Java组合式异步编程---CompletableFuture接口

    技术2025-05-25  40

    1、Future接口

    Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不需要等待耗时的操作完成。

    示例:使用Future以异步的方式执行一个耗时的操作

    ExecutorService executor = Executors.newCachedThreadPool(); Future<Double> future = executor.submit(new Callable<Double>() { //向ExecutorService提交一个Callable对象 public Double call() { return doSomeLongComputation();//以异步方式在新线程中执行耗时的操作 } }); doSomethingElse(); // 伪码 try { Double result = future.get(1, TimeUnit.SECONDS);//获取异步操作结果,如果被阻塞,无法得到结果,在等待1秒钟后退出 } catch (ExecutionException ee) { // 计算抛出一个异常 } catch (InterruptedException ie) { // 当前线程在等待过程中被中断 } catch (TimeoutException te) { // 在Future对象完成之前超时 }

    这种编程方式让你的线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他任务。如果已经运行到没有异步操作的结果就无法继续进行时,可以调用它的get方法去获取操作结果。如果操作已经完成,该方法会立刻返回操作结果,否则它会阻塞线程,直到操作完成,返回相应的结果。 为了处理长时间运行的操作永远不返回的可能性,虽然Future提供了一个无需任何参数的get方法,但还是推荐使用重载版本的get方法,它接受一个超时的参数,可以定义线程等待Future结果的时间,而不是永无止境地等待下去。

    Future接口的局限性 Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。但这些特性还不足以让你编写简洁的并发代码。

    将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。等待Future集合中的所有任务都完成。仅等待Future集合中快结束的任务完成,并返回它的结果。通过编程方式完成一个Future任务的执行。应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步操作,不只是简单地阻塞等待操作结果)。

    如上参考博文:https://www.jianshu.com/p/11327ad1d645

    于是,就有了CompletableFuture,它是一个具体的类,实现了两个接口,一个是Future,另一个是CompletionStage,Future表示异步任务的结果,而CompletionStage字面意思是完成阶段,多个CompletionStage可以以流水线的方式组合起来,对于其中一个CompletionStage,它有一个计算任务,但可能需要等待其他一个或多个阶段完成才能开始,它完成后,可能会触发其他阶段开始运行。CompletionStage提供了大量方法,使用它们,可以方便地响应任务事件,构建任务流水线,实现组合式异步编程。

    在Java 8中, 新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

    示例:

    import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CompletableFutureMain { public static void main(String[] args) { System.out.println("main class,即将执行异步线程 getPriceAsync()"); CompletableFuture<Double> futurePrice = getPriceAsync(); //do anything you want, 当前线程不被阻塞 System.out.println("main class,do anything you want, 当前线程不被阻塞"); //线程任务完成的话,执行回调函数,不阻塞后续操作:whenComplete是异步的 // futurePrice.whenComplete((aDouble, throwable) -> { // System.out.println("main 线程获取异步线程的返回,返回值为="+aDouble); // //do something else // }); try { // 轮询判断 //if(futurePrice.isDone()){ //while(futurePrice.isDone()){ Double aDouble = futurePrice.get(5, TimeUnit.SECONDS); // 阻塞获取异步线程结果 System.out.println("main 线程获取异步线程的返回,返回值为="+aDouble); //} } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }catch (Exception e) { e.printStackTrace(); } System.out.println("main线程写在最后面的代码,但不一定是最后执行的。"); } static CompletableFuture<Double> getPriceAsync() { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { System.out.println("异步线程getPriceAsync()执行 start,线程睡眠3秒 -------------"); Thread.sleep(3000); System.out.println("异步线程getPriceAsync()执行,线程睡眠3秒结束 -------------"); Double dou = 666.66d; System.out.println("异步线程getPriceAsync()执行 end,结果="+ dou +" -------------"); futurePrice.complete(dou); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { // 将异常包装返回 futurePrice.completeExceptionally(e); } }).start(); return futurePrice; } }

    执行结果:

    关于CompletableFuture的错误处理:使用get(long timeout)并将CompletableFuture内发生问题的异常抛出

    如上参考博文:https://www.jianshu.com/p/80633d4f70e7   

    Processed: 0.011, SQL: 9