基於生產者-消費者模型實現先進先出的共享內存段
生產者消費者問題:該問題描述了兩個共享固定大小緩沖區的進程——即所謂的“生產者”和“消費者”——在實際運 行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然後重復此過程。與此同時,消費者也在緩沖區 消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。
我們可以用信號量解決生產者消費者問題,如下圖:
定義3個信號量,sem_full 和 sem_empty 用於生產者進程和消費者進程之間同步,即緩沖區為空才能生產,緩沖區不為 空才能消費。由於共享同一塊緩沖區,在生產一個產品過程中不能生產/消費產品,在消費一個產品的過程中不能生產/消費 產品,故再使用一個 sem_mutex 信號量來約束行為,即進程間互斥。
下面基於生產者消費者模型,來實現一個先進 先出的共享內存段:
如上圖所示,定義兩個結構體,shmhead 是共享內存段的頭部,保存了塊大小,塊數,讀寫索引。shmfifo 保存了共享 內存頭部的指針,有效負載的起始地址,創建的共享內存段的shmid,以及3個信號量。
下面來封裝幾個函數:
#include "shmfifo.h" #include <assert.h> shmfifo_t *shmfifo_init(int key, int blksize, int blocks) { shmfifo_t *fifo = (shmfifo_t *)malloc(sizeof(shmfifo_t)); assert(fifo != NULL); memset(fifo, 0, sizeof(shmfifo_t)); int shmid; shmid = shmget(key, 0, 0); int size = sizeof(shmhead_t) + blksize * blocks; if (shmid == -1) { fifo->shmid = shmget(key, size, IPC_CREAT | 0666); if (fifo->shmid == -1) ERR_EXIT("shmget"); fifo->p_shm = (shmhead_t *)shmat(fifo->shmid, NULL, 0); if (fifo->p_shm == (shmhead_t *) - 1) ERR_EXIT("shmat"); fifo->p_payload = (char *)(fifo->p_shm + 1); fifo->p_shm->blksize = blksize; fifo->p_shm->blocks = blocks; fifo->p_shm->rd_index = 0; fifo->p_shm->wr_index = 0; fifo->sem_mutex = sem_create(key); fifo->sem_full = sem_create(key + 1); fifo->sem_empty = sem_create(key + 2); sem_setval(fifo->sem_mutex, 1); sem_setval(fifo->sem_full, blocks); sem_setval(fifo->sem_empty, 0); } else { fifo->shmid = shmid; fifo->p_shm = (shmhead_t *)shmat(fifo->shmid, NULL, 0); if (fifo->p_shm == (shmhead_t *) - 1) ERR_EXIT("shmat"); fifo->p_payload = (char *)(fifo->p_shm + 1); fifo->sem_mutex = sem_open(key); fifo->sem_full = sem_open(key + 1); fifo->sem_empty = sem_open(key + 2); } return fifo; } void shmfifo_put(shmfifo_t *fifo, const void *buf) { sem_p(fifo->sem_full); sem_p(fifo->sem_mutex); memcpy(fifo->p_payload + fifo->p_shm->blksize * fifo->p_shm->wr_index, buf, fifo->p_shm->blksize); fifo->p_shm->wr_index = (fifo->p_shm->wr_index + 1) % fifo->p_shm->blocks; sem_v(fifo->sem_mutex); sem_v(fifo->sem_empty); } void shmfifo_get(shmfifo_t *fifo, void *buf) { sem_p(fifo->sem_empty); sem_p(fifo->sem_mutex); memcpy(buf, fifo->p_payload + fifo->p_shm->blksize * fifo->p_shm->rd_index, fifo->p_shm->blksize); fifo->p_shm->rd_index = (fifo->p_shm->rd_index + 1) % fifo->p_shm->blocks; sem_v(fifo->sem_mutex); sem_v(fifo->sem_full); } void shmfifo_destroy(shmfifo_t *fifo) { sem_d(fifo->sem_mutex); sem_d(fifo->sem_full); sem_d(fifo->sem_empty); shmdt(fifo->p_shm); shmctl(fifo->shmid, IPC_RMID, 0); free(fifo); }
1、shmfifo_init:先分配shmfifo 結構體的內存,如果嘗試打開共享內存失敗則創建,創建的共享內存段大小 = shmhead大小 + 塊大小×塊數目,然後shmat將此共享內存段映射到進程地址空間,然後使用sem_create 創建3個信號量 集,每個信號集只有一個信號量,即上面提到的3個信號量,設置每個信號量的資源初始值。如果共享內存已經存在,則直 接sem_open 打開即可。sem_xxx 系列封裝函數參考這裡。
2、shmfifo_put:參照第一個生產者消費者的圖,除去sem_p,sem_v 操作之外,中間就將buf 的內容memcpy 到對應緩沖 區塊,然後移動wr_index。
3、shmfifo_get:與shmfifo_put 類似,執行的是相反的操作。
4、 shmfifo_destroy:刪除3個信號量集,將共享內存段從進程地址空間剝離,刪除共享內存段,釋放shmfifo 結構體的內存。
下面是生產者程序和消費者程序:
shmfifo_send.c
#include "shmfifo.h" typedef struct stu { char name[32]; int age; } STU; int main(void) { shmfifo_t *fifo = shmfifo_init(1234, sizeof(STU), 3); STU s; memset(&s, 0, sizeof(STU)); s.name[0] = 'A'; int i; for (i = 0; i < 5; i++) { s.age = 20 + i; shmfifo_put(fifo, &s); s.name[0] = s.name[0] + 1; printf("send ok\n"); } return 0; }
shmfifo_recv.c
#include "shmfifo.h" typedef struct stu { char name[32]; int age; } STU; int main(void) { shmfifo_t *fifo = shmfifo_init(1234, sizeof(STU), 3); STU s; memset(&s, 0, sizeof(STU)); int i; for (i = 0; i < 5; i++) { shmfifo_get(fifo, &s); printf("name = %s age = %d\n", s.name, s.age); } shmfifo_destroy(fifo); return 0; }
先運行生產者進程,輸出如下:
simba@ubuntu:~/Documents/code/linux_programming/UNP/system_v/shmfifo$ ./shmfifo_send
send ok
send ok
send ok
因為共享內存只有3塊block,故發送了3次後再次P(semfull)就 阻塞了,等待消費者讀取數據,現在運行消費者進程
simba@ubuntu:~/Documents/code/linux_programming/UNP/system_v/shmfifo$ ./shmfifo_recv
name = A age = 20
name = B age = 21
name = C age = 22
name = D age = 23
name = E age = 24
因為生產者已經創 建了一塊共享內存,故消費者只是打開而已,當讀取了第一塊數據之後,生產者會再次寫入,依次輸出後兩個 send ok,可 以推論的是D是重新寫到共享內存開始的第一塊,E是第二塊,類似環形隊列。
從輸出可以看出,的確實現了數據的 先進先出。