深入理解 JUC:LinkedBlockingQueue

上一篇我们分析了 ConcurrentLinkedQueue 无界非阻塞线程安全队列的设计与实现,本篇我们继续来分析线程安全队列 LinkedBlockingQueue 的实现机制。由 Blocking 字样可以推断出 LinkedBlockingQueue 是阻塞队列,前面我们介绍过阻塞队列和非阻塞队列在实现上的区别,知道阻塞队列一般是基于锁机制来保证线程安全,本文我们就一起来分析一下 LinkedBlockingQueue 是如何基于锁构建线程安全队列的。

同样由 Linked 关键字我们可以推断出 LinkedBlockingQueue 底层依赖于链表实现,在 LinkedBlockingQueue 的内部实现了一个单链表,用以存放队列元素。其中,结点 Node 类定义如下:

1
2
3
4
5
6
7
8
9
static class Node<E> {

E item;
Node<E> next;

Node(E x) {
item = x;
}
}

其中 next 指针的指向分为 3 种情况:

  1. 指向某个具体的后继结点。
  2. 指向自己,意味着后继结点为 head.next
  3. 指向 null,说明当前结点是队列的尾结点,没有后继结点。

LinkedBlockingQueue 定义了 head 和 last 指针分别指向队列的头结点和尾结点。此外,LinkedBlockingQueue 还定义了如下字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

/** 当前队列的容量上限 */
private final int capacity;
/** 记录当前队列的元素个数 */
private final AtomicInteger count = new AtomicInteger();

/** 队列头结点 */
transient Node<E> head;
/** 队列尾结点 */
private transient Node<E> last;

/** 用于控制 take、poll 等操作,保证同一时间只有一个线程从队列获取元素 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 条件队列,记录出队列时因为队列为空而等待的线程 */
private final Condition notEmpty = takeLock.newCondition();

/** 用户控制 put、offer 等操作,保证同一时间只有一个线程往队列添加元素 */
private final ReentrantLock putLock = new ReentrantLock();
/** 条件队列,记录入队列时因为队列已满而等待的线程 */
private final Condition notFull = putLock.newCondition();

// ... 省略方法定义

}

由上述字段定义可以看出,LinkedBlockingQueue 限制了队列的容量上限,并使用 AtomicInteger 类型字段对队列中的元素个数进行计数。虽然 LinkedBlockingQueue 底层依赖于链表实现,理论上是无界的,但是 LinkedBlockingQueue 在实现上却限制了队列的容量上限(默认为 Integer.MAX_VALUE)。

此外,针对出队列和入队列操作,LinkedBlockingQueue 分别设置了一把独占可重入锁,即 takeLock 和 putLock,从而保证同一时间只有一个线程执行出队列操作,只有一个线程执行入队列操作,且出队列的线程与入队列的线程彼此之间不相互影响。针对一些阻塞版本的出队列入队列方法,如果队列为空,则出队列线程会被记录到条件队列 notEmpty 中进行等待,如果队列已满,则入队列线程会被记录到条件队列 notFull 中进行等待。

BlockingQueue 接口

BlockingQueue 接口继承自 Queue 接口,用于描述阻塞队列。当队列无法及时响应用户请求时,例如当我们尝试从空队列中获取元素,或者继续往已满的有界队列中添加元素,BlockingQueue 定义了以下 4 种响应形式:

  1. 抛出异常。
  2. 立即返回特殊值,例如 null 或 false。
  3. 无限期阻塞当前请求,直到队列状态变为可用。
  4. 超时阻塞当前请求,直到队列状态变为可用。

BlockingQueue 接口的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface BlockingQueue<E> extends Queue<E> {

boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean add(E e);
void put(E e) throws InterruptedException;

E poll(long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;

boolean remove(Object o);

boolean contains(Object o);
int remainingCapacity();

int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);

}

针对各方法的含义说明如下:

  • offer:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则返回 false,而不是抛出异常。BlockingQueue 同时还声明了超时版本的 offer 方法。
  • add:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则抛出 IllegalStateException 异常。
  • put:往队列中添加元素,对于有界队列来说,如果队列已满则阻塞当前请求,期间支持响应中断。
  • poll:移除队列头结点,并返回结点元素值,如果队列为空则等待指定时间,并在超时时返回 null,期间支持响应中断。
  • take:仅获取头结点元素值而不删除结点,如果队列为空则阻塞等待,期间支持响应中断。
  • remove:接收一个参数,从队列中删除值等于该参数的结点,如果存在多个结点满足要求,则删除第一个。
  • contains:接收一个参数,判断队列中是否存在值等于该参数的结点。
  • remainingCapacity:返回队列的剩余容量,如果是无界队列,则返回 Integer.MAX_VALUE
  • drainTo:从队列中移除所有(或指定个数)结点,并将结点元素放入参数指定的集合中返回,相对于逐个移除更加高效。

核心方法实现

LinkedBlockingQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。

添加元素:offer & add & put

针对添加元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#offerLinkedBlockingQueue#addLinkedBlockingQueue#put 方法,其中 LinkedBlockingQueue#add 是对 LinkedBlockingQueue#offer 的封装,并在队列已满时抛出 IllegalStateException 异常。

下面主要展开分析 LinkedBlockingQueue#offerLinkedBlockingQueue#put 方法的实现,首先来看一下 LinkedBlockingQueue#offer 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public boolean offer(E e) {
// 不允许添加 null 元素
if (e == null) {
throw new NullPointerException();
}
final AtomicInteger count = this.count;
// 当前队列已满,直接返回 false
if (count.get() == capacity) {
return false;
}
int c = -1;
// 创建待添加元素对应的结点对象
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
// 加锁
putLock.lock();
try {
// 再次校验队列是否已满
if (count.get() < capacity) {
// 往队列末端追加结点
this.enqueue(node);
// 队列元素个数计数加 1,并返回添加之前队列的大小
c = count.getAndIncrement();
// 当前队列在执行添加操作之后仍然存在空闲位置,尝试唤醒一个之前因为队列已满而等待的线程
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
// 释放锁
putLock.unlock();
}
// c == 0 说明队列中至少存在一个元素(当前添加的),尝试唤醒一个之前因为队列为空而等待的线程
if (c == 0) {
this.signalNotEmpty();
}
return c >= 0;
}

如果待添加元素为 null,或者队列已满,则上述方法会直接返回 false,表示添加失败,否则创建待添加元素对应的结点对象,并继续执行:

  1. 加锁,保证同一时间只有一个线程在执行添加操作;
  2. 再次校验队列是否已满,如果已满则跳转至步骤 5,否则执行 LinkedBlockingQueue#enqueue 方法往队列末端插入结点;
  3. 结点个数计数加 1;
  4. 如果在完成本次添加操作之后,队列仍然未满,则尝试唤醒一个之前因为队列已满而等待的线程;
  5. 释放锁;
  6. 如果本次成功添加了一个元素,则调用 LinkedBlockingQueue#signalNotEmpty 方法尝试唤醒一个之前因为队列为空而等待的线程;
  7. 返回。

其中 LinkedBlockingQueue#signalNotEmpty 方法的实现比较简单,读者可以参考源码实现。这里简单提一下 LinkedBlockingQueue#enqueue 方法,实现如下:

1
2
3
private void enqueue(Node<E> node) {
last = last.next = node;
}

在 LinkedBlockingQueue 对象被构造出来时,head 和 last 指针均指向一个元素值为 null 的标记结点。由上述方法的实现可以看出当执行入队列操作时,是将结点赋值给 last 结点的 next 指针,并没有移除队列头部的 null 结点,下文在介绍出队列操作时返回的都是 head.next 结点元素值,理解了上述插入操作的执行过程也就不会疑惑为什么出队列时不是直接返回 head 结点的元素值。

LinkedBlockingQueue 还定义了超时版本的 LinkedBlockingQueue#offer(E, long, TimeUnit) 方法,当队列已满时,该方法会阻塞等待指定的时间。

下面再来看一下 LinkedBlockingQueue#put 方法,相对于上面介绍的 LinkedBlockingQueue#offer 方法,对于有界队列而言,如果队列已满则该方法将无限期阻塞,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void put(E e) throws InterruptedException {
// 不允许添加 null 元素
if (e == null) {
throw new NullPointerException();
}
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 加锁,期间支持响应中断
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is not protected by lock.
* This works because count can only decrease at this point (all other puts are shut out by lock),
* and we (or some other waiting put) are signalled if it ever changes from capacity.
* Similarly for all other uses of count in other wait guards.
*/

// 队列已满,则等待
while (count.get() == capacity) {
notFull.await();
}
// 执行入队列操作
this.enqueue(node);
// 队列元素个数计数加 1,并返回添加之前队列的大小
c = count.getAndIncrement();
// 当前队列在执行添加操作之后仍然存在空闲位置,尝试唤醒一个之前因为队列已满而等待的线程
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
// c == 0 说明队列中至少存在一个元素(当前添加的),尝试唤醒一个之前因为队列为空而等待的线程
if (c == 0) {
this.signalNotEmpty();
}
}

由上述实现可以看出,相对于 LinkedBlockingQueue#offer 方法在队列已满时的直接返回,方法 LinkedBlockingQueue#put 会将当前线程添加到条件队列中等待其它线程释放队列空间。

获取元素:poll & peek & take

针对获取元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#pollLinkedBlockingQueue#peekLinkedBlockingQueue#take 方法,其中 LinkedBlockingQueue#peek 方法仅获取队列头结点元素值,而不移除头结点,实现上比较简单。下面展开分析 LinkedBlockingQueue#pollLinkedBlockingQueue#take 方法的实现机制。

首先来看一下 LinkedBlockingQueue#poll 方法,LinkedBlockingQueue 针对该方法定义了两个版本,区别在于当队列为空时是立即返回还是阻塞等待一段时间,而在实现思路上是一致的。这里以不带超时参数的版本为例展开分析,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public E poll() {
final AtomicInteger count = this.count;
// 当前队列为空,直接返回 null
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 如果当前队列不为空
if (count.get() > 0) {
// 获取队列头结点元素,并移除头结点
x = this.dequeue();
// 队列元素计数值减 1,这里返回的是减 1 之前的值
c = count.getAndDecrement();
// 队列在执行移除操作后至少还存在一个元素,尝试唤醒一个之前因为队列为空而阻塞的线程
if (c > 1) {
notEmpty.signal();
}
}
} finally {
// 释放锁
takeLock.unlock();
}
/*
* 之前队列已满,但是经过本次 poll 操作之后,至少有一个空闲位置,
* 尝试唤醒一个之前因为队列已满而阻塞的线程
*/
if (c == capacity) {
this.signalNotFull();
}
return x;
}

如果队列为空则上述方法会直接返回 null,否则继续执行:

  1. 加锁,保证同一时间只有一个线程在执行获取操作;
  2. 再次校验队列是否为空,如果为空则跳转至步骤 5,否则执行 LinkedBlockingQueue#dequeue 方法移除队列头结点,并返回结点元素值;
  3. 结点个数计数减 1;
  4. 如果在完成本次移除操作之后,队列仍然非空,则尝试唤醒一个之前因为队列为空而等待的线程;
  5. 释放锁;
  6. 如果本次成功移除了一个元素,则调用 LinkedBlockingQueue#signalNotFull 方法尝试唤醒一个之前因为队列已满而等待的线程;
  7. 返回。

其中 LinkedBlockingQueue#signalNotFull 方法的实现比较简单,读者可以参考源码实现。前面我们分析了入队列 LinkedBlockingQueue#enqueue 方法,下面来看一下出队列方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
// 自引用,等待 GC 回收
h.next = h; // help GC
head = first;
// 获取真正队列头结点的元素值
E x = first.item;
// 将队列头结点元素值置为 null
first.item = null;
return x;
}

理解了前面入队列的过程,则上述出队列的实现也就一目了然,只要清楚队列的头结点一直是一个值为 null 的结点,而真正有效的队列头结点是该结点的 next 结点。

LinkedBlockingQueue 还定义了超时版本的 LinkedBlockingQueue#poll(long, TimeUnit) 方法,当队列为空时,该方法会阻塞等待指定的时间。

下面再来看一下 LinkedBlockingQueue#take 方法,相对于上面介绍的 LinkedBlockingQueue#poll 方法,对于有界队列而言,如果队列为空则该方法将无限期阻塞,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lockInterruptibly();
try {
// 如果队列为空,则等待
while (count.get() == 0) {
notEmpty.await();
}
// 获取队列头结点元素,并移除头结点
x = this.dequeue();
// 队列元素计数值减 1,这里返回的是减 1 之前的值
c = count.getAndDecrement();
// 队列在执行移除操作后至少还存在一个元素,尝试唤醒一个之前因为队列为空而阻塞的线程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 释放锁
takeLock.unlock();
}
/*
* 之前队列已满,但是经过本次 poll 操作之后,至少有一个空闲位置,
* 尝试唤醒一个之前因为队列已满而阻塞的线程
*/
if (c == capacity) {
this.signalNotFull();
}
return x;
}

由上述实现可以看出,相对于 LinkedBlockingQueue#poll 方法在队列为空时的直接返回,方法 LinkedBlockingQueue#take 会将当前线程添加到条件队列中等待其它线程添加新的队列元素。

移除元素:remove

针对移除元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#remove 方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 LinkedBlockingQueue#poll 方法执行的。下面来分析一下有参版本的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean remove(Object o) {
if (o == null) {
return false;
}
// 锁定出队列、入队列操作
this.fullyLock();
try {
// 从头开始遍历队列
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 如果找到第一个目标元素,则移除
if (o.equals(p.item)) {
// 移除 p 结点,如果执行移除之后队列有空闲位置,
// 则尝试唤醒一个之前因为队列已满而阻塞的线程
this.unlink(p, trail);
return true;
}
}
return false;
} finally {
// 释放出队列、入队列操作
this.fullyUnlock();
}
}

上述方法接收一个参数,并执行删除元素值等于该参数的结点,如果存在多个满足条件的结点,则删除第一个。在执行删除操作之前会获取 putLock 和 takeLock 两把锁,以防止删除期间有其它线程执行出队列或入队列操作。

其它操作:size & contains

最后来看一下 LinkedBlockingQueue#containsLinkedBlockingQueue#size 方法的实现,前者用于检查队列是否包含值等于参数的结点,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean contains(Object o) {
if (o == null) {
return false;
}
// 锁定出队列、入队列操作
this.fullyLock();
try {
// 从头开始遍历链表,并逐一比对
for (Node<E> p = head.next; p != null; p = p.next) {
if (o.equals(p.item)) {
return true;
}
}
return false;
} finally {
// 释放出队列、入队列操作
this.fullyUnlock();
}
}

方法 LinkedBlockingQueue#size 用于返回队列的结点个数,前面已经介绍了 LinkedBlockingQueue 定义了一个 AtomicInteger 类型的字段用于计数队列的结点个数,所以 LinkedBlockingQueue#size 方法能够精确的返回,且几乎没有性能开销,同时在实现上非常简单,如下:

1
2
3
public int size() {
return count.get();
}

既然定义一个 AtomicInteger 类型的计数变量有这么多优势,那么不禁要思考为什么上一篇介绍的 ConcurrentLinkedQueue 没有这么做呢?这主要还是因为 ConcurrentLinkedQueue 是非阻塞队列,基于 CAS 机制来保证线程安全,但是 CAS 的缺点在于无法像锁一样同时保证多个操作的原子性,所以无法引入计数原子变量。

总结

本文分析了 LinkedBlockingQueue 的设计与实现,LinkedBlockingQueue 底层依赖于单链表作为存储结构,并基于可重入锁 ReentrantLock 保证线程安全,同时为入队列和出队列分别设置了一把锁以提升操作性能,减少阻塞开销。

相对于上一篇介绍的 ConcurrentLinkedQueue 而言,LinkedBlockingQueue 与其功能相同,但是底层却是两套不同的设计与实现。此外,JUC 还提供了以数组作为底层存储结构的有界阻塞线程安全队列 ArrayBlockingQueue,该组件与本文介绍的 LinkedBlockingQueue 在设计思路上是一致的,同样基于 ReentrantLock 锁保证线程安全,并支持在构造时使用公平锁策略。

总而言之,在线程安全队列的使用上 JUC 给我们提供了多种选择,具体开发时如何确定使用哪个组件还是要取决于具体的应用场景,甚至有必要进行一些压测,以结果引导决策。

参考

  1. JDK 1.8 源码

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.