原理
分而治之:规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解
工作密取 每个小规模里面的问题解决完之后,会从别的地方后面拿取处理完成并归还
返回值、同步/异步处理
ForkJoinTask实现Future
RecursiveTask(有返回值)、RecursiveAction(无返回值)继承ForkJoinTask
同步(invoke)异步(execute)
在compute()中使用**invokeAll(arg)**重新执行该任务的compute()方法,arg参数只会是每一个继承了RecursiveTask、RecursiveAction的对象
案例
同步有返回值
public class SumArray {
public class MakeArray {
public static final int ARRAY_LENGTH
= 100000000;
public static int[] makeArray() {
Random r
= new Random();
int[] result
= new int[ARRAY_LENGTH
];
for(int i
=0;i
<ARRAY_LENGTH
;i
++){
result
[i
] = r
.nextInt(ARRAY_LENGTH
*3);
}
return result
;
}
}
private static class SumTask extends RecursiveTask<Integer>{
private final static int THRESHOLD
= MakeArray
.ARRAY_LENGTH
/10;
private int[] src
;
private int fromIndex
;
private int toIndex
;
public SumTask(int[] src
, int fromIndex
, int toIndex
) {
this.src
= src
;
this.fromIndex
= fromIndex
;
this.toIndex
= toIndex
;
}
@Override
protected Integer
compute() {
if(toIndex
-fromIndex
< THRESHOLD
) {
int count
= 0;
for(int i
=fromIndex
;i
<=toIndex
;i
++) {
count
= count
+ src
[i
];
}
return count
;
}else {
int mid
= (fromIndex
+toIndex
)/2;
SumTask left
= new SumTask(src
,fromIndex
,mid
);
SumTask right
= new SumTask(src
,mid
+1,toIndex
);
invokeAll(left
,right
);
return left
.join()+right
.join();
}
}
}
public static void main(String
[] args
) {
ForkJoinPool pool
= new ForkJoinPool();
int[] src
= MakeArray
.makeArray();
SumTask innerFind
= new SumTask(src
,0,src
.length
-1);
long start
= System
.currentTimeMillis();
pool
.invoke(innerFind
);
System
.out
.println("Task is Running.....");
System
.out
.println("The count is "+innerFind
.join()
+" spend time:"+(System
.currentTimeMillis()-start
)+"ms");
}
}
异步无返回值
public class FindDirsFiles extends RecursiveAction{
private File path
;
public FindDirsFiles(File path
) {
this.path
= path
;
}
public static void main(String
[] args
){
try {
ForkJoinPool pool
= new ForkJoinPool();
FindDirsFiles task
= new FindDirsFiles(new File("F:/"));
pool
.execute(task
);
System
.out
.println("Task is Running......");
Thread
.sleep(1);
int otherWork
= 0;
for(int i
=0;i
<100;i
++){
otherWork
= otherWork
+i
;
}
System
.out
.println("Main Thread done sth......,otherWork="+otherWork
);
task
.join();
System
.out
.println("Task end");
} catch (Exception e
) {
e
.printStackTrace();
}
}
@Override
protected void compute() {
List
<FindDirsFiles> subTasks
= new ArrayList<>();
File
[] files
= path
.listFiles();
if(files
!=null
) {
for(File file
:files
) {
if(file
.isDirectory()) {
subTasks
.add(new FindDirsFiles(file
));
}else {
if(file
.getAbsolutePath().endsWith("txt")) {
System
.out
.println("文件:"+file
.getAbsolutePath());
}
}
}
if(!subTasks
.isEmpty()) {
for(FindDirsFiles subTask
:invokeAll(subTasks
)) {
subTask
.join();
}
}
}
}
}
两者都要重写compute(),通过ForkJoinPool对象 方法调用