unix早期通信機制中的信號能夠傳送的信息量有限,管道則只能傳送無格式字節流,這遠遠是不夠的。
消息隊列(也叫報文隊列)客服了這些缺點:
消息隊列就是一個消息的鏈表。
可以把消息看作一個記錄,具有特定的格式。
進程可以按照一定的規則向消息隊列中添加新消息;另一些進程可以從消息隊列中讀走消息。
消息隊列是隨內核持續的,只有內核重啟或人工刪除時,該消息隊列才會被刪除。
system V消息隊列使用消息隊列標識符標識。具有足夠特權的任何進程都可以往一個給定隊列放置一個消息,具有足夠特權的任何進程都可以從一個給定隊列讀出一個消息。
消息隊列具有一定的FIFO特性,但是它可以實現消息的隨機查詢,比FIFO具有更大的優勢。同時這些消息又存在於內核中,由“隊列”ID來標識消息隊列的實現包括創建或打開消息隊列,添加消息,讀取消息和控制消息隊列這四種操作。
對於系統中的每個消息隊列,內核維護一個定義在<sys/msg.h>頭文件中的信息結構。
struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ };
我們可以將內核中的某個特定的消息隊列畫為一個消息鏈表,如圖假設有一個具有三個消息的隊列,消息長度分別為1字節,2字節和3字節,而且這些消息就是以這樣的順序寫入該隊列的。再假設這三個消息的類型分別為100,200,300.
1.msgget函數
msgget函數用於創建一個新的消息隊列或訪問一個已存在的消息隊列。
#include <sys/msg.h> int msgget(key_t key,int oflag);
返回值是一個整數標識符,其他三個msg函數就用它來指代該隊列。它是基於指定的key產生的,而key即可以是ftok的返回值,也可以是常值IPC_PRIVATE。
oflag是讀寫權限值得組合。它還可以與IPC_CREAT或IPC_CREAT | IPC_EXCL按位或,IPC_NOWAIT --- 讀寫消息隊列要求無法得到滿足時,不阻塞。
當創建一個新消息隊列時,msqid_ds結構的如下成員被初始化。
(1)msg_perm結構的uid和cuid成員被設置成當前進程的有效用戶ID,gid和cgid成員被設置成當前進程的有效組ID。
(2)oflag中的讀寫權限位存放在msg_perm.mode中。
(3)msg_qnum,msg_lspid,msg_lrpid,msg_stime和msg_rtime被置為0.
(4)msg_ctime被設置成當前時間。
(5)msg_qbytes被設置成系統限制值。
huangcheng@ubuntu:~$ ipcs -q ----查看消息隊列 ------ Message Queues -------- key msqid owner perms used-bytes messages
2.msgsnd函數
使用msgget函數打開一個消息隊列後,我們使用msgsnd往其上放置一個消息。
#include <sys/msg.h> int msgsnd(int msgid,const void *ptr,size_t length,int flag);
其中msqid是由msgget返回的標識符。ptr是一個結構指針,該結構具有如下模板,它定義在<sys/msg.h>中。
struct msgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */ };
msgsnd的length參數以字節為單位指定待發送消息的長度。這是位於長整數消息類型之後的用戶自定義數據的長度(注意:不包括消息類型)。該長度可以是0.
flag參數既可以是0,也可以是IPC_NUWAIT。IPC_NOWAIT標志使得msgsnd調用非阻塞:如果沒有存放新消息的可用空間(即消息隊列已滿),該函數馬上返回。這個條件發生的情形包括:
(1)在指定的隊列中已有太多的字節(對於該隊列的msqid_ds結構中的msg_qbytes值);
(2)在系統范圍存在太多的消息。
如果這兩個條件中有一個存在,而且IPC_NUWAIT標志已指定,msgsnd就返回一個EAGAIN錯誤。如果這兩個條件有一個存在,但是IPC_NUWAIT標志未指定,那麼調用線程被投入睡眠直到:
(1)具備存放新消息的空間;
(2)由msqid標志的消息隊列從系統中刪除(這種情況下返回一個EIDRM錯誤);
(3)調用線程被某個捕獲的信號所中斷(這種情況下返回一個EINTR錯誤)。
3.msgrcv函數
使用msgrcv函數從某個消息隊列中讀取一個消息。
#include <sys/msg.h> ssize_t msgrcv(int msqid,void *ptr,size_t lengh,long type,int flag);
其中ptr參數指定所接收消息的存放位置。跟msgsnd一樣,該指針指向緊挨在真正的消息數據之前返回的長整數類型字段。
length指定了由ptr指向的緩沖區中數據部分的大小。這是該函數能返回的最大數據量。該長度不包括長整數類型字段。
type指定希望從所給定的隊列中讀出什麼樣的消息。
(1)如果type為0,那就返回該隊列中的第一個消息,既然每個消息隊列都是作為一個FIFO鏈表維護的,因此type為0指定返回該隊列中最早的消息。
(2)如果type大於0,那就返回其類型值為type的第一個消息。
(3)如果type小於0,那就返回其類型小於或等於type參數的絕對值的消息中類型值最小的第一個消息。
msgrcv的flag參數指定所請求類型的消息不在所指定的隊列中時該做何處理。在沒有消息可得的情況下,如果設置了flag中的IPC_NOWAIT位,msgrcv函數就立即返回一個ENOMSG錯誤。否則,設置了flag為0,調用者被阻塞到下列某個事件發生為止:
(1)有一個所請求類型的消息可獲取;
(2)由msqid標志的消息隊列從系統中刪除(這種情況下返回一個EIDRM錯誤);
(3)調用線程被某個捕獲的信號所中斷(這種情況下返回一個EINTR錯誤)。
flag參數中另有一位可以指定:MSG_NOERROR。當所接收消息的真正數據部分大於length參數時,如果設置了該位,msgrcv函數就只是截短數據部分,而不返回錯誤。否則,ms_grcv返回一個E2BIC錯誤。
成功返回時,msgrcv返回的是所接收消息中數據的字節數。它不包括也通過ptr參數返回的長整數消息類型所需的幾個字節。
4.msgctl函數
msgctl函數提供在一個消息隊列上的各種控制操作。
#include <sys/msg.h> int msgctl(int msqid,int cmd,struct msqid_ds *buf);
msgctl函數提供3個命令。
IPC_RMID 從系統中刪除由msqid指定的消息隊列。當前在該隊列上的任何消息都被丟棄。對於該命令而言,msgctl函數的第三個參數被忽略。
IPC_SET 給所指定的消息隊列設置其msgid_ds結構的以下4個成員:msg_perm.uid,msg_perm.gid,msg_perm.mode和msg_qbytes。他們的值來自buff參數指向的結構中的相應的成員。
IPC_STAT 讀取消息隊列的屬性,並將其保存在buf指向的緩沖區中。
msgrcv.c用於接收消息,msgsend.c用於發送消息:
msgrcv.c
/* Here's the receiver program. */ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/msg.h> struct my_msg_st { long int my_msg_type; char some_text[BUFSIZ]; }; int main() { int running = 1; int msgid; struct my_msg_st some_data; long int msg_to_receive = 0; /* First, we set up the message queue. */ msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } /* Then the messages are retrieved from the queue, until an end message is encountered. Lastly, the message queue is deleted. */ while(running) { if (msgrcv(msgid, (void *)&some_data, BUFSIZ,msg_to_receive, 0) == -1) { fprintf(stderr, "msgrcv failed with error: %d\n", errno); exit(EXIT_FAILURE); } printf("You wrote: %s", some_data.some_text); if (strncmp(some_data.some_text, "end", 3) == 0) running = 0; } if (msgctl(msgid, IPC_RMID, 0) == -1) { fprintf(stderr, "msgctl(IPC_RMID) failed\n"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); }
msgsend.c
查看本欄目更多精彩內容:http://www.bianceng.cn/OS/unix/
/* The sender program is very similar to msg1.c. In the main set up, delete the msg_to_receive declaration and replace it with buffer[BUFSIZ], remove the message queue delete and make the following changes to the running loop. We now have a call to msgsnd to send the entered text to the queue. */ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/msg.h> #define MAX_TEXT 512 struct my_msg_st { long int my_msg_type; char some_text[MAX_TEXT]; }; int main() { int running = 1; struct my_msg_st some_data; int msgid; char buffer[BUFSIZ]; msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } while(running) { printf("Enter some text: "); fgets(buffer, BUFSIZ, stdin); some_data.my_msg_type = 1; strcpy(some_data.some_text, buffer); if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1) { fprintf(stderr, "msgsnd failed\n"); exit(EXIT_FAILURE); } if (strncmp(buffer, "end", 3) == 0) running = 0; } exit(EXIT_SUCCESS); }
運行結果:
huangcheng@ubuntu:~$ ./msgsend Enter some text: ctt Enter some text: huangcheng Enter some text: end huangcheng@ubuntu:~$ ./msgrcv You wrote: ctt You wrote: huangcheng You wrote: end