《Java后端知识体系》系列之CyclicBarrier的原理剖析

    技术2022-07-10  99

    CyclicBarrier原理刨析
    场景:

    CountDownLatch的计数器是一次性的,也就是计数器值变为0之后,再调用CountDownLatch的await和countdown方法都会返回,这就起不到线程同步的效果,所以为了满足计数器可以重置的需要,这里就需要使用CyclicBarrier。CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行,这里之所以叫做回环是因为当所有等待线程执行完毕,并重置CyclicBarrier状态后可以被重用。之所以叫做屏障是因为线程调用await方法后会被阻塞,这个阻塞的点就是屏障点,等所有的线程都调用了await方法后,线程就会打破屏障,继续执行。 实现方式如下:

    /** * @author admin */ public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread()+"step1"); try { cyclicBarrier.await(); System.out.println(Thread.currentThread()+"step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+"step3"); } catch (Exception e){ e.printStackTrace(); } } }); executorService.submit(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread()+"step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+"step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+"step3"); }catch (Exception e){ e.printStackTrace(); } } }); executorService.shutdown(); } } 在以上代码中每个子线程在执行完step1之后都调用await方法,等到所有的线程都达到屏障点后才会一起向下执行,这就保证了所有的线程都完成了step1之后才会开始执行step2,以此类推。
    原理:CyclicBanrrier是基于独占锁实现的,底层还是基于AQS,parties记录线程个数,count记录到达屏障点的个数。

    int await()

    当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:

    parties个数的线程都调用了await方法,并且所有的线程都达到了屏障点其它线程调用了当前线程的interrupt()方法中断了当前线程与当前屏障点关联的Generation对象的broken标志设置为true,会抛出BrokenBarrierException异常,然后返回 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }

    在如上代码中内部是调用了dowait方法,第一个参数为false则说明不设置超时时间,第二个参数就没有意义。

    boolean await(long timeout, TimeUnit unit )

    该方法中第一个参数是超时时间,第二个参数是时间单位。在该方法中也是调用了doawait方法,第一个参数为true则说明设置了超时时间,这时候第二个参数是超时时间。

    int dowait(boolean timed, long nanos)

    该方法实现了CyclicBarrier的核心功能,其代码如下: /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //11 (1 )击。采 index==O 则说明所有线程都到了屏障点,此时执行初始化时传递的任务 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //执行任务 if (command != null) command.run(); ranAction = true; //激活其他因调用 await 方法而被阻塞的线程,并重置CyclicBarrier nextGeneration(); //返回 return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 4如果index != 0 for (;;) { try { //没有设置超时时间 if (!timed) trip.await(); //设置了超时时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void nextGeneration() { // signal completion of last generation //唤醒条件队列中的阻塞线程 trip.signalAll(); //重置CyclicBarrier // set up next generation count = parties; generation = new Generation(); }

    在以上代码中当一个线程调用dowait方法后,首先会获取独占锁lock,如果创建CyclicBarrier时传递的参数为10,那么后面9个调用线程都会被阻塞。然后当前获取到锁的线程会被计数器count进行递减操作,递减后count==index=9,因为index!=0,则需要调用index!=0处。如果当前线程调用的是无参数的await方法,则timed=false,所以当前线程会被放入条件变量trip的条件阻塞队列中,当前线程会被挂起并释放lock锁。 当一个获取锁的线程释放锁之后,其它线程会竞争lock锁,然后执行与第一个线程一样的操作,以此往复那么count会递减到0,然后线程再一起执行,并且会重置CyclicBarrier,这样就能实现线程同步的复用。

    总结:
    CyclicBarrier对比于CountDownLatch是可以复用的,并且CyclicBarrier是通过独占锁ReentrantLock来实现计数器原子性更新,并使用条件变量队列来实现线程同步。

    我是每天敲代码的汤姆猫!!

    Processed: 0.013, SQL: 9