CountDownLatch

原理很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/// Sync
// 当state不等于0时获取共享锁失败,进入同步队列,当等于0时总是成功
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// 每次state减一
// return true 代表要执行doRelease(),将会唤醒线程并传播。
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0; // 只有0时才需要唤醒并传播。
    }
}

// CountDownLatch

// 初始化state为count
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

// 每次state减一
public void countDown() {
    // release次数满了后就会唤醒所有线程。
    sync.releaseShared(1);
}

// 直到state为0
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

CyclicBarrier

类似于鹿威し,线程满了后让所有线程通行,之后又可以继续阻挡

功能:

实现n个线程(线程之间是独占锁的关系)相互等待,最后一个执行wait操作的线程需要执行指定的命令barrierCommand,并且还需要唤醒其他所有等待的线程,还要初始化相关参数,为下一轮相互等待做好准备。n个线程中任何一个线程发生中断,超时,则设置代标志generation.broken为true,其他线程检测到该标识说明此轮等待失败。

实现方式:

使用的是AQS中独占锁CLH(Craig, Landin, and Hagersten)队列+CONDITION队列的实现方式。当调用barrier.await方法时,该线程要获得锁,因此进入CLH队列队尾,当获取到锁后,若不是最后一个则执行condition.wait操作,将该线程节点从CLH队列队首删除,添加到CONDITION队列中。将count-1个线程以上述方式逐个添加到CONDITIO队列中,当最后一个线程获取到锁后,它会执行barrierCommand任务,然后执行condition.signalAll方法,唤醒CONDITION队列中所有节点,CONDITION队列从队首开始逐个将node添加到CLH队列中,开始获取锁,执行完后继操作后从barrier.await中返回。

可以设置超时,若等待中的线程有一个已经超时则generation会被设置为broken,不用等待线程数满足条件屏障也会打开

线程在等待过程被中断且当前代未结束并且正常,那么打破barrier自身抛出IE,且将导致同一轮的其它线程throw new BrokenBarrierException()

线程在超时后barrier未被打破并且未结束当前代,那么线程将抛出TimoutException

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();//独占锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();//条件,lock中的一个CONDITION队列
/** The number of parties */
private final int parties;//参与相互等待的线程数量
/* The command to run when tripped */
private final Runnable barrierCommand;//最后一个执行dowait的线程需要执行的代码,如果generation被broken那么就不会被正确执行。
/** The current generation */
private Generation generation = new Generation();//下一批循环等待的代标志
 
private int count;//还需加入的的线程数量,初始为parties,当需要开始下一轮时会重置为parties

构造方法

1
2
3
4
5
6
7
8
9
10
11
12

//指定参与相互等待的线程的数量parties,
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

内部类

1
2
3
private static class Generation {
    boolean broken = false;
}

参与等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// BrokenBarrierException会被抛出,当超时后调用方将不在等待
// 返回值代表是第几个到的。0代表最后一个到,parties-1代表第一个到
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 非超时等待
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
     // 超时等待 可能会抛出TimeoutException
    return dowait(true, unit.toNanos(timeout));
}
// 超时或本线程被中断都会breakBarrier
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // 会传播会调用方,调用方将不会等待
        if (g.broken)
            throw new BrokenBarrierException();
        // 线程在之前(比如获取锁的期间)被中断,需要通知其它trip.await的线程启动
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 记录下自己是什么时候到的,并修改count
        int index = --count;
        // 我是最后到的,大家可以走了
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 由最后一个线程来执行command
                if (command != null)
                    command.run();
                ranAction = true;
                // 开始下一轮
                nextGeneration();
                return 0;
            } finally {
                // command执行抛出了异常,把它当作broken
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    // 返回值是还需等待的纳秒数
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 线程在等待过程被中断且当前代未结束并且正常,那么打破barrier自身抛出IE,且将导致同一轮的其它线程throw new BrokenBarrierException()
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 设置中断标志设置回去,继续等待
                    Thread.currentThread().interrupt();
                }
            }
            // barrier被broken,抛出异常,可以走了
            if (g.broken)
                throw new BrokenBarrierException();
            // 开始了新一轮,也可以走了
            if (g != generation)
                return index;
            // 超时了,也可以走
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

// 下面两个方法的区别就是一个直接开始进入下一轮(不可重复调用否则又进下一轮)
// 一个表示现在这一轮可以直接通过(意味着只要发现出现异常情况,在同一轮可以被重复调用)

// 设置本轮被broken,重新设置count为parties,通知所有等待的线程
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
// 通知大家走,重新设置count为parties,开始下一轮
private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

手动重置

1
2
3
4
5
6
7
8
9
10
11
12
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 让等待的线程直接走
        breakBarrier();   // break the current generation
        // 初始化下一个generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread() + "开始旅行。");
    }
});

static class Person extends Thread {
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread());
            cyclicBarrier.await(2, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread() + "旅行中。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            // 说明是最后获取锁的
            System.out.println(Thread.currentThread() + "不等了。");
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    Person p1 = new Person();
    Person p2 = new Person();
    Person p3 = new Person();
    Person p4 = new Person();
    p1.start();
    p2.start();
    p3.start();
    p4.start();
}

Semaphore

/ˈseməfɔː(r)/

和可重入锁的区别:它是shared(但是个数有限),可重入锁是exclusive的;

Abstract Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
abstract static class Sync extends AbstractQueuedSynchronizer {

    // 初始的资源数,可以随意释放任意多个(只要不溢出),或获取任意多个(只要不超过当前资源数)
    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }
    
    // 抢到了返回剩余的,没抢到返回负数
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            // 抢到了就走
            // 剩余的少于我需要的,返回一个负数,之后会进入同步队列
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 释放资源
    @Override
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            // 为什么释放的数目会出错?因为可以随意释放任意多个资源
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 释放就行
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 手动减少当前剩余资源,不进同步队列不阻塞
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 手动清空当前剩余资源,不进同步队列不阻塞
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

非公平锁

1
2
3
4
5
6
7
8
9
10
static final class NonfairSync extends Sync {

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 还有前辈,不能获取。AQS的方法
            if (hasQueuedPredecessors())
                return -1;
            // 开始获取
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static Semaphore semaphore = new Semaphore(20);

static class Person extends Thread {
    private final int num;
    public Person(int num) {
        this.num = num;
    }
    @Override
    public void run() {
        try {
            if (num != 2) {
                Thread.sleep(50);
            }
            semaphore.acquire(num);
            System.out.println(Thread.currentThread().getId() + String.format(": 我拿了%d个", num));
            if (num == 2) {
                semaphore.drainPermits();
                System.out.println(Thread.currentThread().getId() + ": 我把剩下的都扔了.");
                Thread.sleep(1000);
                semaphore.release(1);
                System.out.println(Thread.currentThread().getId() + ": 我把我的" + 1 + "个给你们.");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    System.out.println("总共" + 20 + "个");
    Person p1 = new Person(2);
    Person p2 = new Person(1);
    Person p3 = new Person(1);
    p1.start();
    p2.start();
    p3.start();
}

结果:

1
2
3
4
5
6
7
8
9
总共20个
13: 我拿了2个
13: 我把剩下的都扔了.
13: 我把我的1个给你们.
14: 我拿了1个

// 强制退出
// 还有一个线程没有
Process finished with exit code -1