AQS (AbstractQueuedSynchronizer)
AbstractQueuedSynchronizer
字段
- exclusiveOwnerThread:The current owner of exclusive mode synchronization. 从父类
AbstractOwnableSynchronizer继承而来。 head,当队列未初始化时为null;不为null时代表同步队列头节点,并不代表的是已经获取了资源的线程或者已经释放了资源的线程或者是aqs初始化时设置的空节点tail,当队列未初始化时为null;不为null时代表最后一个节点
node字段
- waitStatus:等待状态。
CANCELLED:值为 1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点,其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
SIGNAL:值为 - 1,结点释放锁,会通知后继节点运行,每个节点在阻塞前,需要标记其前驱节点的状态为 SIGNAL。
CONDITION:值为 - 2,与 Condition 相关,该标识的节点处于另一个队列中(等待队列),结点的线程等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的结点将从等待队列转移到同步队列中并且状态设置为0,等待获取同步锁。
PROPAGATE:值为 - 3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
0 状态:值为 0,代表初始化状态。
- prev:前驱节点
- next:后继节点
- thread:当前节点代表的线程
- nextWaiter:Node 既可以作为同步队列节点使用,也可以作为 Condition 的等待队列节点使用 (将会在后面讲 Condition 时讲到)。在作为同步队列节点时,nextWaiter 可能有两个值:
Node.EXCLUSIVE = new Node()、Node.SHARED = null标识当前节点是独占模式还是共享模式;在作为等待队列节点使用时,nextWaiter 保存后继节点。
CLH(Craig, Landin, and Hagersten)队列+CONDITION队列
使用
state 的访问方式有三种:
getState()、setState()、compareAndSetState(),在AQS中都不会被调用,这说明state是提供给用户使用的。一般都在下面这些扩展方法中修改和访问。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():boolean:该线程是否正在独占资源。只有用到 condition 才需要去实现它。比如在condition.signal()时需要判断是否是独占的,不是独占的不能signal。tryAcquire(int):boolean:获取独占资源(修改state),如果获取成功返回true,否则返回 false。获取独占资源的入口方法:acquire(int a):void中调用,在此方法中如果tryAcquire失败会执行可能阻塞的操作:acquireQueued(Node),会重试tryAcquire(int):boolean。tryRelease(int):boolean:返回true代表成功,将继续执行doRelease,否则将不执行,release将直接返回。要确保release成功得在里面循环重试。tryAcquireShared(int a):int:如果以共享方式获取a个资源,将会剩下几个(r)。返回值在AQS里只用于r < 0或r >= 0的判断,用于代表是否需要开始执行可能会阻塞的获取操作(doAcquireShared(int a):void,可能获取失败/成功),并不参与计算或存储。获取共享资源的入口在acquireShared(int a):void,方法的逻辑:一般r < 0代表着state修改失败,r >= 0代表state修改成功,如果state用户代表Semaphore中的资源数,那么r >= 0代表我已经取完了a个资源(state -= a),之后直接返回即可。而如果r < 0那么要执行doAcquireShared(int a),如果资源足够那么会获取成功,否则会阻塞等待被唤醒,唤醒后会重试tryAcquireShared(int a):int。
tryReleaseShared(int a):boolean:只在释放共享资源的入口方法:releaseShared(int a):boolean中调用。释放资源(修改state),如果允许唤醒后续等待结点(doReleaseShared():void)返回 true(比如释放成功了),否则返回 false(比如释放失败了)。
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
// 阻塞获取,如果中途被interrupt,那么当方法返回时线程的interrupted状态被设置成true
public acquire(int);
// 阻塞获取,如果中途被interrupt将取消获取,并抛出异常InterruptedException,interrupted状态会被清除
public acquireInterruptibly(int);
// 进行也是阻塞获取,只是在当前时间超过截止时间时必定会自动醒来,此时如果还没有获取成功则取消获取并返回false。如果中途被interrupt也将取消获取,并抛出异常InterruptedException,interrupted状态会被清除
public tryAcquireNanos(int, long): boolean;
// 进行一次资源获取的尝试
protected tryAcquire(int): boolean;
// 以及以上所有方法的shared版本独占式流程
同一时间只能有一个线程持有锁
队列入队时修改tail,队列出队时修改head(实际是在新线程获取了锁后修改head为当前线程)。
acquire:tryAcquire失败时将本线程入队,等待被唤醒。当自己被唤醒时,代表自己在同步队列中是最前的一个节点,并且是唯一一个被唤醒的节点。这个节点醒来后将不断重试tryAcquire(由于只有这个线程是被唤醒的,这个线程只需要与那些刚进入acquire方法的线程争夺资源)并在成功时将自己作为head(也就是将之前已经执行完的线程节点出队)。release:需要唤醒队列最前面的一个等待的节点,让这个节点与刚进来的线程争夺资源cancel:当线程在队列中,并且在争夺资源的过程中抛出异常就会进入cancelAcquire。在这个方法中,假设本线程的节点是node,它会:- 压缩
node的prev,跳过已经取消的节点cancelNodes(由于自己的当前状态不是cancel,所以可以安全修改prev) - 将自己设置成
cancel - 尝试设置
node.prev.next = node.next,跳过自己以及刚才所有的cancelNodes。这里不需要保证修改成功,如果修改失败,那意味着自己已经是tail并且其它线程入队将自己连接到了node的后面。如果自己是最后一个,那么修改tail为前一个节点
- 压缩
acquire(int)
独占模式下线程获取共享资源的入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试去获取资源,默认没提供实现,true代表获取成功
// addwaiter 将本线程作为一个独占模式的Node添加到同步队列后面
// 若队列没有head则新建一个空的Node作为head,再添加到同步队列后面
// 若获取资源成功且被中断过,自己中断一次
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 在队列里循环等待条件满足,若条件满足则不断尝试获取资源,
// 这里的条件指的是前驱结点是head,并且获取锁的park期间会清除中断,
// 所以返回值为true代表发生过中断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
// 每次取出前驱,判断前驱是否是head,若是则tryAcquire,
// 成功后设置自己为head,把原先的head删掉
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// setHead总是在成功获取资源后设置
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否需要park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 独享与共享模式都用到这个方法
// node是当前线程所在节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 若前驱为signal说明它release后会unpark后继,这时可以park
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { // 1: canceled状态的节点
// 删除前面的canceled的节点,直到找到非canceled节点,对prev关系进行压缩
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 若是其它状态的将其改为signal,不理解为什么不管是否设置成功都返回false,明明设置成功可以返回true的(注释写着 Caller will need to retry to make sure it cannot acquire before parking. 保证在park前自己是获取不到资源的)
// 这里使用cas是为了避免在pred已经进入cancelAcquire的情况下,把已经修改成cancel状态的pred节点又改成了signal
// cas成功代表pred的当前状态不是cancel,可以成功park(这就是signal状态存在的意义)
// 如果在这里执行cas成功,有两种可能,但是都能保证一定会重新进入shouldParkAfterFailedAcquire方法,并且返回true,从而进行park操作。
// (1),前置节点没有变成head,那么必然重新进入此方法并返回true
// (2),前置节点变成head,那么由于前置节点持有资源且为独享资源,所以tryAcquire理应返回false,也必然进入此方法并返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}响应中断的acquire操作叫doAcquireInterruptibly和这里的acquireQueued是对应的只不过它会抛出异常,而不是通过返回值来判断是否被中断过
线程释放资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final boolean release(int arg) {
// 如果tryRelease返回true则unpark当前节点的后继或从后往前找一个未被取消的节点
if (tryRelease(arg)) {
Node h = head;
// 0 说明已经unpark了或者没有需要unpark的后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 如果是release进入的此方法,那么node就是head
// 如果是cancelAcquire进入的,那么node此时已经是cancel状态,并且前面没有singl节点,所有才需要自己来唤醒后面的节点(如果有的话)(在这种情况下实际是head节点已经结束了唤醒了node,轮到node tryAcquire时抛出了异常导致的cancelAcquire)
// 也就是说进入这个方法意味着node的前面不存在signal = -1的节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// cancelAcquire进入的不会执行
if (ws < 0)
// 表示自己已经unpark过node的后面一个节点,
compareAndSetWaitStatus(node, ws, 0);
/*
* unpark后继节点,如果为null或者被取消了(只有cancelled为正数)则从后往前找最前面的一个未被取消的节点
s = null的原因是,在addWaiter中修改tail时使用这样的代码(node是新创建的对象,node.next为null)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
当刚刚设置tail,未让pred.next指向tail时,pred.next = null
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}tryAcquire:
addWaiter(mode):将本线程作为一个独占模式的Node添加到同步队列
acquireQueued():在队列里循环等待条件满足,若条件满足则不断尝试获取资源,这里的条件指的是前驱结点是head,并且获取锁的park期间会清除中断,所以返回值为true代表发生过中断,获取资源后自己设置中断标记
关键方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 向同步队列添加节点
// mode:代表节点的初始状态
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先快速进行一次入队尝试,如果不行就执行enq进行入队过程
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 独享模式时:如果node被其它线程所唤醒后在执行tryAcquire时抛出了异常,此时也就意味着前面没有等待状态的节点了,如果后面有等待的节点,那么自己之前的状态一定是signal,所以自己需要负责唤醒后面的节点(同时也可以理解为:如果自己不断重试成功执行tryAcquire后最终会执行release,也就是说自己总是需要去执行一次唤醒操作的(如果后面的节点有需要的话))。而如果node是超时后自动醒来返回false的情况,
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors,对prev关系进行压缩,查找node压缩后的prev(pred)
// 不用担心pred为null,node一定不是head(异常只能是tryAcquireXXX抛出的,而此时setHead还未执行),并且head的waitStatus一定不是>0,所有pred不为null
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//(在此之前我们不会被其它线程的压缩操作跳过,所以我们在前面可以安全的压缩)
node.waitStatus = Node.CANCELLED;
// 对next关系进行压缩
// If we are the tail, remove ourselves.
// 设置失败说明别的线程入队了
if (node == tail && compareAndSetTail(node, pred)) {
// 不用在意是否修改成功,如果修改失败,那说明有别的node接到tail后了,是正常现象
// 这里的cas只是为了防止把这个接到tail后的node又给断开了
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
// 如果自己前面还有非取消的节点,那么把它设置成signal让他到时候去唤醒后继,并且把自己移除;否则自己唤醒
// 为什么要去唤醒后继:见方法说明
int ws;
// pred如果是head,那么前面一定没节点了所以自己唤醒
// 前面已经是signal(独占模式下不可能出现这样的情况)或者自己设置前面是signal,所以不用自己唤醒
// 如果pred.thread是空那么前面不存在实际线程也需要自己唤醒
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 这里只设置了next(node.pred.next = node.next),没有设置prev(node.next.pred = node.pred,这是因为防止node.next已经是head,这会导致修改head的prev)。由于node的状态已经设置成了cancel,所以实际这个prev关系将在shouldParkAfterFailedAcquire或者这个节点的后面某个节点执行的cancelAquire中进行处理(一种延迟处理的机制)
// 如果失败,说明node.next在shouldpark中修改了next,将node完全移除了
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}共享式流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// 和独占类似
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 注意也会被设置成signal
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 不清楚h == null是什么??(估计是为了防止h.waitStatus抛出异常,同时尽量让if条件宽松)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// s == null是为了考虑到addWaiter刚刚加入的节点,节点还没连接到pred的next。如果后继是共享节点那么叫醒它
if (s == null || s.isShared())
doReleaseShared();
}
}
/*** 释放共享锁 ***/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/*
如果是setHeadAndPropagate进入的,那有三种情况(其实releaseShared进入的也一样这三种情况):
(1),head状态为0(propagate > 0 || h == null或者s == null):此时将head设置为PROPAGATE,能够让这个head能在setHeadAndPropagate里进入doReleaseShared从而更加快速地唤醒需要唤醒的后续节点
(2),head状态为signal:直接唤醒
(3),head状态为propagate:如果在执行时状态被线程改成signal,那么能够直接唤醒。如果没有,那此时大概率head后没有等待的节点,那么也代表着不需要进行propagate,并不会产生什么问题。小概率情况下head在判断完if后被后续节点A设置成signal,然后节点A park前的tryAcauireShared操作,也由于另外一个新线程先一步执行了tryAcauireShared抢走了资源导致失败,从而让节点A park了(Non Fair的抢夺),那此时只能等待有线程执行release才能让A唤醒。
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 说明后继节点需要unpark,unpark成功后即任务完成
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
// 若head状态为0,说明head调用了setHeadAndPropagate(或许这个方法还没结束,只执行了setHead),并且后面没有park的线程
// 1,假设这个方法还没结束,正要执行if:
// 此时将head设置为PROPAGATE,能够让这个head能在setHeadAndPropagate里进入doReleaseShared从而更加快速地唤醒需要唤醒的后续节点
//(即head在已进入doReleaseShared中时,这个节点把head改成signal,之后进行park,再之后head再判断是signal,从而能直接快速地唤醒这个节点,达到快速propagate的效果不会浪费临界资源)
// 2,假设这个方法结束了且未进行propagate,那此时大概率head后没有等待的节点,那么也代表着不需要进行propagate,并不会产生什么问题。
// 小概率情况下head在判断完if后被后续节点A设置成signal,然后节点A park前的tryAcauireShared操作,由于另外一个新线程提前执行了tryAcauireShared抢走了资源导致失败从而让节点A park了(Non Fair的抢夺),那此时只能等待有线程执行release才能让A唤醒。
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 确保整个流程head都没变,都没变才算执行成功
if (h == head) // loop if head changed
break;
}
}其它方法
1
2
3
4
5
6
7
8
9
10
11
12
13
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 首尾不同并且头节点之后不是自己(null或其它线程的节点):有前辈
// 首尾相同或者头节点之后是自己:没有前辈
// 说明还没轮到自己或者自己根本就没在排队
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}ConditionObject
属性
1
2
3
4
5
6
7
8
// 注意不需要 volatile
// 等待队列中没有伪头节点,全部都是等待节点
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;Condition 是为独占模式提供的一个等待-唤醒机制。
await
并不要去资源是独占的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
这个方法必定是独占模式下所以很多地方不需要考虑同步问题
* Implements interruptible condition wait.
* 1. If current thread is interrupted, throw InterruptedException.
* 2. Save lock state returned by {@link #getState}.
* 3. Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* 4. Block until signalled or interrupted.
* 5. Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* 6. If interrupted while blocked in step 4, throw InterruptedException.
*
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
// 释放当前线程持有的所有资源,由于是独占的,也相当于释放当前同步器的所有资源(即release(getState()))
// 如果释放失败,那么让刚才添加的节点状态改成0,代表取消 CONDITION 状态,同时抛出异常
// savedState: 所释放的资源数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 等待这个节点移动到同步队列中,也就是等待其它线程对这个conditionObject执行signal由这个执行signal的线程enq,或者被中断后由自己enq
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 被唤醒了,如果是被中断的那么保证自己enq
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 节点已经移动到同步队列中了,开始参与资源获取。(可能获取成功;也可能抛出异常而取消获取,同时停止整个await)
// 如果是抛出异常而取消获取,await又向上抛出异常,此时是没问题的,因为自己持有的资源之前已经全部释放了
// 现在考虑正常情况,这里获取成功了
// 如果参与资源获取过程中被中断了,且之前等待时不是THROW_IE,那就设置成REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 单纯地进行一次清理
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据interruptMode进行异常抛出或设置中断标志,
// THROW_IE: throw new InterruptedException()
// REINTERRUPT: selfInterrupt()
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 检查在等待过程中线程是否被中断,如果未被中断说明是通过signal里的unpark醒来的,返回0;否则:
// transferAfterCancelledWait返回true代表是意外醒来,所以需要THROW_IE,否则虽然是被中断而醒来但由于并没有破坏await-signal流程,所以只需要设置interrupt标志
// 当然如果不是被中断而醒来的,就不必执行transferAfterCancelledWait了,
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 方法需要保证返回时node已经enq成功,即保证能跳出while (!isOnSyncQueue(node)),进行await()的后续操作
final boolean transferAfterCancelledWait(Node node) {
// 尝试从CONDITION转为0,同时移入同步队列。
// 如果成功意味着自己并不是signal唤醒的,在被signal之前就意外醒了
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 移入同步队列
enq(node);
return true;
}
// 自己的状态不是CONDITION,这意味着之前已经被signal唤醒过了,同时已经进入transferForSignal的enq部分(这个enq的代码是由调用signal的线程执行的),所以此时需要等待enq完成才能退出
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
// 把当前线程添加到等待队列末尾
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除已经取消 CONDITION 状态的节点,获得最新的lastWaiter
unlinkCancelledWaiters(); // 无需同步,所以此方法就不分析了
t = lastWaiter;
}
// 连接到新lastWaiter的后面
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// lastWaiter修改成此node
lastWaiter = node;
return node;
}signal
只有当前线程是获取的是独占资源时才可signal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 简单粗暴
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 唤醒第一个节点
private void doSignal(Node first) {
do {
// 从等待队列中完全移除当前节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 成功移入同步队列 或者 等待队列已经为空时退出循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将节点移动到同步队列
final boolean transferForSignal(Node node) {
// 如果失败,那说明节点已经取消CONDITION了,可能在transferAfterCancelledWait中移入了或将要移入同步队列,也可能是release中抛出了异常而取消的
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 节点移入同步队列,p是node.prev
Node p = enq(node);
int ws = p.waitStatus;
// 前面节点取消了或者前面节点设置成SIGNAL失败,意味着被signal的node可以直接醒来尝试参与await中调用的acquireQueued了
// 否则(前面节点A当前已经被设置成SIGNAL)不进行unpark,由同步队列中前面的节点释放资源时进行唤醒(unparkSuccessor)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}参考
https://www.zhihu.com/question/295925198/answer/1622051796