當硬件處理能力不能按照摩爾定律垂直發展的時候,選擇了水平發展,多核處理器已經廣泛應用。未來隨著技術的進一步發展,可能出現成百上千個處理核心,但現有的程序運行在多核心處理器上並不能得到較大性能的提升,主要的瓶頸在於程序本身的並發處理能力不強,不能夠合理的利用多核心資源。
現有的處理方案是從軟件入手,試圖采用多線程,是程序在同一時間支持多個任務的計算,這種多線程的處理方案在處理器數目較少的情況下可以較為明顯的提高應用性能,但我們更加青睐於由硬件實現的多線程處理模式,但這一領域至今沒有很好的結果。
ForkJoin是Java7提供的原生多線程並行處理框架,其基本思想是將大人物分割成小任務,最後將小任務聚合起來得到結果。它非常類似於Hadoop提供的MapReduce框架,只是MapReduce的任務可以針對集群內的所有計算節點,可以充分利用集群的能力完成計算任務。ForkJoin更加類似於單機版的MapReduce。
即使不通過mapreduce,僅有應用程序本身進行任務的分解與合成也是可以的,但從實現難度上考慮,自己實現可能會帶來較大規模的復雜度,因此程序員急需一種范式來處理這一類的任務。在處理多線程中已經有了如AKKA這樣的基於ACTOR模型的框架,而FORKJOIN則是針對具有明顯可以進行任務分割特性需求的實現。
其場景為:如果一個應用程序能夠被分解成多個子任務,而且結合多個子任務的結果就能夠得到最終的答案,那麼它就適合使用FORK/JOIN模式來實現。
Fork/Join使用兩個類完成以上兩件事情:
· ForkJoinTask: 我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。
1. RecursiveAction,用於沒有返回結果的任務
2. RecursiveTask,用於有返回值的任務
· ForkJoinPool:task要通過ForkJoinPool來執行,分割的子任務也會添加到當前工作線程的雙端隊列中,進入隊列的頭部。當一個工作線程中沒有任務時,會從其他工作線程的隊列尾部獲取一個任務。
ForkJoin框架使用了工作竊取的思想(work-stealing),算法從其他隊列中竊取任務來執行,其工作流圖為:
[img]http://infoqstatic.com/resource/articles/fork-join-introduction/zh/resources/image3.png[/img]
通過此算法降低線程等待和競爭。
下面是示例:
package com.inspur.jiyq.forkjoin.sum;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer>
{
private static final long serialVersionUID = -3611254198265061729L;
public static final int threshold = 2;
private int start;
private int end;
public CountTask(int start, int end)
{
this.start = start;
this.end = end;
}
@Override
protected Integer compute()
{
int sum = 0;
//如果任務足夠小就計算任務
boolean canCompute = (end - start) <= threshold;
if(canCompute)
{
for (int i=start; i<=end; i++)
{
sum += i;
}
}
else
{
// 如果任務大於阈值,就分裂成兩個子任務計算
int middle = (start + end)/2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle+1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
//等待任務執行結束合並其結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合並子任務
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args)
{
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個計算任務,計算1+2+3+4
CountTask task = new CountTask(1, 100);
//執行一個任務
Future<Integer> result = forkjoinPool.submit(task);
try
{
System.out.println(result.get());
}
catch(Exception e)
{
System.out.println(e);
}
}
}
像這種求和以及排序的需求都可以通過FORKJOIN思想來實現,但在實際使用時還是要進行必要的性能測試來確認性能提升的幅度。
在上面這段代碼中,定義了一個累加的任務,在compute方法中,判斷當前值是否小於一個阈值,如果是則計算,如果不是則繼續拆分,並合並子任務的中間結果。
任務定義後執行任務,Fork/Join提供一個和Executor框架的擴展線程來執行任務。