一、ForkJoin框架
Fork/Join 分叉/结合框架。
1. 什么是ForkJoin框架 适用场景
虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。
Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
2.工作窃取算法(work-stealing)
一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。
但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。
A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行。
注意:线程池中的每个线程都有自己的工作队列(PS,这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。
工作窃取算法的优点:
利用了线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:
1、如果双端队列中只有一个任务时,线程间会存在竞争。
2、窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。
3.主要类
1、ForkJoinTask:使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制。一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有两个:
a、RecursiveAction:用于没有返回结果的任务。
b、RecursiveTask:用于有返回结果的任务。
2、ForkJoinPool:任务ForkJoinTask需要通过ForkJoinPool来执行。
3、ForkJoinWorkerThread:ForkJoinPool线程池中的一个执行任务的线程,我们一般接触不到,线程池底层是这个类的实例。
import java.util.concurrent.*;
public class SumTask extends RecursiveTask<Long>{ //每次进行任务分解都必须有起始元素和终止元素 private int start; private int end; private int step = 20; //步长,任务最小分到20个数来运算
public SumTask() { }
public SumTask(int start, int end) { this.start = start; this.end = end; }
@Override protected Long compute() { long sum= 0; if(end-start<=step){ for (int i = start; i <= end; i++) { sum+=i; } }else{ int mid = (start+end)/2; SumTask leftTask = new SumTask(start, mid); SumTask rightTask = new SumTask(mid+1, end); //我们只管把分任务的规则告诉他就行 //他会再次调用SumTask的compute()方法,一级一级往下分,分到符合我们条件为止。 leftTask.fork(); rightTask.fork();
//在做一件事 Long leftResult = leftTask.join(); Long rightResult = rightTask.join();
sum = leftResult+rightResult; } return sum; }
public static void main(String[] args) throws ExecutionException, InterruptedException { // //只有一个核在使用的运算任务 // int sum=0; // for (int i = 0; i < 1000; i++) { // sum+=i; // } // System.out.println(sum); //创建一个ForkJoin线程池 ForkJoinPool pool = new ForkJoinPool(); //算算花了多少时间 long startTime = System.currentTimeMillis();
//使用线程池 RecursiveTask<Long> task = new SumTask(1,1000000); Future<Long> future = pool.submit(task);//只要涉及到返回值的都要把结果返回给Future
Long result = future.get(); System.out.println(result); //算时间 long endTime = System.currentTimeMillis(); System.out.println("耗费时间pool:" + (endTime - startTime)); //关闭 pool.shutdown(); } |