Java8 CompletableFuture 用法总结及Stream并行流对比

    技术2025-08-10  9

     

    Shop计算相关类 package com.future; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private String name ; public Shop(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice(String product) { delay(); Random random = new Random(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } /** * 在另一个线程中计算,并设置Future的返回值,为了不等待 * @param product * @return */ public Future<Double> getPriceAsync(String product){ CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(()->{ try { double price = getPrice(product); futurePrice.complete(price); } catch (Exception e) { futurePrice.completeExceptionally(e); } }).start(); return futurePrice; } public Future<Double> getPriceSupplyAsync(String product){ return CompletableFuture.supplyAsync(() -> getPrice(product)); } public static void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } public String getPriceString(String product) { double price = calculatePrice(product); Random random = new Random(); Discount.Code code = Discount.Code.values()[ random.nextInt(Discount.Code.values().length)]; return String.format("%s:%.2f:%s", name, price, code); } private double calculatePrice(String product) { delay(); Random random = new Random(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } public static void randomDelay() { Random random = new Random(); int delay = 500 + random.nextInt(2000); try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); } } } Discount 折扣类 package com.future; public class Discount { public enum Code { NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20); private final int percentage; Code(int percentage) { this.percentage = percentage; } } public static String applyDiscount(Quote quote) { return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode()); } private static double apply(double price, Code code) { delay(); return (price * (100 - code.percentage) / 100); } // Discount类的具体实现这里暂且不表示,参见代码清单11-14 public static void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } } Quote关系类 package com.future; public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName, double price, Discount.Code code) { this.shopName = shopName; this.price = price; this.discountCode = code; } public static Quote parse(String s) { String[] split = s.split(":"); String shopName = split[0]; double price = Double.parseDouble(split[1]); Discount.Code discountCode = Discount.Code.valueOf(split[2]); return new Quote(shopName, price, discountCode); } public String getShopName() { return shopName; } public double getPrice() { return price; } public Discount.Code getDiscountCode() { return discountCode; } } FutureTest测试类 import com.future.Discount; import com.future.Quote; import com.future.Shop; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.Stream; public class FutureTest { private List<Shop> shops = Arrays.asList(new Shop("AAAAA"), new Shop("BBBB"), new Shop("CCCC"), new Shop("DDDD"), new Shop("EEEE")); private Shop shop = new Shop("BestShop"); private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); /** * 1s+ 主线程优先于getPriceAsync方法打印,因为采用了子线程计算,但是get等待了子线程返回值 */ @Test public void test1() { long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); long iTime = ((System.nanoTime() - start) / 1000000); System.out.println("return after " + iTime + " mesecs"); //doSomethingElse(); Double price = null; try { price = futurePrice.get(); System.out.println(" Price is " + price); } catch (Exception e) { e.printStackTrace(); } long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } /** * stream顺序查询,4S+ */ @Test public void test2() { long start = System.nanoTime(); findPrices(shops); long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } public List<String> findPrices(List<Shop> shops) { return shops.stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(shop.getName()))) .collect(Collectors.toList()); } /** * stream并行流查询,2S+ */ @Test public void test3() { long start = System.nanoTime(); findParallePrices(shops); long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } public List<String> findParallePrices(List<Shop> shops) { return shops.parallelStream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(shop.getName()))) .collect(Collectors.toList()); } /** * CompletableFuture并行查询, 2S+ */ @Test public void test4() { long start = System.nanoTime(); findPrices("TTTTTT"); long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } public List<String> findPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } /** * CompletableFuture + executor 解决默认后台线程启动数量 * public ForkJoinPool() { * this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), * defaultForkJoinWorkerThreadFactory, null, false); * } * 默认线程数取决于 Runtime.getRuntime().availableProcessors()) 的返回值 */ @Test public void test5() { long start = System.nanoTime(); threadFindPrices("TTTTTT"); long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } public List<String> threadFindPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product), executor)) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } // 并行——使用流还是CompletableFutures? // 目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map // 这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助 // 你确保整体的计算不会因为线程都在等待I/O而发生阻塞。 // 我们对使用这些API的建议如下。 // ❑如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实 // 现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要 // 创建比处理器核数更多的线程)。 // ❑反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用 // CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者 // W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的 // 流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。 /** * stream流处理5个+5次折扣 耗时10s+ */ @Test public void test6() { long start = System.nanoTime(); findDiscountPrices("TTTTTT"); long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } /** * 折扣计算 * * @param product * @return */ //为Stream底层依赖的是线程数量固定的通用线程池 public List<String> findDiscountPrices(String product) { return shops.stream() .map(shop -> shop.getPriceString(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); } /** * 采用CompletableFuture实现5个商品 5次折扣 2s+ */ @Test public void test7() { long start = System.nanoTime(); findDiscountFuturPrices("TTTTTT"); long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } // thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其 结果作为参数传递给第二个操作 public List<String> findDiscountFuturPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPriceString(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } // thenCombine 要将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务 // Future<Double> futurePriceInUSD = // CompletableFuture.supplyAsync(() -> shop.getPrice(product)) // .thenCombine( // CompletableFuture.supplyAsync( // () -> exchangeService.getRate(Money.EUR, Money.USD)), // (price, rate) -> price * rate // ); /** * 2s+ */ @Test public void test8() { long start = System.nanoTime(); findPricesStream("TTTTTT").map(f -> f.thenAccept(System.out::println)); CompletableFuture[] futures = findPricesStream("myPhone") .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); // CompletableFuture.allOf(futures).join(); try { CompletableFuture.anyOf(futures).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } //allOf 等待最初stream中所有的CompletableFuture对象执行完毕 //anyOf 任何一个返回了结果都能满足你的需求 long overTime = ((System.nanoTime() - start) / 1000000); System.out.println("return over after " + overTime + " mesecs"); } public Stream<CompletableFuture<String>> findPricesStream(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPriceString(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))); } /** * 2s + */ @Test public void test9() { long start = System.nanoTime(); CompletableFuture[] futures = findPricesStream("myPhone27S") .map(f -> f.thenAccept( s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)"))) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join(); System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs"); } }

     

    Processed: 0.012, SQL: 9