基於生產者-消費者模型實現先進先出的共享內存段
生產者消費者問題:該問題描述了兩個共享固定大小緩沖區的進程——即所謂的“生產者”和“消費者”——在實際運 行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然後重復此過程。與此同時,消費者也在緩沖區 消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。
我們可以用信號量解決生產者消費者問題,如下圖:

定義3個信號量,sem_full 和 sem_empty 用於生產者進程和消費者進程之間同步,即緩沖區為空才能生產,緩沖區不為 空才能消費。由於共享同一塊緩沖區,在生產一個產品過程中不能生產/消費產品,在消費一個產品的過程中不能生產/消費 產品,故再使用一個 sem_mutex 信號量來約束行為,即進程間互斥。
下面基於生產者消費者模型,來實現一個先進 先出的共享內存段:

如上圖所示,定義兩個結構體,shmhead 是共享內存段的頭部,保存了塊大小,塊數,讀寫索引。shmfifo 保存了共享 內存頭部的指針,有效負載的起始地址,創建的共享內存段的shmid,以及3個信號量。
下面來封裝幾個函數:
#include "shmfifo.h"
#include <assert.h>
shmfifo_t *shmfifo_init(int key, int blksize, int blocks)
{
shmfifo_t *fifo = (shmfifo_t *)malloc(sizeof(shmfifo_t));
assert(fifo != NULL);
memset(fifo, 0, sizeof(shmfifo_t));
int shmid;
shmid = shmget(key, 0, 0);
int size = sizeof(shmhead_t) + blksize * blocks;
if (shmid == -1)
{
fifo->shmid = shmget(key, size, IPC_CREAT | 0666);
if (fifo->shmid == -1)
ERR_EXIT("shmget");
fifo->p_shm = (shmhead_t *)shmat(fifo->shmid, NULL, 0);
if (fifo->p_shm == (shmhead_t *) - 1)
ERR_EXIT("shmat");
fifo->p_payload = (char *)(fifo->p_shm + 1);
fifo->p_shm->blksize = blksize;
fifo->p_shm->blocks = blocks;
fifo->p_shm->rd_index = 0;
fifo->p_shm->wr_index = 0;
fifo->sem_mutex = sem_create(key);
fifo->sem_full = sem_create(key + 1);
fifo->sem_empty = sem_create(key + 2);
sem_setval(fifo->sem_mutex, 1);
sem_setval(fifo->sem_full, blocks);
sem_setval(fifo->sem_empty, 0);
}
else
{
fifo->shmid = shmid;
fifo->p_shm = (shmhead_t *)shmat(fifo->shmid, NULL, 0);
if (fifo->p_shm == (shmhead_t *) - 1)
ERR_EXIT("shmat");
fifo->p_payload = (char *)(fifo->p_shm + 1);
fifo->sem_mutex = sem_open(key);
fifo->sem_full = sem_open(key + 1);
fifo->sem_empty = sem_open(key + 2);
}
return fifo;
}
void shmfifo_put(shmfifo_t *fifo, const void *buf)
{
sem_p(fifo->sem_full);
sem_p(fifo->sem_mutex);
memcpy(fifo->p_payload + fifo->p_shm->blksize * fifo->p_shm->wr_index,
buf, fifo->p_shm->blksize);
fifo->p_shm->wr_index = (fifo->p_shm->wr_index + 1) % fifo->p_shm->blocks;
sem_v(fifo->sem_mutex);
sem_v(fifo->sem_empty);
}
void shmfifo_get(shmfifo_t *fifo, void *buf)
{
sem_p(fifo->sem_empty);
sem_p(fifo->sem_mutex);
memcpy(buf, fifo->p_payload + fifo->p_shm->blksize * fifo->p_shm->rd_index,
fifo->p_shm->blksize);
fifo->p_shm->rd_index = (fifo->p_shm->rd_index + 1) % fifo->p_shm->blocks;
sem_v(fifo->sem_mutex);
sem_v(fifo->sem_full);
}
void shmfifo_destroy(shmfifo_t *fifo)
{
sem_d(fifo->sem_mutex);
sem_d(fifo->sem_full);
sem_d(fifo->sem_empty);
shmdt(fifo->p_shm);
shmctl(fifo->shmid, IPC_RMID, 0);
free(fifo);
}
1、shmfifo_init:先分配shmfifo 結構體的內存,如果嘗試打開共享內存失敗則創建,創建的共享內存段大小 = shmhead大小 + 塊大小×塊數目,然後shmat將此共享內存段映射到進程地址空間,然後使用sem_create 創建3個信號量 集,每個信號集只有一個信號量,即上面提到的3個信號量,設置每個信號量的資源初始值。如果共享內存已經存在,則直 接sem_open 打開即可。sem_xxx 系列封裝函數參考這裡。
2、shmfifo_put:參照第一個生產者消費者的圖,除去sem_p,sem_v 操作之外,中間就將buf 的內容memcpy 到對應緩沖 區塊,然後移動wr_index。
3、shmfifo_get:與shmfifo_put 類似,執行的是相反的操作。
4、 shmfifo_destroy:刪除3個信號量集,將共享內存段從進程地址空間剝離,刪除共享內存段,釋放shmfifo 結構體的內存。
下面是生產者程序和消費者程序:
shmfifo_send.c
#include "shmfifo.h"
typedef struct stu
{
char name[32];
int age;
} STU;
int main(void)
{
shmfifo_t *fifo = shmfifo_init(1234, sizeof(STU), 3);
STU s;
memset(&s, 0, sizeof(STU));
s.name[0] = 'A';
int i;
for (i = 0; i < 5; i++)
{
s.age = 20 + i;
shmfifo_put(fifo, &s);
s.name[0] = s.name[0] + 1;
printf("send ok\n");
}
return 0;
}
shmfifo_recv.c
#include "shmfifo.h"
typedef struct stu
{
char name[32];
int age;
} STU;
int main(void)
{
shmfifo_t *fifo = shmfifo_init(1234, sizeof(STU), 3);
STU s;
memset(&s, 0, sizeof(STU));
int i;
for (i = 0; i < 5; i++)
{
shmfifo_get(fifo, &s);
printf("name = %s age = %d\n", s.name, s.age);
}
shmfifo_destroy(fifo);
return 0;
}
先運行生產者進程,輸出如下:
simba@ubuntu:~/Documents/code/linux_programming/UNP/system_v/shmfifo$ ./shmfifo_send
send ok
send ok
send ok
因為共享內存只有3塊block,故發送了3次後再次P(semfull)就 阻塞了,等待消費者讀取數據,現在運行消費者進程
simba@ubuntu:~/Documents/code/linux_programming/UNP/system_v/shmfifo$ ./shmfifo_recv
name = A age = 20
name = B age = 21
name = C age = 22
name = D age = 23
name = E age = 24
因為生產者已經創 建了一塊共享內存,故消費者只是打開而已,當讀取了第一塊數據之後,生產者會再次寫入,依次輸出後兩個 send ok,可 以推論的是D是重新寫到共享內存開始的第一塊,E是第二塊,類似環形隊列。
從輸出可以看出,的確實現了數據的 先進先出。