深入理解 JUC:ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,在 JUC 包中,线程安全的队列按照实现方式可以分为阻塞队列和非阻塞队列两大类,前者基于锁来保证线程安全,而后者则基于 CAS 来保证线程安全,阻塞队列一般在类名中都带有 Blocking 的字样。

由 Linked 关键字我们可以推断出 ConcurrentLinkedQueue 底层依赖于链表实现,在 ConcurrentLinkedQueue 的内部实现了一个单链表,用以存放队列元素。其中,结点 Node 类定义如下:

1
2
3
4
5
6
7
8
9
10
private static class Node<E> {

/** 结点元素值 */
volatile E item;
/** 指针,指向下一个结点 */
volatile Node<E> next;

// ... 省略方法定义

}

Node 类是一个典型的链表结点定义,此外,Node 类还定义了一些方法用于修改结点的元素值和 next 结点,这些方法均基于 Unsafe 实现。ConcurrentLinkedQueue 的 head 和 tail 字段分别指向队列的头结点和尾结点,如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, Serializable {

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
head = tail = new Node<>(null);
}

// ... 省略方法实现

}

当我们构造一个空的 ConcurrentLinkedQueue 对象时,链表的 head 和 tail 均指向一个元素值为 null 的标记结点。在 ConcurrentLinkedQueue 中不允许添加 null 元素,因为值为 null 的结点在 ConcurrentLinkedQueue 中扮演着特殊的角色。

Queue 接口

Queue 接口继承自 Collection 接口,增加了队列相关的操作,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
public interface Queue<E> extends Collection<E> {

boolean add(E e);
boolean offer(E e);

E poll();
E peek();
E element();

E remove();

}

针对各方法的含义说明如下:

  • add:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则抛出 IllegalStateException 异常。
  • offer:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则返回 false,而不是抛出异常。
  • poll:移除队列头结点,并返回结点元素值,如果队列为空则返回 null。
  • peek:仅获取头结点元素值而不删除结点,如果队列为空则返回 null。
  • element:仅获取头结点元素值而不删除结点,如果队列为空则抛出 NoSuchElementException 异常。
  • remove:移除队列头结点,并返回结点元素值,如果队列为空则抛出 NoSuchElementException 异常。

核心方法实现

ConcurrentLinkedQueue 实现自 Queue 接口,下面来分析一下其针对 Queue 中声明的核心操作方法的实现。

添加元素:add & offer

ConcurrentLinkedQueue 实现了 Queue 接口中声明的往队列中添加元素方法,即 Queue#addQueue#offer。这两个方法虽然都是往队列末端追加元素,但是区别在于如果队列已满,则 Queue#offer 方法会拒绝添加。对于 ConcurrentLinkedQueue 而言,这两个方法在实现上并没有区别,因为 ConcurrentLinkedQueue 是无界的。

下面来看一下 ConcurrentLinkedQueue 针对 Queue#offer 的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean offer(E e) {
// 待添加元素不能为 null
checkNotNull(e);
// 创建待添加元素对应的 Node 结点
final Node<E> newNode = new Node<>(e);

// 添加到尾结点
for (Node<E> t = tail, p = t; ; ) {
Node<E> q = p.next;
if (q == null) { // 1
// p 已经是尾结点,基于 CAS 设置结点 newNode 为 p 的 next 结点
if (p.casNext(null, newNode)) {
/*
* Successful CAS is the linearization point for e to become an element of this queue,
* and for newNode to become "live".
*/
if (p != t) { // hop two nodes at a time
// 更新 tail 结点
this.casTail(t, newNode); // Failure is OK.
}
return true;
}
// Lost CAS race to another thread; re-read next
} else if (p == q) { // 2
/*
* We have fallen off list. If tail is unchanged, it will also be off-list,
* in which case we need to jump to head, from which all live nodes are always reachable.
* Else the new tail is a better bet.
*/
p = (t != (t = tail)) ? t : head;
} else { // 3
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
}

对于待添加的元素,上述方法首先会判断是否为 null,上面已经提及过 ConcurrentLinkedQueue 不允许向其中添加 null 元素,这主要是因为元素值为 null 的结点在 ConcurrentLinkedQueue 中是一个特殊的标记结点。如果待添加元素不为 null,则上述方法会将元素包装成 Node 结点(令该结点为 N)添加到队列的末端。

下面通过图示演示各种不同的元素添加场景,本小节中均使用青色表示 head 结点,使用橘黄色表示 tail 结点。假设当前队列的元素构成如下图 (1) 所示,此时 q 结点为 null(说明:本文中所有虚线表示的结点均指代结点本身为 null,而非结点元素值为 null),即运行到上述代码 1 位置。需要基于 CAS 操作将 p 的 next 结点由 null 修改为 N 结点,成功则返回 true。此时 p 不等于 t,操作完成之后队列的元素构成如下图 (2) 所示。

image

考虑上述过程存在多个线程竞争,假设现在有两个线程 A 和 B,其中 A 在执行代码 1 中的 Node#casNext 时成功将 p 的 next 结点由 null 更新为 node1 结点,如下图 (1) 所示。此时线程 B 再执行 Node#casNext 企图将 p 的 next 结点由 null 更新为 node2 结点时将失败,因为 p 的 next 结点此时已经不为 null,所以线程 B 将进入下一轮 for 循环,但此时 q 已经不为 null,且不等于 p,所以进入代码 3,这一步的运行结果就是将 q 赋值给 p,如下图 (2) 所示。接着线程 B 继续进入下一轮 for 循环,执行 Node<E> q = p.next;,如下图 (3) 所示。因为此时 q 等于 null,所以继续执行代码 1 将 p 的 next 结点由 null 修改为 node2,如下图 (4) 所示。但此时的 p 不等于 t,所以需要执行 ConcurrentLinkedQueue#casTail 更新 tail 结点,如下图 (5) 所示。

image

最后再来分析一下什么情况下会执行到代码 2。假设当前队列的元素构成如下图 (1) 所示,此种结构一般由其它线程执行 poll 方法所造成,下一小节会进行分析。此时 tail 结点形成了自引用,开始执行 for 循环时 p 和 t 均指向 tail 结点,当将 p 的 next 结点赋值给 q 时,因为 p 的 next 结点即为 tail 结点自己,所以 q 也指向 tail 结点。此时,q 结点不为 null,且 p 等于 q,所以执行代码 2 将 head 结点赋值给 p,如下图 (2) 所示。所以这一步的目的在于跳出自引用,后续的执行流程参考下图 (3)、(4) 和 (5),读者可以自行梳理。

image

除了上面介绍的 ConcurrentLinkedQueue#offer 方法,ConcurrentLinkedQueue 还实现了 ConcurrentLinkedQueue#add 方法同样用于往队列末端追加元素,不过因为 ConcurrentLinkedQueue 是无界队列,所以该方法也只是简单的调用了 ConcurrentLinkedQueue#offer 方法,不再重复介绍。

获取元素:poll & peek & element

针对获取元素的操作,Queue 接口声明了 3 个方法,包括 Queue#pollQueue#peek,以及 Queue#element。其中 Queue#pollQueue#peek 的区别一般都比较熟悉,而 Queue#element 方法在功能上与 Queue#peek 方法类似,都是获取队列的头结点元素值而不删除结点,区别在于当队列为空时,方法 Queue#peek 返回 null,而 Queue#element 则抛出异常。ConcurrentLinkedQueue 针对 Queue#element 方法的实现实际上也是委托给 ConcurrentLinkedQueue#peek 方法执行的,只是对该方法的返回值进行了处理,如果返回值为 null 则抛出 NoSuchElementException 异常。

下面首先来看一下 ConcurrentLinkedQueue 针对方法 Queue#poll 的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public E poll() {
restartFromHead:
for (; ; ) {
// 从头结点获取元素
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;

// 如果当前头结点不为 null,则尝试基于 CAS 将其修改为 null
if (item != null && p.casItem(item, null)) { // 1
// CAS 操作成功
// Successful CAS is the linearization point for item to be removed from this queue.
if (p != h) { // hop two nodes at a time
this.updateHead(h, ((q = p.next) != null) ? q : p);
}
return item;
}
// 当前队列为空
else if ((q = p.next) == null) { // 2
this.updateHead(h, p);
return null;
} else if (p == q) { // 3
continue restartFromHead;
} else { // 4
p = q;
}
}
}
}

final void updateHead(Node<E> h, Node<E> p) {
// 将头结点由 h 更新为 p
if (h != p && this.casHead(h, p)) {
// 更新 h 的 next 结点为 h 自己
h.lazySetNext(h);
}
}

上述方法使用了 continue 标签语法来控制代码的执行逻辑,其中标签名为 restartFromHead,此外,break 关键字同样支持标签语法,与 continue 一起实现类似 goto 关键字的功能。

下面同样通过图示演示各种不同的获取元素场景,本小节中均使用青色表示 head 结点,使用橘黄色表示 tail 结点。假设当前队列的元素构成如下图 (1) 所示,此时 p 和 h 均指向 head 结点,而 q 结点未赋值,所以暂未标出。此时 p 所指向的结点元素值不为 null,所以尝试执行 Node#casItem 方法基于 CAS 修改结点元素值为 null,即运行上述代码 1。假设当前线程 CAS 操作成功,如下图 (2) 所示,因为此时 p 等于 h,所以直接返回结点元素值,即出队列成功。

image

继续演示一些其它情况,假设现在队列的头结点元素值为 null,所以直接跳转到代码 2 执行,将 q 赋值为 p 的 next 结点,如下图 (1) 所示,但是因为结点不为 null,所以继续往下执行。此时 p 不等于 q,所以执行代码 4 将 q 赋值给 p,如下图 (2) 所示,然后进入下一轮循环。

此时结点 p 的元素值不为 null,所以进入代码 1。考虑存在多个线程竞争的场景,假设现在有两个线程 A 和 B,其中 A 在执行代码 1 中的 Node#casItem 时成功将 p 的元素值更新为 null,如下图 (3-1) 所示。因为此时 p 不等于 h,所以执行 ConcurrentLinkedQueue#updateHead 方法将头结点由 h 更新为 p,并重定向 h 的 next 指针指向自己,如下图 (4-1) 所示。最后返回结点元素值,即出队列成功。

image

因为线程 A 已经操作成功,所以线程 B 在执行 Node#casItem 方法时必然失败,于是继续向下执行代码 2,将 q 指向 p 的 next 结点,如上图 (3-2) 所示。因为此时 q 结点为 null,所以执行 ConcurrentLinkedQueue#updateHead 方法将头结点由 h 更新为 p,并重定向 h 的 next 指针指向自己,如上图 (4-2) 所示。最后返回 null 值,即出队列成功。

最后再来分析一下什么情况下会执行到代码 3。假设当前队列的元素构成如下图 (1) 所示,并且当前有两个线程(A 和 B)竞争执行出队列操作,线程 A 首先执行代码 1 基于 CAS 将结点 p 元素值修改为 null,如下图 (2) 所示。但在线程 A 继续执行代码 1 中的 ConcurrentLinkedQueue#updateHead 方法尝试更新 head 结点之前,B 线程进入了 for 循环,如下图 (3) 所示,此时 B 线程的 h 和 p 指针均指向 head 结点,但是在 B 继续向下执行之前,A 线程执行了 ConcurrentLinkedQueue#updateHead 方法,将 head 结点由 h 更新为 p,并修改 h 的 next 指针指向自己,最后返回元素值,如下图 (4) 所示。

image

此时,如上图 (5) 所示,B 线程再继续执行时发现 p 结点元素值为 null,所以跳转执行代码 2 将 p 的 next 结点赋值给 q,如上图 (6) 所示。因为此时 p 结点不为 null,且是自引用,所以 p 也就等于 q,继续执行代码 3 跳出本次 for 循环从头再来。再次进入 for 循环时,B 线程看到的队列结构就变成了如上图 (7) 所示。

最后来看一下 ConcurrentLinkedQueue#peek 方法的实现,相对于 ConcurrentLinkedQueue#poll 方法,该方法的区别在于仅获取队头元素,而不移除头结点。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (; ; ) {
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;
if (item != null || (q = p.next) == null) { // 1
this.updateHead(h, p);
return item;
} else if (p == q) { // 2
continue restartFromHead;
} else { // 3
p = q;
}
}
}
}

上述实现初看起来似乎不太好理解,但是如果像上面一样结合图示去分析就会一目了然,这里就不再继续演示各个步骤的执行逻辑,感兴趣的读者可以自己动手画一下。

移除元素:remove

针对删除元素操作,Queue 仅声明了 Queue#remove 方法,用于删除队列头结点并返回结点元素值,区别于 Queue#poll 方法返回 null 值,当队列为空时该方法将抛出异常。ConcurrentLinkedQueue 还实现了带参数的 remove 方法(继承自 Collection 接口),该方法用于从当前队列中删除目标元素值对应的结点,如果存在多个则删除第 1 个。下面主要来看一下带参数版本的 remove 方法实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
// 遍历队列中的元素
for (Node<E> p = this.first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
// 如果当前遍历元素不是期望删除的元素,则继续获取后继结点
if (!o.equals(item)) {
next = this.succ(p);
continue;
}
// 当前遍历元素是期望删除的元素,基于 CAS 将该结点置为 null
removed = p.casItem(item, null);
}

/*
* 指向到这里分为两种情况:
* 1. 当前结点为 null。
* 2. 当前结点为待删除结点。
*/

// 获取当前结点的后继结点
next = this.succ(p);
// 更新前驱结点的 next 指针指向当前结点的后继结点
if (pred != null && next != null) { // unlink
pred.casNext(p, next);
}
if (removed) {
return true;
}
}
}
return false;
}

删除的过程从队列头部开始遍历,并在遇到待删除元素时基于 CAS 将对应结点元素值更新为 null,在遍历过程中会剔除掉所有元素值为 null 的结点。

其它操作:size & contains

ConcurrentLinkedQueue 提供了 ConcurrentLinkedQueue#size 方法用于获取队列的长度,该方法在实现上会从头开始对队列进行遍历,并计数元素值不为 null 的结点,并以 Integer.MAX_VALUE 作为计数值上界。需要注意的一点是不同于一般的集合,ConcurrentLinkedQueue 整个计算队列大小的过程时间复杂度为 O(n),并且结果是不准确的。如果期间有其它线程对队列执行增删操作,将不会在 ConcurrentLinkedQueue#size 方法的返回值中体现。

方法 ConcurrentLinkedQueue#contains 用于判断队列中是否包含参数指定的元素值,在实现上与 ConcurrentLinkedQueue#size 方法思想想通,都需要从头开始遍历队列并对元素值进行比对。

总结

本文分析了 ConcurrentLinkedQueue 的设计与实现,并运用图示梳理了队列元素添加、获取,以及删除的过程。ConcurrentLinkedQueue 底层依赖于单链表作为存储结构,并基于 CAS 对链表的结点进行修改,从而实现在不阻塞的前提下保证线程安全,避免线程阻塞和唤醒所带来的性能开销。ConcurrentLinkedQueue 的设计思路还是比较简单明了的,难点在于访问结点过程中对链表的操作,并不是特别直观,所以本文引入了大量的图示演示相关的操作过程,希望能够简化理解。

参考

  1. JDK 1.8 源码

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.