前期收到的問題:
1、在Topology中我們可以指定spout、bolt的並行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器並且控制服務的CPU、磁盤等資源的?
2、Storm處理消息時會根據Topology生成一棵消息樹,Storm如何跟蹤每個消息、如何保證消息不丟失以及如何實現重發消息機制?
本篇看看storm是通過什麼機制來保證消息至少處理一次的語義的,並回答第2個問題。
要說明上面的問題,得先了解storm中的一些原語,比如:
tuple和message
在storm中,消息是通過tuple來抽象表示的,每個tuple知道它從哪裡來,應往哪裡去,包含了其在tuple-tree(如果是anchored的話)或者DAG中的位置,等等信息。
spout
spout充當了tuple的發送源,spout通過和其它消息源,比如kafka交互,將消息封裝為tuple,發送到流的下游。
acker
acker是一種特殊的bolt,其接收來自spout和bolt的消息,主要功能是追蹤tuple的處理情況,如果處理完成,會向tuple的源頭spout發送確認消息,否則,會發送失敗消息,spout收到失敗的消息,根據配置和自定義的情況會進行消息的丟棄、重放處理。
簡單的關系如下所示:
上圖展示了spout、bolts等形成了一個DAG,如何追蹤這個DAG的執行過程,就是storm保證僅處理一次消息的語義的機制所在。
spout在調用emit/emitDirect方法發送tuple時,會以單播或者廣播的方式,將消息發送給流的下游的component/task/bolt,如果配置了acker,那麼會在每次emit調用之後,向acker發送請求ack的消息:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; spout向acker發送請求ack消息
;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; rooted?表示是否設置了acker
(if (and rooted?
(not (.isEmpty out-ids)))
(do
(.put pending root-id [task-id
message-id
{:stream out-stream-id :values values}
(if (sampler) (System/currentTimeMillis))])
(task/send-unanchored task-data
;;表示這是一個流初始化的消息
ACKER-INIT-STREAM-ID
;;將下游組件的out-id和0組成一個異或鏈,發送給acker用於追蹤
[root-id (bit-xor-vals out-ids) task-id]
overflow-buffer))
;; 如果沒有配置acker,則調用自身的ack方法
(when message-id
(ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values}
(if (sampler) 0) "0:")))
從上面的代碼可以看出,每次emit tuple後,spout會向acker發送一個流ID為ACKER-INIT-STREAM-ID
的消息,用於將DAG或者tuple-tree中的節點信息交給acker,acker會利用這個信息來追蹤tuple-tree或DAG的完成。
而spout調用emit/emitDirect方法,將tuple發到下游的bolts,也同時會發送用於追蹤DAG完成情況的信息:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; spout向流的下游emit消息
;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(let [tuple-id (if rooted?
;; 如果有acker,tuple的MessageId會包含一個<root-id,id>的哈希表
;; root-id和id都是long型64位整數
(MessageId/makeRootId root-id id)
(MessageId/makeUnanchored))
;;實例化tuple
out-tuple (TupleImpl. worker-context
values
task-id
out-stream-id
tuple-id)]
;; 發送至隊列,最終發送給流的下游的task/bolt
(transfer-fn out-task
out-tuple
overflow-buffer)
))
這個追蹤信息是什麼呢?
如果是spout -> bolt或者bolt -> bolt,這個信息就是tuple的MessageId,其內部維護一個哈希表:
// map anchor to id
private Map<Long, Long> _anchorsToIds;
鍵為root-id,表示spout,值表示tuple在tuple-tree或者DAG的根(spout)或者經過的邊(bolt),但這裡沒有利用任何常規意義上的“樹”的算法,而是采用異或的方式來存儲這個值:
如果是spout -> acker,或者bolt -> acker,那麼用於追蹤的是tuple的values:
[root-id (bit-xor-vals out-ids) task-id]
[root (bit-xor id ack-val) ..]
下面給出上面調用的bit-xor-vals和bit-xor方法的代碼:
(defn bit-xor-vals
[vals]
(reduce bit-xor 0 vals))
(defn bit-xor
"Bitwise exclusive or"
{:inline (nary-inline 'xor)
:inline-arities >1?
:added "1.0"}
([x y] (. clojure.lang.Numbers xor x y))
([x y & more]
(reduce1 bit-xor (bit-xor x y) more)))
說起來有點抽象,看個例子。
假設我們有1個spout,n個bolt,1個acker:
spout發送tuple到下游的bolts:
;; id_1是發送到bolt_1的tuple-id,依此類推
spout :
->bolt_1 : id_1
->bolt_2 : id_2
..
->bolt_n : id_n
bolt收到tuple,在execute方法中進行必要的處理,然後調用emit方法,最後調用ack方法:
;; bolt_1調用emit方法,追蹤消息的這樣一個值:讓id_1和bid_1按位進行異或.
;; bid_1和id_1類似,是個long型的64位隨機整數,在emit這一步生成
bolt_1 emit : id_1 ^ bid_1
;; bolt_1調用ack方法,並將值表達為如下方式的異或鏈的結果
bolt_1 ack : 0 ^ bid_1 ^ id_1 ^ bid_1 = 0 ^ id_1
以上,可以看出bolt進行了emit-ack組合後,其自身在異或鏈中的作用消失了,也就是說tuple在此bolt得到了處理。
(當然,此時的ack還沒有得到acker的確認,假設acker確認了,那麼上面所說的tuple在bolt得到了處理就成立了。)
來看看acker的確認。
acker收到來自spout的tuple:
;; spout發消息給acker,tuple的MessageId包含下面的異或鏈的結果
spout -> acker : 0 ^ id_1 ^ id_2 ^ .. ^ id_n
;; acker收到來spout的消息,對tuple的ackVal進行處理,如下所示:
acker : 0 ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_1 ^ id_2 ^ .. ^ id_n
acker收到來自bolt的tuple:
;; bolt_1發消息給acker:
bolt_1 -> acker : 0 ^ id_1
;; acker維護的對應此tuple的源spout的ackVal :
ackVal : 0 ^ id_1 ^ id_2 ^ .. ^ id_n
;; acker進行確認,也就是拿上面的兩個值進行異或:
acker : (0 ^ id_1) ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_2 ^ .. ^ id_n
可以看出,bolt_1向acker請求ack,acker收到請求ack,異或之後,id_1的作用消失。也就是說,bolt_1已處理完畢這個tuple。
所以,在acker看來,如果某個bolt的處理完成,則此bolt在異或鏈中的作用就消失了。
如果所有的bolt 都得到處理,那麼acker將會觀察到ackVal值變成了0:
ackVal = 0
= (0 ^ id_1) ^ (0 ^ id_1 ^ .. ^ id_n) ^ .. ^ (0 ^ id_n)
= (0 ^ 0) ^ (id_1 ^ id_1) ^ (id_2 ^ id_2) ^ .. ^ (id_n ^ id_n)
如果出現了ackVal = 0,說明兩個可能:
如果ackVal不為0,說明tuple-tree或DAG沒有完成。如果長時間不為0,通過超時,可以觸發一個超時回調,在這個回調中調用spout的fail方法,來進行重放。
如此,就保證了消息處理不會漏掉,但可能會重復。
以上,就是storm保證消息至少處理一次的語義的機制 。
Storm進程通信機制分析 http://www.linuxidc.com/Linux/2014-12/110158.htm
Apache Storm 的歷史及經驗教訓 http://www.linuxidc.com/Linux/2014-10/108544.htm
Apache Storm 的詳細介紹:請點這裡
Apache Storm 的下載地址:請點這裡