浅显理解ForkJoin

    技术2022-07-11  72

    ForkJoin

    1 什么是ForkJoin

    Fork/Join 框架是 Java7 供的原生多线程并行处理框架,其思想就是将线程任务分而治之, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果

    2 ForkJoin特点

    ForkJoin把任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应

    工作窃取(work-stealing) : 任务线程干完了自身的任务队列里的任务,去窃取完成其他任务线程队列中的任务,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行

    工作窃取算法的优缺点

    优点 : 充分利用线程进行并行计算,并减少了线程间的竞争

    缺点 : 在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列

    Hierarchy(层次结构)

    源码层面本人实力有限待我学成归来再剖析源码

    通过一个简单的Demo来加强对ForkJoin API的调用

    Demo

    /** * @author dearth * @version 1.0 * @date 2020/7/1 16:43 * <p> * 求和计算任务 * 计算10亿的数据 * * 普通的方法 * 使用ForkJoin * 使用Stream流,并行计算 */ public class ForkJoinDemo extends RecursiveTask<Long> { private long start; private long end; //有参构造为了ForkJoin的任务执行参数 public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { long divide = 10000L; if ((end-start)<divide) { long sum = 0L; for (long i = 0L; i < 10_0000_0000; i++) { sum += i; } return sum; } else { //分支合并计算 long mid = (start + end) / 2; ForkJoinDemo task1 = new ForkJoinDemo(start, mid); task1.fork(); //拆分任务,将任务加入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(mid + 1, end); task2.fork(); //拆分任务,将任务加入线程队列 return task1.join() + task2.join(); } } } public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //test1(); //test2(); test3(); } //sum: 500000000500000000 执行的时间329 public static void test1 () { long start = System.currentTimeMillis(); long sum = 0L; for (long i = 1L; i < 10_0000_0000; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum: "+sum+" 执行的时间"+ (end-start)); } //sum: 500000000500000000 执行的时间310 public static void test2 () throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum: "+sum+" 执行的时间"+ (end-start)); } //sum: 500000000500000000 执行的时间195 public static void test3 () { long start = System.currentTimeMillis(); long sum = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum: "+sum+" 执行的时间"+ (end-start)); } }
    Processed: 0.016, SQL: 9