Fork-Join的介绍及使用

    技术2025-10-01  19

    原理

    分而治之:规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

    工作密取 每个小规模里面的问题解决完之后,会从别的地方后面拿取处理完成并归还

    返回值、同步/异步处理

    ForkJoinTask实现Future

    RecursiveTask(有返回值)、RecursiveAction(无返回值)继承ForkJoinTask

    同步(invoke)异步(execute)

    在compute()中使用**invokeAll(arg)**重新执行该任务的compute()方法,arg参数只会是每一个继承了RecursiveTask、RecursiveAction的对象

    案例

    同步有返回值 public class SumArray { public class MakeArray { //数组长度 public static final int ARRAY_LENGTH = 100000000; public static int[] makeArray() { //new一个随机数发生器 Random r = new Random(); int[] result = new int[ARRAY_LENGTH]; for(int i=0;i<ARRAY_LENGTH;i++){ //用随机数填充数组 result[i] = r.nextInt(ARRAY_LENGTH*3); } return result; } } //继承RecursiveTask有返回值 private static class SumTask extends RecursiveTask<Integer>{ private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;//每组的个数 private int[] src; //表示我们要实际统计的数组 private int fromIndex;//开始统计的下标 private int toIndex;//统计到哪里结束的下标 public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() { if(toIndex-fromIndex < THRESHOLD) { int count = 0; for(int i=fromIndex;i<=toIndex;i++) { //SleepTools.ms(1); count = count + src[i]; } return count; }else { //fromIndex....mid....toIndex //1...................70....100 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,toIndex); invokeAll(left,right); return left.join()+right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); int[] src = MakeArray.makeArray(); SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind);//同步调用 System.out.println("Task is Running....."); //join()合并结果 System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } } 异步无返回值 //继承RecursiveAction无返回值 public class FindDirsFiles extends RecursiveAction{ private File path;//当前任务需要搜寻的目录 public FindDirsFiles(File path) { this.path = path; } public static void main(String [] args){ try { // 用一个 ForkJoinPool 实例调度总任务 ForkJoinPool pool = new ForkJoinPool(); FindDirsFiles task = new FindDirsFiles(new File("F:/")); pool.execute(task);//异步调用 System.out.println("Task is Running......"); Thread.sleep(1); int otherWork = 0; for(int i=0;i<100;i++){ otherWork = otherWork+i; } System.out.println("Main Thread done sth......,otherWork="+otherWork); task.join();//阻塞的方法 System.out.println("Task end"); } catch (Exception e) { e.printStackTrace(); } } @Override protected void compute() { List<FindDirsFiles> subTasks = new ArrayList<>(); File[] files = path.listFiles(); if(files!=null) { for(File file:files) { if(file.isDirectory()) { subTasks.add(new FindDirsFiles(file)); }else { //遇到文件,检查 if(file.getAbsolutePath().endsWith("txt")) { System.out.println("文件:"+file.getAbsolutePath()); } } } if(!subTasks.isEmpty()) { //循环目录 for(FindDirsFiles subTask:invokeAll(subTasks)) { subTask.join();//等待子任务执行完成 } } } } }

    两者都要重写compute(),通过ForkJoinPool对象 方法调用

    Processed: 0.009, SQL: 9