CountDownLatch和CyclicBarrier都是线程工具类,他们的作用都是可以使得一个线程等待其他线程各自执行完毕后再执行。之前也写过相应的小Demo,最近和朋友聊天时,又遇到了关于他们两个的区别。
CountDownLatch
在使用CountDownLatch最常用的方法是await方法和Count方法。
CountDownLatch latch = new CountDownLatch(5);
latch.await();
latch.countDown();
构造方法
构造方法是设置等待线程的个数。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
在构造方法内部又new了一个Sync。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
}
Sync对象是继承自AQS,那么它的构造方法明显猜到是设置了AQS中的volatile对象State的值为我们设置的等待线程个数。
await方法
await方法是线程等待,由构造方法,countDownLatch中设置了5个等待线程,那它的等待是如何等待的。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
调用它的内部类sync的acquireSharedInterruptibly,根据这个方法名,具体一点,CountDownLatch是使用的AQS中的共享模式。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
内部类Sync重写了tryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
之前设置过的state,在调用await方法时,getState()返回值会返回-1,然后再调用doAcquireSharedInterruptibly()方法。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//对于CountDownLatch而言,如果计数器值不等于0,那么r 会一直小于0
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
那么根据代码int r的值,是等于-1,所以永远不可能返回,那么就开始等待,await方法就实现了。
countDown方法
countDown方法表示该线程已经执行结束。现在await方法的实现已经明白了,是通过判断计数器即state的值是不是等于0,那现在可以推断出countDown方法就是来设置state的值的。看源码
public void countDown() {
sync.releaseShared(1);
}
调用内部类的releaseShared方法,releaseShared是AQS释放共享模式锁的代码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
内部类Sync已经重写了tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
通过CAS,设置了state的值,并且设置的值的大小是目前state值-1,如果失败就自旋。
CountDownLatch就是通过内部的一个Sync使用的AQS的共享模式。
通过先设置计数器的值,await方法就是通过计数器值来判断是不是该等待,countDown方法就是设置计数器的值,并且减一。
CyclicBarrier
CyclicBarrier的实现和CountDownLatch大相径庭,CountDownLatch基于 AQS的共享模式的使用,而CyclicBarrier基于ReenTrantLock的Condition来实现的。
CyclicBarrier的Barrier是栅栏的意思,来看下它是怎么通过Condition来实现的。
构造方法
继续,看构造方法
//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
构造方法主要就是设置栅栏的个数和当达到栅栏个数所使用的线程。
await方法
它也和CountDownLatch一样,在等待其他线程完成的当前线程需要使用await方法来等待。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
其中调用了dowait方法,继续查看dowait方法具体干了些啥
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//分代
final Generation g = generation;
//当前generation“已损坏”,抛出BrokenBarrierException异常
//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie
if (g.broken)
//当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
throw new BrokenBarrierException();
//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//进来一个线程 count - 1
int index = --count;
//count == 0 表示所有线程均已到位,触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//触发任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
//确保在任务未成功执行时能将所有线程唤醒
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
//如果不是超时等待,则调用Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
//超时等待,调用Condition.awaitNanos()方法等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//generation已经更新,返回index
if (g != generation)
return index;
//“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}
现在ReenTrantLock开始使用,代码主要的逻辑就是在调用dowait方法中每次都将count减1,减完后立马进行判断看看是否等于0,如果等于0的话就会先去执行之前指定好的任务,执行完之后再调用nextGeneration方法将栅栏转到下一代,在该方法中会将所有线程唤醒,将计数器的值重新设为parties,最后会重新设置栅栏代次。
如果计数器此时还不等于0的话就进入for循环,根据参数来决定是调用trip.awaitNanos(nanos)还是trip.await()方法,这两方法对应着定时和非定时等待。如果在等待过程中当前线程被中断就会执行breakBarrier方法,该方法叫做打破栅栏。
接着看它是怎么换代和打破栅栏的
//切换栅栏到下一代
private void nextGeneration() {
//唤醒条件队列所有线程
trip.signalAll();
//设置计数器的值为需要拦截的线程数
count = parties;
//重新设置栅栏代次
generation = new Generation();
}
//打翻当前栅栏
private void breakBarrier() {
//将当前栅栏状态设置为打翻
generation.broken = true;
//设置计数器的值为需要拦截的线程数
count = parties;
//唤醒所有线程
trip.signalAll();
}
ReenTrantLock的Condition对象—trip开始使用。所以,在CyclicBarrier的实现原理已经明白了,他就是在实现多线程代码的时候,通过内部parties值来建立栅栏,然后在代码执行时,又使用ReenTrantLock来完成线程的加锁来安全的维护count值,每次减一,当count为0时,执行换代前该执行的任务并进行换代。就将线程通过Condition来完成等待。
同时CyclicBarrier也被可以循环使用,它相比CountDownLatch,只要没有执行退出的代码,就会一种执行。这个实现也是通过它的Generation来完成。
CountDownLatch与CyclicBarrier比较
难免会将CyclicBarrier与CountDownLatch进行一番比较。这两个类都可以实现一组线程在到达某个条件之前进行等待,它们内部都有一个计数器,当计数器的值不断的减为0的时候所有阻塞的线程将会被唤醒。区别的是:
- CyclicBarrier的计数器由自己控制,而CountDownLatch的计数器则由使用者来控制,在CyclicBarrier中线程调用await方法不仅会将自己阻塞还会将计数器减1,而在CountDownLatch中线程调用await方法只是将自己阻塞而不会减少计数器的值。
- CountDownLatch只能拦截一轮,而CyclicBarrier可以实现循环拦截。一般来说用CyclicBarrier可以实现CountDownLatch的功能,而反之则不能
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
- CyclicBarrier还提供了其他有用的方法,比如getNumberWaiting可以获得CyclicBarrier阻塞的线程;isBroken方法可以用来了解阻塞的线程是否被中断
另外附上之前写过的代码。
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
new Thread(()->{
// 摇人整活
System.out.println("赶紧摇人来整活");
try {
// 憨憨等人上号
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 开始游戏
System.out.println("发车!!!!!");
}).start();
// 循环创建模拟五个人上号的过程
for (int i = 0; i < 5 ; i++) {
new Thread(()->{
// 憨憨上号中。。。
System.out.println(Thread.currentThread().getName()+"正在上号");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 憨憨上号成功 count - 1
System.out.println(Thread.currentThread().getName()+"已上号,等待发车");
latch.countDown();
},"第"+(i+1)+"个人").start();
}
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 发车线程
Thread play =new Thread(()->{
System.out.println(Thread.currentThread().getName()+"发车发车");
});
CyclicBarrier barrier = new CyclicBarrier(5,play);
for (int i = 0; i < 5 ; i++) {
new Thread(()->{
// 憨憨上号中。。。
System.out.println(Thread.currentThread().getName()+"正在上号");
try {
// 模拟憨憨上号时间
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 憨憨上号成功 count - 1
System.out.println(Thread.currentThread().getName()+"已上号,等待发车");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},"第"+(i+1)+"个人").start();
}
}
}
评论区