歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Linux編程之自定義消息隊列

我這裡要講的並不是IPC中的消息隊列,我要講的是在進程內部實現自定義的消息隊列,讓各個線程的消息來推動整個進程的運動。進程間的消息隊列用於進程與進程之間的通信,而我將要實現的進程內的消息隊列是用於有序妥當處理來自於各個線程請求,避免一窩蜂的請求而導致消息的異常丟失。想想socket編程裡的listen函數吧,裡面要設置一個隊列長度的參數,其實來自網絡的請求已經排成一個請求隊列了,只是這個隊列是系統幫我們做好了,我們看不到而已。如果系統不幫我們做這個等待隊列的話,那就需要我們程序員在應用層實現了。

進程內的消息隊列實現並不難,總的來說有以下幾點:
  • 自定義消息結構,並構造隊列
  • 一個線程負責依次從消息隊列中取出消息,並處理該消息
  • 多個線程產生事件,並將消息放進消息隊列,等待處理
長話短說,我們開始動手吧!   一、定義消息結構 先貼代碼再解釋:
typedef struct Msg_Hdr_s  
{  
    uint32 msg_type;  
    uint32 msg_len;  
    uint32 msg_src;  
    uint32 msg_dst;      
}Msg_Hdr_t;  
  
typedef struct Msg_s  
{  
    Msg_Hdr_t hdr;  
    uint8 data[100];  
} Msg_t;
下面是我設計的消息格式內容的解釋:
  • msg_type:標記消息類型,當消息接收者看到該msg_type後就知道他要干什麼事了
  • msg_len:消息長度,待擴展,暫時沒用到(以後會擴展為變長消息)
  • msg_src:消息的源地址,即消息的發起者
  • msg_dst:消息的目的地,即消息的接受者
  • data[100]:消息除去消息頭外可以攜帶的信息量,定義為100字節
由該消息數據結構可以知道,這個消息是定長的,當然也可以實現為變長消息,但現在暫不實現,今天先把定長消息實現了,以後再完善變長消息。     二、構造循環隊列 隊列可以由鏈表實現,也可以由數組實現,這裡就使用數組實現的循環鏈表作為我們消息隊列的隊列模型。
typedef struct Queue_s  
{  
    int head;  
    int rear;  
    sem_t sem;  
    Msg_t data[QUEUE_SIZE];  
}Queue_t;  
  
int MsgQueueInit(Queue_t* Q)  
{  
    if(!Q)  
    {  
        printf("Invalid Queue!\n");  
        return -1;  
    }  
    Q->rear = 0;  
    Q->head = 0;  
    sem_init(&Q->sem, 0, 1);  
    return 0;      
}  
  
int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
{  
    if(!Q)  
    {  
        printf("Invalid Queue!\n");  
        return -1;  
    }  
    if(Q->rear == Q->head) //only one consumer,no need to lock head  
    {  
        printf("Empty Queue!\n");  
        return -1;  
    }  
    memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
    Q->head = (Q->head+1)%QUEUE_SIZE;  
    return 0;         
  
}  
  
int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
{  
    if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
    {  
        printf("Full Queue!\n");  
        return -1;  
    }  
    sem_wait(&Q->sem);  
    memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
    Q->rear = (Q->rear+1)%QUEUE_SIZE;  
    sem_post(&Q->sem);  
    return 0;  
} 
  循環隊列的實現想必大家都比較熟悉,但這裡需要提示的幾點是:
  • 隊列中應加入信號量或鎖來保證進隊時的互斥訪問,因為多個消息可能同時進隊,互相覆蓋其隊列節點
  • 這裡的信號量僅用於進隊而沒用於出隊,理由是消息處理者只有一個,不存在互斥的情形

三、構造消息處理者

if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
{  
    printf("create handler thread fail!\n");  
    return -1;          
}  
  
void msg_printer(Msg_t* msg)  
{  
    if(!msg)  
    {  
        return;  
    }  
    printf("%s: I have recieved a message!\n", __FUNCTION__);  
    printf("%s: msgtype:%d   msg_src:%d  dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
  
}  
  
void msg_handler()  
{  
    sleep(5);  //let's wait 5s when starts  
    while(1)  
    {  
        Msg_t msg;  
        memset(&msg, 0 ,sizeof(Msg_t));  
        int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
        if(res != 0)  
        {  
            sleep(10);  
            continue;  
        }  
        msg_printer(&msg);  
        sleep(1);  
    }  
}
我在進程裡create了一個線程作為消息處理者(handler)來處理消息隊列的消息,甘進入該線程時先等個5秒鐘來讓生產者往隊列裡丟些消息,然後再開始消息處理。當隊列沒消息可取時,就休息十秒,再去取消息。   這裡的消息處理很簡單,我只是簡單地將受到的消息打印一下,證明受到的消息正是其他線程發給我的。當然,你也可以在這裡擴展功能,根據受到的消息類型進一步決定該做什麼事。比如:
enum MSG_TYPE  
{  
    GO_HOME,  
    GO_TO_BED,  
    GO_TO_LUNCH,  
    GO_TO_CINAMA,  
    GO_TO_SCHOOL,  
    GO_DATEING,  
    GO_TO_WORK,//6  
};  
  
void handler()  
{  
    switch(msgtype)  
    {  
        case GO_HOME: go_home(); break;  
        case GO_TO_BED:  go_to_bed(); break;  
        .......  
    }  
}

這裡的handler就是一個簡單的狀態機了,根據給定的消息類型(事件)去做特定的事,推動狀態機的轉動。

四、構造消息生產者

if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
{  
    printf("create thread1 fail!\n");  
    return -1;  
}  
  
if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
{  
    printf("create thread2 fail!\n");  
    return -1;  
}      
  
if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
{  
    printf("create thread3 fail!\n");  
    return -1;  
}      
  
  
void msg_sender1()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD1;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread1 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}  
  
void msg_sender2()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD2;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread2 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}  
  
void msg_sender3()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD3;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread3 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}

這裡我create了三個線程來模擬消息生產者,每個生產者每隔1秒往消息隊列裡寫消息。

五、跑起來看看

先貼完整的代碼: msg_queue.c:  
  1 #include <stdio.h>  
  2 #include <pthread.h>  
  3 #include <semaphore.h>  
  4 #include <unistd.h>  
  5 #include <string.h>  
  6 #include "msg_def.h"  
  7   
  8 Queue_t MsgQueue;  
  9   
 10 int main(int argc, char* argv[])  
 11 {  
 12     int ret;  
 13     pthread_t thread1_id;  
 14     pthread_t thread2_id;  
 15     pthread_t thread3_id;  
 16     pthread_t handler_thread_id;  
 17   
 18     ret = MsgQueueInit((Queue_t*)&MsgQueue);  
 19     if(ret != 0)  
 20     {  
 21         return -1;  
 22     }  
 23   
 24     if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
 25     {  
 26         printf("create handler thread fail!\n");  
 27         return -1;          
 28     }  
 29   
 30   
 31     if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
 32     {  
 33         printf("create thread1 fail!\n");  
 34         return -1;  
 35     }  
 36   
 37     if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
 38     {  
 39         printf("create thread2 fail!\n");  
 40         return -1;  
 41     }      
 42   
 43     if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
 44     {  
 45         printf("create thread3 fail!\n");  
 46         return -1;  
 47     }      
 48   
 49   
 50     while(1)  
 51     {      
 52         sleep(1);  
 53     }  
 54   
 55     return 0;  
 56 }  
 57   
 58   
 59   
 60   
 61 int MsgQueueInit(Queue_t* Q)  
 62 {  
 63     if(!Q)  
 64     {  
 65         printf("Invalid Queue!\n");  
 66         return -1;  
 67     }  
 68     Q->rear = 0;  
 69     Q->head = 0;  
 70     sem_init(&Q->sem, 0, 1);  
 71     return 0;      
 72 }  
 73   
 74 int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
 75 {  
 76     if(!Q)  
 77     {  
 78         printf("Invalid Queue!\n");  
 79         return -1;  
 80     }  
 81     if(Q->rear == Q->head) //only one cosumer,no need to lock head  
 82     {  
 83         printf("Empty Queue!\n");  
 84         return -1;  
 85     }  
 86     memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
 87     Q->head = (Q->head+1)%QUEUE_SIZE;  
 88     return 0;         
 89   
 90 }  
 91   
 92 int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
 93 {  
 94     if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
 95     {  
 96         printf("Full Queue!\n");  
 97         return -1;  
 98     }  
 99     sem_wait(&Q->sem);  
100     memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
101     Q->rear = (Q->rear+1)%QUEUE_SIZE;  
102     sem_post(&Q->sem);  
103     return 0;  
104 }  
105   
106 void msg_printer(Msg_t* msg)  
107 {  
108     if(!msg)  
109     {  
110         return;  
111     }  
112     printf("%s: I have recieved a message!\n", __FUNCTION__);  
113     printf("%s: msgtype:%d   msg_src:%d  dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
114   
115 }  
116   
117 int msg_send()  
118 {  
119   
120     Msg_t msg;  
121     msg.hdr.msg_type = GO_HOME;  
122     msg.hdr.msg_src = THREAD1;  
123     msg.hdr.msg_dst = HANDLER;  
124     return MsgEnQueue((Queue_t*)&MsgQueue, &msg);      
125   
126 }  
127   
128 void msg_handler()  
129 {  
130     sleep(5);  //let's wait 5s when starts  
131     while(1)  
132     {  
133         Msg_t msg;  
134         memset(&msg, 0 ,sizeof(Msg_t));  
135         int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
136         if(res != 0)  
137         {  
138             sleep(10);  
139             continue;  
140         }  
141         msg_printer(&msg);  
142         sleep(1);  
143     }  
144 }  
145   
146   
147 void msg_sender1()  
148 {  
149     int i = 0;  
150     while(1)  
151     {  
152         if(i > 10)  
153         {  
154             i = 0;  
155         }  
156         Msg_t msg;  
157         msg.hdr.msg_type = i++;  
158         msg.hdr.msg_src = THREAD1;  
159         msg.hdr.msg_dst = HANDLER;  
160         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
161         printf("%s: Thread1 send a message!\n",__FUNCTION__);  
162         sleep(1);  
163     }  
164 }  
165   
166 void msg_sender2()  
167 {  
168     int i = 0;  
169     while(1)  
170     {  
171         if(i > 10)  
172         {  
173             i = 0;  
174         }  
175         Msg_t msg;  
176         msg.hdr.msg_type = i++;  
177         msg.hdr.msg_src = THREAD2;  
178         msg.hdr.msg_dst = HANDLER;  
179         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
180         printf("%s: Thread2 send a message!\n",__FUNCTION__);  
181         sleep(1);  
182     }  
183 }  
184   
185 void msg_sender3()  
186 {  
187     int i = 0;  
188     while(1)  
189     {  
190         if(i > 10)  
191         {  
192             i = 0;  
193         }  
194         Msg_t msg;  
195         msg.hdr.msg_type = i++;  
196         msg.hdr.msg_src = THREAD3;  
197         msg.hdr.msg_dst = HANDLER;  
198         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
199         printf("%s: Thread3 send a message!\n",__FUNCTION__);  
200         sleep(1);  
201     }  
202 } 

msg_def.h:

 1 #include <stdio.h>  
 2 #include <pthread.h>  
 3 #include <semaphore.h>  
 4   
 5 typedef unsigned char uint8;  
 6 typedef unsigned short unit16;  
 7 typedef unsigned int uint32;  
 8   
 9 #define QUEUE_SIZE 1000  
10   
11 typedef struct Msg_Hdr_s  
12 {  
13     uint32 msg_type;  
14     uint32 msg_len;  
15     uint32 msg_src;  
16     uint32 msg_dst;      
17 }Msg_Hdr_t;  
18   
19 typedef struct Msg_s  
20 {  
21     Msg_Hdr_t hdr;  
22     uint8 data[100];  
23 } Msg_t;  
24   
25 typedef struct Queue_s  
26 {  
27     int head;  
28     int rear;  
29     sem_t sem;  
30     Msg_t data[QUEUE_SIZE];  
31 }Queue_t;  
32   
33 typedef struct Queue_s QueueNode;  
34   
35 enum MSG_TYPE  
36 {  
37     GO_HOME,  
38     GO_TO_BED,  
39     GO_TO_LUNCH,  
40     GO_TO_CINAMA,  
41     GO_TO_SCHOOL,  
42     GO_DATEING,  
43     GO_TO_WORK,//6  
44 };  
45   
46 enum SRC_ADDR  
47 {  
48     THREAD1,  
49     THREAD2,  
50     THREAD3,  
51     HANDLER,  
52 };  
53   
54   
55 int MsgQueueInit(Queue_t* Q);  
56 int MsgDeQueue(Queue_t* Q, Msg_t* msg);  
57 int MsgEnQueue(Queue_t* Q, Msg_t* msg);  
58 void msg_handler();  
59 void msg_sender1();  
60 void msg_sender2();  
61 void msg_sender3();  
62 void msg_printer(Msg_t* msg);  
63 int msg_send(); 
看看跑起來的現象:     Finish! 現在這套進程內的消息隊列的架構在實際工程中非常實用(當然實際工程的框架會復雜健壯得多),很多工程都需要這種基於事件推動的思想來保證每條請求都可以有條不絮地執行,所以這個框架也是有用武之地的,尤其配合狀態機非常適合!

Copyright © Linux教程網 All Rights Reserved