[并发实现原理] 0701 ForkJoinPool 用法

    技术2025-09-23  37

    一、介绍

    1、分治。其基本思路是:将一个大的任务分为若干个子任务,子任务分别计算,然后合并出最终结果,计算过程中通常会用到递归。

    2、ForkJoinPool

    JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。 可以将ForkJoinPool看作一个单机版的Map/Reduce,这里的并行不是多台机器并行计算,而是多个线程并行计算。

    3、ThreadPoolExecutor,ForkJoinPool 对比

    ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。 假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,会导致1个线程很忙,其他4个线程则处于空闲状态。 利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。

    例子

    快排

    a: 利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组里面的元素比该元素大

    b: 对左右的2个子数组分别排序

    package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @author haoxiansheng * sortTask */ @Slf4j public class ForkJoinTaskExample2 extends RecursiveAction { final long[] array; final int lo; final int hi; private int THRESHLOD = 0; // dor demo only public ForkJoinTaskExample2(long[] array) { this.array = array; this.lo = 0; this.hi = array.length - 1; } public ForkJoinTaskExample2(long[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } @Override protected void compute() { if (lo < hi) { int pivot = partition(array, lo, hi); // 划分 ForkJoinTaskExample2 left = new ForkJoinTaskExample2(array, lo, pivot - 1); ForkJoinTaskExample2 right = new ForkJoinTaskExample2(array, pivot + 1, hi); left.fork(); right.fork(); left.join(); right.join(); } } private int partition(long[] array, int lo, int hi) { long x = array[hi]; int i = lo - 1; for (int j = 0; j < hi; j++) { if (array[j] <= x) { i++; swap(array, i + 1, hi); } } swap(array, i + 1, hi); return i + 1; } private void swap(long[] array, int i, int j) { if (i != j) { long temp = array[i]; array[i] = array[j]; array[j] = temp; } } public static void main(String[] args) throws InterruptedException { long array[] = {1, 2, 5, 3, 9, 4, 8}; ForkJoinTaskExample2 task = new ForkJoinTaskExample2(array); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(task); // 提交任务 forkJoinPool.shutdown(); // 结束,ForkJoinPool 内部会开多个线程,同时执行上面的子任务 log.info("array=>{}", array); forkJoinPool.awaitTermination(30, TimeUnit.SECONDS); } } 求和 package com.current.flame.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * @author haoxiansheng * 递归拆分任务 求和 */ @Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshhold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 如果任务足够小就计算任务 boolean canCompute = (end - start) <= threshhold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阀值,就分裂成两个自任务计算 int middle = (start + end)/2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle+1, end); leftTask.fork(); rightTask.fork(); // 等待任务结束合并结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生成一个计算任务 计算 1+2+3+4 ForkJoinTaskExample taskExample = new ForkJoinTaskExample(1, 100); // 执行一个任务 Future<Integer> result = forkJoinPool.submit(taskExample); try{ log.info("result:{}",result.get()); } catch (Exception e) { log.error("exception", e); } } }

    4、RecursiveAction 和RecursiveTask 两个类,都继承自抽象类ForkJoinTask

    区别一个有返回值,一个没有

    Processed: 0.009, SQL: 9