歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Clojure:通過ZeroMQ推送消息

通過ZeroMQ的pub/sub模式,我們可以實現發送推送消息的功能。以下為示例代碼(入門可參考此文:http://www.linuxidc.com/Linux/2015-01/112315.htm):

(def ctx (ZMQ/context 1))

(def msg-list (atom ()))                                    ; 消息列表
(def stop-signal (atom false))                              ; 停止發送服務標識

(defn msg-publisher
  []
  (let [s (.socket ctx ZMQ/PUB)]
    (.bind s “tcp://x.x.x.x:xxxx”)
    (while (false? @stop-signal)                            ; 遇到停止信號則退出發送循環
      (loop [msgs @msg-list]                                ; 對消息列表進行循環發送處理
        (if (empty? msgs)
          (do
            (reset! msg-list ())                            ; 全部發送後清空消息列表
            (.send s (c/generate-string "0"))              ; 發送結束標識
            (Thread/sleep 1000)                            ; 延時1秒後再重新讀取,以免發送空數據太頻繁
            )
          (do
            (.send s (c/generate-string (first msgs)))      ; 發送消息
            (recur (rest msgs)))                            ; 發送下一條消息
          )))
(.close s)))

通過(future-call msg-publisher)將msg-publisher常駐線程後,msg-publisher會自動讀取msg-list列表,將新增加的內容推送給客戶端。下面附上測試代碼:

(deftest test-msg-publisher
  (do
    (let [f (future-call msg-publisher)
          s (.socket ctx ZMQ/SUB)]
      (reset! stop-signal false)
      f
      (.subscribe s ZMQ/SUBSCRIPTION_ALL)
      (.connect s “tcp://x.x.x.x:xxxx”)
      (reset! msg-list (range 10000))                      ; 產生消息10000條,但是只接收1000條,這是因為連接延時的問題,
      (loop [exec-times 1000                                ; 導致不可能將全部消息收全
            msg-count 0]
        (if (= 0 exec-times)
          (is (= 1000 msg-count))
          (do
            (let [msg (c/parse-string (.recvStr s))]
              ;(println msg)
              (if (not (= "0" msg))                        ; 如果為0則表示不是我們希望要的數據
                (recur (dec exec-times) (inc msg-count))
                (recur (dec exec-times) msg-count)))))
        )
      (.close s)
      (reset! stop-signal true)
      (future-cancel f)
      (is (future-cancelled? f)))))

運行lein test,如果輸出如下就表示運行正常。

Copyright © Linux教程網 All Rights Reserved