搜索
您的当前位置:首页正文

JUC并发包——ForkJoin框架简单了解

来源:吉趣旅游网

一、ForkJoin框架

         Fork/Join 分叉/结合框架。

         1. 什么是ForkJoin框架 适用场景

虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。

 

Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务最终汇总每个小任务结果后得到大任务结果的框架

 

 

 

2.工作窃取算法(work-stealing

       一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。

但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。

A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行。

 

注意:线程池中的每个线程都有自己的工作队列(PS,这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。

 

工作窃取算法的优点:

         利用了线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:

         1、如果双端队列中只有一个任务时,线程间会存在竞争。

         2、窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。

 

3.主要类

1ForkJoinTask:使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制。一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有两个:

aRecursiveAction:用于没有返回结果的任务。

bRecursiveTask:用于有返回结果的任务。

2ForkJoinPool:任务ForkJoinTask需要通过ForkJoinPool来执行。

3ForkJoinWorkerThreadForkJoinPool线程池中的一个执行任务的线程,我们一般接触不到,线程池底层是这个类的实例

 

 

 

import java.util.concurrent.*;

public class
SumTask extends RecursiveTask<Long>{
   
//每次进行任务分解都必须有起始元素和终止元素
   
private int start;
    private int
end;
    private int
step = 20; //步长,任务最小分到20个数来运算

   
public SumTask() {
    }

   
public SumTask(int start, int end) {
       
this.start = start;
        this
.end = end;
   
}


   
@Override
   
protected Long compute() {
       
long sum= 0;
        if
(end-start<=step){
           
for (int i = start; i <= end; i++) {
                sum+=i
;
           
}
        }
else{
           
int mid = (start+end)/2;
           
SumTask leftTask = new SumTask(start, mid);
           
SumTask rightTask = new SumTask(mid+1, end);
           
//我们只管把分任务的规则告诉他就行
            //他会再次调用SumTask的compute()方法,一级一级往下分,分到符合我们条件为止。
           
leftTask.fork();
           
rightTask.fork();

           
//在做一件事
           
Long leftResult = leftTask.join();
           
Long rightResult = rightTask.join();

           
sum = leftResult+rightResult;
       
}
       
return sum;
   
}

   
public static void main(String[] args) throws ExecutionException, InterruptedException {
//       //只有一个核在使用的运算任务
//        int sum=0;
//        for (int i = 0; i < 1000; i++) {
//            sum+=i;
//        }
//        System.out.println(sum);
        //创建一个ForkJoin线程池
       
ForkJoinPool pool = new ForkJoinPool();
       
//算算花了多少时间
       
long startTime = System.currentTimeMillis();

        
//使用线程池
       
RecursiveTask<Long> task = new SumTask(1,1000000);
       
Future<Long> future = pool.submit(task);//只要涉及到返回值的都要把结果返回给Future

       
Long result = future.get();
       
System.out.println(result);
       
//算时间
       
long endTime = System.currentTimeMillis();
       
System.out.println("耗费时间pool:" + (endTime - startTime));
       
//关闭
       
pool.shutdown();
   
}

 

 

 

因篇幅问题不能全部显示,请点此查看更多更全内容

Top