基于 CAS 机制的 ConcurrentHashMap 实现内幕

曾经写过一篇《基于锁分段机制的 ConcurrentHashMap 实现内幕》的文章,介绍了在 jdk 1.7 之前 ConcurrentHashMap 的实现机制。文章的结尾我们提及到在 jdk 1.8 之后,ConcurrentHashMap 在实现上抛弃了锁分段机制,转而采用 CAS(Compare And Swap) 策略,并和 HashMap 一样引入了红黑树的支持。本文我们将基于 jdk 1.8 源码,分析基于 CAS 机制的 ConcurrentHashMap 实现。

Java 中的 CAS 机制

CAS 属于原子操作的一种,能够保证一次读写操作是原子的。CAS 通过将内存中的值与期望值进行比较,只有在两者相等时才会对内存中的值进行修改。一个 CAS 操作的伪代码实现如下(引自 WIKI):

1
2
3
4
5
6
7
function cas(p : pointer to int, old : int, new : int) returns bool {
if *p ≠ old {
return false
}
*p <- new
return true
}

Java 中的 CAS 实现位于 sun.misc.Unsafe 类中,该类中定义了大量的 native 方法,CAS 的实现也不例外:

1
2
3
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

仅仅从 java 源码层面我们只能看到对应的 native 定义,而具体实现需要依赖于操作系统,这里对方法的参数进行说明:

  • o:目标操作对象
  • offset:目标操作数内存偏移地址
  • expected:期望值
  • x:更新值

CAS 是支撑 java 并发类库(JUC)的基础,除了本文介绍的 ConcurrentHashMap 实现外,典型的应用场景就是 java 中的原子类,例如 AtomicInteger,其中运用了大量的 CAS 操作,在保证性能的同时提供并发场景下的线程安全性,以 AtomicInteger#getAndSet 方法为例:

1
2
3
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

该方法原子性的将当前 AtomicInteger 类型的变量值设置为 newValue,并返回修改之前的值。整个过程无需加锁,实现上依赖于 Unsafe#getAndSetInt 方法,其中 valueOffset 变量是当前 AtomicInteger 类型变量值的内存偏移地址:

1
2
3
4
5
6
7
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}

方法 Unsafe#getAndSetInt 首先会获取当前 AtomicInteger 类型变量的值,然后基于 CAS 更新变量值为 newValue。

CAS 机制虽然无需加锁、安全且高效,但也存在一些缺点,概括如下:

  1. 循环检查的时间可能较长,不过可以限制循环检查的次数
  2. 只能对一个共享变量执行原子操作
  3. 存在 ABA 问题

所谓 ABA 问题是指在 CAS 两次检查操作期间,目标变量的值由 A 变为 B,又变回 A,但是 CAS 看不到这中间的变换,对它来说目标变量的值并没有发生变化,一直是 A,所以 CAS 操作会继续更新目标变量的值。大部分时候该问题并不会对结果产生实质性影响,如果确实需要关心该问题(例如 lock-free 算法),可以为目标变量引入版本特性,例如 AtomicStampedReference 工具类通过为引用建立类似版本号(stamp)的方式,来解决 ABA 问题。

基于 CAS 的 ConcurrentHashMap 实现

Jdk 1.7 之前,ConcurrentHashMap 通过加锁保证线程安全,并引入锁分段机制以减小加锁的粒度,从而提升性能。Jdk 1.8 中的 ConcurrentHashMap 实现则引入了 CAS 机制以尽量避免加锁操作,虽然仍然有部分同步代码,不过锁的粒度相对于分段锁而言更加细粒度。另外一个重要的设计就是在结点个数达到阈值时会自动将链表转换成红黑树,从而进一步提升性能。

存储结构设计

在存储结构设计上,新的 ConcurrentHashMap 相对于之前看起来更加的简洁。如下图,在一个 Node 类型的数组(下文如不做特殊说明,均使用 table 指代该数组)上挂载着多个链表和红黑树(下文如不做特殊说明,均使用 bin 指代一个完整的链表或红黑树):

image

在结点类型上主要包含:

  • Node<K, V>:基本结点数据结构,用于存储 key、value,以及结点的哈希值
  • ForwardingNode<K, V>:扩容节点,哈希值始终为 -1,在扩容过程中作为一个占位符表示当前结点为 null,或正在迁移
  • ReservationNode<K, V>:同样是一个占位符结点,哈希值始终为 -3,用于 computeIfAbsentcompute 操作
  • TreeNode<K, V>:红黑树结点,除了包含基本的 key、value,以及结点哈希值外,还定义了红黑树结点特有的指针,以及结点颜色标记
  • TreeBin<K, V>:封装红黑树相关的操作

ConcurrentHashMap 针对 ForwardingNode、ReservationNode,以及树根结点都定义了特定的哈希值:

1
2
3
4
5
6
7
8
/** ForwardingNode 结点的 hash 值 */
static final int MOVED = -1; // hash for forwarding nodes

/** 树根结点的 hash 值 */
static final int TREEBIN = -2; // hash for roots of trees

/** ReservationNode 结点的 hash 值 */
static final int RESERVED = -3; // hash for transient reservations

基本方法实现

工具方法

ConcurrentHashMap 主要定义了 3 个工具方法:tabAt、casTabAt 和 setTabAt。

  • tabAt

方法 tabAt 用于获取 table 上下标为 i 的头结点,实现上依赖 Unsafe 类:

1
2
3
static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) {
return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
}
  • casTabAt

方法 casTabAt 基于 CAS 尝试更新 table 上下标为 i 的结点的值为 v,实现如下:

1
2
3
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
}
  • setTabAt

方法 setTabAt 用于设置 table 上下标为 i 的结点为 v,相对于 casTabAt 方法的区别在于不关注历史值,实现如下:

1
2
3
static final <K, V> void setTabAt(Node<K, V>[] tab, int i, Node<K, V> v) {
U.putObjectVolatile(tab, ((long) i << ASHIFT) + ABASE, v);
}
构造方法

ConcurrentHashMap 的构造方法存在多个重载版本,这里我们以参数最全的为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
// Use at least as many bins
if (initialCapacity < concurrencyLevel) {
// 如果指定初始化容量小于并行度,则修正初始化容量设置
initialCapacity = concurrencyLevel; // as estimated threads
}
// 计算大于当前指定初始容量的最小 2 次幂
long size = (long) (1.0 + (long) initialCapacity / loadFactor);
int cap = (size >= (long) MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int) size);
this.sizeCtl = cap;
}

该构造方法允许我们指定初始化容量(initialCapacity)、负载因子(loadFactor),以及并行度(concurrencyLevel),并依据这些参数计算 sizeCtl 值。类实例变量 sizeCtl 是一个核心变量,用于控制 table 的初始化和扩容策略,该变量的值定义了几种不同的语义:

  • -1:表示正在初始化
  • -N:表示有 N-1 个线程正在执行扩容操作
  • 0:表示还未执行初始化
  • N:表示初始化或下次扩容的大小
添加或更新键值对:put

方法 put 用于往 ConcurrentHashMap 中添加或更新键值对,这是 map 集合的基础操作,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public V put(K key, V value) {
return this.putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 或 value 不允许为 null
if (key == null || value == null) throw new NullPointerException();
// 计算 key 的哈希码
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
// 1. 如果 table 数组为空,则进行初始化
if (tab == null || (n = tab.length) == 0) {
// 基于 CAS 策略初始化 table,初始化大小为 16
tab = this.initTable();
}
// 2. 否则,计算 hash 值对应的下标,获取 table 上对应下标的头结点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
/*
* table 对应下标的头结点为 null
* 基于 CAS 设置结点,如果成功则本次 put 操作完成,
* 如果失败则说明期间有并发操作,需要进入一轮新的循环
*/
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null))) {
// 设置结点成功,put 操作完成
break;
}
}
// 3. 否则,如果 Map 正在执行扩容操作(MOVED 哈希值表示正在扩容),则帮助扩容
else if ((fh = f.hash) == MOVED) {
tab = this.helpTransfer(tab, f);
}
// 4. 否则,获取到 hash 值对应下标的头结点,且结点不为 null
else {
V oldVal = null;
synchronized (f) { // 加锁
if (tabAt(tab, i) == f) { // 再次校验头结点为 f
// 头结点的哈希值大于等于 0,说明是链表,如果是树的话应该是 -2
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 如果是已经存在的 key,则在允许覆盖的前提下直接覆盖已有的值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) {
e.val = value;
}
break;
}
// 如果是不存在的 key,则直接在链表尾部插入一个新的结点
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<>(hash, key, value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
// 调用红黑树的方法获取到修改的结点,并插入或更新结点(如果允许)
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) {
p.val = value;
}
}
}
}
} // end synchronized

if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) {
/*
* 结点数目大于等于 8,对链表执行转换操作
* - 如果 table 长度小于 64,则执行扩容
* - 如果 table 长度大于等于 64,则转换成红黑树
*/
this.treeifyBin(tab, i);
}
if (oldVal != null) {
return oldVal;
}
break;
}
}
}
// size 加 1
this.addCount(1L, binCount);
return null;
}

方法的执行流程可以概括为:

  1. 计算 key 的哈希值
  2. 如果 table 为空,则执行初始化
  3. 否则,计算 key 哈希值对应的下标,并获取 table 中对应下标的头结点
  4. 如果头结点为 null,则基于 CAS 尝试添加头结点
  5. 否则,如果头结点不为 null,但是头结点的哈希值为 MOVED,说明目前正在执行扩容操作,则帮助扩容
  6. 否则,如果头结点不为 null,且未处于扩容状态,则尝试添加或更新结点
  7. 判断当前 bin 范围内结点数目是否大于阈值,如果大于阈值则执行扩容操作

下面就流程中的一些关键点展开详细说明。

初始化 table

Table 的初始化采用延迟策略,在我们构造 ConcurrentHashMap 对象时,只是初始化了一些参数值,并没有对 table 进行构造,而 table 的初始化发生在第一次使用 table 时,例如这里 put 方法。初始化过程位于 ConcurrentHashMap#initTable 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) {
// 负数表示正在初始化或扩容,等待
Thread.yield(); // lost initialization race; just spin
// compareAndSwapInt 参数:对象,对象变量在内存中的偏移地址,期望值,修改值
} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 执行 CAS 操作,期望将 sizeCtl 设置为 -1,-1 是正在初始化的标识
// CAS 抢到了锁
try {
// 对 table 进行初始化,初始化长度为指定值,或者默认值 16
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
// 指定下次扩容的大小,相当于 0.75 × n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

Table 本质上就是一个 Node 数组,其初始化过程也就是对 Node 数组的初始化过程,方法中使用了 CAS 策略执行初始化操作。初始化流程为:

  1. 判断 sizeCtl 值是否小于 0,如果小于 0 则表示 ConcurrentHashMap 正在执行初始化操作,所以需要先等待一会,如果其它线程初始化失败还可以顶替上去
  2. 如果 sizeCtl 值大于等于 0,则基于 CAS 策略抢占标记 sizeCtl 为 -1,表示 ConcurrentHashMap 正在执行初始化,然后构造 table,并更新 sizeCtl 的值
协助扩容

在 put 过程中,如果当前头结点的哈希值为 MOVED,则说明 ConcurrentHashMap 正在对结点执行扩容操作,此时可以让当前线程加入到扩容工作中协助扩容。该过程位于 ConcurrentHashMap#helpTransfer 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
Node<K, V>[] nextTab;
int sc;
// 当前结点是 ForwardingNode 类型
if (tab != null && (f instanceof ForwardingNode)
&& (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 有 sc-1 个线程正在执行扩容操作
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1
|| sc == rs + MAX_RESIZERS || transferIndex <= 0) {
break;
}
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 添加一个线程执行 transfer 任务
this.transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

该方法的主要作用就是基于 CAS 尝试添加一个线程去协助扩容操作,如果能够成功加入则将 sizeCtl 值加 1。方法 ConcurrentHashMap#transfer 是真正执行扩容操作的地方,并在多个步骤中被触发。这里先给出该方法的定义(如下),具体的实现后面会专门进行分析,
该方法接收 2 个参数,其中 tab 是当前需要被扩容的 table,而 nextTab 是扩容之后的 table,容量上是之前的两倍,helpTransfer 传递的 nextTab 是一个非 null 值,因为触发 helpTransfer 的前提就是当前已经处于扩容阶段。

1
2
3
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
// ...
}
链表转红黑树

ConcurrentHashMap 在设计上并不是一上来就在 table 上建立红黑树数据结构作为 bin,而是先建立一个链表,并在链表长度与 table 长度均达到一定的阈值时才执行转换,即将链表转换成红黑树:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final void treeifyBin(Node<K, V>[] tab, int index) {
Node<K, V> b;
int n, sc;
if (tab != null) {
// 1. 如果 table 长度小于 64,执行扩容操作
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
this.tryPresize(n << 1);
}
// 2. 否则,将链表转换成红黑树
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 头结点 hash 大于 0,说明是链表
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K, V> hd = null, tl = null;
// 将链表转换成一棵红黑树
for (Node<K, V> e = b; e != null; e = e.next) {
TreeNode<K, V> p = new TreeNode<>(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null) {
hd = p;
} else {
tl.next = p;
}
tl = p;
}
// 将红黑树设置到 table 对应的位置
setTabAt(tab, index, new TreeBin<>(hd));
}
}
}
}
}

方法在链表内元素个数大于等于 8 时触发,此时并没有直接执行转换操作,而是先判断当前 table 的长度是否小于 64,如果小于则先尝试扩容操作,否则才会将链表转换成红黑树。如果是扩容的话会基于 CAS 尝试将 sizeCtl 的值设置为 (rs << RESIZE_STAMP_SHIFT) + 2,然后调用 ConcurrentHashMap#transfer 方法执行扩容,该过程位于 ConcurrentHashMap#tryPresize 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private final void tryPresize(int size) {
// 如果当前期望的大小(size)小于最大允许容量的一半,则扩容大小为 size 的 1.5 倍加 1,在向上取最小的 2 次幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) { // 检查当前未处于扩容阶段
Node<K, V>[] tab = table;
int n;
// 初始化 nextTable
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
} else if (c <= sc || n >= MAXIMUM_CAPACITY) {
break;
} else if (tab == table) {
int rs = resizeStamp(n);
// 2. 基于 CAS 将 sc 的值加 1,然后执行 transfer 方法
if (sc < 0) {
Node<K, V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1
|| sc == rs + MAX_RESIZERS || (nt = nextTable) == null
|| transferIndex <= 0) {
break;
}
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 执行 transfer 方法
this.transfer(tab, nt);
}
}
// 1. 基于 CAS 将 sizeCtl 的值设置为 (rs << RESIZE_STAMP_SHIFT) + 2
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {
// 执行 transfer 方法,此时 nextTable 是 null
this.transfer(tab, null);
}
}
}
}

该方法的核心操作在于最后一个添加 transfer 任务,并设置 sizeCtl 值,该方法第一次调用 transfer 方法时 sizeCtl 一定是大于等于 0 的,所以方法会尝试将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2,这是大负数,并执行 transfer(tab, null) 操作,后面的循环 sizeCtl 均小于 0,所以会执行 transfer(tab, nt),并将 sizeCtl 加 1。注意整个过程中 sizeCtl 值的变化,在一次扩容操作中第一次调用 transfer 方法时将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2,并在扩容过程再次调用 transfer 方法时将 sizeCtl 加 1。这对于下一节理解扩容操作什么时候结束至关重要。

扩容操作

扩容操作简单地说就是新建一个长度翻倍的 nextTable,然后将之前 table 上的结点重新哈希迁移到新的 nextTable 上,并在迁移完成之后使用 nextTable 替换原先的 table。对于一个 table 而言,上面分布着 n 个 bin 结点,而结点迁移的过程可以是并发的,这样可以提升迁移的效率。ConcurrentHashMap 使用了一个 stride 变量用于指定将 stride 个 bin 结点组成一个任务单元由一个线程负责处理,在单核 CPU 下 stride 的值为 table 的长度 n,在多核 CPU 下为 (n >>> 3) / NCPU,最小值为 16。

ConcurrentHashMap 定义了一个类实例变量 transferIndex,用于指定任务的边界。任务划分的过程在 table 上是从后往前进行的,例如现在有 n 个结点,则编号 (n-1-stride, ..., n-1) 的任务交给第 1 个线程进行处理,编号 (n-1-2*stride, ..., n-1-stride) 的任务交给第 2 个线程进行处理,以此类推。当有新的线程加入时可以依据 transferIndex 值知道接下去应该分配哪一块的 bin 结点给当前线程。

整个扩容的核心工作位于 ConcurrentHashMap#transfer 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
int n = tab.length, stride;
/*
* stride 即步进,
* 在单核下为 table 的长度 n,在多核模式下为 (n >>> 3) / NCPU,最小值为 16
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
stride = MIN_TRANSFER_STRIDE; // subdivide range
}

// 1. 如果 nextTable 未初始化,则先进行初始化,容量是之前的两倍
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; // 容量翻倍
nextTab = nt;
} catch (Throwable ex) {
// try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}

// 2. 执行迁移工作

int nextn = nextTab.length;
// ForwardingNode 表示一个正在被迁移的结点,对应的 hash 值是 MOVED
ForwardingNode<K, V> fwd = new ForwardingNode<>(nextTab);
boolean advance = true; // 标记一个结点是否迁移完成
boolean finishing = false; // 标记扩容任务是否完成
// i 是索引,bound 是边界值,从后往前迁移
for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
/*
* 2.1 基于 CAS 计算本次任务的边界值,即 i 和 bound 值,
* 将 i 指向 transferIndex,将 bound 指向 transferIndex - stride
*/
while (advance) {
int nextIndex, nextBound;
// 标记当前结点迁移完成
if (--i >= bound || finishing) {
advance = false;
}
// 一旦 transferIndex <= 0,表示所有任务已经分配给相应的线程进行处理
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 基于 CAS 计算 transferIndex 值(即 transferIndex - stride),nextBound 是本次任务的边界
else if (U.compareAndSwapInt(
this,
TRANSFERINDEX,
nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
} // ~end while

/*
* 2.2 执行迁移任务
*/

if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 完成了所有结点的迁移
if (finishing) {
nextTable = null;
table = nextTab; // 更新 table 为 nextTable
sizeCtl = (n << 1) - (n >>> 1); // sizeCtl 值更新为 table 长度的 1.5 倍
return;
}

// 任务继续

/*
* 基于 CAS 将 sizeCtl 减 1
* 在迁移操作开始前会将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2,每一个线程加入迁移任务就会将 sizeCtl 加 1,
* 所以这里执行 sizeCtl 减 1,代表当前任务完成
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
// 当前任务结束,但是整体任务还未完成
return;
}
// 此时 sizeCtl == (rs << RESIZE_STAMP_SHIFT) + 2,说明所有的任务都执行完了
finishing = advance = true;
i = n; // recheck before commit
}
}
// 否则,获取 table 中位置为 i 的头结点,且为 null
else if ((f = tabAt(tab, i)) == null) {
// 在当前位置设置一个空的 ForwardingNode 节点
advance = casTabAt(tab, i, null, fwd);
}
// 否则,当前位置已经是一个 ForwardingNode,代表正在执行迁移工作
else if ((fh = f.hash) == MOVED) {
advance = true; // already processed
}
// 否则,开始对当前结点执行迁移工作
else {
synchronized (f) {
if (tabAt(tab, i) == f) { // 再次校验结点
Node<K, V> ln, hn;
// 当前 bin 是一个链表
if (fh >= 0) {
int runBit = fh & n; // n 为老 table 的长度
Node<K, V> lastRun = f;
// 遍历当前链表,找到最后 p.hash & n 值相同的第一个结点
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// runBit == 0 表示还在老 table 原先的位置
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 此处 runBit 等于老 table 的长度,即 n
else {
hn = lastRun;
ln = null;
}
// 以 lastRun 为界
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0) {
ln = new Node<>(ph, pk, pv, ln);
} else {
hn = new Node<>(ph, pk, pv, hn);
}
}
// 将其中一个链表放置在 nextTable 的 i 位置
setTabAt(nextTab, i, ln);
// 将另外一个链表放置在 nextTable 的 i+n 位置
setTabAt(nextTab, i + n, hn);
// 设置当前 table 的 i 位置为 ForwardingNode 空结点,代表已经处理完
setTabAt(tab, i, fwd);
advance = true;
}
// 当前 bin 是一颗红黑树
else if (f instanceof TreeBin) {
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null) {
lo = p;
} else {
loTail.next = p;
}
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null) {
hi = p;
} else {
hiTail.next = p;
}
hiTail = p;
++hc;
}
}

/* 如果将红黑树一分为二后,结点数目小于 6,则将红黑树转换成链表 */

ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<>(hi) : t;

// 将其中一个红黑树(或链表)放置在 nextTable 的 i 位置
setTabAt(nextTab, i, ln);
// 将另外一个红黑树(或链表)放置在 nextTable 的 i+n 位置
setTabAt(nextTab, i + n, hn);
// 设置当前 table 的 i 位置为 ForwardingNode 空结点,代表已经处理完
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

方法的实现很复杂,不过整体流程可以概括为 2 大部分:

  1. 如果 nextTable 未初始化,则先执行初始化操作,新的 table 容量翻倍
  2. 执行迁移任务

其中过程 1 比较简单,不过需要注意的是并不是所有触发 transfer 方法都需要执行初始化 table 的操作,只有主动触发扩容的线程需要执行该操作,对于后来加入“帮忙”的线程会跳过过程 1,直接进入过程 2。

过程 2 通过 transferIndex 实例变量协调任务的分配,并为每个线程分配 stride 个结点进行迁移,任务分配的过程实际上就是确定当前线程迁移结点的上下界的过程,该过程位于 while 循环中(即代码注释 2.1),该循环整体上就是一个 CAS 操作,如果迁移任务已经完成,或者没有剩余的结点可以迁移(实例变量 transferIndex 小于等于 0),则退出 CAS,否则尝试为本次任务分配新的上下界,同时更新 transferIndex 值。

接下来正式开始迁移工作,整体流程可以概括为:

  1. 检查整体迁移任务是否完成,如果完成则更新 table 和 sizeCtl 值
  2. 否则,检查当前任务是否已经完成,如果完成则退出本次任务
  3. 对于仍在进行中的任务会继续执行迁移操作,如果当前结点是一个空结点,则在该位置设置一个空的 ForwardingNode 结点,用于标记当前结点正在迁移中
  4. 否则,如果当前结点是一个 ForwardingNode 结点,即当前结点正在迁移中,进入下一轮任务分配
  5. 否则,对当前结点执行迁移操作

下面针对流程中的一些关键点进行说明,首先来看一下过程 2 相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
* 基于 CAS 将 sizeCtl 减 1
* 在迁移操作开始前会将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2,每一个线程加入迁移任务就会将 sizeCtl 加 1,
* 所以这里执行 sizeCtl 减 1,代表当前任务完成
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
// 当前任务结束,但是整体任务还未完成
return;
}
// 此时 sizeCtl == (rs << RESIZE_STAMP_SHIFT) + 2,说明所有的任务都执行完了
finishing = advance = true;
i = n; // recheck before commit
}

前面我们曾提到当新增一个线程支持迁移任务时会执行 U.compareAndSwapInt(this, SIZECTL, sc, sc + 1) 操作,并且在扩容操作开始前会设置 sizeCtl 的值为 (rs << RESIZE_STAMP_SHIFT) + 2,而这里在完成一个任务的时候会执行 U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1) 操作将 sizeCtl 的值减 1。上面的代码会判定当前 sizeCtl 值是否等于 (rs << RESIZE_STAMP_SHIFT) + 2,如果相等则说明整体扩容任务完成,否则仅说明当前任务完成,将线程任务数减 1。

接下来我们继续来看一个结点迁移的过程,迁移区分链表和红黑树,不过基本思想是想通的,这里以链表进行说明,相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 当前 bin 是一个链表
if (fh >= 0) {
int runBit = fh & n; // n 为老 table 的长度
Node<K, V> lastRun = f;
// 遍历当前链表,找到最后 p.hash & n 值相同的第一个结点
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// runBit == 0 表示还在老 table 原先的位置
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 此处 runBit 等于老 table 的长度,即 n
else {
hn = lastRun;
ln = null;
}
// 以 lastRun 为界
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0) {
ln = new Node<>(ph, pk, pv, ln);
} else {
hn = new Node<>(ph, pk, pv, hn);
}
}
// 将其中一个链表放置在 nextTable 的 i 位置
setTabAt(nextTab, i, ln);
// 将另外一个链表放置在 nextTable 的 i+n 位置
setTabAt(nextTab, i + n, hn);
// 设置当前 table 的 i 位置为 ForwardingNode 空结点,代表已经处理完
setTabAt(tab, i, fwd);
advance = true;
}

这一段代码如果希望更好的理解,建议自己模拟一个 table,并 debug 一下执行流程。其实也不难,这段代码的工作就是将一个链表的拆分成两个链表,并将它们插入到新 table 适当的位置。假设老的 table 长度为 16,那么上面的实现有一个巧妙的地方在于对链表中所有结点的哈希值执行 p.hash & n 操作,其结果不是 0 就是 16(老 table 的长度),所以我们可以依据 p.hash & n 的值将一个链表拆分成两个链表,其中值均为 0 的结点构成的链表仍然放置在新 table 的当前位置 i,而值均为 16 的结点构成的链表则放置在新的位置,即 i + 16。变量 lastRun 所表示的结点实际上是最后几个具备相同 p.hash & n 值的连续结点的最左边结点,因为这样可以减少该结点右边几个结点的迁移工作,因为它们具备相同的 p.hash & n 值,自然也就位于同一个链表上。

获取指定键值:get

方法 put 的执行流程可以加深我们对于 ConcurrentHashMap 存储结构的理解,而理解了 ConcurrentHashMap 的存储结构,那么分析 get 方法的运行机制也是水到渠成的事情,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V get(Object key) {
Node<K, V>[] tab;
Node<K, V> e, p;
int n, eh;
K ek;
// 计算 key 的 hash 值
int h = spread(key.hashCode());
// table 表不为空,且 key 对应的 table 头结点存在
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek))) {
// 找到对应的 key,返回 value
return e.val;
}
}
// 当前 bin 为树,执行 find 方法检索
else if (eh < 0) {
return (p = e.find(h, key)) != null ? p.val : null;
}
// 当前 bin 是链表,直接遍历检索
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
return e.val;
}
}
}
return null;
}

方法首先依据相同的实现计算 key 的哈希值,然后定位 key 在 table 中的 bin 位置。如果 bin 结点存在,则依据当前 bin 类型(链表或红黑树)检索目标值。

参考

  1. jdk 1.8 源码
  2. Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.