JUC 并发工具类总结

    技术2026-04-23  15

    文章目录

    简介1. CountDownLatch2. CyclicBarrier2.1 await方法源码 3. Semaphore4. Exchanger5 LockSupport

    简介

    JUC封装了许多并发工具类方便的去对并发线程间的同步访问控制,减少代码的重复开发.

    1. CountDownLatch

    通过给CountDownLatch设置一个初始值, 然后阻塞程序运行,当计数器初始值变为0后程序才可以继续执行

    伪代码案例

    将计算年收入这个大任务分成3个小任务并发执行.3个小任务分别是计算1-4月收入, 计算5-9月收入,计算10-12月收入等到3个小任务都执行完后, 才去根据3个计算任务的结果汇总年收入 CountDownLatch latch = new CountDownLatch(3); //计数器值为3 // 给每个任务创建一个线程并发执行 //线程1执行操作A { do something .. 计算1-4月收入 latch.countDown(); //计数器值减 1 } //线程2执行操作B { do something ...计算5-9月收入 latch.countDown(); //计数器值减 1 } //线程3执行操作C { do something ...计算10-12月收入 latch.countDown(); //计数器值减 1 } //阻塞当前线程,直到latch的值为0才继续向下执行 latch.await(); //对年收入进行汇总 计算年收入 = 1-4月收入 + 5-9月收入 + 10-12月收入 ....

    2. CyclicBarrier

    循环屏障, 使用起来有点像可循环的循环计数器(计算线程等待的数量)

    给循环屏障,CyclicBarrier设置一个等待线程的数量A(屏障点) ,然后各个线程互相去等待, 直到等待的线程的数量达到A(屏障点)时这些所有等待的线程才会一起继续往下执行.如下图

    CyclicBarrier内部通过一个boolean变量维护CyclicBarrier是否破损(“坏掉”), 当屏障出现破损时所有功能失效,并清空唤醒在此屏障锁上等待的所有线程

    屏障破损情况:

    CyclicBarrier初始化时,broken=false,表示屏障未破损。如果正在等待的线程被中断,则broken=true,表示屏障破损。如果正在等待的线程超时,则broken=true,表示屏障破损。如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。

    伪代码案例

    有100个人参加活动,每个人等待组成队伍, 每3个人可以组成一队. 就可以开始参与活动. 然后不断循环. //创建CyclicBarrier 屏障 ,每当等待线程的数量达到3后就会执行这个线程回调方法 CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { public void run() { System.out.println("3名人员组队成功, 开始参与活动"); } }); //创建10个线程并发去抢占活动名额 for (int i = 0; i < 100; i++) { new Thread(()->{ Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + "-到达,等待组队完成"); // 当Thread-3到来时,由于是vip之前等待的线程不算要重新开始 if(Thread.currentThread().getName().equals("Thread-3")) { //将屏障设置为初始状态,清空在屏障锁等待的线程.这样barrier就可以重复使用 barrier.reset(); } try { // 线程等待, CyclicBarrier还需达到屏障点的等待线程数-1 //(当屏障状态破损,此方法不生效)当前线程进入此屏障锁的等待队列 //当等待超过1分钟都没到达屏障点时置屏障为破损状态 barrier.await(1, TimeUnit.SECONDS); } catch (Exception e) { } // 参加完活动后,程序继续往下执行 3人组队参与活动结束各回各家各找爸妈.... }).start(); } // 每隔0.5秒统计等待组成人员个数 ----监控等待线程数 new Thread(()->{ while(true) { Thread.sleep(500); //获取在循环屏障的等待线程个数 System.out.println("等待的线程数 " + barrier.getNumberWaiting()); //屏障是否破损,为true就是破损,需reset重置才能循环使用 System.out.println("is broken " + barrier.isBroken()); } }).start();

    CyclicBarrier 与 CountDownLatch 区别

    CountDownLatch 是一次性的,CyclicBarrier提供reset功能 是可以可循环利用的CountDownLatch是一个线程B等到其他N个线程都完成了,线程B才继续往下执行.而CyclicBarrier是N个线程都到达同一个水平线才一起开始起跑

    2.1 await方法源码

    就是在真正阻塞前,判断是否到达屏障点是就不用阻塞,然后判断是否破损,判断是否需要打破屏障,再去真正的阻塞. 所以才说当屏障破损时是不可以去继续使用的. private int dowait(boolean timed, long nanos){ final ReentrantLock lock = this.lock; lock.lock(); final Generation g = generation; // 屏障破损直接抛出异常 if (g.broken) throw new BrokenBarrierException(); // 线程被中断 if (Thread.interrupted()) { breakBarrier(); // 打破屏障 throw new InterruptedException(); } int index = --count;// 屏障点-1 // 触发屏障点 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); //执行设置的回调任务 ranAction = true; nextGeneration(); // 循环初始化下一次屏障功能 return 0; } finally { if (!ranAction) breakBarrier(); } } // 未达到屏障点 for (;;) { try { // 非超时等待直接在条件锁Condition上等待即可 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } // 如果线程被中断 catch (InterruptedException ie) { // 当屏障未破损时 if (g == generation && ! g.broken) { breakBarrier(); //打破屏障 throw ie; } else { // 重新设置中断标志位 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } lock.unlock(); } //打破屏障: 恢复为非破损状态,并唤醒在此CyclicBarrier锁上等待队列的线程 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } // 重置CyclicBarrier,开启下一轮循环 private void nextGeneration() { // 唤醒在此CyclicBarrier锁上等待队列的线程n trip.signalAll(); count = parties; generation = new Generation(); }

    3. Semaphore

    信号量控制访问同步资源的线程数量 // Semaphore s = new Semaphore(10); public void toExecute(){ semaphore.acquire(); ... do something semaphore.release(); } //最多有10个线程能同时执行 toExecute()方法 for(;;){ new Thread(()->{ toExecute(); }).start }

    4. Exchanger

    线程间的数据交换工具,用于线程间通信,数据交换用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据

    结合上图和下面伪代码, 加入当线程A先执行到exch.exchange(“data”); 则线程A会往 Object1空间里填充数据“data”,然后线程A阻塞. 不久后线程B执行到了exch.exchange(“bbq”)于是线程B把数据“bbq填充进Object2空间然后线程B也进入阻塞, Exchanger此时发现Object1空间和Object2空间的数据均不为空,于是 Exchanger会发Object1空间和Object2空间间的数据进行交换, 此时Object1空间的数据为“bbq”,Object2空间的数据为“data”, 交换完成后线程A从阻塞态中返回,并把Obejct1的数据“bbq”返回给res1接收, 线程B也从阻塞态中返回,并把Obejct2的数据“data”返回给res2接收

    伪代码:

    //交换String类型数据 Exchanger<String> exch = new Exchanger<>(); //线程A { String res1 = exch.exchange(“data”); //在交换前会阻塞在此 } //线程B里 { String res2 = exch.exchange(“bbq”); //在交换前会阻塞在此 } //两个线程同时执行,当两个线程都存储了数据后,会把数据交换并返回 这时res1结果为bbq, res2结果为data

    5 LockSupport

    线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,当然阻塞之后肯定得有唤醒的方法。功能上类似 等待/通知 机制 class TheradA { public void run(){ // do something .... LoackSupport.park(); // 阻塞当前线程 } } // ThreadA a = new TheradA(); a.start(); LockSupport.unpark(a); // 唤醒线程a

    和 wait/notify机制区别

    wait和notify都是Object中的方法, wait和notify方法要在同步块synchronized中才能调用. 并且在调用这两个方法前必须先获得锁对象.而park不需要获取某个对象的锁就可以锁住线程。notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。
    Processed: 0.009, SQL: 9