CompletableFuture api详解

    技术2022-07-10  130

    花了两天时间学习CompletableFuture api,该类是JUC原子包中的类,通过单元测试代码把所有public api方法跑了一遍,大致了解了底层实现,初学乍练,有很多一知半解的地方,待后续有了深入理解再来补充

    package test.java.util.concurrent; import java.util.concurrent.*; import java.util.function.*; import org.junit.Test; import static java.lang.Thread.*; /** * CompletableFuture的测试类 * * @author zqw * @date 2020-06-29 21:33:34 */ public class CompletableFutureTest { /** *无参构造函数 * @Param * @author zqw */ @Test public void testConstruct0()throws Exception{ CompletableFuture testObj=new CompletableFuture(); System.out.println(testObj.toString()); } /** * supplier异步处理 * @Param * @author zqw */ @Test public void testSupplyAsync1()throws Exception{ Supplier supplier= ()->33*33333333+321321; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); System.out.println(future.get()); } /** *supplier异步处理,指定自己的线程池 * @Param * @author zqw */ @Test public void testSupplyAsync()throws Exception{ Supplier supplier= ()->33*3+1; Executor executor= ForkJoinPool.commonPool(); CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier,executor); System.out.println(future.get()); } /** *异步运行runnable实现,返回空的future,内部 * 使用线程池运行 * @Param * @author zqw */ @Test public void testRunAsync1()throws Exception{ Runnable runnable=() -> System.out.println(23123); CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(runnable); System.out.println(completableFuture.get()); } /** *异步运行runnable实现,返回空的future,使用指定线程池 * @Param * @author zqw */ @Test public void testRunAsync()throws Exception{ Executor executor=ForkJoinPool.commonPool(); Runnable runnable=() -> System.out.println(23123); CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(runnable,executor); System.out.println(completableFuture.get()); } /** * 返回使用给定value值的CompletableFuture * @Param * @author zqw */ @Test public void testCompletedFuture()throws Exception{ CompletableFuture<Long> future=CompletableFuture.completedFuture(3243L); System.out.println(future.get()); } /** * 判断是否完成,完成即返回结果,result有值 * @Param * @author zqw */ @Test public void testIsDone()throws Exception{ CompletableFuture<Long> future=CompletableFuture.completedFuture(3243L); System.out.println(future.isDone()); System.out.println(future.get()); System.out.println(future.isDone()); } /** *完成则返回值,未完成则阻塞等待 * @Param * @author zqw */ @Test public void testGet1()throws Exception{ CompletableFuture<Long> future=CompletableFuture.completedFuture(3243L); System.out.println(future.get()); } /** * 完成则返回值,未完成则阻塞等待参数时间 * @Param * @author zqw */ @Test public void testGet()throws Exception{ CompletableFuture<Long> future=CompletableFuture.completedFuture(3243L); System.out.println(future.get(1000, TimeUnit.SECONDS)); } /** * 温和处理方式,获取返回值,没有处理完成则阻塞,不会阻断 * @Param * @author zqw */ @Test public void testJoin()throws Exception{ CompletableFuture<Long> future=CompletableFuture.completedFuture(3243L); System.out.println(future.join()); } /** * 立即获取结果,没有处理完则将指定参数作为结果返回 * @Param * @author zqw */ @Test public void testGetNow()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); System.out.println(future.getNow(12)); } /** * 将给定参数作为结果完成异步处理 * @Param * @author zqw */ @Test public void testComplete()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); System.out.println(future.complete(12)); System.out.println(future.get()); } /** * 将指定异常作为处理结果返回 * @Param * @author zqw */ @Test public void testCompleteExceptionally()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); System.out.println(future.completeExceptionally(new Throwable("321321"))); System.out.println(future.get()); } /** *转换泛型中的类型,如下例子中将Integer转换成Object * @Param * @author zqw */ @Test public void testThenApply()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Function function=a -> 333*3; CompletableFuture<Object> future1=future.thenApply(function); System.out.println(future.get()); System.out.println(future1.get()); } /** *转换泛型中的类型,如下例子中将Integer转换成Object * 使用内部线程池异步处理 * @Param * @author zqw */ @Test public void testThenApplyAsync1()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Function function=a -> 333*3; CompletableFuture<Object> future1=future.thenApplyAsync(function); System.out.println(future.get()); System.out.println(future1.get()); } /** *转换泛型中的类型,如下例子中将Integer转换成Object * 使用指定线程池异步处理 * @Param * @author zqw */ @Test public void testThenApplyAsync()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Function function=a -> 333*3; CompletableFuture<Object> future1=future.thenApplyAsync(function,ForkJoinPool.commonPool()); System.out.println(future.get()); System.out.println(future1.get()); } /** *执行consumer 并得到新的空的future,旧的不受影响 * @Param * @author zqw */ @Test public void testThenAccept()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Consumer<Integer> consumer= a -> System.out.println(333); CompletableFuture<Void> future1=future.thenAccept(consumer); System.out.println(future.get()); System.out.println(future1.get()); } /** *异步执行consumer 并得到新的空的future,旧的不受影响,使用内部线程池 * @Param * @author zqw */ @Test public void testThenAcceptAsync1()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Consumer<Integer> consumer= a -> System.out.println(333); CompletableFuture<Void> future1=future.thenAcceptAsync(consumer); System.out.println(future.get()); System.out.println(future1.get()); } /** *异步执行consumer 并得到新的空的future,旧的不受影响,使用指定线程池 * @Param * @author zqw */ @Test public void testThenAcceptAsync()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Consumer<Integer> consumer= a -> System.out.println(333); CompletableFuture<Void> future1=future.thenAcceptAsync(consumer,ForkJoinPool.commonPool()); System.out.println(future.get()); System.out.println(future1.get()); } /** * 运行runnable,没有返回值 * @Param * @author zqw */ @Test public void testThenRun()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Runnable runnable=() -> System.out.println(33333); CompletableFuture<Void> future1=future.thenRun(runnable); System.out.println(future1.get()); } /** *异步运行runnable,没有返回值 * 使用默认线程池 * @Param * @author zqw */ @Test public void testThenRunAsync1()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Runnable runnable=() -> System.out.println(33333); CompletableFuture<Void> future1=future.thenRunAsync(runnable); System.out.println(future1.get()); } /** *异步运行runnable,没有返回值 * 使用指定线程池 * @Param * @author zqw */ @Test public void testThenRunAsync()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 2333; }; CompletableFuture<Integer> future=CompletableFuture.supplyAsync(supplier); Runnable runnable=() -> System.out.println(33333); CompletableFuture<Void> future1=future.thenRunAsync(runnable,ForkJoinPool.commonPool()); System.out.println(future1.get()); } /** * 将两个线程的执行结果合并,然后返回 * @Param * @author zqw */ @Test public void testThenCombine()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); Supplier supplier1=() ->{ try { TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); } return "qwqq"; }; BiFunction<String,String,String> biFunction=(String s,String s2) ->s+s2; CompletableFuture<String> future1=future.thenCombine(CompletableFuture.supplyAsync(supplier1),biFunction ); System.out.println(future1.get()); } /** *将两个线程的执行结果合并,然后返回 * 使用默认线程池 * @Param * @author zqw */ @Test public void testThenCombineAsync1()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); Supplier supplier1=() ->{ try { TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); } return "qwqq"; }; BiFunction<String,String,String> biFunction=(String s,String s2) ->s+s2; CompletableFuture<String> future1=future.thenCombineAsync(CompletableFuture.supplyAsync(supplier1),biFunction ); System.out.println(future1.get()); } /** *将两个线程的执行结果合并,然后返回 * 使用指定线程池 * @Param * @author zqw */ @Test public void testThenCombineAsync()throws Exception{ Supplier supplier=()-> { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); Supplier supplier1=() ->{ try { TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); } return "qwqq"; }; BiFunction<String,String,String> biFunction=(String s,String s2) ->s+s2; CompletableFuture<String> future1=future.thenCombineAsync(CompletableFuture.supplyAsync(supplier1),biFunction ,ForkJoinPool.commonPool()); System.out.println(future1.get()); } /** * 合并两次线程执行结果,并用consumer执行两次结果 * @Param * @author zqw */ @Test public void testThenAcceptBoth()throws Exception{ Supplier supplier=()-> "31321"; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); BiConsumer<String,String> biConsumer=(s,s2) -> System.out.println(s+s2); CompletableFuture<Void> future1=future.thenAcceptBoth(CompletableFuture.supplyAsync(supplier1),biConsumer); System.out.println(future1.get()); } /** *异步合并两次线程执行结果,并用consumer执行两次结果 * 使用默认线程池 * @Param * @author zqw */ @Test public void testThenAcceptBothAsync1()throws Exception{ Supplier supplier=()-> "31321"; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); BiConsumer<String,String> biConsumer=(s,s2) -> System.out.println(s+s2); CompletableFuture<Void> future1=future.thenAcceptBothAsync(CompletableFuture.supplyAsync(supplier1),biConsumer); System.out.println(future1.get()); } /** *异步合并两次线程执行结果,并用consumer执行两次结果 * 使用指定线程池 * @Param * @author zqw */ @Test public void testThenAcceptBothAsync()throws Exception{ Supplier supplier=()-> "31321"; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); BiConsumer<String,String> biConsumer=(s,s2) -> System.out.println(s+s2); CompletableFuture<Void> future1=future.thenAcceptBothAsync(CompletableFuture.supplyAsync(supplier1),biConsumer,ForkJoinPool.commonPool()); System.out.println(future1.get()); } /** * future和future2 执行完之后执行runnable2,future和future2顺序不定 * @Param * @author zqw */ @Test public void testRunAfterBoth()throws Exception{ Runnable runnable=()-> System.out.println(1); Runnable runnable1=()-> System.out.println(2); CompletableFuture<Void> future=CompletableFuture.runAsync(runnable); CompletableFuture<Void> future2=CompletableFuture.runAsync(runnable1); Runnable runnable2=()-> System.out.println(3); CompletableFuture<Void> future1=future2.runAfterBoth(future,runnable2); } /** *future和future2 执行完之后执行runnable2,future和future2顺序不定 * 使用默认线程池 * @Param * @author zqw */ @Test public void testRunAfterBothAsync1()throws Exception{ Runnable runnable=()-> System.out.println(1); Runnable runnable1=()-> System.out.println(2); CompletableFuture<Void> future=CompletableFuture.runAsync(runnable); CompletableFuture<Void> future2=CompletableFuture.runAsync(runnable1); Runnable runnable2=()-> System.out.println(3); CompletableFuture<Void> future1=future2.runAfterBothAsync(future,runnable2); } /** *future和future2 执行完之后执行runnable2,future和future2顺序不定 * 使用指定线程池 * @Param * @author zqw */ @Test public void testRunAfterBothAsync()throws Exception{ Runnable runnable=()-> System.out.println(1); Runnable runnable1=()-> System.out.println(2); CompletableFuture<Void> future=CompletableFuture.runAsync(runnable); CompletableFuture<Void> future2=CompletableFuture.runAsync(runnable1); Runnable runnable2=()-> System.out.println(3); CompletableFuture<Void> future1=future2.runAfterBothAsync(future,runnable2,ForkJoinPool.commonPool()); } /** * 将future和future1最快返回的结果作为接下来线程执行的输入 * @Param * @author zqw */ @Test public void testApplyToEither()throws Exception{ Supplier supplier=()-> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); CompletableFuture<String> future1= CompletableFuture.supplyAsync(supplier1); Function<String,String> function=(s) -> s+"qw"; CompletableFuture<String> future2=future.applyToEither(future1,function); System.out.println(future2.get()); } /** *将future和future1最快返回的结果作为接下来线程执行的输入 * 使用默认线程池 * @Param * @author zqw */ @Test public void testApplyToEitherAsync1()throws Exception{ Supplier supplier=()-> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); CompletableFuture<String> future1= CompletableFuture.supplyAsync(supplier1); Function<String,String> function=(s) -> s+"qw"; CompletableFuture<String> future2=future.applyToEitherAsync(future1,function); System.out.println(future2.get()); } /** *将future和future1最快返回的结果作为接下来线程执行的输入 *使用指定线程池 * @Param * @author zqw */ @Test public void testApplyToEitherAsync()throws Exception{ Supplier supplier=()-> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); CompletableFuture<String> future1= CompletableFuture.supplyAsync(supplier1); Function<String,String> function=(s) -> s+"qw"; CompletableFuture<String> future2=future.applyToEitherAsync(future1,function,ForkJoinPool.commonPool()); System.out.println(future2.get()); } /** *将future和future1最快返回的结果作为接下来线程执行的输入 * @Param * @author zqw */ @Test public void testAcceptEither()throws Exception{ Supplier supplier=()-> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); CompletableFuture<String> future1= CompletableFuture.supplyAsync(supplier1); Consumer<String> consumer=(s) -> System.out.println(s+"qw"); CompletableFuture<Void> future2=future.acceptEither(future1,consumer); System.out.println(future2.get()); } /** *将future和future1最快异步执行返回的结果作为接下来线程执行的输入 * 使用默认线程池 * @Param * @author zqw */ @Test public void testAcceptEitherAsync1()throws Exception{ Supplier supplier=()-> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); CompletableFuture<String> future1= CompletableFuture.supplyAsync(supplier1); Consumer<String> consumer=(s) -> System.out.println(s+"qw"); CompletableFuture<Void> future2=future.acceptEitherAsync(future1,consumer); System.out.println(future2.get()); } /** *将future和future1最快异步执行返回的结果作为接下来线程执行的输入 * 使用指定线程池 * @Param * @author zqw */ @Test public void testAcceptEitherAsync()throws Exception{ Supplier supplier=()-> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "31321"; }; Supplier supplier1=() -> "qwqq"; CompletableFuture<String> future=CompletableFuture.supplyAsync(supplier); CompletableFuture<String> future1= CompletableFuture.supplyAsync(supplier1); Consumer<String> consumer=(s) -> System.out.println(s+"qw"); CompletableFuture<Void> future2=future.acceptEitherAsync(future1,consumer,ForkJoinPool.commonPool()); System.out.println(future2.get()); } /** *runnable1和runnable2有一个执行完,执行runnable3 * @Param * @author zqw */ @Test public void testRunAfterEither()throws Exception{ for (int i = 0; i < 500; i++) { Runnable runnable1=()-> System.out.println("run1"); Runnable runnable2=()-> System.out.println("run2"); Runnable runnable3=()-> System.out.println("run3"); CompletableFuture<Void> future=CompletableFuture.runAsync(runnable1); CompletableFuture<Void> future1= CompletableFuture.runAsync(runnable2); future.runAfterEither(future1,runnable3); Thread.sleep(500); System.out.println("=============================="); } } /**异步runnable1和runnable2有一个执行完,执行runnable3 * 使用默认线程池 * * @Param * @author zqw */ @Test public void testRunAfterEitherAsync1()throws Exception{ for (int i = 0; i < 500; i++) { Runnable runnable1=()-> System.out.println("run1"); Runnable runnable2=()-> System.out.println("run2"); Runnable runnable3=()-> System.out.println("run3"); CompletableFuture<Void> future=CompletableFuture.runAsync(runnable1); CompletableFuture<Void> future1= CompletableFuture.runAsync(runnable2); future.runAfterEitherAsync(future1,runnable3); Thread.sleep(500); System.out.println("=============================="); } } /** *runnable1和runnable2有一个执行完,执行runnable3 * 使用指定线程池 * @Param * @author zqw */ @Test public void testRunAfterEitherAsync()throws Exception{ for (int i = 0; i < 500; i++) { Runnable runnable1=()-> System.out.println("run1"); Runnable runnable2=()-> System.out.println("run2"); Runnable runnable3=()-> System.out.println("run3"); CompletableFuture<Void> future=CompletableFuture.runAsync(runnable1); CompletableFuture<Void> future1= CompletableFuture.runAsync(runnable2); future.runAfterEitherAsync(future1,runnable3,ForkJoinPool.commonPool()); Thread.sleep(500); System.out.println("=============================="); } } /** * 将第一个future执行结果和第二个在第二个future中计算 * @Param * @author zqw */ @Test public void testThenCompose()throws Exception{ CompletableFuture<String> future=CompletableFuture.supplyAsync(()-> "pp1").thenCompose(s-> CompletableFuture.supplyAsync(()->s+"pp2")); System.out.println(future.get()); } /** *将第一个future执行结果和第二个在第二个future中计算(异步执行) * 使用默认线程池 * @Param * @author zqw */ @Test public void testThenComposeAsync1()throws Exception{ CompletableFuture<String> future=CompletableFuture.supplyAsync(()-> "pp1").thenComposeAsync(s-> CompletableFuture.supplyAsync(()->s+"pp2")); System.out.println(future.get()); } /** *将第一个future执行结果和第二个在第二个future中计算(异步执行) * 使用指定线程池 * @Param * @author zqw */ @Test public void testThenComposeAsync()throws Exception{ CompletableFuture<String> future=CompletableFuture.supplyAsync(()-> "pp1").thenComposeAsync(s-> CompletableFuture.supplyAsync(()->s+"pp2"),ForkJoinPool.commonPool()); System.out.println(future.get()); } /** * 使用线程池执行future,然后使用主线程执行future2, * @Param * @author zqw */ @Test public void testWhenComplete()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); //主线程 System.out.println(Thread.currentThread().getName()); CompletableFuture<String> future2=future.whenComplete((s,throwable)-> { System.out.println(s); System.out.println(throwable); System.out.println("2 "+Thread.currentThread().getName()); }); future2.join(); System.out.println(future2.get()); } /** *使用线程池执行future,然后使用主线程执行future2,使用默认线程池 * @Param * @author zqw */ @Test public void testWhenCompleteAsync1()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); //主线程 System.out.println(Thread.currentThread().getName()); CompletableFuture<String> future2=future.whenCompleteAsync((s,throwable)-> { System.out.println(s); System.out.println(throwable); System.out.println("2 "+Thread.currentThread().getName()); }); future2.join(); System.out.println(future2.get()); } /** *使用线程池执行future,然后使用主线程执行future2,使用指定线程池 * @Param * @author zqw */ @Test public void testWhenCompleteAsync()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); //主线程 System.out.println(Thread.currentThread().getName()); CompletableFuture<String> future2=future.whenCompleteAsync((s,throwable)-> { System.out.println(s); System.out.println(throwable); System.out.println("2 "+Thread.currentThread().getName()); },ForkJoinPool.commonPool()); future2.join(); System.out.println(future2.get()); } /** * 使用ForkJoinPool执行future,将future执行结果作为下一个线程输入即主线程执行future2, * @Param * @author zqw */ @Test public void testHandle()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); CompletableFuture<String> future2=future.handle((s,s1)-> { System.out.println(s); System.out.println(s1); System.out.println("2 "+Thread.currentThread().getName()); return "2"; }); future2.join(); System.out.println(future2.get()); } /** *使用ForkJoinPool执行future,将future执行结果作为下一个线程输入即主线程执行future2(异步执行) * 使用默认线程池 * @Param * @author zqw */ @Test public void testHandleAsync1()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); CompletableFuture<String> future2=future.handleAsync((s,s1)-> { System.out.println(s); System.out.println(s1); System.out.println("2 "+Thread.currentThread().getName()); return "2"; }); future2.join(); System.out.println(future2.get()); } /** *使用ForkJoinPool执行future,将future执行结果作为下一个线程输入即主线程执行future2(异步执行) * 使用指定线程池 * @Param * @author zqw */ @Test public void testHandleAsync()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); CompletableFuture<String> future2=future.handleAsync((s,s1)-> { System.out.println(s); System.out.println(s1); System.out.println("2 "+Thread.currentThread().getName()); return "2"; },ForkJoinPool.commonPool()); future2.join(); System.out.println(future2.get()); } /** * 返回当前future的引用 * @Param * @author zqw */ @Test public void testToCompletableFuture()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); return "1"; } ); CompletableFuture<String> future2=future.toCompletableFuture(); System.out.println(future.get()); System.out.println(future2.get()); } /** *future执行过程中遇到异常转向exceptionally代码块 * @Param * @author zqw */ @Test public void testExceptionally()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("1 "+Thread.currentThread().getName()); if (true){ throw new NullPointerException("333"); } return "1"; } ); CompletableFuture<String> future2=future.exceptionally((s )-> s+"222"); System.out.println(future2.get()); } /** * 所有future执行完返回,否则阻塞 * @Param * @author zqw */ @Test public void testAllOf()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(1); return "1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println(2); return "2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println(3); return "3"; }); CompletableFuture.allOf(future,future2,future3).get(); } /** *有一个future执行完立即返回 * @Param * @author zqw */ @Test public void testAnyOf()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(1); return "1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(2); return "2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println(3); return "3"; }); CompletableFuture.anyOf(future,future2,future3).get(); } /** *设成false话,不允许在线程运行时中断,设成true的话就允许 * 取消线程执行 * @Param * @author zqw */ @Test public void testCancel()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(1); return "1"; }); future.cancel(false); } /** * 判断线程执行是否被取消 * @Param * @author zqw */ @Test public void testIsCancelled()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(1); return "1"; }); future.cancel(false); System.out.println(future.isCancelled()); } /** * 判断线程是否被异常中断,走了异常处理流程 * @Param * @author zqw */ @Test public void testIsCompletedExceptionally()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true){ throw new NullPointerException("222"); } System.out.println(1); return "1"; }); System.out.println(future.exceptionally((s1) -> s1 + "23123").get()); System.out.println(future.isCompletedExceptionally()); } /** * 将线程执行结果设置为指定值 * @Param * @author zqw */ @Test public void testObtrudeValue()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(1); return "1"; }); System.out.println(future.get()); future.obtrudeValue("123"); System.out.println(future.get()); } /** * 将线程执行结果转换成指定异常 * @Param * @author zqw */ @Test public void testObtrudeException()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(1); return "1"; }); System.out.println(future.get()); future.obtrudeException(new Throwable("333")); System.out.println(future.get()); } /** *全名@地址[完成状态] * @Param * @author zqw */ @Test public void testToString()throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(1); return "1"; }); System.out.println(future.toString()); } }

     

    Processed: 0.012, SQL: 9