《Java并发编程实践》笔记8——Fork/Join框架
《java并发编程实践》书中并没有介绍Fork/Join框架,该框架和并发编程关系密切,也是由本书作者之一Doug Lea在JDK1.7引入,因此作为最后一篇笔记简单介绍。
Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,它把一个大任务分割为若干个小任务执行,最后将小任务的执行结果汇总得到大任务的结果。当看到这个定义时,我想很多人会和我一样立刻想到现在非常热门的Map-Reduce思想。二者的对比如下:
共同点:
都是用于执行并行任务的。基本思想都是把复杂问题分解为一个个简单的子问题分别计算,再合并结果。应该说并行计算都是这种思想,彼此独立的或可分解的。从名字上看Fork和Map都有切分的意思,Join和Reduce都有合并的意思,比较类似。
区别:
(1).环境差异:
分布式 vs 单机多核:Fork/Join设计初衷针对单机多核(处理器数量很多的情况)。Map-Reduce一开始就明确是针对很多机器组成的集群环境的。也就是说一个是想充分利用多处理器,而另一个是想充分利用很多机器做分布式计算。这是两种不同的的应用场景,有很多差异,因此在细的编程模式方面有很多不同。
(2).编程差异:
Map-Reduce一般是:做较大粒度的切分,一开始就先切分好任务然后再执行,并且彼此间在最后合并之前不需要通信。这样可伸缩性更好,适合解决巨大的问题,但限制也更多。Fork/Join可以是较小粒度的切分,任务自己知道该如何切分自己,递归地切分到一组合适大小的子任务来执行,因为是同一个JVM内,所以彼此间通信是很容易的,更像是传统编程方式。
综上所述,我们可以认为Fork/Join是同一个JVM内的Map-Reduce的一种实现方式。
Fork/Join的基本工作流程如下图:
Fork-Join框架的实现原理为:
首先,分割任务,只要任务的粒度超过阀值,就不停地将任务分拆为小任务;
然后,将分割的任务任务添加到双端队列中,启动线程从双端队列获取任务执行,将执行结果统一放到一个队列中;
最后,再启动一个线程合并结果队列的值。
Fork-Join框架涉及的主要类如下:
RecursiveAction:用于没有返回值的任务。
RecursiveTask:用于需要返回值的任务,通过泛型参数设置计算的返回值类型。
ForkJoinPool:提供了一系列的submit方法,计算ForkJoinTask(需要实现computer方法)。
我们以计算1+2+3+4结果为例,来演示Fork/Join的使用,我们设置阀值为2,则由于是4个数字相加,因此Fork/Join框架会分拆为两个子任务,一个计算1+2,另一个计算3+4,最后在把两个子任务的结果合并,例子代码如下:
public class CounterTask extends RecursiveTask<Integer>{ private static final int THRESHOLD = 2; private int start; private int end; public CounterTask(int start, int end){ this.start = start; this.end = end; } protected Integer computer(){ int sum = 0; boolean canComputer = (end - start) <= THRESHOLD; if(canComputer){ for(int i = start; i <= end; i++){ sum += i; } }else{ int middle = (start + end) / 2; CounterTask leftTask = new CounterTask(start, middle); CounterTask rightTask = new CounterTask(middle + 1, end); leftTask.fork(); rightTask.fork(); int leftResult = leftTask.join(); int rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args){ ForkJoinPool pool = new ForkJoinPool(); CounterTask task = new CounterTask(1, 4); Future<Integer> result = pool.submit(task); try{ System.out.println("result is:" + result.get()); }catch(InterruptedException e){ }catch(ExecutionException e){ }finally{ pool.shutdown(); } } }