一.概念
消息隊列提供了一種從一個進程向另一個進程發送一個數據塊的方法。 每個數據塊都被認為是有一個類型,接收者進程接收的數據塊可以有不同的類型值。我們可以通過發送消息 來避免命名管道的同步和阻塞問題。消息隊列與管道不同的是,消息隊列是基於消息的, 而管道是基於字節流的,且消息隊列的讀取不?定是先入先出。消息隊列與命名管道有一樣的不足,就是每個消息的最大長度是有上限的(MSGMAX),每個消息隊列的總的字節數是有上限的(MSGMNB),系統上消息隊列的總數也有?個上限(MSGMNI)。
查看機器的三個上限值:
二. IPC對象數據結構
內核為每個IPC對象維護?一個數據結構(/usr/include/linux/ipc.h)
- structipc_perm
- {
- key_t__key;/*Keysuppliedtoxxxget(2)*/
- uid_tuid;/*EffectiveUIDofowner*/
- gid_tgid;/*EffectiveGIDofowner*/
- uid_tcuid;/*EffectiveUIDofcreator*/
- gid_tcgid;/*EffectiveGIDofcreator*/
- unsignedshortmode;/*Permissions*/
- unsignedshort__seq;/*Sequencenumber*/};
消息隊列,共享內存和信號量都有這樣?一個共同的數據結構。
三、消息隊列結構(/usr/include/linux/msg.h)
- /*Obsolete,usedonlyforbackwardscompatibilityandlibc5compiles*/
- structmsqid_ds{
- structipc_permmsg_perm;
- structmsg*msg_first;/*firstmessageonqueue,unused*/
- structmsg*msg_last;/*lastmessageinqueue,unused*/
- __kernel_time_tmsg_stime;/*lastmsgsndtime*/
- __kernel_time_tmsg_rtime;/*lastmsgrcvtime*/
- __kernel_time_tmsg_ctime;/*lastchangetime*/
- unsignedlongmsg_lcbytes;/*Reusejunkfieldsfor32bit*/
- unsignedlongmsg_lqbytes;/*ditto*/
- unsignedshortmsg_cbytes;/*currentnumberofbytesonqueue*/
- unsignedshortmsg_qnum;/*numberofmessagesinqueue*/
- unsignedshortmsg_qbytes;/*maxnumberofbytesonqueue*/
- __kernel_ipc_pid_tmsg_lspid;/*pidoflastmsgsnd*/
- __kernel_ipc_pid_tmsg_lrpid;/*lastreceivepid*/
- };
可以看到第?個紅色條目就是IPC結構體,即是共有的,後面的都是消息隊列所私有的成員。消息隊列是用鏈表實現的。
四.消息隊列的函數
實現消息隊列的相關函數:
1.生成新消息隊列或者取得已存在的消息隊列
intmsgget(ket_tkey,intmsgflg); key:可以認為是一個端口號,也可以由函數ftok生成。
參數: msg?g:
IPC_CREAT:如果IPC不存在,則創建一個IPC資源,否則打開操作。
IPC_EXCL:只有在共享內存不存在的時候,新的共享內存才建立,否則就產生錯誤。 如果單獨使用IPC_CREAT XXXget()函數要麼返回一個已經存在的共享內存的操作符,要 麼返回一個新建的共享內存的標識符。 如果將IPC_CREAT和IPC_EXCL標志一起使用,XXXget()將返回一個新建的IPC標識符 ;如果該IPC資源已存在,或者返回-1。 IPC_EXEL標志本身並沒有太大的意義,但是和IPC_CREAT標志一起使用可以用來保證所得的對象是新建的,而不是打開已有的對象。
返回值:返回msgid。
2.向隊列讀/寫消息
msgrcv取消息:
ssize_tmsgrcv(intmsqid,void*msgp,size_tmsgsz,longmsgtyp,intmsg?g);
msgsnd讀消息:
intmsgsnd(intmsqid,constvoid*msgp,size_tmsgsz,intmsg?g);
msqid:消息隊列的標識碼。
msgp:指向消息緩沖區的指針,此位置用來暫時存儲發送和接收的消息,是一個用戶可定義的通用結構,形態如下:
- structmsgstru
- {
- longmtype;//大於0
- charmtext[_SIZE_];//_SIZE_用戶指定大小
- }
msgsz:消息的大小。
msgtyp:從消息隊列內讀取的消息形態。如果值為零,則表示消息隊列中的所有消息都會被讀取。
msg?g:用來指明核心程序在隊列沒有數據的情況下所應采取的行動。如果msg?g和常數IPC_NOWAIT合用,則在msgsnd()執行時若是消息隊列已滿,則msgsnd()將不會阻塞,而會立即返回-1,如果執行的是msgrcv(),則在消息隊列呈空時,不做等待馬上返回-1,並設定 錯誤碼為ENOMSG。當msg?g為0時,msgsnd()及msgrcv()在隊列呈滿或呈空的情形時,采取 阻塞等待的處理模式。
五.應用
———使用消息隊列實現一個簡單的客戶端--服務端的通信
實現代碼如下給出:
comm.h
- #pragmaonce
-
- #include
- #include
- #include
- #include
- #include
- #include
-
- #define_PATH_NAME_"/tmp"
- #define_PROJ_ID_0x6666
- #define_SIZE_1024
-
- externintserver_type;
- externintclient_type;
-
-
- structmsgbuf
- {
- longmtype;
- charmtext[_SIZE_];
- };
-
- intcreat_msg_queue();
- intget_msg_queue();
- //intcreat_msg_queue(intmsg_id);
- intsend_msg(intmsg_id,intsend_type,constchar*msg);
- intrecv_msg(intmsg_id,intrecv_type,char*msg_out);
- intdestroy_queue(intmsg_id);
comm.c:
- #include"comm.h"
-
- intserver_type=1;
- intclient_type=2;
-
-
- staticintcomm_msg_queue(intflags)
- {
- key_t_key=ftok(_PATH_NAME_,_PROJ_ID_);
- if(_key<0)
- {
- printf("%d:%s",errno,strerror(errno));
- return-1;
- }
-
- intmsg_id=msgget(_key,flags);
- //intmsg_id=msgget(_key,IPC_CREAT|IPC_EXCL|0666);
- returnmsg_id;
- }
-
- intcreat_msg_queue()
- {
- intflags=IPC_CREAT|IPC_EXCL|0666;
- returncomm_msg_queue(flags);
- }
-
- intget_msg_queue()
- {
- intflags=IPC_CREAT;
- returncomm_msg_queue(flags);
- }
-
- intdestroy_queue(intmsg_id)
- {
- if(msgctl(msg_id,IPC_RMID,NULL)!=0)
- {
- printf("%d:%s",errno,strerror(errno));
- return-1;
- }
-
- return0;
- }
-
- intsend_msg(intmsg_id,intsend_type,constchar*msg)
- {
- structmsgbuf_buf;
- _buf.mtype=send_type;
- strncpy(_buf.mtext,msg,strlen(msg)+1);
-
- if(msgsnd(msg_id,&_buf,sizeof(_buf.mtext),0)<0)
- {
- printf("%d:%s",errno,strerror(errno));
- return-1;
- }
- return0;
- }
-
- intrecv_msg(intmsg_id,intrecv_type,char*msg_out)
- {
- structmsgbuf_buf;
- _buf.mtype=0;
- memset(_buf.mtext,'\0',sizeof(_buf.mtext));
-
- if(msgrcv(msg_id,&_buf,sizeof(_buf.mtext),recv_type,0)<0)
- {
- printf("%d:%s",errno,strerror(errno));
- return-1;
- }
-
- strcpy(msg_out,_buf.mtext);
- return0;
- }
server.c:
- #include"comm.h"
-
- intmain()
- {
- intmsg_id=creat_msg_queue();
- if(msg_id<0)
- {
- printf("%d:%s\n",errno,strerror(errno));
- return1;
- }
-
- charbuf[_SIZE_];
- while(1)
- {
- memset(buf,'\0',sizeof(buf));
- recv_msg(msg_id,client_type,buf);
- printf("client:%s\n",buf);
- if(strcasecmp(buf,"quit")==0)
- {
- break;
- }
-
- printf("clientsaydone,PleaseEnter#");
- fflush(stdout);
- ssize_t_s=read(0,buf,sizeof(buf)-1);
- if(_s>0)
- {
- buf[_s-1]='\0';
- }
-
- send_msg(msg_id,server_type,buf);
-
- }
-
- destroy_queue(msg_id);
- return0;
- }
client.c:
- #include"comm.h"
-
- intmain()
- {
- //intmsg_id=creat_msg_queue();
-
- //if(msg_id<0)
- //{
- //printf("%d:%s\n",errno,strerror(errno));
- //return1;
- //}
- //
- //
- intmsg_id=get_msg_queue();
-
- charbuf[_SIZE_];
- while(1)
- {
- printf("PleaseEnter:");
- fflush(stdout);
- ssize_t_s=read(0,buf,sizeof(buf)-1);
- if(_s>0)
- {
- buf[_s-1]='\0';
- }
- send_msg(msg_id,client_type,buf);
- if(strcasecmp(buf,"quit")==0)
- {
- break;
- }
-
- memset(buf,'\0',sizeof(buf));
- recv_msg(msg_id,server_type,buf);
- printf("server#%s\n",buf);
- }
-
- return0;
- }
-
六.測試過程及結果
頭文件:
各個接口:
- intcreat_msg_queue();//創建一個消息隊列
-
- intget_msg_queue();//獲得消息隊列的msg_id
intsend_msg(intmsg_id,intsend_type,constchar*msg);//發送消息
intrecv_msg(intmsg_id,intrecv_type,char*msg_out);//接收消息
intdestroy_queue(intmsg_id);//銷毀隊列