1、代码一
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author haoxiansheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { testCountDownLatch(threadNum); } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { countDownLatch.countDown(); // -1 } }); } countDownLatch.await(); log.info("=========finish"); service.shutdown(); } private static void testCountDownLatch(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("threadNum=====>{}", threadNum); Thread.sleep(100); } }2、代码二
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author haoxiansheng */ @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { testCountDownLatchTimeOut(threadNum); } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { countDownLatch.countDown(); // -1 } }); } countDownLatch.await(10, TimeUnit.MICROSECONDS); log.info("=========finish"); service.shutdown(); } private static void testCountDownLatchTimeOut(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("threadNum=====>{}", threadNum); Thread.sleep(100); } }1、代码一
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author haoxiansheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5, ()->{ log.info("优先执行我 callback is running"); }); public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { testCyclicBarrier(threadNum); } catch (Exception e) { log.error("exception=>{}", e); } }); } executor.shutdown(); } private static void testCyclicBarrier(int threadNum) throws InterruptedException, BrokenBarrierException { Thread.sleep(1000); log.info("thread-{} say: I am is ready", threadNum); barrier.await(); log.info("thread-{} say: continue", threadNum); } }2、代码二
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { testCyclicBarrierAwait(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void testCyclicBarrierAwait(int threadNum) throws InterruptedException, BrokenBarrierException, TimeoutException { Thread.sleep(1000); log.info("thread-{} say: I am is ready", threadNum); try{ barrier.await(2000, TimeUnit.MILLISECONDS); } catch (BrokenBarrierException | TimeoutException e) { log.warn("e", e); } log.info("thread-{} say: continue", threadNum); } }1、代码一
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @author haoxiansheng * 单个许可 */ @Slf4j public class SemaphoreExample1 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(5); // 一共有多少线程同步等待 for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { semaphore.acquire(); // 拿到许可 testSemaphore(threadNum); semaphore.release(); // 释放许可 } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { } }); } log.info("=========finish"); service.shutdown(); } private static void testSemaphore(int threadNum) throws InterruptedException { log.info("threadNum=====>{}", threadNum); Thread.sleep(1000); } }2、代码二
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng * 多个许可 */ @Slf4j public class SemaphoreExample2 { private final static int threadCount = 20; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(5); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { semaphore.acquire(3); // 拿到许可 testSemaphore(threadNum); semaphore.release(1); // 释放许可 log.info("release=====2"); semaphore.release(2); } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { } }); } log.info("=========finish"); service.shutdown(); } private static void testSemaphore(int threadNum) throws InterruptedException { log.info("threadNum=====>{}", threadNum); Thread.sleep(1000); } }3、代码三
package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @author haoxiansheng * 单个许可 */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 30; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(5); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(() -> { try { //if (semaphore.tryAcquire()) { // 尝试拿到许可 if (semaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) { // 尝试拿到许可 testSemaphore(threadNum); semaphore.release(); // 释放许可 } } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { } }); } log.info("=========finish"); service.shutdown(); } private static void testSemaphore(int threadNum) throws InterruptedException { log.info("threadNum=====>{}", threadNum); Thread.sleep(1000); } }