監控服務程序實現調度算法
完成nginx服務監控(從nginx配置解析出對應的服務作為監控對象之五,還有可以從數據庫裡讀出待監控的服務)與更新服務後的監控算法:
處理休眠隊列---------將所有的待監控服務記錄放入一個優先級隊列裡(休眠隊列,最小堆的數據結構,堆頂為絕對間隔時間最小的,優先執行),每次只需要檢查堆頂就可以了,需要執行的放進執行隊列裡,刪除的不加入執行隊列
執行線程---------將執行列裡的記錄拋給異步執行的池裡,每一個都是異步調用運行
回收線程----------運行完成的請求回收休眠隊列,不回收已刪除的。
更新線程---------定時加載新的數據,設置好絕對間隔時間,放入休眠隊列
廢話少說,主要實現代碼如下。。
package com.wole.monitor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;
/**
* 管理並調度某一個服務數據源的監控池
* @author yzygenuine
*
*/
public class MonitorsManage {
private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);
private ServiceDao dao;
/**
* 執行的一個並發池
*/
private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
/**
*
*/
private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);
/**
* 正在執行中的MonitorService集合
*/
private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();
/**
* 等待優先級隊列
*/
private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();
/**
* 執行隊列
*/
private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();
/**
* 是否關閉
*/
private AtomicBoolean isClose = new AtomicBoolean(false);
/**
* 生產者啟動時間
*/
private AtomicLong startTime = new AtomicLong(0);
/**
* 相對於啟動的間隔時間
*/
private AtomicLong intervalTime = new AtomicLong(0);
public void close() {
logger.info("closing................");
isClose.compareAndSet(false, true);
}
public void init() {
logger.info("初始化");
}
public void work() {
logger.info("開始工作");
// 生產者啟動工作
Thread productThread = new Thread(new ProductMonitor(1000));
// 消費者啟動工作
Thread consumerThread = new Thread(new ConsumerMonitor(1000));
// 回收者啟動工作
Thread recoverThread = new Thread(new RecoverMonitor(1000));
// 啟動定時加載數據工作
Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
productThread.start();
consumerThread.start();
recoverThread.start();
refreshThread.start();
}
/**
* 生產者
*
* @author yzygenuine
*
*/
class ProductMonitor implements Runnable {
long sleepTime = 1000;
public ProductMonitor(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public void run() {
logger.info("生產者開啟工作");
// 開始進行定時監控
long now = System.currentTimeMillis();
long lastTime = now;
startTime.addAndGet(now);
try {
do {
Thread.sleep(sleepTime);
logger.debug("生產者休息{}ms", sleepTime);
now = System.currentTimeMillis();
intervalTime.addAndGet(now - lastTime);
while (sleepQueue.size() > 0) {
MonitorService service = sleepQueue.peek();
if (service.getCurrentTime() - intervalTime.get() < 1) {
service = sleepQueue.poll();// 出隊並檢查是否被刪除,如果沒被刪除則進入執行隊列
if (!currentSet.contains(service)) {
logger.info("service {} 已被刪除,不加入執行隊列了", service.toString());
continue;
}
executeQueue.add(service);
} else {
logger.debug("還有{}秒可執行", service.getCurrentTime() - intervalTime.get());
break;
}
}
if (sleepQueue.size() <= 0) {
logger.debug("生產隊列為空");
}
lastTime = now;
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 消費者
*
* @author yzygenuine
*
*/
class ConsumerMonitor implements Runnable {
long sleepTime = 1000;
public ConsumerMonitor(long sleepTime) {
this.sleepTime = sleepTime;
if (sleepTime < 1000) {
throw new RuntimeException("請配置sleepTime值大一些");
}
}
@Override
public void run() {
logger.info("消費者開啟工作");
try {
do {
Thread.sleep(sleepTime);
logger.debug("消費者休息{}ms", sleepTime);
while (executeQueue.size() > 0) {
final MonitorService service = executeQueue.poll();
completionService.submit(new ExecuteCallable(service));
}
logger.debug("消費隊列為空");
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 執行回調類
*
* @author yzygenuine
*
*/
class ExecuteCallable implements Callable<Response> {
final MonitorService service;
public ExecuteCallable(MonitorService service) {
this.service = service;
}
@Override
public Response call() throws Exception {
logger.debug("執行");
Map<String, String> r = new HashMap<String, String>();
Response response = new Response();
response.service = service;
response.response = r;
Monitor m = MonitorFactory.getMonitor(service);
response.isNeedWarn = m.isNeedWarnging(service, r);
if (response.isNeedWarn) {
response.isSucToNotify = m.sendNotify(service, r);
}
return response;
}
}
/**
* 回收者
*
* @author yzygenuine
*
*/
class RecoverMonitor implements Runnable {
private long sleepTime = 1000;
private long count = 0;
public RecoverMonitor(long sleepTime) {
this.sleepTime = sleepTime;
if (sleepTime < 1000) {
throw new RuntimeException("請配置sleepTime值大一些");
}
}
@Override
public void run() {
logger.info("回收者開啟工作");
try {
do {
// Thread.sleep(sleepTime);
Future<Response> response = completionService.take();
// 重置後進入休眠隊列
MonitorService s = response.get().service;
if (!currentSet.contains(s)) {
logger.info("service {} 已被刪除,不回收了", s.toString());
continue;
}
// 當前程序已運動的時間+相對間隔時間=絕對的間隔時間
s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
sleepQueue.add(s);
count++;
logger.info("回收,當前回收數量:" + count);
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 加載新的數據
*
* @author yzygenuine
*
*/
class RefreshMonitorService implements Runnable {
private long sleepTime = 1000;
private ServiceDao dao;
public RefreshMonitorService(long sleepTime, ServiceDao dao) {
this.sleepTime = sleepTime;
if (sleepTime < 60000) {
logger.warn("刷新加載數據的間隔時間不能太短");
throw new RuntimeException("刷新加載數據的間隔時間不能太短");
}
this.dao = dao;
}
private void firstLoad() {
List<MonitorService> monitorService = dao.getService();
logger.info("加載記錄:" + monitorService.size());
// 將被監控服務加入優先級隊列裡
for (int j = 0; j < monitorService.size(); j++) {
MonitorService service = monitorService.get(j);
// 初始化好時間
service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
currentSet.add(service);
sleepQueue.add(service);
}
}
@Override
public void run() {
logger.info("讀取新的service開啟工作");
firstLoad();
try {
do {
logger.info("定時加載新的數據監聽者休息{}ms", sleepTime);
Thread.sleep(sleepTime);
logger.info("##########開始執行更新數據############");
// 加載新的所有所數據 ,與當前的數據比較
List<MonitorService> deleteList = dao.deleteService();
List<MonitorService> addList = dao.incrementalService();
logger.info("刪除舊的數據共:{}", deleteList.size());
currentSet.removeAll(deleteList);
logger.info("增加新的數據共:{}", addList.size());
currentSet.addAll(addList);
logger.info("更新後的currentSet size:{}", currentSet.size());
for (MonitorService service : addList) {
// 初始化絕對間隔時間
service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
sleepQueue.add(service);
}
logger.info("########這一輪更新結束");
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 響應的封裝類
*
* @author yzygenuine
*
*/
class Response {
public Map<String, String> response;
public MonitorService service;
public boolean isNeedWarn;
public boolean isSucToNotify;
}
public void setDao(ServiceDao dao) {
this.dao = dao;
}
}