Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架 。
它的主要思想是:分而治之。
我们再通过 Fork 和 Join 这两个单词来理解下 Fork/Join 框架,Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。
比如:计算 1+2+... +10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。
Fork/Join 的运行流程图如下:
Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:
为什么 ForkJoin 会存在工作窃取呢?因为我们将任务进行分解成多个子任务的时候。每个子任务的处理时间都不一样。
例如分别有子任务 A \ B 。如果子任务 A 的 1ms 的时候已经执行,子任务 B 还在执行。那么如果我们子任务 A 的线程等待子任务 B 完毕后在进行汇总,那么子任务 A 线程就会在浪费执行时间,最终的执行时间就以最耗时的子任务为准。而如果我们的子任务A执行完毕后,处理子任务 B 的任务,并且执行完毕后将任务归还给子任务 B。这样就可以提高执行效率。而这种就是工作窃取。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
我们已经很清楚 Fork/Join 框架的需求了,那么我们可以思考一下,如果让我们来设计一个 Fork/Join 框架,该如何设计?这个思考有助于你理解 Fork/Join 框架的设计。
1)Fork/Join 框架的设计分为两步:
2)Fork/Join 使用两个类来完成以上两件事情:
使用 Fork/Join 框架计算:1+2+3+……+100000000。
使用 Fork/Join 框架首先要考虑到的是如何分割任务,如果我们希望每个子任务最多执行 10000 个数的相加,那么我们设置分割的阈值是 10000,由于是 100000000 个数字相加,所以会不停的分割,第一次先分割成两部分,即 1~50000000 和 50000001~100000000,第二次继续将 1~50000000 分割成 1~25000000 和 25000001~50000000 ,将50000001~100000000 分割成 50000001~75000000 和 75000001~100000000 ……,一直分割,直到 开始和 结束的的差小于等于 10000 。
使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。
public class ForkJoinDemo extends RecursiveTask<Long> { private long start; // 开始值 private long end; // 结束值 private long temp = 10000L; // 阈值 public ForkJoinDemo(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { // 条件成立:任务量没有超过临界值时计算; if ((end - start) < temp) { Long sum = 0L; for (long i = start; i <= end; i++) { sum += i; } return sum; } else { // 条件不成立:拆分任务 long middle = (end + start) / 2; // 中间值
// 进行递归 ForkJoinDemo fork_1 = new ForkJoinDemo(start, middle); // fork_1.fork(); // fork直接这样使用会导致有一个线程变成boss线程。执行时间会变长。 ForkJoinDemo fork_2 = new ForkJoinDemo(middle + 1, end); // fork_2.fork(); // fork直接这样使用会导致有一个线程变成boss线程。执行时间会变长。 // 执行子任务,应该使用invokeAll(left,right);这样代码效率成倍提升 invokeAll(fork_1,fork_2);
// 返回结果 return fork_1.join() + fork_2.join(); } } }
task 要通过 ForkJoinPool 来执行,分割的子任务也会添加到当前工作线程的双端队列中,进入队列的头部。
当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作窃取)。
public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); // 这是Fork/Join框架的线程池 ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> submit = pool.submit(new ForkJoinDemo(0L, 1_0000_0000L)); // 提交任务 Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " ,耗时:" + (end - start) + " 毫秒"); } }
执行结果:
sum = 5000000050000000 ,耗时:837 毫秒
三种提交任务到 ForkJoinPool 的方法:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
为公共池提供一个引用,使用预定义的公共池减少了资源消耗,因为这阻碍了每个任务创建一个单独的线程池。
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常 。使用如下代码:
if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }
getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null。
使用 ForkJoin 将相同的计算任务通过多线程的进行执行。从而能提高数据的计算速度。
在 google 的中的大数据处理框架 mapreduce 就通过类似 ForkJoin 的思想。通过多线程提高大数据的处理。但是我们需要注意:
原文:https://www.cnblogs.com/Dm920/p/13368664.html