歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Storm基礎框架分析

背景

前期收到的問題:

1、在Topology中我們可以指定spout、bolt的並行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器並且控制服務的CPU、磁盤等資源的?

2、Storm處理消息時會根據Topology生成一棵消息樹,Storm如何跟蹤每個消息、如何保證消息不丟失以及如何實現重發消息機制?

上篇:Storm是如何保證at least once語義的
回答了第2個問題。

本篇來建立一個基本的背景,來大概看下構成storm流式計算能力的一些基礎框架,並部分回答第一個問題。

worker、executor、task的關系

  1. worker是一個進程.
  2. executor是一個線程,是運行tasks的物理容器.
  3. task是對spout/bolt/acker等任務的邏輯抽象.

supervisor會定時從zookeeper獲取拓補信息topologies、任務分配信息assignments及各類心跳信息,以此為依據進行任務分配。

在supervisor同步時,會根據新的任務分配情況來啟動新的worker或者關閉舊的worker並進行負載均衡。

worker通過定期的更新connections信息,來獲知其應該通訊的其它worker。

worker啟動時,會根據其分配到的任務啟動一個或多個executor線程。這些線程僅會處理唯一的topology。
如果有新的tolopogy被提交到集群,nimbus會重新分配任務,這個後面會說到。

executor線程負責處理多個spouts或者多個bolts的邏輯,這些spouts或者bolts,也稱為tasks。

具體有多少個worker,多少個executor,每個executor負責多少個task,是由配置和指定的parallelism-hint共同決定的,但這個值並不一定等於實際運行中的數目。

如果計算出的總的executors超過了nimbus的限制,此topology將不會得到執行。

並行度的作用:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 計算所有tolopogy的topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-topology->executors [nimbus storm-ids]
  "compute a topology-id -> executors map"
  (into {} (for [tid storm-ids]
             {tid (set (compute-executors nimbus tid))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 計算topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-executors [nimbus storm-id]
  (let [conf (:conf nimbus)
        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
        component->executors (:component->executors storm-base)
        storm-conf (read-storm-conf conf storm-id)
        topology (read-storm-topology conf storm-id)
        task->component (storm-task-info topology storm-conf)]
    (->> (storm-task-info topology storm-conf)
         reverse-map
         (map-val sort)
         (join-maps component->executors)
         (map-val (partial apply partition-fixed))
         (mapcat second)
         (map to-executor-id)
         )))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 計算topology的task-info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn storm-task-info
  "Returns map from task -> component id"
  [^StormTopology user-topology storm-conf]
  (->> (system-topology! storm-conf user-topology)
       all-components
    ;; 獲取每個組件的並行數
       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
       (sort-by first)
       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
       (into {})
       ))

上述代碼會在nimbus進行任務分配時調用:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; nimbus進行任務分配
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
mk-assignments
->compute-new-topology->executor->node+port
->compute-topology->executors
-> ...

線程模型及消息系統

基本關系如下所示:

  1. worker啟動時,除了啟動多個executor線程,還會啟動多個工作線程來負責消息傳遞。

  2. worker會訂閱到transfer-queue來消費消息,同時也會發布消息到transfer-queue,比如需要進行遠程發布時(某個bolt在另一個進程或者節點上)。

  3. executor會發布消息到executor-send-queue比如emit tuple,同時會從executor-receive-queue消費消息,比如執行ack或者fail。

  4. batch-transfer-worker-handler線程訂閱到executor-send-queue消費消息,並將消息發布到transfer-queue供worker消費。

  5. transfer-thread會訂閱到transfer-queue消費消息,並負責將消息通過socket發送到遠程節點的端口上。

  6. worker通過receive-thread線程來收取遠程消息,並將消息以本地方式發布到消息中指定的executor對應的executor-receive-queue。executor按第3點來消費消息。

  7. 以上所有的消息隊列都是Disruptor Queue,非常高效的線程間通訊框架。

所謂本地發布,是指在worker進程內及executor線程間進行消息發布。
所謂遠程發布,是指在worker進程間、不同的機器間進行消息發布。

任務調度及負載均衡

任務調度的主要角色

  1. nimbus將可以工作的worker稱為worker-slot.

  2. nimbus是整個集群的控管核心,總體負責了topology的提交、運行狀態監控、負載均衡及任務重新分配,等等工作。
    nimbus分配的任務包含了topology代碼所在的路徑(在nimbus本地)、tasks、executors及workers信息。
    worker由node + port唯一確定。

  3. supervisor負責實際的同步worker的操作。一個supervisor稱為一個node。所謂同步worker,是指響應nimbus的任務調度和分配,進行worker的建立、調度與銷毀。
    其通過將topology的代碼從nimbus下載到本地以進行任務調度。

  4. 任務分配信息中包含task到worker的映射信息task -> node + host,所以worker節點可據此信息判斷跟哪些遠程機器通訊。

集群的狀態機:

集群狀態管理

集群的狀態是通過一個storm-cluster-state的對象來描述的。
其提供了許多功能接口,比如:

  1. zookeeper相關的基本操作,如create-node、set-data、remove-node、get-children等.
  2. 心跳接口,如supervisor-heartbeat!、worker-heatbeat!等.
  3. 心跳信息,如executors-beats等.
  4. 啟動、更新、停止storm,如update-storm!等.

如下圖所示:

任務調度的依據

  1. zookeeper是整個集群狀態同步、協調的核心組件。

  2. supervisor、worker、executor等組件會定期向zookeeper寫心跳信息。

  3. 當topology出現錯誤、或者有新的topology提交到集群時,topologies信息會同步到zookeeper。

  4. nimbus會定期監視zookeeper上的任務分配信息assignments,並將重新分配的計劃同步到zookeeper。

所以,nimbus會根據心跳、topologies信息及已分配的任務信息為依據,來重新分配任務,如下圖所示:

任務調度的時機

  1. 如上文的狀態機圖所示,rebalance和do-reblalance(比如來自web調用)會觸發mk-assignments即任務(重新)分配。

  2. 同時,nimbus進程啟動後,會周期性地進行mk-assignments調用,以進行負載均衡和任務分配。

  3. 客戶端通過storm jar ... topology 方式提交topology,會通過thrift接口調用nimbus的提交功能,此時會啟動storm,並觸發mk-assignments調用。

topology提交過程

一個topology的提交過程:

  1. 非本地模式下,客戶端通過thrift調用nimbus接口,來上傳代碼到nimbus並觸發提交操作.

  2. nimbus進行任務分配,並將信息同步到zookeeper.

  3. supervisor定期獲取任務分配信息,如果topology代碼缺失,會從nimbus下載代碼,並根據任務分配信息,同步worker.

  4. worker根據分配的tasks信息,啟動多個executor線程,同時實例化spout、bolt、acker等組件,此時,等待所有connections(worker和其它機器通訊的網絡連接)啟動完畢,此storm-cluster即進入工作狀態。

  5. 除非顯示調用kill topology,否則spout、bolt等組件會一直運行。

主要過程如下圖所示:

結語

以上,基本闡述了storm的基礎框架,但未涉及trident機制,也基本回答了問題1。

終。

Storm進程通信機制分析 http://www.linuxidc.com/Linux/2014-12/110158.htm

Apache Storm 的歷史及經驗教訓  http://www.linuxidc.com/Linux/2014-10/108544.htm

Apache Storm 的詳細介紹:請點這裡
Apache Storm 的下載地址:請點這裡

Copyright © Linux教程網 All Rights Reserved