歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

監控服務程序調度算法實現

監控服務程序實現調度算法

完成nginx服務監控(從nginx配置解析出對應的服務作為監控對象之五,還有可以從數據庫裡讀出待監控的服務)與更新服務後的監控算法:

處理休眠隊列---------將所有的待監控服務記錄放入一個優先級隊列裡(休眠隊列,最小堆的數據結構,堆頂為絕對間隔時間最小的,優先執行),每次只需要檢查堆頂就可以了,需要執行的放進執行隊列裡,刪除的不加入執行隊列

執行線程---------將執行列裡的記錄拋給異步執行的池裡,每一個都是異步調用運行

回收線程----------運行完成的請求回收休眠隊列,不回收已刪除的。

更新線程---------定時加載新的數據,設置好絕對間隔時間,放入休眠隊列

廢話少說,主要實現代碼如下。。

package com.wole.monitor;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;

/**
 * 管理並調度某一個服務數據源的監控池
 * @author yzygenuine
 *
 */
public class MonitorsManage {
 private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);

 private ServiceDao dao;

 /**
  * 執行的一個並發池
  */
 private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
   new SynchronousQueue<Runnable>());

 /**
  *
  */
 private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);

 /**
  * 正在執行中的MonitorService集合
  */
 private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();

 /**
  * 等待優先級隊列
  */
 private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();

 /**
  * 執行隊列
  */
 private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();

 /**
  * 是否關閉
  */
 private AtomicBoolean isClose = new AtomicBoolean(false);

 /**
  * 生產者啟動時間
  */
 private AtomicLong startTime = new AtomicLong(0);
 /**
  * 相對於啟動的間隔時間
  */
 private AtomicLong intervalTime = new AtomicLong(0);

 public void close() {
  logger.info("closing................");
  isClose.compareAndSet(false, true);
 }

 

 public void init() {
  logger.info("初始化");

 }

 public void work() {
  logger.info("開始工作");
  // 生產者啟動工作

  Thread productThread = new Thread(new ProductMonitor(1000));
  // 消費者啟動工作
  Thread consumerThread = new Thread(new ConsumerMonitor(1000));
  // 回收者啟動工作
  Thread recoverThread = new Thread(new RecoverMonitor(1000));

  // 啟動定時加載數據工作
  Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
  productThread.start();
  consumerThread.start();
  recoverThread.start();
  refreshThread.start();

 }

 /**
  * 生產者
  *
  * @author yzygenuine
  *
  */
 class ProductMonitor implements Runnable {
  long sleepTime = 1000;

  public ProductMonitor(long sleepTime) {
   this.sleepTime = sleepTime;
  }

  @Override
  public void run() {
   logger.info("生產者開啟工作");
   // 開始進行定時監控
   long now = System.currentTimeMillis();
   long lastTime = now;
   startTime.addAndGet(now);
   try {
    do {
     Thread.sleep(sleepTime);
     logger.debug("生產者休息{}ms", sleepTime);
     now = System.currentTimeMillis();
     intervalTime.addAndGet(now - lastTime);
     while (sleepQueue.size() > 0) {
      MonitorService service = sleepQueue.peek();
      if (service.getCurrentTime() - intervalTime.get() < 1) {
       service = sleepQueue.poll();// 出隊並檢查是否被刪除,如果沒被刪除則進入執行隊列
       if (!currentSet.contains(service)) {
        logger.info("service {} 已被刪除,不加入執行隊列了", service.toString());
        continue;
       }
       executeQueue.add(service);
      } else {
       logger.debug("還有{}秒可執行", service.getCurrentTime() - intervalTime.get());
       break;
      }
     }

     if (sleepQueue.size() <= 0) {
      logger.debug("生產隊列為空");
     }
     lastTime = now;
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }

  }

 }

 /**
  * 消費者
  *
  * @author yzygenuine
  *
  */
 class ConsumerMonitor implements Runnable {
  long sleepTime = 1000;

  public ConsumerMonitor(long sleepTime) {
   this.sleepTime = sleepTime;
   if (sleepTime < 1000) {
    throw new RuntimeException("請配置sleepTime值大一些");
   }
  }

  @Override
  public void run() {
   logger.info("消費者開啟工作");
   try {
    do {
     Thread.sleep(sleepTime);
     logger.debug("消費者休息{}ms", sleepTime);
     while (executeQueue.size() > 0) {
      final MonitorService service = executeQueue.poll();
      completionService.submit(new ExecuteCallable(service));
     }
     logger.debug("消費隊列為空");
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
  }

 }

 /**
  * 執行回調類
  *
  * @author yzygenuine
  *
  */
 class ExecuteCallable implements Callable<Response> {
  final MonitorService service;

  public ExecuteCallable(MonitorService service) {
   this.service = service;
  }

  @Override
  public Response call() throws Exception {
   logger.debug("執行");
   Map<String, String> r = new HashMap<String, String>();
   Response response = new Response();
   response.service = service;
   response.response = r;
   Monitor m = MonitorFactory.getMonitor(service);
   response.isNeedWarn = m.isNeedWarnging(service, r);
   if (response.isNeedWarn) {
    response.isSucToNotify = m.sendNotify(service, r);
   }
   return response;
  }

 }

 /**
  * 回收者
  *
  * @author yzygenuine
  *
  */
 class RecoverMonitor implements Runnable {
  private long sleepTime = 1000;

  private long count = 0;

  public RecoverMonitor(long sleepTime) {
   this.sleepTime = sleepTime;
   if (sleepTime < 1000) {
    throw new RuntimeException("請配置sleepTime值大一些");
   }
  }

  @Override
  public void run() {
   logger.info("回收者開啟工作");
   try {
    do {
     // Thread.sleep(sleepTime);
     Future<Response> response = completionService.take();
     // 重置後進入休眠隊列
     MonitorService s = response.get().service;
     if (!currentSet.contains(s)) {
      logger.info("service {} 已被刪除,不回收了", s.toString());
      continue;
     }
     // 當前程序已運動的時間+相對間隔時間=絕對的間隔時間
     s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
     sleepQueue.add(s);
     count++;
     logger.info("回收,當前回收數量:" + count);
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
  }
 }

 /**
  * 加載新的數據
  *
  * @author yzygenuine
  *
  */
 class RefreshMonitorService implements Runnable {
  private long sleepTime = 1000;
  private ServiceDao dao;

  public RefreshMonitorService(long sleepTime, ServiceDao dao) {
   this.sleepTime = sleepTime;
   if (sleepTime < 60000) {
    logger.warn("刷新加載數據的間隔時間不能太短");
    throw new RuntimeException("刷新加載數據的間隔時間不能太短");
   }
   this.dao = dao;
  }

  private void firstLoad() {
   List<MonitorService> monitorService = dao.getService();
   logger.info("加載記錄:" + monitorService.size());

   // 將被監控服務加入優先級隊列裡
   for (int j = 0; j < monitorService.size(); j++) {
    MonitorService service = monitorService.get(j);
    // 初始化好時間
    service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
    currentSet.add(service);
    sleepQueue.add(service);
   }

  }

  @Override
  public void run() {
   logger.info("讀取新的service開啟工作");
   firstLoad();
   try {
    do {
     logger.info("定時加載新的數據監聽者休息{}ms", sleepTime);
     Thread.sleep(sleepTime);
     logger.info("##########開始執行更新數據############");
     // 加載新的所有所數據 ,與當前的數據比較
     List<MonitorService> deleteList = dao.deleteService();
     List<MonitorService> addList = dao.incrementalService();
     logger.info("刪除舊的數據共:{}", deleteList.size());
     currentSet.removeAll(deleteList);
     logger.info("增加新的數據共:{}", addList.size());
     currentSet.addAll(addList);
     logger.info("更新後的currentSet size:{}", currentSet.size());

     for (MonitorService service : addList) {
      // 初始化絕對間隔時間
      service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
      sleepQueue.add(service);
     }
     logger.info("########這一輪更新結束");
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
  }
 }

 /**
  * 響應的封裝類
  *
  * @author yzygenuine
  *
  */
 class Response {
  public Map<String, String> response;
  public MonitorService service;
  public boolean isNeedWarn;
  public boolean isSucToNotify;

 }

 public void setDao(ServiceDao dao) {
  this.dao = dao;
 }

}

Copyright © Linux教程網 All Rights Reserved