多线程-工具类-CountDownLatch

    技术2022-07-10  140

    多线程-工具类-CountDownLatch

    文章目录

    多线程-工具类-CountDownLatch简介使用示例多线程同步功能 原理详解`new`初始化`countDown()`方法`countDwon()`流程图`countDwon()`源码解析 `await()`方法`await()`流程图`await()`源码解析

    简介

    CountDownLatch是jdk自带并发工具类,实现了类似倒计数器的功能。通过countDown()方法和await()方法实现多线程任务同步。 使用await()方法阻塞的线程,需要等待其他线程调用足够次数的countDown()方法,才能解除阻塞。

    使用示例

    多线程同步功能

    import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class CountDownLatchDemo { public static void main(String[] args) { // 实例化CountDownLatch对象 final CountDownLatch latch = new CountDownLatch(5); // 用于产生随机长度的子线程任务运行时间 final Random random = new Random(); // 不推荐直接使用new Thread() 创建线程,本例是为了减少理解难度 // Thread-0 new Thread(new CountDownLatchRunnable(latch, random)).start(); // Thread-1 new Thread(new CountDownLatchRunnable(latch, random)).start(); // Thread-2 new Thread(new CountDownLatchRunnable(latch, random)).start(); // Thread-3 new Thread(new CountDownLatchRunnable(latch, random)).start(); // Thread-4 new Thread(new CountDownLatchRunnable(latch, random)).start(); // CountDownLatch减小到0后不再减小 // Thread-5 new Thread(new CountDownLatchRunnable(latch, random)).start(); System.out.println("main thread waiting"); try { // latch阻塞主线程,直到latch值减小到0 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 会在latch解除阻塞后运行 System.out.println("end"); // 主线程会等待所有子线程运行结束才会结束 } static class CountDownLatchRunnable implements Runnable { private CountDownLatch latch; private Random random; CountDownLatchRunnable(CountDownLatch latch, Random random) { this.latch = latch; this.random = random; } @Override public void run() { // 产生随机长度的子线程运行时间 int time = random.nextInt(10); try { // 模拟子线程任务执行 TimeUnit.SECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } // 该子线程任务完成,倒计数器减一 latch.countDown(); // 输出当前线程信息以及CountDownLatch值 System.out.println(Thread.currentThread().getName() + " finished, cost time: " + time + ", current count:" + latch.getCount()); } } }

    原理详解

    new初始化

    public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化Sync对象,该对象继承自AbstractQueuedSynchronizer this.sync = new Sync(count); } /** * count是实例化CountDownLatch时传入的倒计数 * Sync是继承自AbstractQueuedSynchronizer的用于同步的抽象队列实现 */ Sync(int count) { // 该方法是给volatile int state初始化赋值,该属性通过volatile修饰, // 实现state属性的多线程安全 setState(count); }

    countDown()方法

    该方法是线程安全的方法。通过私有的静态内部类Sync对象倒计数器减一功能。

    countDwon()流程图

    countDwon()源码解析
    /** * 倒计数器值减一 */ public void countDown() { // sync对象减一 sync.releaseShared(1); } /** * 该方法在Sync对象父类AbstractQueuedSynchronizer中实现 */ public final boolean releaseShared(int arg) { // 通过循环重试和UnSafe类中的CAS操作保证倒计数值减一 // 并判断当前倒计数值是否为0,如果为0,则解除阻塞 if (tryReleaseShared(arg)) { // 解除阻塞 doReleaseShared(); return true; } return false; } /** * 通过循环重试和UnSafe类中的CAS操作保证倒计数值减一 */ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取当前的倒计数值 int c = getState(); // 如果倒计数值已经为0,直接返回 if (c == 0) return false; // 计算减一后的倒计数值 int nextc = c-1; // 比较并设置,原子操作,保证减一操作时在最新的数值上进行 // 如果当前值是c,即本线程刚刚读取并使用的值,未被其他线程修改,就尝试赋予新值nextc, // 操作成功返回true,即倒计数减一成功,返回; // 操作失败返回false,循环重试 if (compareAndSetState(c, nextc)) // 倒计数值更新成功,返回当前值是否为0 return nextc == 0; } } /** * 该方法在Sync父类AbstractQueuedSynchronizer中 * 实现解除线程阻塞 */ private void doReleaseShared() { for (;;) { // 获取头节点,循环保证头节点的后继节点持有线程被解除阻塞 Node h = head; // 如果队列不为空 if (h != null && h != tail) { // 头节点的等待状态标志 int ws = h.waitStatus; // 如果节点的状态为阻塞状态 if (ws == Node.SIGNAL) { // CAS操作将本节点状态还原为0 // 如果成功,则解除线程阻塞,否则循环重试 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 解除后继节点持有线程阻塞 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态为0,就尝试将节点状态置为-3 // 该状态表示等待其他线程请求同步阻塞方法,即{@code await()}方法 // 循环等待 continue; } // 如果头节点未被其他线程修改,表明头节点的后继节点持有的线程已经被解除阻塞 // 是则结束阻塞,不是则继续解除头节点后继节点的线程阻塞 if (h == head) break; } } /** * 节点的{@code waitStatus}标识的是后继节点的线程状态 * 解除线程阻塞,执行{@code LockSupport.unpark}方法 */ private void unparkSuccessor(Node node) { // 当前节点的等待状态 int ws = node.waitStatus; if (ws < 0) // 当前节点的等待状态置为0,原子操作,不保证成功 compareAndSetWaitStatus(node, ws, 0); // 获取该节点的后继节点 Node s = node.next; // 如果后继节点是空,或者后继节点的waitStatus大于0 // 则从队尾开始遍历,找到位于队列最前方的waitStatus小于等于0的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果当前节点的后继节点不为空,则解除后继节点线程的阻塞状态 if (s != null) LockSupport.unpark(s.thread); }

    await()方法

    该方法也是通过sync对象的同步队列和LockSupport的park()、unpark()实现阻塞。

    await()流程图

    await()源码解析
    /** * 与countDown()方法构成多线程同步操作 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 该方法在Syncd父类AbstractQueuedSynchronizer中实现 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 判断当前倒计数值是否为0 // 如果是,直接返回,不阻塞当前线程 // 如果不是,阻塞线程 if (tryAcquireShared(arg) < 0) // 阻塞线程 doAcquireSharedInterruptibly(arg); } /** * Sync实现判断当前倒计数值是否为0 */ protected int tryAcquireShared(int acquires) { // 判断当前倒计数值是否为0,如果0,返回1,否则返回-1 return (getState() == 0) ? 1 : -1; } /** * 该方法在父类AbstractQueuedSynchronizer中实现 */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 阻塞队列队尾新增共享节点,该节点持有当前线程 // 前驱节点的waitStatus标识了该节点线程的状态 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 循环获取当前节点的前驱节点 // 当阻塞状态结束后,判断是否该节点的前驱节点是否为头节点, // 以此来判断该节点是否需要解除阻塞 final Node p = node.predecessor(); // 如果当前节点是头节点,表明解除阻塞从该节点开始 if (p == head) { // 获取倒计数器值 int r = tryAcquireShared(arg); // 如果倒计数器值为0, if (r >= 0) { // 将该节点设为头节点,并释放该节点持有的前驱节点引用和线程 // 并将解除后继节点持有线程的阻塞状态 setHeadAndPropagate(node, r); // 释放引用,帮助GC p.next = null; // help GC failed = false; return; } } // 先将线程所属节点的状态置为阻塞状态(-1), // 成功后通过{@code LockSupport}阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // 如果被中断,取消阻塞 if (failed) cancelAcquire(node); } } /** * 新建节点,并入队 * 线程安全 */ private Node addWaiter(Node mode) { // 新建共享类型节点 Node node = new Node(Thread.currentThread(), mode); // 从队尾入队 Node pred = tail; // 如果队尾不为空,第一次尝试通过CAS将新节点从队尾入队 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 新节点添加进队列尾部,方法内循环执行,保证新节点入队成功 enq(node); return node; } /** * 新节点入队 */ private Node enq(final Node node) { // 循环重试,CAS原子操作,保证新节点进入到队列尾部, // 并记录新节点前驱 // 线程安全 for (;;) { Node t = tail; if (t == null) { // Must initialize // 如果队尾节点为null,则对队列进行初始化 if (compareAndSetHead(new Node())) tail = head; } else { // 节点入队 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 将线程的节点状态置为阻塞状态(-1) */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 如果当前节点的后继节点已经处于阻塞状态(-1)(此时后继节点持有的线程还可能未被阻塞),返回true if (ws == Node.SIGNAL) return true; if (ws > 0) { // 从后向前遍历找到状态为阻塞状态的前驱节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // CAS原子操作将当前节点状态置为阻塞状态(-1)(标识了后继节点的将要进行阻塞操作 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 阻塞当前线程 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } /** * 将该节点设置为头节点,并释放该节点持有的前驱节点引用和线程 * (该方法是在倒计数器为0时才会被调用,且该节点线程已经被接触阻塞) * 并调用{@code doReleaseShared}方法解除该节点后继节点持有的线程阻塞 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 将该节点设置为头节点,并释放该节点持有的前驱节点引用和线程 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** * 将该节点设置为头节点,并释放该节点持有的前驱节点引用和线程 */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * 该方法位于父类AbstractQueuedSynchronizer中 * 取消阻塞等待 */ private void cancelAcquire(Node node) { // 如果节点为null,忽略 if (node == null) return; node.thread = null; // 遍历到最前端的,waitStatus>0的前驱节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 记录前驱节点的后继节点 Node predNext = pred.next; // 将当前节点的waitStatus置为取消(1) node.waitStatus = Node.CANCELLED; // 如果节点是队尾节点,删除该节点 if (node == tail && compareAndSetTail(node, pred)) { // 更新前驱节点的next为null compareAndSetNext(pred, predNext, null); } else { // 如果是队尾节点,删除失败,或者不是队尾节点 // 那么判断前驱节点是否是头节点,waitStatus标识是否为阻塞(-1) //(如果不是,尝试更新为-1,更新成功则true,否则为false), // 前驱节点是否持有线程, // 如果是,且当前节点的后继节点不为空,并且后继节点waitStatus也为阻塞状态,则从队列中删除当前节点,将前驱节点的后继改为当前节点的后继 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果前驱节点不是头节点 // 解除当前节点的后继节点持有线程的阻塞状态 unparkSuccessor(node); } // 自闭引用,帮助GC node.next = node; // help GC } }
    Processed: 0.009, SQL: 9