CountDownLatch,CyclicBarrier,Semaphore
CountDownLatch
原理很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/// Sync
// 当state不等于0时获取共享锁失败,进入同步队列,当等于0时总是成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 每次state减一
// return true 代表要执行doRelease(),将会唤醒线程并传播。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 只有0时才需要唤醒并传播。
}
}
// CountDownLatch
// 初始化state为count
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 每次state减一
public void countDown() {
// release次数满了后就会唤醒所有线程。
sync.releaseShared(1);
}
// 直到state为0
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}CyclicBarrier
类似于鹿威し,线程满了后让所有线程通行,之后又可以继续阻挡
功能:
实现n个线程(线程之间是独占锁的关系)相互等待,最后一个执行wait操作的线程需要执行指定的命令barrierCommand,并且还需要唤醒其他所有等待的线程,还要初始化相关参数,为下一轮相互等待做好准备。n个线程中任何一个线程发生中断,超时,则设置代标志generation.broken为true,其他线程检测到该标识说明此轮等待失败。
实现方式:
使用的是AQS中独占锁CLH(Craig, Landin, and Hagersten)队列+CONDITION队列的实现方式。当调用barrier.await方法时,该线程要获得锁,因此进入CLH队列队尾,当获取到锁后,若不是最后一个则执行condition.wait操作,将该线程节点从CLH队列队首删除,添加到CONDITION队列中。将count-1个线程以上述方式逐个添加到CONDITIO队列中,当最后一个线程获取到锁后,它会执行barrierCommand任务,然后执行condition.signalAll方法,唤醒CONDITION队列中所有节点,CONDITION队列从队首开始逐个将node添加到CLH队列中,开始获取锁,执行完后继操作后从barrier.await中返回。
可以设置超时,若等待中的线程有一个已经超时则generation会被设置为broken,不用等待线程数满足条件屏障也会打开
线程在等待过程被中断且当前代未结束并且正常,那么打破barrier自身抛出IE,且将导致同一轮的其它线程throw new BrokenBarrierException()
线程在超时后barrier未被打破并且未结束当前代,那么线程将抛出TimoutException
成员变量
1
2
3
4
5
6
7
8
9
10
11
12
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();//独占锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();//条件,lock中的一个CONDITION队列
/** The number of parties */
private final int parties;//参与相互等待的线程数量
/* The command to run when tripped */
private final Runnable barrierCommand;//最后一个执行dowait的线程需要执行的代码,如果generation被broken那么就不会被正确执行。
/** The current generation */
private Generation generation = new Generation();//下一批循环等待的代标志
private int count;//还需加入的的线程数量,初始为parties,当需要开始下一轮时会重置为parties构造方法
1
2
3
4
5
6
7
8
9
10
11
12
//指定参与相互等待的线程的数量parties,
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}内部类
1
2
3
private static class Generation {
boolean broken = false;
}参与等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// BrokenBarrierException会被抛出,当超时后调用方将不在等待
// 返回值代表是第几个到的。0代表最后一个到,parties-1代表第一个到
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 非超时等待
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
// 超时等待 可能会抛出TimeoutException
return dowait(true, unit.toNanos(timeout));
}
// 超时或本线程被中断都会breakBarrier
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 会传播会调用方,调用方将不会等待
if (g.broken)
throw new BrokenBarrierException();
// 线程在之前(比如获取锁的期间)被中断,需要通知其它trip.await的线程启动
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 记录下自己是什么时候到的,并修改count
int index = --count;
// 我是最后到的,大家可以走了
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 由最后一个线程来执行command
if (command != null)
command.run();
ranAction = true;
// 开始下一轮
nextGeneration();
return 0;
} finally {
// command执行抛出了异常,把它当作broken
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
// 返回值是还需等待的纳秒数
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 线程在等待过程被中断且当前代未结束并且正常,那么打破barrier自身抛出IE,且将导致同一轮的其它线程throw new BrokenBarrierException()
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 设置中断标志设置回去,继续等待
Thread.currentThread().interrupt();
}
}
// barrier被broken,抛出异常,可以走了
if (g.broken)
throw new BrokenBarrierException();
// 开始了新一轮,也可以走了
if (g != generation)
return index;
// 超时了,也可以走
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 下面两个方法的区别就是一个直接开始进入下一轮(不可重复调用否则又进下一轮)
// 一个表示现在这一轮可以直接通过(意味着只要发现出现异常情况,在同一轮可以被重复调用)
// 设置本轮被broken,重新设置count为parties,通知所有等待的线程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
// 通知大家走,重新设置count为parties,开始下一轮
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}手动重置
1
2
3
4
5
6
7
8
9
10
11
12
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 让等待的线程直接走
breakBarrier(); // break the current generation
// 初始化下一个generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + "开始旅行。");
}
});
static class Person extends Thread {
@Override
public void run() {
try {
System.out.println(Thread.currentThread());
cyclicBarrier.await(2, TimeUnit.SECONDS);
System.out.println(Thread.currentThread() + "旅行中。");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
// 说明是最后获取锁的
System.out.println(Thread.currentThread() + "不等了。");
}
}
}
public static void main(String[] args) throws InterruptedException {
Person p1 = new Person();
Person p2 = new Person();
Person p3 = new Person();
Person p4 = new Person();
p1.start();
p2.start();
p3.start();
p4.start();
}Semaphore
/ˈseməfɔː(r)/
和可重入锁的区别:它是shared(但是个数有限),可重入锁是exclusive的;
Abstract Sync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
abstract static class Sync extends AbstractQueuedSynchronizer {
// 初始的资源数,可以随意释放任意多个(只要不溢出),或获取任意多个(只要不超过当前资源数)
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 抢到了返回剩余的,没抢到返回负数
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 抢到了就走
// 剩余的少于我需要的,返回一个负数,之后会进入同步队列
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放资源
@Override
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// 为什么释放的数目会出错?因为可以随意释放任意多个资源
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 释放就行
if (compareAndSetState(current, next))
return true;
}
}
// 手动减少当前剩余资源,不进同步队列不阻塞
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 手动清空当前剩余资源,不进同步队列不阻塞
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}非公平锁
1
2
3
4
5
6
7
8
9
10
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 还有前辈,不能获取。AQS的方法
if (hasQueuedPredecessors())
return -1;
// 开始获取
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static Semaphore semaphore = new Semaphore(20);
static class Person extends Thread {
private final int num;
public Person(int num) {
this.num = num;
}
@Override
public void run() {
try {
if (num != 2) {
Thread.sleep(50);
}
semaphore.acquire(num);
System.out.println(Thread.currentThread().getId() + String.format(": 我拿了%d个", num));
if (num == 2) {
semaphore.drainPermits();
System.out.println(Thread.currentThread().getId() + ": 我把剩下的都扔了.");
Thread.sleep(1000);
semaphore.release(1);
System.out.println(Thread.currentThread().getId() + ": 我把我的" + 1 + "个给你们.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("总共" + 20 + "个");
Person p1 = new Person(2);
Person p2 = new Person(1);
Person p3 = new Person(1);
p1.start();
p2.start();
p3.start();
}结果:
1
2
3
4
5
6
7
8
9
总共20个
13: 我拿了2个
13: 我把剩下的都扔了.
13: 我把我的1个给你们.
14: 我拿了1个
// 强制退出
// 还有一个线程没有
Process finished with exit code -1