深入理解 JUC:ReentrantReadWriteLock

上一篇我们分析了 ReentrantLock 锁的设计与实现,对于大部分并发场景来说已经能够满足同步加锁的需求,但是相对于本文将要介绍的 ReentrantReadWriteLock 锁来说,ReentrantLock 主要存在的不足在于不区分读操作和写操作。考虑读多写少的场景,如果将读操作和写操作一视同仁的对待,那么线程之间读操作是互斥的,同一时间只允许一个线程读取数据,势必会影响系统的吞吐性能,并且读操作并不会导致数据的不一致性,这种情况下应该允许多个线程对同一份数据执行并发读取。

ReentrantReadWriteLock 锁在设计上认识到了 ReentrantLock 锁在读多写少场景下的不足,于是分别设计了读锁和写锁,在满足同步加锁需求的基础上支持读锁能够同时被多个线程持有。ReentrantReadWriteLock 锁的特性包括:

  1. 读锁和写锁均是可重入的,并允许多个不同线程同时持有读锁。
  2. 对于不同线程而言,读锁与写锁之间是互斥的,写锁与写锁之间也是互斥的。
  3. 对于相同线程而言,在获取到写锁的前提下,仍然可以获取读锁,反之则不行。
  4. 支持公平锁机制,默认为非公平锁。

ReentrantReadWriteLock 示例

在开始分析 ReentrantReadWriteLock 的实现内幕之前,我们先以官方示例来回忆一下 ReentrantReadWriteLock 的基本使用,具体实现如下(做了一些小小的改动):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Object data;
private volatile boolean cacheInvalid = true;

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();

void processCachedData() {
readLock.lock();
if (cacheInvalid) {
readLock.unlock(); // 在获取写锁之前必须释放读锁,否则会造成死锁
writeLock.lock();
try {
// 再次校验状态,考虑其它线程可能已经初始化了缓存数据
if (cacheInvalid) {
// 初始化缓存数据
data = new Object();
cacheInvalid = false;
}
// 获取读锁,在持有写锁的情况下,同一个线程仍然可以获取读锁
readLock.lock();
} finally {
// 释放写锁
writeLock.unlock();
}
}

try {
// 使用缓存数据
this.use(data);
} finally {
// 释放读锁
readLock.unlock();
}
}

线程在读取缓存数据 data 之前需要获取读锁(ReadLock),如果判断缓存数据 data 未被初始化,则尝试获取写锁(WriteLock)以初始化缓存数据。因为读锁是共享的,所有上述示例允许多个线程并发读取 data 数据,但是一旦某个线程检测到 data 未被初始化,则尝试获取写锁并更新缓存数据,期间其它线程需要阻塞等待初始化过程的完成。

上述示例还暗含了一个锁降级的过程,我们将在持有写锁的前提下获取到读锁,然后释放写锁的过程称为锁降级(先释放写锁再获取读锁的过程并不能称为锁降级)。那么锁降级操作的意义何在呢?以上述示例为例,当线程完成对缓存数据 data 的更新之后,并没有立即释放写锁,而是先获取到了读锁后再释放写锁,这样就能够避免其它线程再次获取到写锁去更新缓存数据(因为 ReentrantReadWriteLock 不支持在读锁被持有的情况下去获取写锁)。

ReentrantReadWriteLock 实现内幕

下面来分析一下 ReentrantReadWriteLock 的设计与实现,ReentrantReadWriteLock 在设计上并没有实现 Lock 接口,而是实现了 ReadWriteLock 接口,该接口定义了返回读锁和写锁对象的方法,如下:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

ReentrantReadWriteLock 在内部定义了 ReadLock 和 WriteLock 两个内部类,用于分别表示读锁和写锁,ReadLock 和 WriteLock 均实现自 Lock 接口。与 ReentrantLock 一样,ReentrantReadWriteLock 也支持公平锁(FairSync)与非公平锁(NonfairSync)机制,并且允许在构造 ReentrantReadWriteLock 对象时通过参数指定是否使用公平锁,默认使用非公平锁。

FairSync 和 NonfairSync 均派生自 Sync 抽象类,该抽象类继承自 AbstractQueuedSynchronizer,这一点设计上与 ReentrantLock 如出一辙。Sync 类的字段定义和部分基础方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
abstract static class Sync extends AbstractQueuedSynchronizer {

static final int SHARED_SHIFT = 16;
/** 共享锁(读锁)单位值 */
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** 共享锁(读锁)最大线程数 */
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

/** 独占锁(写锁)掩码 */
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 读锁线程数目 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

/** 写锁重入次数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

/** 记录各个线程(除第 1 个)获取读锁的重入次数 */
private transient ThreadLocalHoldCounter readHolds;

/** 最近 1 次获取到读锁的线程计数器 */
private transient HoldCounter cachedHoldCounter;

/** 记录第 1 个获取到读锁的线程 */
private transient Thread firstReader = null;
/** 记录第 1 个获取到读锁的线程重入次数 */
private transient int firstReaderHoldCount;

// ... 省略方法定义

}

ReentrantReadWriteLock 复用 AQS 的 state 字段来记录线程的重入次数,区别于 ReentrantLock 锁,因为区分了读锁和写锁,ReentrantReadWriteLock 将 state 字段由中间分割成了两段,其中高位区间(16 位)用于记录持有读锁的线程数(重入则累加),低位区间(16 位)用于记录写锁的重入次数。

前面在分析 AQS 同步队列的 Node 内部类定义时,介绍了结点分为共享(SHARED)和独占(EXCLUSIVE)两类模式,对应到 ReentrantReadWriteLock,我们可以认为读锁是共享的,而写锁是独占的,同一时间读锁能够被多个线程持有,而写锁只能被一个线程持有。Sync 定义了 Sync#sharedCountSync#exclusiveCount 方法,分别用于获取读锁线程重入次数和写锁线程重入次数。

属性 Sync#readHolds 用于记录除第 1 个获取到读锁的线程外的剩余线程的重入次数,类型为 ThreadLocalHoldCounter。ThreadLocalHoldCounter 类继承自 ThreadLocal,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
@Override
public HoldCounter initialValue() {
return new HoldCounter();
}
}

static final class HoldCounter {
int count = 0; // 重入次数
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread()); // 线程 ID
}

ThreadLocalHoldCounter 覆盖实现了 ThreadLocal#initialValue 方法,当某个线程第一次访问 Sync#readHolds 属性时,会为该线程创建一个 HoldCounter 对象(下文简称线程计数器),用于记录当前线程 ID 和重入次数。

属性 Sync#cachedHoldCounter 为 HoldCounter 类型,用于记录最近一次获取到读锁的线程计数器对象,其目的是为了提升 ReadLock 的性能,因为大多数时候调用 ReadLock#unlock 方法释放锁资源的线程就是刚刚(即最近一次)调用 ReadLock#lock 方法获取锁资源的线程,引入 Sync#cachedHoldCounter 字段能够快速获取该线程的计数器对象,避免从 Sync#readHolds 中查找。

属性 Sync#firstReaderSync#firstReaderHoldCount 用于记录首次获取到读锁的线程对象和该线程重入读锁的次数。之所以要设计这样两个字段,同样是为了性能的考量,考虑只有一个线程加锁的场景,此时引入这两个字段就能避免为该线程创建一个 HoldCounter 计数器对象,同时也就避免了从 Sync#readHolds 中查找

读锁:ReadLock

本小节来分析一下 ReadLock 的实现机制,ReadLock 实现自 Lock 接口,针对接口方法的实现均委托给 Sync 对象执行。下面首先来看一下 ReadLock#tryLock() 方法,之前的文章中我们已经介绍过 Lock 接口的 Lock#tryLock() 方法用于尝试获取锁资源,不管获取成功与否该方法都会立即返回,如果获取成功则返回 true,否则返回 false。ReadLock 在实现该方法时直接调用了 Sync 的 Sync#tryReadLock 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// ReadLock#tryLock()
public boolean tryLock() {
return sync.tryReadLock();
}

// Sync#tryReadLock
final boolean tryReadLock() {
// 获取当前线程对象
Thread current = Thread.currentThread();
for (; ; ) {
// 获取 state 状态值
int c = this.getState();
// 如果写锁被其它线程持有,则立即返回 false
if (exclusiveCount(c) != 0 && this.getExclusiveOwnerThread() != current) {
return false;
}
// 获取读锁重入次数
int r = sharedCount(c);
if (r == MAX_COUNT) {
// 读锁重入次数已达上限
throw new Error("Maximum lock count exceeded");
}
// 基于 CAS 操作更新 state 状态值
if (this.compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁被首次被获取
if (r == 0) {
firstReader = current; // 记录第 1 个获取到读锁的线程
firstReaderHoldCount = 1; // 记录第 1 个获取到读锁的线程重入次数
}
// 首个获取该读锁的线程是当前线程,更新线程的重入次数
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 当前线程不是首个获取该读锁的线程,更新计数器
else {
// 最近 1 次获取到读锁的线程计数器
HoldCounter rh = cachedHoldCounter;
// 当前记录的最近 1 次获取到读锁的线程不是当前线程,则更新 cachedHoldCounter 计数器
if (rh == null || rh.tid != getThreadId(current)) {
cachedHoldCounter = rh = readHolds.get();
}
// 当前线程是最近 1 次获取到读锁的线程
else if (rh.count == 0) {
readHolds.set(rh);
}
// 线程重入次数加 1
rh.count++;
}
return true;
}
}
}

尝试获取读锁资源的操作在拿到当前线程对象之后,会基于 CAS 尝试修改 state 的值,具体执行过程可以概括为:

  1. 获取 ReentrantReadWriteLock 的 state 状态值;
  2. 如果当前 ReentrantReadWriteLock 的写锁已被其它线程持有,则返回 false,因为不同线程之间读锁与写锁之间是互斥的;
  3. 否则,获取当前 ReentrantReadWriteLock 读锁的线程重入次数,校验是否达到上限值(即 65535),如果是则抛出异常;
  4. 否则,基于 Unsafe 尝试更新 state 状态值,即将读锁重入次数加 1;
  5. 如果更新成功,则修改当前线程对应的计数器,失败则退回到步骤 1 继续重试。

除了上面介绍的 ReadLock#tryLock() 方法,ReadLock 针对其它获取读锁资源的方法均委托给 AQS 执行,包括:

  • ReadLock#lock:直接调用 AbstractQueuedSynchronizer#acquireShared 方法,请求资源数为 1。
  • ReadLock#lockInterruptibly:直接调用 AbstractQueuedSynchronizer#acquireSharedInterruptibly 方法,请求资源数为 1。
  • ReadLock#tryLock(long, TimeUnit):直接调用 AbstractQueuedSynchronizer#tryAcquireSharedNanos 方法,请求资源数为 1。

AQS 针对上述方法的实现,我们在前面的文章中已经专门介绍过,这些方法均调用了模板方法 AbstractQueuedSynchronizer#tryAcquireShared 尝试获取读锁资源,Sync 类针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for lock wrt state, so ask if it should block because of queue policy.
* If not, try to grant by CASing state and updating count.
* Note that step does not check for reentrant acquires, which is postponed to full version
* to avoid having to check hold count in the more typical non-reentrant case.
* 3. If step 2 fails either because thread apparently not eligible or CAS fails or count saturated,
* chain to version with full retry loop.
*/

// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取 state 状态值
int c = this.getState();
// 如果写锁被其它线程持有,则立即返回 -1
if (exclusiveCount(c) != 0 && this.getExclusiveOwnerThread() != current) {
return -1;
}
// 获取持有读锁的线程数
int r = sharedCount(c);
if (!this.readerShouldBlock() // 对于公平锁,保证公平性,对于非公平锁,判断写锁是否被占用
&& r < MAX_COUNT // 重入次数未达上限
&& this.compareAndSetState(c, c + SHARED_UNIT)) { // 修改 state 状态值成功
// 读锁被首次被获取
if (r == 0) {
firstReader = current; // 记录第 1 个获取到读锁的线程
firstReaderHoldCount = 1; // 记录第 1 个获取到读锁的线程重入次数
}
// 首个获取该读锁的线程是当前线程,更新线程的重入次数
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 当前线程不是首个获取该读锁的线程,更新计数器
else {
// 最近 1 次获取到读锁的线程计数器
HoldCounter rh = cachedHoldCounter;
// 当前记录的最近 1 次获取到读锁的线程不是当前线程,则更新 cachedHoldCounter 计数器
if (rh == null || rh.tid != getThreadId(current)) {
cachedHoldCounter = rh = readHolds.get();
} else if (rh.count == 0) {
readHolds.set(rh);
}
// 线程重入次数加 1
rh.count++;
}
return 1;
}

/*
* 执行到这里,需要满足以下条件之一:
* 1. 对于公平锁而言,前面有等待的线程,为了保证公平性,需要让前面的线程优先获取锁;
* 2. 对于非公平锁而言,写锁已经被持有;
* 3. 读锁重入次数达到上限
* 4. 更新 state 字段失败,说明期间有其它线程获取到锁对象(读锁、写锁)
*/

// 继续尝试获取读锁,更多考虑一些重入的场景
return this.fullTryAcquireShared(current);
}

上述方法尝试获取读锁资源的执行流程可以概括为:

  1. 获取当前线程对象和 state 状态值;
  2. 如果写锁已经被其它线程持有,则获取读锁失败,返回 -1;
  3. 如果写锁未被持有(非公平锁),或者当前没有排队等待获取锁的线程(公平锁),且读锁重入次数未达到上限,则尝试更新 state 字段,即重入次数加 1;
  4. 如果更新成功,则更新线程计数器;
  5. 如果更新不成功,或者 3 中的条件不满足,则继续执行 Sync#fullTryAcquireShared 方法尝试获取读锁。

方法 Sync#fullTryAcquireShared 的实现逻辑与上述方法大同小易,区别在于引入了重试机制,主要用来处理 CAS 操作失败和线程重入的场景。之所以这样设计,主要还是考虑到性能上的需求,对于非重入的场景来说避免了查询线程计数器的开销。方法 Sync#fullTryAcquireShared 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (; ; ) {
// 获取 state 状态值
int c = this.getState();
// 当前写锁已被持有
if (exclusiveCount(c) != 0) {
// 如果持有写锁的线程不是当前线程,则尝试加读锁失败
if (this.getExclusiveOwnerThread() != current) {
return -1;
}
// else we hold the exclusive lock; blocking here would cause deadlock.
}
// 对于公平锁而言,前面存在等待的线程
else if (this.readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 更新计数器,如果当前线程未持有读锁,则移除计数器
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0) {
readHolds.remove();
}
}
}
if (rh.count == 0) {
return -1;
}
}
}

/*
* 执行到这一步说明:
* 1. 写锁未被持有,或者持有写锁的线程是当前线程
* 2. 前面没有排队等待的线程,或者当前线程已经持有了读锁
*/

// 读锁重入次数达到上限
if (sharedCount(c) == MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 修改读锁重入次数(加 1)
if (this.compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null) {
rh = cachedHoldCounter;
}
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
} else if (rh.count == 0) {
readHolds.set(rh);
}
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

上述方法和前面分析的 AbstractQueuedSynchronizer#tryAcquireShared 方法均调用了 Sync#readerShouldBlock 方法用于判断当前获取读锁的线程是否应该阻塞。Sync 中仅仅声明了该方法,具体实现交由 FairSync 和 NonfairSync 子类实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// FairSync#readerShouldBlock
final boolean readerShouldBlock() {
return this.hasQueuedPredecessors();
}

// NonfairSync#readerShouldBlock
final boolean readerShouldBlock() {
return this.apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
}

FairSync 在实现上通过调用 AbstractQueuedSynchronizer#hasQueuedPredecessors 方法判断同步队列中是否有排在前面等待获取锁的线程,如果有的话则让渡这些线程,以保证锁的公平性。NonfairSync 在实现上则判断当前同步队列队头等待线程节点是否以 EXCLUSIVE 模式在等待,也就是在等待获取写锁,如果是则当前获取读锁的线程应该让渡获取写锁的线程。

ReadLock 释放读锁资源的方法 ReadLock#unlock 同样是直接委托给 AQS 处理,调用的是 AbstractQueuedSynchronizer#releaseShared 方法。AQS 通过调用模板方法 AbstractQueuedSynchronizer#tryReleaseShared 尝试释放共享资源,成功则返回 true。Sync 针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 如果当前线程是首个获取读锁的线程,则更新对应的重入次数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) {
firstReader = null;
} else {
firstReaderHoldCount--;
}
}
// 否则,修改线程计数器
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
}
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0) {
throw this.unmatchedUnlockException();
}
}
--rh.count;
}

// 基于 CAS 修改 state 状态值
for (; ; ) {
int c = this.getState();
int nextc = c - SHARED_UNIT;
if (this.compareAndSetState(c, nextc)) {
/*
* Releasing the read lock has no effect on readers,
* but it may allow waiting writers to proceed if both read and write locks are now free.
*/
return nextc == 0; // 如果 nextc 为 0,则说明当前锁(读锁、写锁)未被任何线程持有
}
}
}

整体逻辑比较简单,如代码注释。

写锁:WriteLock

本小节一起来分析一下 WriteLock 的实现机制,WriteLock 同样实现自 Lock 接口,针对接口方法的实现均委托给 Sync 对象执行。下面首先来看一下 WriteLock#tryLock() 方法,WriteLock 在实现该方法时直接调用了 Sync#tryWriteLock 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean tryWriteLock() {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取 state 状态值
int c = this.getState();
// 如果不为 0,则说明当前锁(读锁、写锁)已被线程持有
if (c != 0) {
// 获取写锁的重入次数
int w = exclusiveCount(c);
if (w == 0 // 写锁重入次数为 0,说明读锁已被持有,获取写锁失败
|| current != this.getExclusiveOwnerThread()) { // 写锁已经被其它线程持有,获取写锁失败
return false;
}
// 写锁重入次数达到上限
if (w == MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
}
// 更新写锁重入次数(加 1)
if (!this.compareAndSetState(c, c + 1)) {
return false;
}
// 记录当前持有写锁的线程对象
this.setExclusiveOwnerThread(current);
return true;
}

尝试获取写锁的具体执行过程可以概括为:

  1. 获取当前线程对象和 state 状态值;
  2. 如果 state 状态值不为 0,则说明写锁或读锁已被线程持有;
  3. 如果是读锁被持有,则尝试获取写锁失败;否则,如果持有写锁的线程不是当前线程,则尝试获取写锁失败;
  4. 判断写锁重入次数是否达到上限(即 65535),如果是则抛出异常;
  5. 尝试修改写锁重入次数(加 1),修改成功即加锁成功,如果失败则返回 false,对于成功获取到写锁的线程对象需要予以记录,并返回 true;

除了 WriteLock#tryLock() 方法,WriteLock 针对其它获取写锁资源的方法均委托给 AQS 执行,包括:

  • WriteLock#lock:直接调用 AbstractQueuedSynchronizer#acquire 方法,请求资源数为 1。
  • WriteLock#lockInterruptibly:直接调用 AbstractQueuedSynchronizer#acquireInterruptibly 方法,请求资源数为 1。
  • WriteLock#tryLock(long, TimeUnit):直接调用 AbstractQueuedSynchronizer#tryAcquireNanos 方法,请求资源数为 1。

AQS 针对上述方法的实现,我们同样在前面的文章中已经专门分析过,这些方法均调用了模板方法 AbstractQueuedSynchronizer#tryAcquire 尝试获取资源,Sync 类针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or
* queue policy allows it. If so, update state and set owner.
*/

// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取 state 状态值
int c = this.getState();
// 获取写锁的重入次数
int w = exclusiveCount(c);
// 如果不为 0,则说明当前锁(读锁、写锁)已被线程持有
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 // 写锁重入次数为 0,说明读锁已被持有,获取写锁失败
|| current != this.getExclusiveOwnerThread()) { // 写锁已经被其它线程持有,获取写锁失败
return false;
}
// 写锁重入次数达到上限
if (w + exclusiveCount(acquires) > MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 更新 state 状态值
this.setState(c + acquires);
return true;
}

// 当前锁对象未被线程持有,对于非公平锁则抢占式获取锁对象,对于公平锁则等待前面的线程优先获取锁
if (this.writerShouldBlock() || !this.compareAndSetState(c, c + acquires)) {
return false;
}
// 记录当前持有写锁的线程对象
this.setExclusiveOwnerThread(current);
return true;
}

上述方法尝试获取写锁资源的执行流程可以概括为:

  1. 获取当前线程对象、state 状态值,以及写锁的重入次数;
  2. 如果 state 状态值不为 0,但写锁重入次数为 0,说明当前读锁已被持有,获取写锁失败;
  3. 如果 state 状态值不为 0,且写锁重入次数也不为 0,说明当前写锁已被持有,如果持有该写锁的线程不是当前线程,则获取写锁失败;
  4. 否则,说明当前线程已经持有该写锁资源,本次加锁相当于重入,此时需要校验写锁重入次数是否已达上限,是则抛出异常,否则说明获取写锁成功,更新 state 状态值;
  5. 如果 2 中检测到 state 状态值为 0,说明当前锁对象未被任何线程持有,对于公平锁而言需要考虑公平性,优先让排在前面的线程先获取锁;
  6. 尝试更新 state 状态值,如果成功则说明加锁成功,需要记录当前持有写锁的线程对象并返回 true,否则返回 false。

上述方法通过调用 Sync#writerShouldBlock 方法来判断当前获取写锁资源的线程是否应该被阻塞。Sync 中同样仅仅声明了该方法,具体实现交由 FairSync 和 NonfairSync 子类实现,如下:

1
2
3
4
5
6
7
8
9
// FairSync#writerShouldBlock
final boolean writerShouldBlock() {
return this.hasQueuedPredecessors();
}

// NonfairSync#writerShouldBlock
final boolean writerShouldBlock() {
return false; // writers can always barge
}

由实现可以看出在获取写锁时,对于公平锁需要让渡排在同步队列前面的等待线程,而非公平锁则允许当前线程抢占获取写锁资源。

WriteLock 释放写锁资源的方法 WriteLock#unlock 同样是直接委托给 AQS 处理,调用的是 AbstractQueuedSynchronizer#release 方法。AQS 通过调用模板方法 AbstractQueuedSynchronizer#tryRelease 尝试释放独占资源,成功则返回 true。Sync 针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryRelease(int releases) {
// 如果写锁未被当前线程持有,则抛出异常
if (!this.isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 计算释放之后,剩余的重入次数
int nextc = this.getState() - releases;
// 如果写锁重入次数为 0,则说明锁被释放
boolean free = exclusiveCount(nextc) == 0;
if (free) {
// 写锁已经被释放,清空持有写锁的线程对象
this.setExclusiveOwnerThread(null);
}
// 更新 state 状态值
this.setState(nextc);
return free;
}

整体逻辑比较简单,如代码注释。

总结

本文我们通过一个官方示例演示了 ReentrantReadWriteLock 的基本使用,并分析了其内在实现机制。ReentrantReadWriteLock 相对于前面介绍的 ReentrantLock 更加适合读多写少的业务场景,通过将读锁与写锁相分离的设计来提升读操作的吞吐性能。然而,灵活的背后也暗藏着更加容易出错的风险,一些不规范的使用很容易造成死锁,比如在已经持有读锁的基础上仍然尝试获取写锁,使用时需要留心。

参考

  1. JDK 1.8 源码
  2. The java.util.concurrent Synchronizer Framework

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.