java 并发编程工具类之 CountDownLatch

    技术2022-07-10  178

    CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

    public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { new Thread(() -> { countDownLatch.countDown(); System.out.println("===="); }).start(); } countDownLatch.await(); System.out.println("done"); }

    原理:

     内部类Sync

    /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 如果计数器(state)为0,返回正数,表示主程序可以执行了, // 计数器(state)不为0, 返回负数,表示主程序不可执行. protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }

    由上可知,AQS的state在此处代表计数器

    await方法核心实现

    public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); // AQS->acquireSharedInterruptibly } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //CountDownLatch->tryAcquireShared doAcquireSharedInterruptibly(arg);// AQS->doAcquireSharedInterruptibly } // AQS->doAcquireSharedInterruptibly private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

    主线程执行await方法,先执行tryAcquireShared方法尝试获取锁,如果此时计数器state != 0,返回-1,表示主线程不可执行,则加入到等待队列中,主线程通过LockSupport.park(this)被挂起。

    countDown方法核心实现

    public void countDown() {sync.releaseShared(1);} //AQS$releaseShared: public final boolean releaseShared(int arg) { // countDownLatch -> tryReleaseShared if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //AQS$doReleaseShared private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }

    通过unsafe.compareAndSwapInt方法让state值减1;如果state为0,通过LockSupport.unpark唤醒await方法中挂起的主线程。

    Processed: 0.011, SQL: 9