歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

並行設計模式-Master/Worker

Master-Worker設計模式核心思想是將原來串行的邏輯並行化,並將邏輯拆分成很多獨立模塊並行執行,其中主要包含兩個主要組件Master和Worker,Master主要講邏輯進行查分,拆分為互相獨立的部分,同時維護了Worker隊列,將每個獨立部分下發到多個Worker並行執行,Worker主要進行實際邏輯

計算,並將結果返回給Master。

其核心框架如下:

Master部分實現代碼

package com.yf.designpattern.masterworker;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
 //任務隊列,保存所有的任務
 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
 //Worker進程隊列
 protected Map<String,Thread> threadMap=new HashMap<String,Thread>();
 
 //任務處理結果集
 protected Map<String,Object> resultMap=new ConcurrentHashMap<String,Object>();
 
 //判斷是否所有子任務都完成了
 public boolean isComplete(){
  for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
   if(entry.getValue().getState()!=Thread.State.TERMINATED){
    return false;
   }
  }
  return true;
 }
 
 //Master的構造,需要一個Worker進程實例,和需要的worker進程數量
 public Master(Worker worker,int count){
  worker.setResultMap(resultMap);
  worker.setWorkQueue(workQueue);
  for(int i=0;i<count;i++){
   threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
  }
 }
 //提交一個任務
 public void submit(Object job){
  workQueue.add(job);
 }
 //返回子任務結果集
 public Map<String,Object> getResultMap(){
  return resultMap;
 }
 
 //開始運行所有的Worker進程,進行處理
 public void execute(){
  for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
   entry.getValue().start();
  }
 }
}

Master主要維護了任務隊列、Worker隊列、結果隊列和開啟工作線程,添加任務等邏輯,可以通過Master添加任務,獲取結果,具體任務執行過程在Worker

Worker的主要結構如下:

package com.yf.designpattern.masterworker;

import java.util.Map;
import java.util.Queue;

public class Worker implements Runnable {
 // 任務隊列,用於取得子任務
 protected Queue<Object> workQueue;
 // 子任務處理結果集
 protected Map<String, Object> resultMap;

 public void setWorkQueue(Queue<Object> workQueue) {
  this.workQueue = workQueue;
 }

 public void setResultMap(Map<String, Object> resultMap) {
  this.resultMap = resultMap;
 }

 // 子類處理的業務邏輯,在子類中實現具體邏輯
 public Object handle(Object input) {
  return input;
 }

 @Override
 public void run() {
  while (true) {
   //獲取子任務
   Object input = this.workQueue.poll();
   if (input == null)
    break;
   //處理子任務
   Object re = this.handle(input);
   //將處理結果寫回結果集
   this.resultMap.put(Integer.toString(input.hashCode()), re);
  }

 }

}

Worker主要是從Master的任務隊列中獲取一個任務,並且執行,將結果保存到Master的ResultMap中,每個Worker都持有Master的工作隊列和ResultMap。

以上就是Master-Worker模式的核心框架,還可以將上述框架進行擴展,擴展為分布式結構,Master與Worker分布在不同機器,Master與Worker之間通過一定協議來進行通信,同時Worker還可以水平擴展為多台,能夠支撐大壓力、高並發需求。

例如現在需要計算1到100,每個數字的立方相加的結果

這裡可以將計算立方的邏輯交個每個Worker去執行,Worker計算完成後,將結果保存到Master的ResultMap中,Master只需要檢查ResultMap是否有元素,有就進行相加計算,而不用等待每個數字都計算完成,

Worker的具體實現如下:

package com.yf.designpattern.masterworker;

public class PlusWorker extends Worker {
 
 @Override
 public Object handle(Object input){
  Integer i=(Integer)input;
  return i*i*i;
 }
}

測試代碼如下:

package com.yf.designpattern.masterworker;

import java.util.Map;
import java.util.Set;

public class Main {

 public static void main(String[] args) {
  //實例化一個有5個Worker的Master
  long start=System.currentTimeMillis();
  Master m=new Master(new PlusWorker(),5);
  for(int i=0;i<=1000;i++){
   m.submit(i);
  }
  m.execute();
  int re=0;
  Map<String,Object> resultMap=m.getResultMap();
  //只要有一個Worker計算完成,則開始計算
  while(resultMap.size()>0||!m.isComplete()){
   Set<String> keys=resultMap.keySet();
   String key=null;
   //每次獲取一個結果
   for(String k:keys){
    key=k;
    break;
   }
   Integer i=null;
   if(key!=null){
    i=(Integer)resultMap.get(key);
    //將結果相加
    if(i!=null){
     re+=i;
    }
    //刪除已經計算過的子結果
    resultMap.remove(key);
   }
  }
  long time=System.currentTimeMillis()-start;
  System.out.println("The result is:"+re+",time:"+time);
 }

}

Copyright © Linux教程網 All Rights Reserved