Fork/Join框架和异步

定义:

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。类似于递归或者分而治之的思想。

引用《Java并发编程的艺术》

Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结

果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果

工作窃取算法 :

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

当大任务需要处理时,我们把其分割成多个子任务,存放在每个队列中,并且每个线程处理不同队列中的子任务,每当有线程(A)提前完成任务了,那么(A)线程会去其他的队列中窃取任务处理,这是A线程与当前的线程一起处理同一个队列。

由此引出为了减少窃取任务线程和被窃取任务线程之间的竞争,队列采用双端队列。多线程处理同队列的流程是:

被窃取任务线程永远从双端队列的头部拿任务执行,

窃取任务的线程(A)永远从双端队列的尾部拿任务执行。

其优缺点:

工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

使用Fork/Join框架

需求是:计算1+2+3+4+......+100的结果

阈值设置为10,希望每个子任务最大执行10个数的相加。

package com.JucPool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask; class MyTask extends RecursiveTask{ private static final int THRESHOLD = 10; // 阈值
private int start;
private int end;
private int result; public MyTask(int start, int end) {
this.start = start;
this.end = end;
} @Override
protected Integer compute() {
if((end-start) <= THRESHOLD){
for(int i = start; i <= end; i++){
result +=i;
}
}else{
int mid = (start+end)>>1;
MyTask task1 = new MyTask(start, mid);
MyTask task2 = new MyTask(mid+1, end);
//执行子任务
task1.fork();
task2.fork();
//得到最后结果
result = task1.join()+task2.join();
}
return result;
}
} public class demo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ForkJoinPool joinPool = new ForkJoinPool();
//资源
MyTask myTask = new MyTask(1,100);
//执行任务
ForkJoinTask submit = joinPool.submit(myTask);
System.out.println(submit.get());
}
}

使用ForkJoinTask资源需要继承RecursiveTask(用于有返回结果的任务)--ForkJoinTask子类。

首先需要实现compute方法,我们在该方法中判断任务的大小是否小于我们设置的阈值。如果小于阈值,就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

JUC之Fork/Join框架的


扫描二维码,在手机上阅读!