【Java】--CyclicBarrier的介绍及应用

    技术2022-07-10  150

    CyclicBarrier

    简介

    CyclicBarrier是java.util.concurrent包下的一个类,CyclicBarrier的字面意思是 可循环(Cyclic) 使用的 屏障(Barrier) 。

    CyclicBarrier能让一组线程到达一个屏障(也可叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

    CyclicBarrier会等齐其他线程在继续进行,即CyclicBarrier会每次加加直到某个数字,正好与CountDownLatch相反(CountDownLatch会每次减减直到0)

    方法说明

    构造函数

    /** *参数表示屏障拦截的线程数量, *每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。 **/ public CyclicBarrier(int parties) { this(parties, null); } /** *用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。 **/ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

    await

    调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:

    public int await() throws InterruptedException, BrokenBarrierException { try { // 不超时等待 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }

    这两个方法最终都会调用dowait(boolean, long)方法,它也是CyclicBarrier的核心方法,该方法定义如下:

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 当前代 final Generation g = generation; // 如果这代损坏了,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,抛出异常 if (Thread.interrupted()) { // 将损坏状态设置为true // 并通知其他阻塞在此栅栏上的线程 breakBarrier(); throw new InterruptedException(); } // 获取下标 int index = --count; // 如果是 0,说明最后一个线程调用了该方法 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行栅栏任务 if (command != null) command.run(); ranAction = true; // 更新一代,将count重置,将generation重置 // 唤醒之前等待的线程 nextGeneration(); return 0; } finally { // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 如果没有时间限制,则直接等待,直到被唤醒 if (!timed) trip.await(); // 如果有时间限制,则等待指定时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 当前代没有损坏 if (g == generation && ! g.broken) { // 让栅栏失效 breakBarrier(); throw ie; } else { // 上面条件不满足,说明这个线程不是这代的 // 就不会影响当前这代栅栏的执行,所以,就打个中断标记 Thread.currentThread().interrupt(); } } // 当有任何一个线程中断了,就会调用breakBarrier方法 // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new BrokenBarrierException(); // g != generation表示正常换代了,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换代,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要generation来保证正确。 if (g != generation) return index; // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放独占锁 lock.unlock(); } }

    dowait(boolean, long)方法的主要逻辑处理比较简单,如果该线程不是最后一个调用await方法的线程,则它会一直处于等待状态,除非发生以下情况:

    最后一个线程到达,即index == 0某个参与线程等待超时某个参与线程被中断调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态

    在上面的源代码中,我们可能需要注意Generation 对象,在上述代码中我们总是可以看到抛出BrokenBarrierException异常,那么什么时候抛出异常呢?如果一个线程处于等待状态时,如果其他线程调用reset(),或者调用的barrier原本就是被损坏的,则抛出BrokenBarrierException异常。同时,任何线程在等待时被中断了,则其他所有线程都将抛出BrokenBarrierException异常,并将barrier置于损坏状态。

    同时,Generation描述着CyclicBarrier的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier之后,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

    private static class Generation { boolean broken = false; }

    默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程:

    private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }

    在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

    当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation:

    private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }

    除了上面讲到的栅栏更新换代以及损坏状态,我们在使用CyclicBarrier时还要要注意以下几点:

    CyclicBarrier使用独占锁来执行await方法,并发性可能不是很高如果在等待过程中,线程被中断了,就抛出异常。但如果中断的线程所对应的CyclicBarrier不是这代的,比如,在最后一次线程执行signalAll后,并且更新了这个“代”对象。在这个区间,这个线程被中断了,那么,JDK认为任务已经完成了,就不必在乎中断了,只需要打个标记。该部分源码已在dowait(boolean, long)方法中进行了注释。如果线程被其他的CyclicBarrier唤醒了,那么g肯定等于generation,这个事件就不能return了,而是继续循环阻塞。反之,如果是当前CyclicBarrier唤醒的,就返回线程在CyclicBarrier的下标。完成了一次冲过栅栏的过程。该部分源码已在dowait(boolean, long)方法中进行了注释。

    应用案例

    public class CyclicBarrierDemo { public static void main(String[] args) { int num = 7; CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> { System.out.println("*****开始召唤神龙******"); }); for (int i = 1; i <= num; i++) { final int temp = i; new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t收集到第" + temp + "龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i) ).start(); } } }

    运行结果:

    CyclicBarrier和CountDownLatch的区别

    CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置

    参考文章: Java并发编程之CyclicBarrier详解

    Processed: 0.010, SQL: 9