【高并发】11 AQS代码案例

    技术2022-07-12  64

    一、CountDownLatch

    1、代码一

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author haoxiansheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { testCountDownLatch(threadNum); } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { countDownLatch.countDown(); // -1 } }); } countDownLatch.await(); log.info("=========finish"); service.shutdown(); } private static void testCountDownLatch(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("threadNum=====>{}", threadNum); Thread.sleep(100); } }

    2、代码二

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author haoxiansheng */ @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { testCountDownLatchTimeOut(threadNum); } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { countDownLatch.countDown(); // -1 } }); } countDownLatch.await(10, TimeUnit.MICROSECONDS); log.info("=========finish"); service.shutdown(); } private static void testCountDownLatchTimeOut(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("threadNum=====>{}", threadNum); Thread.sleep(100); } }

    二、CyclicBarrier

    1、代码一

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author haoxiansheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5, ()->{ log.info("优先执行我 callback is running"); }); public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { testCyclicBarrier(threadNum); } catch (Exception e) { log.error("exception=>{}", e); } }); } executor.shutdown(); } private static void testCyclicBarrier(int threadNum) throws InterruptedException, BrokenBarrierException { Thread.sleep(1000); log.info("thread-{} say: I am is ready", threadNum); barrier.await(); log.info("thread-{} say: continue", threadNum); } }

    2、代码二

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { testCyclicBarrierAwait(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void testCyclicBarrierAwait(int threadNum) throws InterruptedException, BrokenBarrierException, TimeoutException { Thread.sleep(1000); log.info("thread-{} say: I am is ready", threadNum); try{ barrier.await(2000, TimeUnit.MILLISECONDS); } catch (BrokenBarrierException | TimeoutException e) { log.warn("e", e); } log.info("thread-{} say: continue", threadNum); } }

    三、Semaphore

    1、代码一

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @author haoxiansheng * 单个许可 */ @Slf4j public class SemaphoreExample1 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(5); // 一共有多少线程同步等待 for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { semaphore.acquire(); // 拿到许可 testSemaphore(threadNum); semaphore.release(); // 释放许可 } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { } }); } log.info("=========finish"); service.shutdown(); } private static void testSemaphore(int threadNum) throws InterruptedException { log.info("threadNum=====>{}", threadNum); Thread.sleep(1000); } }

    2、代码二

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng * 多个许可 */ @Slf4j public class SemaphoreExample2 { private final static int threadCount = 20; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(5); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(()-> { try { semaphore.acquire(3); // 拿到许可 testSemaphore(threadNum); semaphore.release(1); // 释放许可 log.info("release=====2"); semaphore.release(2); } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { } }); } log.info("=========finish"); service.shutdown(); } private static void testSemaphore(int threadNum) throws InterruptedException { log.info("threadNum=====>{}", threadNum); Thread.sleep(1000); } }

    3、代码三

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @author haoxiansheng * 单个许可 */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 30; public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(5); for (int i = 0; i < threadCount; i++) { final int threadNum = i; service.execute(() -> { try { //if (semaphore.tryAcquire()) { // 尝试拿到许可 if (semaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) { // 尝试拿到许可 testSemaphore(threadNum); semaphore.release(); // 释放许可 } } catch (InterruptedException e) { log.error("Exception=>{}", e.getMessage()); } finally { } }); } log.info("=========finish"); service.shutdown(); } private static void testSemaphore(int threadNum) throws InterruptedException { log.info("threadNum=====>{}", threadNum); Thread.sleep(1000); } }

    四、future

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j public class FutureExample { static class MyCallable implements Callable<String>{ @Override public String call() throws Exception { log.info("do something......"); Thread.sleep(5000); return "ok"; } } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newCachedThreadPool(); Future<String> future = executor.submit(new MyCallable()); log.info("do other something......"); Thread.sleep(10000); String result = future.get(); log.info("future=>{}", result); executor.shutdown(); } }

    五、FutureTask

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * @author haoxiansheng */ @Slf4j public class FutureTaskExample { public static void main(String[] args) throws InterruptedException, ExecutionException { // FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { // @Override // public String call() throws Exception { // log.info("do something......"); // Thread.sleep(5000); // return "ok"; // } // }); FutureTask<String> futureTask = new FutureTask<>(() -> { log.info("do something......"); Thread.sleep(5000); return "ok"; }); new Thread(futureTask).start(); log.info("do other something"); Thread.sleep(1000); String result = futureTask.get(); log.info("future=>{}", result); } }

    六、ForkJoinTask

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * @author haoxiansheng * 递归拆分任务 */ @Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshhold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 如果任务足够小就计算任务 boolean canCompute = (end - start) <= threshhold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阀值,就分裂成两个自任务计算 int middle = (start + end)/2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle+1, end); leftTask.fork(); rightTask.fork(); // 等待任务结束合并结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生成一个计算任务 计算 1+2+3+4 ForkJoinTaskExample taskExample = new ForkJoinTaskExample(1, 100); // 执行一个任务 Future<Integer> result = forkJoinPool.submit(taskExample); try{ log.info("result:{}",result.get()); } catch (Exception e) { log.error("exception", e); } } }
    Processed: 0.024, SQL: 9