關鍵詞:集群通信 高可用集群 Linux Heartbeat
引言
Heartbeat是Linux-HA工程的一個組件,自1999年開始到現在,發布了眾多版本,是目前開源Linux-HA項目最成功的一個例子,在行業內得到了廣泛的應用,這裡分析的是2007年1月18日發布的版本2.0.8,可以從Linux-HA的官方網站www.linux-ha.org下載到最新版本。
隨著Linux在關鍵行業應用的逐漸增多,它必將提供一些原來由IBM和SUN這樣的大型商業公司所提供的服務,這些商業公司所提供的服務都有一個關鍵特性,就是高可用集群。
高可用集群是指一組通過硬件和軟件連接起來的獨立計算機,它們在用戶面前表現為一個單一系統,在這樣的一組計算機系統內部的一個或者多個節點停止工作,服務會從故障節點切換到正常工作的節點上運行,不會引起服務中斷。從這個定義可以看出,集群必須檢測節點和服務何時失效,何時恢復為可用。這個任務通常由一組被稱為“心跳”的代碼完成。在Linux-HA裡這個功能由一個叫做heartbeat的程序完成。
Heartbeat消息通信模型
Heartbeat包括以下幾個組件:
heartbeat – 節點間通信校驗模塊
CRM - 集群資源管理模塊
CCM - 維護集群成員的一致性
LRM - 本地資源管理模塊
Stonith Daemon - 提供節點重啟服務
logd - 非阻塞的日志記錄
apphbd - 提供應用程序級的看門狗計時器
Recovery Manager - 應用故障恢復
底層結構–包括插件接口、進程間通信等
CTS – 集群測試系統,集群壓力測試
這裡主要分析的是Heartbeat的集群通信機制,所以這裡主要關注的是heartbeat模塊。heartbeat模塊由以下幾個進程構成:
master進程(master process)
FIFO子進程(fifo child)
read子進程(read child)
write子進程(write child)
在heartbeat裡每一條通信通道對應於一個write子進程和一個read子進程,假設n是通信通道數,p為heartbeat模塊的進程數,則p、n有以下關系:
p = 2 * n + 2
在heartbeat裡,master進程把自己的數據或者是客戶端發送來的數據,通過IPC發送到write子進程,write子進程把數據發送到網絡;同時read子進程從網絡讀取數據,通過IPC發送到master進程,由master進程處理或者由master進程轉發給其客戶端處理。這幾個進程間的數據流如圖一:
圖一:heartbeat模塊進程間的數據流
Heartbeat啟動的時候,由master進程來啟動FIFO子進程、write子進程和read子進程,最後再啟動client進程。
可靠消息通信
Heartbeat通過插件技術實現了集群間的串口、多播、廣播和組播通信,在配置的時候可以根據通信媒介選擇采用的通信協議,heartbeat啟動的時候檢查這些媒介是否存在,如果存在則加載相應的通信模塊。這樣開發人員可以很方便地添加新的通信模塊,比如添加紅外線通信模塊。
對於高可用集群系統,如果集群間的通信不可靠,那麼很明顯集群本身也不可靠。Heartbeat采用UDP協議和串口進行通信,它們本身是不可靠的,可靠性必須由上層應用來提供。那麼怎樣保證消息傳遞的可靠性呢?
Heartbeat通過冗余通信通道和消息重傳機制來保證通信的可靠性。Heartbeat檢測主通信鏈路工作狀態的同時也檢測備用通信鏈路狀態,並把這一狀態報告給系統管理員,這樣可以大大減少因為多重失效引起的集群故障不能恢復。例如,某個工作人員不小心撥下了一個備份通信鏈路,一兩個月以後主通信鏈路也失效了,系統就不能再進行通信了。通過報告備份通信鏈路的工作狀態和主通信鏈路的狀態,可心完全避免這種情況。因為這樣在主通信鏈路失效以前,就可以檢測到備份工作鏈路失效,從而在主通信鏈路失效前修復備份通信鏈路。
Heartbeat通過實現不同的通信子系統,從而避免了某一通信子系統失效而引起的通信失效。最典型的就是采用以太網和串口相結合的通信方式。這被認為是當前的最好實踐,有幾個理由可以使我們選擇采用串口通信:
(1)IP通信子系統的失效不太可能影響到串口子系統。
(2)串口不需要復雜的外部設備和電源。
(3)串口設備簡單,在實踐中非常可靠。
(4)串口可以非常容易地專用於集群通信。
(5)串口的直連線因為偶然性掉線事件很少。
不管是采用串口還是以太網IP協議進行通信,heartbeat都實現了一套消息重傳協議,保證消息包的可靠傳遞。實現消息包重傳有兩種協議,一種是發送者發起,另一種是接收者發起。
對於發送者發起協議,一般情況下接收者會發送一個消息包的確認。發送者維護一個計時器,並在計時器到時的時候重傳那些還沒有收到確認的消息包。這種方法容易引起發送者溢出,因為每一台機器的每一個消息包都需要確認,使得要發送的消息包成倍增長。這種現像被稱為發送者(或者ACK)內爆(implosion)。
對於接收者發起協議,采用這種協議通信雙方的接收者通過序列號負責進行錯誤檢測。當檢測到消息包丟失時,接收者請求發送者重傳消息包。采用這種方法,如果消息包沒有被送達任何一個接收者,那麼發送者容易因NACK溢出,因為每個接收者都會向發送者發送一個重傳請求,這會引起發送者的負載過高。這種現像被稱為NACK內爆(implosion)。
Heartbeat實現的是接收者發起協議的一個變種,它采用計時器來限制過多的重傳,在計時器時間內限制接收者請求重傳消息包的次數,這樣發送者重傳消息包的次數也被相應的限制了,從而嚴格的限制了NACK內爆。
可靠消息通信的實現
一般集群通信有兩類消息包,一類是心跳消息包,這類消息包通告集群內節點的存活情況;另一類是控制消息包,這類消息包負責集群的節點和資源管理。heartbeat把心跳消息包看成是控制消息包的一個特例,采用相同的通信通道進行發送,這使得協議的實現簡單化,而且很有效,並把相應的代碼限制在幾百行之內。
在heartbeat裡,一切流向網絡的數據都由master進程發送到write子進程進行發送。master進程調用send_cluster_msg()函數把消息發送到所有的write子進程。下面通過一些代碼片段看看heartbeat是怎麼發送消息的。在介紹代碼之前先介紹相關的重要數據結構
Heartbeat的消息包數據結構 struct ha_msg { int nfields; /*消息包數據域的個數*/ int nalloc; /*己分配的內存塊個數*/ char **names; /*消息包數據域的名稱*/ size_t *nlens; /*各個數據域稱的長度*/ void **values; /*與數據域名稱對應的數據值*/ size_t *vlens; /*各個數據域對應的數據值的長度*/ int *types; /*消息包的類型*/ };
Heartbeat的歷史消息隊列 struct msg_xmit_hist { struct ha_msg *msgq[MAXMSGHIST]; /*歷史消息隊列*/ seqno_t seqnos[MAXMSGHIST]; /*歷史消息序列號*/ longclock_t lastrexmit[MAXMSGHIST]; /*上一次重傳的時間*/ int lastmsg; /*上一次重傳到的消息序列號*/ seqno_t hiseq; /*最大消息序列號*/ seqno_t lowseq; /*最小消息序列號*/ seqno_t ackseq; /*確認了的消息序列號*/ struct node_info *lowest_acknode; /*確認的節點*/ };
代碼所屬文件heartbeat/heartbeat.c
int send_cluster_msg(struct ha_msg *msg) { ... pid_t ourpid = getpid(); ...
if (ourpid == processes[0]) { /*來自master進程的消息*/ /*添加控制信息,包括源節點名,源節點全局標識符,序列號,代數,時間等*/ if ((msg = add_control_msg_fields(msg)) != NULL) { /*可靠的多播消息包傳遞*/ rc = process_outbound_packet(&msghist, msg); } } else { /*來自client進程的消息*/ int ffd = -1; char *smsg = NULL;
...
/*發送到FIFO進程*/
if ((smsg = msg2wirefmt_noac(msg, &len)) == NULL) { ... } else if ((ffd = open(FIFONAME, O_WRONLY | O_APPEND)) < 0) { ... } else if ((writerc = write(ffd, smsg, len – 1)) != (ssize_t)(len-1)) { ... } } }
Heartbeat在process_outbound_packet()函數裡實現了一個可靠的多播協議,它利用一個循環隊列來存放歷史消息,對於帶有序列號的心跳消息,先存放到歷史消息隊列裡然後發送,接收者可以請求發送傳重傳該消息,對於不帶序列號的控制消息,不會進行重傳。下面是這個函數的實現代碼。
static int process_outbound_packet(struct msg_xmit_hist *hist, struct ha_msg *msg) { ... const char *cseq; seqno_t seqno = -1; const char *to; int IsToUs; size_t len; ...
if ((type = ha_msg_value(msg, F_TYPE)) == NULL) { ... return HA_FAIL; }
if ((cseq = ha_msg_value(msg, F_SEQ)) != NULL) { if (sscanf(cseq, “%lx”, &seqno) != 1 || seqno nodename) == 0);
/*把消息轉換成字符串*/ smsg = msg2wirefmt(msg, &len);
...
if (cseq != NULL) { /*存放到歷史消息隊列裡,通過序列號記錄,如果需要,則進行重傳*/ add2_xmit_hist(hist, msg, seqno); }
...
/*通過write子進程發送到所有的網絡接口上*/ send_to_all_media(smsg, len);
...
return HA_OK; }
add2_xmit_hist()函數把發送的消息發到一個歷史消息隊列裡去,隊列的最大長度為200。如果接收者請求重傳消息,發送者通過序列號在該隊列裡查找要重傳的消息,如果找到則進行重傳。下面是相關代碼。
static void add2_xmit_hist (struct msg_xmit_hist *hist, struct ha_msg *msg, seqno_t seq) { int slot; struct ha_msg *slotmsg;
...
/*查找隊列裡消息存放的位置*/ slot = hist->lastmsg + 1; if (slot >= MAXMSGHIST) { /*到達隊尾,從頭開始。在這裡實現循環隊列*/ slot = 0; }
hist->hiseq = seq; slotmsg = hist->msgq[slot];
/*刪除隊列中找到的位置上的舊消息*/ if (slotmsg != NULL) { hist->lowseq = hist->seqnos[slot]; hist->msgq[slot] = NULL; if (!ha_is_allocated(slotmsg)) { ... } else { ha_msg_del(slotmsg); } }
hist->msgq[slot] = msg; hist->seqnos[slot] = seq; hist->lastrexmit[slot] = 0L; hist->lastmsg = slot;
if (enable_flow_control && live_node_count > 1 && (hist->hiseq – hist->lowseq) > ((MAXMSGHIST*3)/4)) { /*消息隊列長度大於告警長度,記錄日志*/ ... } if (enable_flow_control && hist->hiseq – hist->ackseq > FLOWCONTROL_LIMIT) { /*消息隊列的長度大於流控限制長度*/ if (live_node_count < 2) { /*集群裡只有本機節點為存活節點,更新歷史消息隊列,刪除舊消息, 以防止歷史消息隊列滿*/ update_ackseq(hist->hiseq – (FLOWCONTROL_LIMIT – 1)); all_clients_resume(); } else { /*client進程發送消息過快,暫停所有的client進程*/ all_clients_pause(); hist_display(hist); } }
}
當發送者收到接收者的重傳請求後,通過回調函數HBDoMsg_T_REXMIT()函數調用process_rexmit()函數進行消息重傳。
#define MAX_REXMIT_BATCH 50 /*每次最多重傳的消息包數*/
static void process_rexmit(struct msg_xmit_hist *hist, struct ha_msg *msg) { const char *cfseq; const char *clseq; seqno_t fseq = 0; seqno_t lseq = 0; seqno_t thisseq; int firstslot = hist->lastmsg – 1; int rexmit_pkt_count = 0; const char *fromnodename = ha_msg_value(msg, F_ORIG); struct node_info *fromnode = NULL;
...
/*取得要重傳的消息包的起始序列號*/ if ((cfseq = ha_msg_value(msg, F_FIRSTSEQ)) == NULL || (clseq = ha_msg_value(msg, F_LASTSEQ)) == NULL || (fseq = atoi(cfseq)) hiseq) { /*序列號大於消息隊列中最大序列號*/ ... continue; }
for (msgslot = firstslot; !foundit && msgslot != (firstslot+1); --msgslot) { char *smsg; longclock_t now = time_longclock(); longclock_t last_rexmit; size_t len;
...
/*重傳上一次重傳剩下的消息包*/ last_rexmit = hist->lastrexmit[msgslot];
if (cmp_longclock(last_rexmit, zero_longclock) != 0 && longclockto_ms(sub_longclock(now, last_rexmit)) < (ACCEPT_REXMIT_REQ_MS)) { goto NextReXmit; }
/*一次不能發送太多數據包,如果數據包太多的話,可能會引起串口溢出*/ ++rexm