首页 > 其他 > 详细

Fork/Join框架

时间:2020-08-19 09:56:58      阅读:46      评论:0      收藏:0      [点我收藏+]

Fork/Join框架介绍

          Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务        结 果 后得到大任务结果的框架。Fork/Join框架要完成两件事情:

  1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

  2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执        行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

 

工作窃取

        算法是指某个线程从其他队列里窃取任务来执行。

        为什么需要使用工作窃取算法呢?

              假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是            把 这 些 子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,              比如A线程负责处理A队列里的任务。

        工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间时竟争。

        工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。

 

Fork/Join框架设计

   1 分割任务

    2 执行任务并合并结果

 

API介绍

 ForkJoinPool

        由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool,而               ForkJoinWorkerThread负责执行这些任务。

有两种构造器方式可以获取ForkJoinPool的实例,第一种使用构造器创建:

  • ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等。
  • ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。

    并行性的级别决定了可以并发执行的线程的数量。换句话说,它决定了可以同时执行的任务的数量——但不能超过处理器的数     量。

    但是,这并不限制池可以管理的任务的数量。ForkJoinPool可以管理比其并行级别多得多的任务。

ForkJoinTask

ForkJoinTask代表运行在ForkJoinPool中的任务。

主要方法:

  • fork()    在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
  • join()    当任务完成的时候返回计算结果,Join主要作用是阻塞当前线程并等待获取结果。
  • invoke()    开始执行任务,如果必要,等待计算完成。

子类:

  • RecursiveAction    一个递归无结果的ForkJoinTask(没有返回值)
  • RecursiveTask    一个递归有结果的ForkJoinTask(有返回值)

   

实例:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class MyForkJoin extends  RecursiveTask<Integer>
{

    private  final  int threshold=3;
    private int  beg;
    private int end;

    public MyForkJoin(int beg, int end) {

        this.beg=beg;
        this.end=end;

    }

    public static void main(String[] arg)
    {
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        //生成一个计算任务

        MyForkJoin forjoin=new MyForkJoin(1,3);
        //执行一个任务
        Future<Integer> result=forkJoinPool.submit(forjoin);

        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }


    @Override
    protected Integer compute() {

        int sum=0;
        if(end-beg>1)
        {

            int middle = (beg + end) / 2;
            MyForkJoin Task1 = new MyForkJoin(beg, middle);
            MyForkJoin Task2 = new MyForkJoin(middle + 1, end);

            // 执行子任务
            Task1.fork();
            Task2.fork();

            // 等待任务执行结束合并其结果
            int Result1 = Task1.join();
            int Result2 = Task2.join();

            // 合并子任务
            sum = Result1 + Result1;

        }
        else
        {
            for (int i = beg; i <= end; i++) {
                sum += i;
            }


        }

        return sum;

    }
}

结果:6
技术分享图片

Fork/Join框架的异常处理

  ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常

 

        

Fork/Join框架

原文:https://www.cnblogs.com/shu-java-net/p/13527468.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!