前期收到的問題:
1、在Topology中我們可以指定spout、bolt的並行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器並且控制服務的CPU、磁盤等資源的?
2、Storm處理消息時會根據Topology生成一棵消息樹,Storm如何跟蹤每個消息、如何保證消息不丟失以及如何實現重發消息機制?
上篇:Storm是如何保證at least once語義的
回答了第2個問題。
本篇來建立一個基本的背景,來大概看下構成storm流式計算能力的一些基礎框架,並部分回答第一個問題。
supervisor會定時從zookeeper獲取拓補信息topologies、任務分配信息assignments及各類心跳信息,以此為依據進行任務分配。
在supervisor同步時,會根據新的任務分配情況來啟動新的worker或者關閉舊的worker並進行負載均衡。
worker通過定期的更新connections信息,來獲知其應該通訊的其它worker。
worker啟動時,會根據其分配到的任務啟動一個或多個executor線程。這些線程僅會處理唯一的topology。
如果有新的tolopogy被提交到集群,nimbus會重新分配任務,這個後面會說到。
executor線程負責處理多個spouts或者多個bolts的邏輯,這些spouts或者bolts,也稱為tasks。
具體有多少個worker,多少個executor,每個executor負責多少個task,是由配置和指定的parallelism-hint共同決定的,但這個值並不一定等於實際運行中的數目。
如果計算出的總的executors超過了nimbus的限制,此topology將不會得到執行。
並行度的作用:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 計算所有tolopogy的topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-topology->executors [nimbus storm-ids]
"compute a topology-id -> executors map"
(into {} (for [tid storm-ids]
{tid (set (compute-executors nimbus tid))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 計算topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-executors [nimbus storm-id]
(let [conf (:conf nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
component->executors (:component->executors storm-base)
storm-conf (read-storm-conf conf storm-id)
topology (read-storm-topology conf storm-id)
task->component (storm-task-info topology storm-conf)]
(->> (storm-task-info topology storm-conf)
reverse-map
(map-val sort)
(join-maps component->executors)
(map-val (partial apply partition-fixed))
(mapcat second)
(map to-executor-id)
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 計算topology的task-info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn storm-task-info
"Returns map from task -> component id"
[^StormTopology user-topology storm-conf]
(->> (system-topology! storm-conf user-topology)
all-components
;; 獲取每個組件的並行數
(map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
(sort-by first)
(mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
(map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
(into {})
))
上述代碼會在nimbus進行任務分配時調用:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; nimbus進行任務分配
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
mk-assignments
->compute-new-topology->executor->node+port
->compute-topology->executors
-> ...
基本關系如下所示:
worker啟動時,除了啟動多個executor線程,還會啟動多個工作線程來負責消息傳遞。
worker會訂閱到transfer-queue來消費消息,同時也會發布消息到transfer-queue,比如需要進行遠程發布時(某個bolt在另一個進程或者節點上)。
executor會發布消息到executor-send-queue比如emit tuple,同時會從executor-receive-queue消費消息,比如執行ack或者fail。
batch-transfer-worker-handler線程訂閱到executor-send-queue消費消息,並將消息發布到transfer-queue供worker消費。
transfer-thread會訂閱到transfer-queue消費消息,並負責將消息通過socket發送到遠程節點的端口上。
worker通過receive-thread線程來收取遠程消息,並將消息以本地方式發布到消息中指定的executor對應的executor-receive-queue。executor按第3點來消費消息。
以上所有的消息隊列都是Disruptor Queue,非常高效的線程間通訊框架。
所謂本地發布,是指在worker進程內及executor線程間進行消息發布。
所謂遠程發布,是指在worker進程間、不同的機器間進行消息發布。
nimbus將可以工作的worker稱為worker-slot.
nimbus是整個集群的控管核心,總體負責了topology的提交、運行狀態監控、負載均衡及任務重新分配,等等工作。
nimbus分配的任務包含了topology代碼所在的路徑(在nimbus本地)、tasks、executors及workers信息。
worker由node + port唯一確定。
supervisor負責實際的同步worker的操作。一個supervisor稱為一個node。所謂同步worker,是指響應nimbus的任務調度和分配,進行worker的建立、調度與銷毀。
其通過將topology的代碼從nimbus下載到本地以進行任務調度。
任務分配信息中包含task到worker的映射信息task -> node + host,所以worker節點可據此信息判斷跟哪些遠程機器通訊。
集群的狀態機:
集群的狀態是通過一個storm-cluster-state的對象來描述的。
其提供了許多功能接口,比如:
如下圖所示:
zookeeper是整個集群狀態同步、協調的核心組件。
supervisor、worker、executor等組件會定期向zookeeper寫心跳信息。
當topology出現錯誤、或者有新的topology提交到集群時,topologies信息會同步到zookeeper。
nimbus會定期監視zookeeper上的任務分配信息assignments,並將重新分配的計劃同步到zookeeper。
所以,nimbus會根據心跳、topologies信息及已分配的任務信息為依據,來重新分配任務,如下圖所示:
如上文的狀態機圖所示,rebalance和do-reblalance(比如來自web調用)會觸發mk-assignments即任務(重新)分配。
同時,nimbus進程啟動後,會周期性地進行mk-assignments調用,以進行負載均衡和任務分配。
客戶端通過storm jar ... topology 方式提交topology,會通過thrift接口調用nimbus的提交功能,此時會啟動storm,並觸發mk-assignments調用。
一個topology的提交過程:
非本地模式下,客戶端通過thrift調用nimbus接口,來上傳代碼到nimbus並觸發提交操作.
nimbus進行任務分配,並將信息同步到zookeeper.
supervisor定期獲取任務分配信息,如果topology代碼缺失,會從nimbus下載代碼,並根據任務分配信息,同步worker.
worker根據分配的tasks信息,啟動多個executor線程,同時實例化spout、bolt、acker等組件,此時,等待所有connections(worker和其它機器通訊的網絡連接)啟動完畢,此storm-cluster即進入工作狀態。
除非顯示調用kill topology,否則spout、bolt等組件會一直運行。
主要過程如下圖所示:
以上,基本闡述了storm的基礎框架,但未涉及trident機制,也基本回答了問題1。
終。
Storm進程通信機制分析 http://www.linuxidc.com/Linux/2014-12/110158.htm
Apache Storm 的歷史及經驗教訓 http://www.linuxidc.com/Linux/2014-10/108544.htm
Apache Storm 的詳細介紹:請點這裡
Apache Storm 的下載地址:請點這裡