Master-Worker設計模式核心思想是將原來串行的邏輯並行化,並將邏輯拆分成很多獨立模塊並行執行,其中主要包含兩個主要組件Master和Worker,Master主要講邏輯進行查分,拆分為互相獨立的部分,同時維護了Worker隊列,將每個獨立部分下發到多個Worker並行執行,Worker主要進行實際邏輯
計算,並將結果返回給Master。
其核心框架如下:
Master部分實現代碼
package com.yf.designpattern.masterworker;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
//任務隊列,保存所有的任務
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
//Worker進程隊列
protected Map<String,Thread> threadMap=new HashMap<String,Thread>();
//任務處理結果集
protected Map<String,Object> resultMap=new ConcurrentHashMap<String,Object>();
//判斷是否所有子任務都完成了
public boolean isComplete(){
for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
if(entry.getValue().getState()!=Thread.State.TERMINATED){
return false;
}
}
return true;
}
//Master的構造,需要一個Worker進程實例,和需要的worker進程數量
public Master(Worker worker,int count){
worker.setResultMap(resultMap);
worker.setWorkQueue(workQueue);
for(int i=0;i<count;i++){
threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
}
}
//提交一個任務
public void submit(Object job){
workQueue.add(job);
}
//返回子任務結果集
public Map<String,Object> getResultMap(){
return resultMap;
}
//開始運行所有的Worker進程,進行處理
public void execute(){
for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
entry.getValue().start();
}
}
}
Master主要維護了任務隊列、Worker隊列、結果隊列和開啟工作線程,添加任務等邏輯,可以通過Master添加任務,獲取結果,具體任務執行過程在Worker
Worker的主要結構如下:
package com.yf.designpattern.masterworker;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable {
// 任務隊列,用於取得子任務
protected Queue<Object> workQueue;
// 子任務處理結果集
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
// 子類處理的業務邏輯,在子類中實現具體邏輯
public Object handle(Object input) {
return input;
}
@Override
public void run() {
while (true) {
//獲取子任務
Object input = this.workQueue.poll();
if (input == null)
break;
//處理子任務
Object re = this.handle(input);
//將處理結果寫回結果集
this.resultMap.put(Integer.toString(input.hashCode()), re);
}
}
}
Worker主要是從Master的任務隊列中獲取一個任務,並且執行,將結果保存到Master的ResultMap中,每個Worker都持有Master的工作隊列和ResultMap。
以上就是Master-Worker模式的核心框架,還可以將上述框架進行擴展,擴展為分布式結構,Master與Worker分布在不同機器,Master與Worker之間通過一定協議來進行通信,同時Worker還可以水平擴展為多台,能夠支撐大壓力、高並發需求。
例如現在需要計算1到100,每個數字的立方相加的結果
這裡可以將計算立方的邏輯交個每個Worker去執行,Worker計算完成後,將結果保存到Master的ResultMap中,Master只需要檢查ResultMap是否有元素,有就進行相加計算,而不用等待每個數字都計算完成,
Worker的具體實現如下:
package com.yf.designpattern.masterworker;
public class PlusWorker extends Worker {
@Override
public Object handle(Object input){
Integer i=(Integer)input;
return i*i*i;
}
}
測試代碼如下:
package com.yf.designpattern.masterworker;
import java.util.Map;
import java.util.Set;
public class Main {
public static void main(String[] args) {
//實例化一個有5個Worker的Master
long start=System.currentTimeMillis();
Master m=new Master(new PlusWorker(),5);
for(int i=0;i<=1000;i++){
m.submit(i);
}
m.execute();
int re=0;
Map<String,Object> resultMap=m.getResultMap();
//只要有一個Worker計算完成,則開始計算
while(resultMap.size()>0||!m.isComplete()){
Set<String> keys=resultMap.keySet();
String key=null;
//每次獲取一個結果
for(String k:keys){
key=k;
break;
}
Integer i=null;
if(key!=null){
i=(Integer)resultMap.get(key);
//將結果相加
if(i!=null){
re+=i;
}
//刪除已經計算過的子結果
resultMap.remove(key);
}
}
long time=System.currentTimeMillis()-start;
System.out.println("The result is:"+re+",time:"+time);
}
}