Fork/Join框架是并行执行任务的框架,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。 ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool,而ForkJoinWorkerThread负责执行这些任务。当我们调用ForkJoinTask的fork方法时,程序会把任务放在ForkJoinWorkerThread的pushTask的workQueue中,异步地执行这个任务,然后立即返回结果。
ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类: a.RecursiveAction:用于没有返回结果的任务 b.RecursiveTask: 用于有返回结果的任务
ForkJoinTask需要通过ForkJoinPool来执行
package fork_join; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /*实现从start到end的累加*/ public class CountTask extends RecursiveTask<Integer>{ private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute){ for (int i = start;i <= end;i++){ sum += i; } }else { int middle = (start + end)/2; CountTask leftTask = new CountTask(start,middle); CountTask rightTask = new CountTask(middle+1,end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务执行,并得到其结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); //合并子任务 sum = leftResult+rightResult; } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1,4); //执行task,此处的summit可用invoke代替,都是执行任务 Future<Integer> result = forkJoinPool.submit(task); //isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了 if (task.isCompletedAbnormally()) task.getException(); //如果任务执行完毕,则关闭线程池 if (task.isDone()) forkJoinPool.shutdown(); System.out.println(result.get()); } }