一.概念
消息隊列提供了一種從一個進程向另一個進程發送一個數據塊的方法。 每個數據塊都被認為是有一個類型,接收者進程接收的數據塊可以有不同的類型值。我們可以通過發送消息 來避免命名管道的同步和阻塞問題。消息隊列與管道不同的是,消息隊列是基於消息的, 而管道是基於字節流的,且消息隊列的讀取不?定是先入先出。消息隊列與命名管道有一樣的不足,就是每個消息的最大長度是有上限的(MSGMAX),每個消息隊列的總的字節數是有上限的(MSGMNB),系統上消息隊列的總數也有?個上限(MSGMNI)。
查看機器的三個上限值:
二. IPC對象數據結構
內核為每個IPC對象維護?一個數據結構(/usr/include/linux/ipc.h)
struct ipc_perm { key_t __key; /* Key supplied to xxxget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode; /* Permissions */ unsigned short __seq; /* Sequence number */ };
消息隊列,共享內存和信號量都有這樣?一個共同的數據結構。
三、消息隊列結構(/usr/include/linux/msg.h)
/* Obsolete, used only for backwards compatibility and libc5 compiles */ struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ };
消息隊列是用鏈表實現的。
四.消息隊列的函數
實現消息隊列的相關函數:
1.生成新消息隊列或者取得已存在的消息隊列
int msgget(ket_t key,int msgflg);key:可以認為是一個端口號,也可以由函數ftok生成。
參數: msg?g:
IPC_CREAT:如果IPC不存在,則創建一個IPC資源,否則打開操作。
IPC_EXCL:只有在共享內存不存在的時候,新的共享內存才建立,否則就產生錯誤。 如果單獨使用IPC_CREAT XXXget()函數要麼返回一個已經存在的共享內存的操作符,要 麼返回一個新建的共享內存的標識符。 如果將IPC_CREAT和IPC_EXCL標志一起使用,XXXget()將返回一個新建的IPC標識符 ;如果該IPC資源已存在,或者返回-1。 IPC_EXEL標志本身並沒有太大的意義,但是和IPC_CREAT標志一起使用可以用來保證所得的對象是新建的,而不是打開已有的對象。
返回值:返回msgid。
2.向隊列讀/寫消息msgrcv取消息:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msg?g);
msgsnd讀消息:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msg?g);
msqid:消息隊列的標識碼。
msgp:指向消息緩沖區的指針,此位置用來暫時存儲發送和接收的消息,是一個用戶可定義的通用結構,形態如下:
struct msgstru { long mtype;//大於0 char mtext[_SIZE_]; //_SIZE_ 用戶指定大小 }
msgtyp:從消息隊列內讀取的消息形態。如果值為零,則表示消息隊列中的所有消息都會被讀取。
msg?g:用來指明核心程序在隊列沒有數據的情況下所應采取的行動。如果msg?g和常數IPC_NOWAIT合用,則在msgsnd()執行時若是消息隊列已滿,則msgsnd()將不會阻塞,而會立即返回-1,如果執行的是msgrcv(),則在消息隊列呈空時,不做等待馬上返回-1,並設定 錯誤碼為ENOMSG。當msg?g為0時,msgsnd()及msgrcv()在隊列呈滿或呈空的情形時,采取 阻塞等待的處理模式。
———使用消息隊列實現一個簡單的客戶端--服務端的通信
實現代碼如下給出:
comm.h
#pragma once #include#include #include #include #include #include #define _PATH_NAME_ "/tmp" #define _PROJ_ID_ 0x6666 #define _SIZE_ 1024 extern int server_type; extern int client_type; struct msgbuf { long mtype; char mtext[_SIZE_]; }; int creat_msg_queue(); int get_msg_queue(); //int creat_msg_queue(int msg_id); int send_msg(int msg_id,int send_type,const char* msg); int recv_msg(int msg_id,int recv_type,char* msg_out); int destroy_queue(int msg_id);
#include"comm.h" int server_type = 1; int client_type = 2; static int comm_msg_queue(int flags) { key_t _key = ftok(_PATH_NAME_,_PROJ_ID_); if(_key < 0) { printf("%d : %s",errno,strerror(errno)); return -1; } int msg_id = msgget(_key,flags); //int msg_id = msgget(_key,IPC_CREAT | IPC_EXCL | 0666); return msg_id; } int creat_msg_queue() { int flags = IPC_CREAT | IPC_EXCL | 0666; return comm_msg_queue(flags); } int get_msg_queue() { int flags = IPC_CREAT; return comm_msg_queue(flags); } int destroy_queue(int msg_id) { if(msgctl(msg_id,IPC_RMID,NULL) != 0) { printf("%d : %s",errno,strerror(errno)); return -1; } return 0; } int send_msg(int msg_id,int send_type,const char* msg) { struct msgbuf _buf; _buf.mtype = send_type; strncpy(_buf.mtext,msg,strlen(msg)+1); if(msgsnd(msg_id,&_buf,sizeof(_buf.mtext),0) < 0) { printf("%d : %s",errno,strerror(errno)); return -1; } return 0; } int recv_msg(int msg_id,int recv_type,char* msg_out) { struct msgbuf _buf; _buf.mtype = 0; memset(_buf.mtext,'\0',sizeof(_buf.mtext)); if(msgrcv(msg_id,&_buf,sizeof(_buf.mtext),recv_type,0) < 0) { printf("%d : %s",errno,strerror(errno)); return -1; } strcpy(msg_out,_buf.mtext); return 0; }
#include"comm.h" int main() { int msg_id = creat_msg_queue(); if(msg_id <0) { printf("%d : %s\n",errno,strerror(errno)); return 1; } char buf[_SIZE_]; while(1) { memset(buf,'\0',sizeof(buf)); recv_msg(msg_id,client_type,buf); printf("client:%s\n",buf); if(strcasecmp(buf,"quit") == 0) { break; } printf("client say done,Please Enter# "); fflush(stdout); ssize_t _s = read(0,buf,sizeof(buf)-1); if(_s > 0) { buf[_s - 1] = '\0'; } send_msg(msg_id,server_type,buf); } destroy_queue(msg_id); return 0; }
#include"comm.h" int main() { //int msg_id = creat_msg_queue(); //if(msg_id <0) //{ // printf("%d : %s\n",errno,strerror(errno)); // return 1; //} // // int msg_id = get_msg_queue(); char buf[_SIZE_]; while(1) { printf("Please Enter:"); fflush(stdout); ssize_t _s = read(0,buf,sizeof(buf)-1); if(_s >0 ) { buf[_s - 1] = '\0'; } send_msg(msg_id,client_type,buf); if(strcasecmp(buf,"quit") == 0) { break; } memset(buf,'\0',sizeof(buf)); recv_msg(msg_id,server_type,buf); printf("server# %s\n",buf); } return 0; }
六.測試過程及結果
頭文件:
各個接口:
int creat_msg_queue(); //創建一個消息隊列 int get_msg_queue(); //獲得消息隊列的msg_id
int send_msg(int msg_id,int send_type,const char* msg); //發送消息
int recv_msg(int msg_id,int recv_type,char* msg_out); //接收消息
int destroy_queue(int msg_id); //銷毀隊列
server.c:創建消息隊列,接收客戶端發送的消息,同時給客戶端發送消息
client.c:獲取msg_id,接收服務端發送的消息,同時給客戶端反饋
Makefile:
要生成兩個可執行程序,所以使用偽目標all。
$@(依賴關系中代表目標文件) $^(依賴關系中:右邊的所有內容)
測試結果:
如有纰漏,歡迎指正。