posix消息隊列與system v消息隊列的差別:
(1)對posix消息隊列的讀總是返回最高優先級的最早消息,對system v消息隊列的讀則可以返回任意指定優先級的消息。隊列中的每個消息具有如下屬性:
1、一個無符號整數優先級(posix)或一個長整數類型(system v)Posix消息隊列操作函數如下:
1. 創建/獲取一個消息隊列
mqd_t mq_open(const char *name, int oflag); //專用於打開一個消息隊列 mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
參數:
name: 消息隊列名字;
oflag: 與open函數類型, 可以是O_RDONLY, O_WRONLY, O_RDWR, 還可以按位或上O_CREAT, O_EXCL, O_NONBLOCK.
mode: 如果oflag指定了O_CREAT, 需要指定mode參數;
attr: 指定消息隊列的屬性;
返回值:
成功: 返回消息隊列文件描述符;
失敗: 返回-1;
注意-Posix IPC名字限制:
1. 必須以”/”開頭, 並且後面不能還有”/”, 形如:/file-name;
2. 名字長度不能超過NAME_MAX
3. 鏈接時:Link with -lrt(Makefile中使用實時鏈接庫-lrt)
2. 關閉一個消息隊列
#includeint mq_close(mqd_t mqdes);
返回: 成功時為0,出錯時為-1。
功能: 關閉已打開的消息隊列。
注意:System V沒有此功能函數調用
3. 刪除一個消息隊列
int mq_unlink(const char *name); /** System V 消息隊列 通過msgctl函數, 並將cmd指定為IPC_RMID來實現 int msgctl(int msqid, int cmd, struct msqid_ds *buf); **/返回: 成功時為0,出錯時為-1
int main() { mqd_t mqid = mq_open("/abc", O_CREAT|O_RDONLY, 0666, NULL); if (mqid == -1) err_exit("mq_open error"); cout << "mq_open success" << endl; mq_close(mqid); mq_unlink("/abc"); cout << "unlink success" << endl; }4. 獲取/設置消息隊列屬性
#include均返回:成功時為0, 出錯時為-1int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);
參數:
newattr: 需要設置的屬性
oldattr: 原來的屬性
每個消息隊列有四個屬性:struct mq_attr { long mq_flags; /* message queue flag : 0, O_NONBLOCK */ long mq_maxmsg; /* max number of messages allowed on queue*/ long mq_msgsize; /* max size of a message (in bytes)*/ long mq_curmsgs; /* number of messages currently on queue */ };
int main(int argc,char **argv) { mqd_t mqid = mq_open("/test", O_RDONLY|O_CREAT, 0666, NULL); if (mqid == -1) err_exit("mq_open error"); struct mq_attr attr; if (mq_getattr(mqid, &attr) == -1) err_exit("mq_getattr error"); cout << "Max messages on queue: " << attr.mq_maxmsg << endl; cout << "Max message size: " << attr.mq_msgsize << endl; cout << "current messages: " << attr.mq_curmsgs << endl; mq_close(mqid); return 0; }對比System V:
通過msgctl函數, 並將cmd指定為IPC_STAT/IPC_SET來實現
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
另外每個消息均有一個優先級,它是一個小於MQ_PRIO_MAX的無符號整數5. 發送消息/讀取消息
#includeint mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio); ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
返回:成功時為0,出錯為-1
返回:成功時為消息中的字節數,出錯為-1
參數: 最後一個是消息的優先級
消息隊列的限制:
MQ_OPEN_MAX : 一個進程能夠同時擁有的打開著消息隊列的最大數目
MQ_PRIO_MAX : 任意消息的最大優先級值加1
/** 示例: 向消息隊列中發送消息, prio需要從命令行參數中讀取 **/ struct Student { char name[36]; int age; }; int main(int argc,char **argv) { if (argc != 2) err_quit("./send"); mqd_t mqid = mq_open("/test", O_WRONLY|O_CREAT, 0666, NULL); if (mqid == -1) err_exit("mq_open error"); struct Student stu = {"xiaofang", 23}; unsigned prio = atoi(argv[1]); if (mq_send(mqid, (const char *)&stu, sizeof(stu), prio) == -1) err_exit("mq_send error"); mq_close(mqid); return 0; }
/** 示例: 從消息隊列中獲取消息 **/ int main(int argc,char **argv) { mqd_t mqid = mq_open("/test", O_RDONLY); if (mqid == -1) err_exit("mq_open error"); struct Student buf; int nrcv; unsigned prio; struct mq_attr attr; if (mq_getattr(mqid, &attr) == -1) err_exit("mq_getattr error"); if ((nrcv = mq_receive(mqid, (char *)&buf, attr.mq_msgsize, &prio)) == -1) err_exit("mq_receive error"); cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: " << buf.name << ", age: " << buf.age << endl; mq_close(mqid); return 0; }6.建立/刪除消息到達通知事件
#include返回: 成功時為0,出錯時為-1int mq_notify(mqd_t mqdes, const struct sigevent *notification);
sigev_notify代表通知的方式: 一般常用兩種取值:SIGEV_SIGNAL, 以信號方式通知; SIGEV_THREAD, 以線程方式通知
如果以信號方式通知: 則需要設定一下兩個參數:
sigev_signo: 信號的代碼
sigev_value: 信號的附加數據(實時信號)
如果以線程方式通知: 則需要設定以下兩個參數:
sigev_notify_function
sigev_notify_attributes
union sigval { int sival_int; /* Integer value */ void *sival_ptr; /* pointer value */ }; struct sigevent { int sigev_notify; /* SIGEV_{ NONE, ISGNAL, THREAD} */ int sigev_signo; /* signal number if SIGEV_SIGNAL */ union sigval sigev_value; /* passed to signal handler or thread */ void (*sigev_notify_function)(union sigval); pthread_attr_t *sigev_notify_attribute; };
參數sevp:
NULL: 表示撤銷已注冊通知;
非空: 表示當消息到達且消息隊列當前為空, 那麼將得到通知;
通知方式:
1. 產生一個信號, 需要自己綁定
2. 創建一個線程, 執行指定的函數
注意: 這種注冊的方式只是在消息隊列從空到非空時才產生消息通知事件, 而且這種注冊方式是一次性的!
** Posix IPC所特有的功能, System V沒有 **//**示例: 將下面程序多運行幾遍, 尤其是當消息隊列”從空->非空”, 多次”從空->非空”, 當消息隊列不空時運行該程序時, 觀察該程序的狀態; **/ mqd_t mqid; long size; void sigHandlerForUSR1(int signo) { //將數據的讀取轉移到對信號SIGUSR1的響應函數中來 struct Student buf; int nrcv; unsigned prio; if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1) err_exit("mq_receive error"); cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: " << buf.name << ", age: " << buf.age << endl; } int main(int argc,char **argv) { // 安裝信號響應函數 if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR) err_exit("signal error"); mqid = mq_open("/test", O_RDONLY); if (mqid == -1) err_exit("mq_open error"); // 獲取消息的最大長度 struct mq_attr attr; if (mq_getattr(mqid, &attr) == -1) err_exit("mq_getattr error"); size = attr.mq_msgsize; // 注冊消息到達通知事件 struct sigevent event; event.sigev_notify = SIGEV_SIGNAL; //指定以信號方式通知 event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知 if (mq_notify(mqid, &event) == -1) err_exit("mq_notify error"); //死循環, 等待信號到來 while (true) pause(); mq_close(mqid); return 0; }
/** 示例:多次注冊notify, 這樣就能過多次接收消息, 但是還是不能從隊列非空的時候進行接收, 將程序改造如下: **/ mqd_t mqid; long size; struct sigevent event; void sigHandlerForUSR1(int signo) { // 注意: 是在消息被讀走之前進行注冊, // 不然該程序就感應不到消息隊列"從空->非空"的一個過程變化了 if (mq_notify(mqid, &event) == -1) err_exit("mq_notify error"); //將數據的讀取轉移到對信號SIGUSR1的響應函數中來 struct Student buf; int nrcv; unsigned prio; if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1) err_exit("mq_receive error"); cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: " << buf.name << ", age: " << buf.age << endl; } int main(int argc,char **argv) { // 安裝信號響應函數 if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR) err_exit("signal error"); mqid = mq_open("/test", O_RDONLY); if (mqid == -1) err_exit("mq_open error"); // 獲取消息的最大長度 struct mq_attr attr; if (mq_getattr(mqid, &attr) == -1) err_exit("mq_getattr error"); size = attr.mq_msgsize; // 注冊消息到達通知事件 event.sigev_notify = SIGEV_SIGNAL; //指定以信號方式通知 event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知 if (mq_notify(mqid, &event) == -1) err_exit("mq_notify error"); //死循環, 等待信號到來 while (true) pause(); mq_close(mqid); return 0; }
mq_notify 注意點總結:
1. 任何時刻只能有一個進程可以被注冊為接收某個給定隊列的通知;
2. 當有一個消息到達某個先前為空的隊列, 而且已有一個進程被注冊為接收該隊列的通知時, 只有沒有任何線程阻塞在該隊列的mq_receive調用的前提下, 通知才會發出;
3. 當通知被發送給它的注冊進程時, 該進程的注冊被撤銷. 進程必須再次調用mq_notify以重新注冊(如果需要的話),但是要注意: 重新注冊要放在從消息隊列讀出消息之前而不是之後(如同示例程序);
異步信號安全函數#includeint sigwait(const sigset_t *set, int *sig);
可以使用sigwait函數代替信號處理程序的信號通知,將信號阻塞到某個函數中,僅僅等待該信號的遞交。采用sigwait實現上面的程序如下:
#include啟動線程處理消息通知,程序如下:#include #include #include #include #include #include int main(int argc,char *argv[]) { mqd_t mqd; int signo; void *buff; ssize_t n; sigset_t newmask; struct mq_attr attr; struct sigevent sigev; if(argc != 2) { printf("usage :mqnotify "); exit(0); } mqd = mq_open(argv[1],O_RDONLY); mq_getattr(mqd,&attr); buff = malloc(attr.mq_msgsize); sigemptyset(&newmask); sigaddset(&newmask,SIGUSR1); sigprocmask(SIG_BLOCK,&newmask,NULL); sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; if(mq_notify(mqd,&sigev) == -1) { perror("mq_notify error"); exit(-1); } for(; ;) { sigwait(&newmask,&signo); //阻塞並等待該信號 if(signo == SIGUSR1) { mq_notify(mqd,&sigev); while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0) printf("read %ld bytes\n",(long) n); if(errno != EAGAIN) { perror("mq_receive error"); exit(-1); } } } eixt(0); }
#include#include #include #include #include #include #include mqd_t mqd; struct mq_attr attr; struct sigevent sigev; static void notify_thread(union sigval); int main(int argc,char *argv[]) { if(argc != 2) { printf("usage :mqnotify "); exit(0); } mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK); mq_getattr(mqd,&attr); sigev.sigev_notify = SIGEV_THREAD; sigev.sigev_value.sival_ptr = NULL; sigev.sigev_notify_function = notify_thread; sigev.sigev_notify_attributes = NULL; if(mq_notify(mqd,&sigev) == -1) { perror("mq_notify error"); exit(-1); } for(; ;) { pause(); } eixt(0); } static void notify_thread(union sigval arg) { ssize_t n; void *buff; printf("notify_thread started\n"); buff = malloc(attr.mq_msgsize); mq_notify(mqd,&sigev); while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0) printf("read %ld bytes\n",(long) n); if(errno != EAGAIN) { perror("mq_receive error"); exit(-1); } free(buff); pthread_exit(NULL); }
附-查看已經成功創建的Posix消息隊列
#其存在與一個虛擬文件系統中, 需要將其掛載到系統中才能查看
Mounting the message queue filesystem On Linux, message queues are created in a virtual filesystem.
(Other implementations may also provide such a feature, but the details are likely to differ.) This
file system can be mounted (by the superuser, 注意是使用root用戶才能成功) using the following commands:
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue
還可以使用cat查看該消息隊列的狀態, rm刪除:
cat /dev/mqueue/abc
rm abc
還可umount該文件系統
umount /dev/mqueue