https://blog.csdn.net/fxkcsdn/article/details/82217760

  • 读锁被锁定后,任何线程再获取写锁会阻塞,读锁可以继续获取
  • 写锁被获取后,获取了写锁的线程可以继续获取读锁或写锁,其它线程任何锁都不能获取

state被分为两份:高16位用于share(数目是持有的锁的个数,重入也计入),低16位用于exclusive(持有的锁的个数,重入当然得计入)

字段

基本使用

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws IOException {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    readLock.lock();
    readLock.unlock();
    writeLock.lock();
    writeLock.lock();
}

关键方法和字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 缓存的是上一次获取过读锁的非firstReader线程的HoldCounter,用作快速查找的缓存,没必要立即可见不需要是volatile
        // 使用tid而不是Thread引用的原因是避免影响垃圾回收
    	private transient HoldCounter cachedHoldCounter; 

        // 记录每个线程获取的读锁的数量,当降为0时则把它移除(Removed whenever a thread's read hold count drops to 0.)
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * firstReader is the first thread to have acquired the read lock.
         * firstReaderHoldCount is firstReader's hold count.
         *
         * <p>More precisely, firstReader is the unique thread that last
         * changed the shared count from 0 to 1, and has not released the
         * read lock since then; null if there is no such thread.
         *
         * <p>Cannot cause garbage retention unless the thread terminated
         * without relinquishing its read locks, since tryReleaseShared
         * sets it to null.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's out-of-thin-air guarantees for references.
         *
         * https://stackoverflow.com/questions/42588079/what-is-out-of-thin-air-safety
         *
         * <p>This allows tracking of read holds for uncontended read
         * locks to be very cheap.(意义在于,如果只有一个线程获取读锁的话,记录读锁数目所需的消耗会大大降低)
         */
        // firstReader不会创建CountHolder也不会被放到readHolds里,毕竟每次查询的时候都会先让currentThread与firstReader比较,如果不是再去比较cachedHoldCounter的tid,如果还不是,最后再去查询readHolds,尽量降低总体查询的时间
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
    }
}

公平和非公平策略的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    // 若可以获取写锁,则队列内的所有节点都能直接获取
    final boolean writerShouldBlock() {
        return false; // writers can always barge(乱闯)
    }
    // 若可以获取读锁,当明确第一个节点(head release后第一个唤醒的节点)是写锁的时候才阻塞,否则直接获取读锁(尽量让head release后第一个唤醒的是自己)
    final boolean readerShouldBlock() {
        // 如果能够确定第一个节点一定是独占的,则返回true,其实是判断head的下一个节点是否是独占模式的节点
        // AQS的方法
        return apparentlyFirstQueuedIsExclusive();
    }
}

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    // 必须等待前面的节点完成
    final boolean writerShouldBlock() {
        // AQS的方法
        return hasQueuedPredecessors();
    }
    // 必须等待前面的节点完成
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

抽象内部类Sync

tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate(饱和), fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible(符合条件的) for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 有读锁(c != 0 and w == 0) 或者 有写锁(也可能同时有读锁但肯定是同一个线程的)但不是自己的
        // 说明没有锁升级功能(即使只有自己一个人获取读锁,也不能直接获取写锁)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        
        // 只有本线程持有写锁,并尝试获取多个写锁
        
        // 超出写锁数量范围
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        
        // 由于只有本线程持有写锁,所以直接修改state
        setState(c + acquires);
        return true;
    }
    // 此时说明读锁和写锁的数量都为0,需要根据策略决定当前线程是否需要去获取(即执行compareAndSetState)
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 获取写锁成功
    setExclusiveOwnerThread(current);
    return true;
}

tryRelease

1
2
3
4
5
6
7
8
9
10
11
// 如果tryRelease返回true代表需要unpark
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc); // 写锁在当前线程,所以state可以用setState设置
    return free;
}

tryAcquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible(符合条件的) for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed(推迟) to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果写锁被其它线程持有返回-1代表失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    // 写锁为0或者被自己持有,此时可以获取读锁,但是需要根据对应策略判断是否阻塞
    // SHARED_UNIT:share部分值为1,exclusive部分为0,相当于对share部分加1
    if (!readerShouldBlock() && // 不需要阻塞
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 进入if内部代表已经获取成功
        
        // 之前读锁为0, 作为0~1的线程,设置当前线程为firstReader
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { 
            // 当前线程作为firstReader读锁重入了
            firstReaderHoldCount++;
        } else {
            // cachedHoldCounter缓存的是上一次获取过读锁的非firstReader线程的HoldCounter
            // 现在需要更新cachedHoldCounter和它的count
            HoldCounter rh = cachedHoldCounter;
            // 如果cachedHoldCounter为空,或者不是自己,那么设置成自己
            if (rh == null || rh.tid != getThreadId(current))
                // get()如果当前线程没有创建过,那么返回一个新的HoldCounter,count==0
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 在tryReleaseShared里,如果readHolds里的HoldCounter count为0会把它从readHolds移除(为了保证firstReader线程不出现在readHolds)。所以需要重新添加到readHolds
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 需要block或者获取失败,因此进入完整的获取操作
    return fullTryAcquireShared(current);
}

/**
循环尝试:一旦判断写锁被其它线程持有(没有尝试的必要) 或者 线程需要阻塞且不满足重入条件(出于公平性不应该尝试)时返回-1,否则可以尝试获取资源
*/
final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        
        if (exclusiveCount(c) != 0) {
            // 如果写锁被其它线程持有返回-1代表失败
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else 当前线程持有写锁
        } else if (readerShouldBlock()) { // 当前线程一定未持有写锁,如果此时获取读锁需要block则:
            // Make sure we're not acquiring read lock reentrantly
            // 自己拥有读锁,因为firstReader是最近的将读锁从0-1的线程,并且没有释放掉读锁
            if (firstReader == current) { 
                // assert firstReaderHoldCount > 0;
            } else {
                // 懒加载当前线程的HoldCounter
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 当前线程没有获取过读锁(不满足重入条件),且需要阻塞,所以不应该进行尝试
                if (rh.count == 0)
                    return -1;
            }
        }
        
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        
        // 可以尝试获取资源
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 资源获取成功
            // 下面的代码与tryAcquireShared的完全一致,不做解析
            
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

tryReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    
    // 下面的代码在减少count,firstReader完全释放了锁就设置成null,其它线程完全释放了锁就从readHolds删除
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    
    // state的读锁数减一
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 当nextc不为零时,意味着还有读锁被持有。那么写线程没必要唤醒,读线程一定能获取到读锁(通过propagate机制或nonfair读锁,只要不超出资源数)也没必要在释放资源时进行唤醒
            // 只有当读锁全部释放完毕时,此时需要唤醒可能一直在队首等待着的写线程
            return nextc == 0; 
    }
}