22 - CyclicBarrier 一组线程之间相互等待

    技术2025-09-28  37

    CyclicBarrier 一组线程之间相互等待

    1. CyclicBarrier 的使用1.1 主要方法1.2 使用举例1.3 与 CountDownLatch 比较 2. 源码分析2.1 类结构2.2 构造器2.3 await 3. 对账系统4. 总结

    1. CyclicBarrier 的使用

      CyclicBarrier 要做的事情是让一组线程未到达一个屏障(条件)时被阻塞,直到最后一个线程到达屏障(条件)时,屏障(条件)才会开门,所有被屏障拦截的线程一起执行。   

    1.1 主要方法

    CyclicBarrier(int parties):构造方法,parties表示拦截线程的数量。 CyclicBarrier(int parties, Runnable barrierAction) :barrierAction 是一个回调线程, 用于在线程到达屏障时执行的线程,用于处理更加复杂的业务场景。 await():将当前线程阻塞,等到所有的线程都到达指定的临界点后一起执行。 getNumberWaiting():获取当前有多少个线程阻塞等待在临界点上。 reset():将屏障重置为初始状态。

      

    1.2 使用举例

      举个例子说明 CyclicBarrier 的使用:8 个运动员参加比赛,运动员可能到达赛场的时间不一样,要等8 个运动员到齐了才开始比赛,代码如下:

    public class TestCyclicBarrier { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(8, () -> { System.out.println("所有运动员入场,裁判员一声令下!!!"); }); for (int i = 0; i < 8; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 运动员已到达起点,准备就位..."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 运动员出发..."); }).start(); } } } # 运行结果如下: 运动员准备上场,尖叫声想起来... Thread-0 运动员已到达起点,准备就位... Thread-1 运动员已到达起点,准备就位... Thread-2 运动员已到达起点,准备就位... Thread-3 运动员已到达起点,准备就位... Thread-4 运动员已到达起点,准备就位... Thread-5 运动员已到达起点,准备就位... Thread-6 运动员已到达起点,准备就位... Thread-7 运动员已到达起点,准备就位... 所有运动员入场,裁判员一声令下!!! Thread-7 运动员出发... Thread-0 运动员出发... Thread-2 运动员出发... Thread-3 运动员出发... Thread-1 运动员出发... Thread-6 运动员出发... Thread-5 运动员出发... Thread-4 运动员出发...

      

    1.3 与 CountDownLatch 比较

      CyclicBarrier 和 CountDownLatch 都有等待的意思,其中两个的不同点如下:

    CountDownLatch 用于一个线程等待若干个其他线程执行完任务之后才执行,强调一个线程等待,这个线程会阻塞。而 CyclicBarrier 用于一组线程互相等待至某个状态,然后这一组线程再同时执行,强调的是多个线程互等,这多个线程阻塞,等大家都完成,再携手共进;CountDownLatch 是不能复用的,而 CyclicLatch 是可以复用的。使用 reset() 方法将屏障重置为初始状态之后就可以复用;CyclicBarrier 提供了更多的方法,能够通过 getNumberWaiting() 获取阻塞线程的数量,通过 isBroken() 方法可以知道阻塞的线程是否被中断。

      

    2. 源码分析

      CyclicBarrier 是通过 Lock 的 Condition 实现的,每个 CyclicBarrier 对应个 Lock 锁和该锁的 condition 条件。

      创建 CyclicBarrier 时设置一个 count 计数,当调用 await() 时做两件事:①将 count-1 ②将线程阻塞并构造成结点加入 condition 条件队列。当 count 变为 0 时,达到等待线程数量要求,condition 将条件队列中的线程全部唤醒。

      

    2.1 类结构

    public class CyclicBarrier { // 内部类,当有parties个线程到达barrier就会更新换代 private static class Generation { boolean broken = false; // 是否损坏 } // 重入锁 private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); // 等待线程总数量 private final int parties; // 达到等待线程数量后执行的线程 private final Runnable barrierCommand; // 当有parties个线程到达barrier,就会更新换代 private Generation generation = new Generation(); // 记录当前线程数量 private int count; }

      

    2.2 构造器

      将parties设置为count值,设置达到等待线程数量后优先执行的线程

    public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; // 保存parties可循环使用 this.count = parties; // 将parties设置为count值 this.barrierCommand = barrierAction;// 设置达到等待线程数量后优先执行的线程 }

      

    2.3 await

      await() 方法:①将 count-1;②将线程阻塞并构造成结点加入 condition 条件队列;③当 count 变为 0 时,condition 将条件队列中的线程全部唤醒。

    public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); // 代失效,唤醒所有线程 throw new InterruptedException(); } int index = --count; // 计数 if (index == 0) { // 达到要求数量 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) // 达到等待线程数量后执行barrierCommand command.run(); ranAction = true; // 唤醒本代所有线程,生成新一代,重置count nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 线程数量未达到要求数量,将线程挂起等待 for (;;) { try { if (!timed) trip.await(); // 将线程加入condition队列挂起 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 特殊情况处理 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } // 当前代失效,唤醒所有线程 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } // 唤醒本代所有线程,生成新一代,重置count private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }

      

    3. 对账系统

      在《21 - CountDownLatch 让线程等待其他线程完成》中,我们使用了 CountDownLatch 进行了进行了优化,将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。

      针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。

      用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。

    // 订单队列 Vector<P> pos; // 派送单队列 Vector<D> dos; // 执行回调的线程池 Executor executor = Executors.newFixedThreadPool(1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> check()); }); void check () { P p = pos.remove(0); D d = dos.remove(0); // 执行对账操作 diff = check(p, d); // 差异写入差异库 save(diff); } void checkAll () { // 循环查询订单库 Thread T1 = new Thread(() -> { while (存在未对账订单) { // 查询订单库 pos.add(getPOrders()); // 等待 barrier.await(); } }); T1.start(); // 循环查询运单库 Thread T2 = new Thread(() -> { while (存在未对账订单) { // 查询运单库 dos.add(getDOrders()); // 等待 barrier.await(); } }); T2.start(); }

      在上面的代码中,我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。

      线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。

      非常值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。这个功能用起来实在是太方便了。

      

    4. 总结

      CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

      CyclicBarrier 是通过 Lock 的 Condition 实现的,每个 CyclicBarrier 对应个 Lock 锁和该锁的 condition 条件。创建 CyclicBarrier 时设置一个 count 计数,当调用 await() 时做两件事:①将 count-1 ②将线程阻塞并构造成结点加入condition 条件队列。当 count 变为 0 时,达到等待线程数量要求,condition 将条件队列中的线程全部唤醒。

    Processed: 0.010, SQL: 9