CyclicBarrier 要做的事情是让一组线程未到达一个屏障(条件)时被阻塞,直到最后一个线程到达屏障(条件)时,屏障(条件)才会开门,所有被屏障拦截的线程一起执行。
举个例子说明 CyclicBarrier 的使用:8 个运动员参加比赛,运动员可能到达赛场的时间不一样,要等8 个运动员到齐了才开始比赛,代码如下:
public class TestCyclicBarrier { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(8, () -> { System.out.println("所有运动员入场,裁判员一声令下!!!"); }); for (int i = 0; i < 8; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 运动员已到达起点,准备就位..."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 运动员出发..."); }).start(); } } } # 运行结果如下: 运动员准备上场,尖叫声想起来... Thread-0 运动员已到达起点,准备就位... Thread-1 运动员已到达起点,准备就位... Thread-2 运动员已到达起点,准备就位... Thread-3 运动员已到达起点,准备就位... Thread-4 运动员已到达起点,准备就位... Thread-5 运动员已到达起点,准备就位... Thread-6 运动员已到达起点,准备就位... Thread-7 运动员已到达起点,准备就位... 所有运动员入场,裁判员一声令下!!! Thread-7 运动员出发... Thread-0 运动员出发... Thread-2 运动员出发... Thread-3 运动员出发... Thread-1 运动员出发... Thread-6 运动员出发... Thread-5 运动员出发... Thread-4 运动员出发...
CyclicBarrier 和 CountDownLatch 都有等待的意思,其中两个的不同点如下:
CountDownLatch 用于一个线程等待若干个其他线程执行完任务之后才执行,强调一个线程等待,这个线程会阻塞。而 CyclicBarrier 用于一组线程互相等待至某个状态,然后这一组线程再同时执行,强调的是多个线程互等,这多个线程阻塞,等大家都完成,再携手共进;CountDownLatch 是不能复用的,而 CyclicLatch 是可以复用的。使用 reset() 方法将屏障重置为初始状态之后就可以复用;CyclicBarrier 提供了更多的方法,能够通过 getNumberWaiting() 获取阻塞线程的数量,通过 isBroken() 方法可以知道阻塞的线程是否被中断。
CyclicBarrier 是通过 Lock 的 Condition 实现的,每个 CyclicBarrier 对应个 Lock 锁和该锁的 condition 条件。
创建 CyclicBarrier 时设置一个 count 计数,当调用 await() 时做两件事:①将 count-1 ②将线程阻塞并构造成结点加入 condition 条件队列。当 count 变为 0 时,达到等待线程数量要求,condition 将条件队列中的线程全部唤醒。
将parties设置为count值,设置达到等待线程数量后优先执行的线程
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; // 保存parties可循环使用 this.count = parties; // 将parties设置为count值 this.barrierCommand = barrierAction;// 设置达到等待线程数量后优先执行的线程 }
await() 方法:①将 count-1;②将线程阻塞并构造成结点加入 condition 条件队列;③当 count 变为 0 时,condition 将条件队列中的线程全部唤醒。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); // 代失效,唤醒所有线程 throw new InterruptedException(); } int index = --count; // 计数 if (index == 0) { // 达到要求数量 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) // 达到等待线程数量后执行barrierCommand command.run(); ranAction = true; // 唤醒本代所有线程,生成新一代,重置count nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 线程数量未达到要求数量,将线程挂起等待 for (;;) { try { if (!timed) trip.await(); // 将线程加入condition队列挂起 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 特殊情况处理 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } // 当前代失效,唤醒所有线程 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } // 唤醒本代所有线程,生成新一代,重置count private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
在《21 - CountDownLatch 让线程等待其他线程完成》中,我们使用了 CountDownLatch 进行了进行了优化,将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。
针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。
// 订单队列 Vector<P> pos; // 派送单队列 Vector<D> dos; // 执行回调的线程池 Executor executor = Executors.newFixedThreadPool(1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> check()); }); void check () { P p = pos.remove(0); D d = dos.remove(0); // 执行对账操作 diff = check(p, d); // 差异写入差异库 save(diff); } void checkAll () { // 循环查询订单库 Thread T1 = new Thread(() -> { while (存在未对账订单) { // 查询订单库 pos.add(getPOrders()); // 等待 barrier.await(); } }); T1.start(); // 循环查询运单库 Thread T2 = new Thread(() -> { while (存在未对账订单) { // 查询运单库 dos.add(getDOrders()); // 等待 barrier.await(); } }); T2.start(); }在上面的代码中,我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。
线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。
非常值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。这个功能用起来实在是太方便了。
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
CyclicBarrier 是通过 Lock 的 Condition 实现的,每个 CyclicBarrier 对应个 Lock 锁和该锁的 condition 条件。创建 CyclicBarrier 时设置一个 count 计数,当调用 await() 时做两件事:①将 count-1 ②将线程阻塞并构造成结点加入condition 条件队列。当 count 变为 0 时,达到等待线程数量要求,condition 将条件队列中的线程全部唤醒。