ReentrantReadWriteLock
https://blog.csdn.net/fxkcsdn/article/details/82217760
- 读锁被锁定后,任何线程再获取写锁会阻塞,读锁可以继续获取
- 写锁被获取后,获取了写锁的线程可以继续获取读锁或写锁,其它线程任何锁都不能获取
state被分为两份:高16位用于share(数目是持有的锁的个数,重入也计入),低16位用于exclusive(持有的锁的个数,重入当然得计入)
字段
基本使用
1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws IOException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
readLock.lock();
readLock.unlock();
writeLock.lock();
writeLock.lock();
}关键方法和字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// 缓存的是上一次获取过读锁的非firstReader线程的HoldCounter,用作快速查找的缓存,没必要立即可见不需要是volatile
// 使用tid而不是Thread引用的原因是避免影响垃圾回收
private transient HoldCounter cachedHoldCounter;
// 记录每个线程获取的读锁的数量,当降为0时则把它移除(Removed whenever a thread's read hold count drops to 0.)
private transient ThreadLocalHoldCounter readHolds;
/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* https://stackoverflow.com/questions/42588079/what-is-out-of-thin-air-safety
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.(意义在于,如果只有一个线程获取读锁的话,记录读锁数目所需的消耗会大大降低)
*/
// firstReader不会创建CountHolder也不会被放到readHolds里,毕竟每次查询的时候都会先让currentThread与firstReader比较,如果不是再去比较cachedHoldCounter的tid,如果还不是,最后再去查询readHolds,尽量降低总体查询的时间
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
}
}公平和非公平策略的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 若可以获取写锁,则队列内的所有节点都能直接获取
final boolean writerShouldBlock() {
return false; // writers can always barge(乱闯)
}
// 若可以获取读锁,当明确第一个节点(head release后第一个唤醒的节点)是写锁的时候才阻塞,否则直接获取读锁(尽量让head release后第一个唤醒的是自己)
final boolean readerShouldBlock() {
// 如果能够确定第一个节点一定是独占的,则返回true,其实是判断head的下一个节点是否是独占模式的节点
// AQS的方法
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
// 必须等待前面的节点完成
final boolean writerShouldBlock() {
// AQS的方法
return hasQueuedPredecessors();
}
// 必须等待前面的节点完成
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}抽象内部类Sync
tryAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate(饱和), fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible(符合条件的) for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 有读锁(c != 0 and w == 0) 或者 有写锁(也可能同时有读锁但肯定是同一个线程的)但不是自己的
// 说明没有锁升级功能(即使只有自己一个人获取读锁,也不能直接获取写锁)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 只有本线程持有写锁,并尝试获取多个写锁
// 超出写锁数量范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 由于只有本线程持有写锁,所以直接修改state
setState(c + acquires);
return true;
}
// 此时说明读锁和写锁的数量都为0,需要根据策略决定当前线程是否需要去获取(即执行compareAndSetState)
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 获取写锁成功
setExclusiveOwnerThread(current);
return true;
}tryRelease
1
2
3
4
5
6
7
8
9
10
11
// 如果tryRelease返回true代表需要unpark
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc); // 写锁在当前线程,所以state可以用setState设置
return free;
}tryAcquireShared
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible(符合条件的) for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed(推迟) to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁被其它线程持有返回-1代表失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 写锁为0或者被自己持有,此时可以获取读锁,但是需要根据对应策略判断是否阻塞
// SHARED_UNIT:share部分值为1,exclusive部分为0,相当于对share部分加1
if (!readerShouldBlock() && // 不需要阻塞
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 进入if内部代表已经获取成功
// 之前读锁为0, 作为0~1的线程,设置当前线程为firstReader
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程作为firstReader读锁重入了
firstReaderHoldCount++;
} else {
// cachedHoldCounter缓存的是上一次获取过读锁的非firstReader线程的HoldCounter
// 现在需要更新cachedHoldCounter和它的count
HoldCounter rh = cachedHoldCounter;
// 如果cachedHoldCounter为空,或者不是自己,那么设置成自己
if (rh == null || rh.tid != getThreadId(current))
// get()如果当前线程没有创建过,那么返回一个新的HoldCounter,count==0
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 在tryReleaseShared里,如果readHolds里的HoldCounter count为0会把它从readHolds移除(为了保证firstReader线程不出现在readHolds)。所以需要重新添加到readHolds
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 需要block或者获取失败,因此进入完整的获取操作
return fullTryAcquireShared(current);
}
/**
循环尝试:一旦判断写锁被其它线程持有(没有尝试的必要) 或者 线程需要阻塞且不满足重入条件(出于公平性不应该尝试)时返回-1,否则可以尝试获取资源
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
// 如果写锁被其它线程持有返回-1代表失败
if (getExclusiveOwnerThread() != current)
return -1;
// else 当前线程持有写锁
} else if (readerShouldBlock()) { // 当前线程一定未持有写锁,如果此时获取读锁需要block则:
// Make sure we're not acquiring read lock reentrantly
// 自己拥有读锁,因为firstReader是最近的将读锁从0-1的线程,并且没有释放掉读锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 懒加载当前线程的HoldCounter
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 当前线程没有获取过读锁(不满足重入条件),且需要阻塞,所以不应该进行尝试
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 可以尝试获取资源
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 资源获取成功
// 下面的代码与tryAcquireShared的完全一致,不做解析
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}tryReleaseShared
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 下面的代码在减少count,firstReader完全释放了锁就设置成null,其它线程完全释放了锁就从readHolds删除
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// state的读锁数减一
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 当nextc不为零时,意味着还有读锁被持有。那么写线程没必要唤醒,读线程一定能获取到读锁(通过propagate机制或nonfair读锁,只要不超出资源数)也没必要在释放资源时进行唤醒
// 只有当读锁全部释放完毕时,此时需要唤醒可能一直在队首等待着的写线程
return nextc == 0;
}
}