下面用一个小 demo,对 CyclicBarrier 有一个初步的印象。
public class Test implements Runnable{
//定义一个循环栅栏
private CyclicBarrier cyclicBarrier = new CyclicBarrier(3,this);
//存放每个线程的数据
private BlockingQueue< Integer > list = new ArrayBlockingQueue< >(3);
public void count() {
//运行 3 个线程
for(int i = 0;i< 3;i++){
new Thread(new Runnable() {
public void run() {
//这里可以存放更复杂的操作,比如查询 SQL、写入 Excel 等等
Random r = new Random();
int a = r.nextInt(100);
System.out.println("线程 " + Thread.currentThread().getName() + "获得数字:" + a);
list.add(a);
try {
//线程到达栅栏后,等待
cyclicBarrier.await();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
}
//这里就是线程一起跨过栅栏后执行的任务
public void run() {
int result = 0;
for (Integer i: list) {
result += i;
}
System.out.println(result);
}
public static void main(String[] args){
Test test = new Test();
test.count();
}
}
这个 demo 中一共 3 个线程,每个线程都随机获取一个数字(在实际生产代码中会有更复杂的操作),最后将每个线程获取的数字相加后打印最后的结果。
CyclicBarrier 的一个内部类,Generation 被翻译成为“代”。当这一代的所有线程都到达栅栏后可以开启下一代,所以才被成为循环栅栏。broken 属性表示栅栏是否被打破了。
private static class Generation {
boolean broken = false;
}
//可重入的 ReentrantLock 锁,非公平锁
private final ReentrantLock lock = new ReentrantLock();
//lock 的条件队列
private final Condition trip = lock.newCondition();
//线程的数量
private final int parties;
//所有线程到达后,可执行的方法
private final Runnable barrierCommand;
//当前代
private Generation generation = new Generation();
//当前代还需要等待线程的数量
private int count;
public CyclicBarrier (int parties) {
//调用 2 个参数的构造函数
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
//检查线程的数量是否小于 0
if (parties <= 0) throw new IllegalArgumentException();
//设置线程数、需要等待的线程数量
this.parties = parties;
this.count = parties;
//设置所有线程都到达的时候需要运行的方法
this.barrierCommand = barrierAction;
}
从上面的内容和 demo 粗略的可以看出,CyclicBarrier 在初始化时设置了线程数量 parties,必须等待所有的线程都到栅栏处 cyclicBarrier.await() 时才可以运行 barrierCommand 方法。
如果还有线程没有到达栅栏处,会将先到达栅栏处的线程放入 trip 条件队列中等待最后一个线程到达。
//CyclicBarrier.await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
//调用了 dowait()
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
外部调用 await() 方法,等待线程到达栅栏后一起执行后续的操作。await() 可以被复用,每多调用一次 await() 就表示多增加一代,第一次调用是一代、第二次调用是二代、第三次调用是三代...。
dowait() 是 CyclicBarrier 的核心方法。
//CyclicBarrier.dowait()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//判断当前代的栅栏是否被打断了
if (g.broken)
throw new BrokenBarrierException();
//线程是否被中断
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//到达一个线程,数量就减去 1,直到最后一个线程
int index = --count;
if (index == 0) {
//是否执行了 barrierCommand 的标识位
boolean ranAction = false;
try {
//所有线程到达后,一起执行的方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//进入下一代
nextGeneration();
return 0;
} finally {
//barrierCommand 没被执行,打破栅栏
if (!ranAction)
breakBarrier();
}
}
//无限循环,这里只有最后一个线程不进入循环
for (;;) {
try {
//没有设置需要等待的时间
if (!timed)
//到达栅栏的线程进入 trip 条件队列等待被唤醒
trip.await();
//等待的时间还没有超时
else if (nanos > 0L)
//等待指定的时间
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//出了异常且当前代没有打破栅栏,那么打破栅栏并且抛出异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//中断当前线程
Thread.currentThread().interrupt();
}
}
//检查
if (g.broken)
throw new BrokenBarrierException();
//不是当前的代
if (g != generation)
return index;
//等待超时了
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//解锁,出队
lock.unlock();
}
}
dowait() 的运行被分成了 2 部分:
//CyclicBarrier.nextGeneration()
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
CyclicBarrier 为什么会一代结束后可以开始下一代,就靠这个 nextGeneration() 方法,它干了三件事:
CyclicBarrier 使用了两个队列,一个条件队列,一个 AQS 队列,在 trip.await() 出进入条件队列。当最后一个线程到达栅栏出的时候,条件队列中的线程全部移动到 AQS 队列中,要注意的是最后一个线程并没有进入 AQS 队列中。在 lock.unlock() 的时候 AQS 队列中的线程出队。
CyclicBarrier 基于 ReentrantLock 和 Condition 实现同步线程的逻辑。
全部0条评论
快来发表一下你的评论吧 !