前面在《Linux內核數據結構kfifo詳解》一文中詳細解析了 Linux 內核並發無鎖環形隊列kfifo的原理和實現,kfifo鬼斧神工,博大精深,讓人歎為觀止,但遺憾的是kfifo為內核提供服務,並未開放出來。劍不試則利鈍暗,弓不試則勁撓誣,鷹不試則巧拙惑,馬不試則良驽疑,光說不練是不能學到精髓的,下面就動手實現自己的並發無鎖隊列UnlockQueue(單生產者單消費者)。
1: #ifndef _UNLOCK_QUEUE_H
2: #define _UNLOCK_QUEUE_H
3:
4: class UnlockQueue
5: {
6: public:
7: UnlockQueue(int nSize);
8: virtual ~UnlockQueue();
9:
10: bool Initialize();
11:
12: unsigned int Put(const unsigned char *pBuffer, unsigned int nLen);
13: unsigned int Get(unsigned char *pBuffer, unsigned int nLen);
14:
15: inline void Clean() { m_nIn = m_nOut = 0; }
16: inline unsigned int GetDataLen() const { return m_nIn - m_nOut; }
17:
18: private:
19: inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); };
20: inline unsigned long roundup_power_of_two(unsigned long val);
21:
22: private:
23: unsigned char *m_pBuffer; /* the buffer holding the data */
24: unsigned int m_nSize; /* the size of the allocated buffer */
25: unsigned int m_nIn; /* data is added at offset (in % size) */
26: unsigned int m_nOut; /* data is extracted from off. (out % size) */
27: };
28:
29: #endif
UnlockQueue與kfifo 結構相同相同,也是由一下變量組成:
UnlockQueue kfifo 作用 m_pBuffer buffer 用於存放數據的緩存 m_nSize size 緩沖區空間的大小,圓整為2的次冪 m_nIn in 指向buffer中隊頭 m_nOut out 指向buffer中的隊尾 UnlockQueue的設計是用在單生產者單消費者情況下,所以不需要鎖 lock 如果使用不能保證任何時間最多只有一個讀線程和寫線程,必須使用該lock實施同步。1: UnlockQueue::UnlockQueue(int nSize)
2: :m_pBuffer(NULL)
3: ,m_nSize(nSize)
4: ,m_nIn(0)
5: ,m_nOut(0)
6: {
7: //round up to the next power of 2
8: if (!is_power_of_2(nSize))
9: {
10: m_nSize = roundup_power_of_two(nSize);
11: }
12: }
13:
14: UnlockQueue::~UnlockQueue()
15: {
16: if(NULL != m_pBuffer)
17: {
18: delete[] m_pBuffer;
19: m_pBuffer = NULL;
20: }
21: }
22:
23: bool UnlockQueue::Initialize()
24: {
25: m_pBuffer = new unsigned char[m_nSize];
26: if (!m_pBuffer)
27: {
28: return false;
29: }
30:
31: m_nIn = m_nOut = 0;
32:
33: return true;
34: }
35:
36: unsigned long UnlockQueue::roundup_power_of_two(unsigned long val)
37: {
38: if((val & (val-1)) == 0)
39: return val;
40:
41: unsigned long maxulong = (unsigned long)((unsigned long)~0);
42: unsigned long andv = ~(maxulong&(maxulong>>1));
43: while((andv & val) == 0)
44: andv = andv>>1;
45:
46: return andv<<1;
47: }
1.在構造函數中,對傳入的size進行2的次冪圓整,圓整的好處是可以將m_nIn % m_nSize 可以轉化為 m_nIn & (m_nSize – 1),取模運算”的效率並沒有 “位運算” 的效率高。
2.在構造函數中,未給buffer分配內存,而在Initialize中分配,這樣做的原因是:我們知道在new UnlockQueue的時候有兩步操作,第一步分配內存,第二步調用構造函數,如果將buffer的分配放在構造函數中,那麼就可能 buffer 就可能分配失敗,而後面用到buffer,還需要判空。
1: unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len)
2: {
3: unsigned int l;
4:
5: len = std::min(len, m_nSize - m_nIn + m_nOut);
6:
7: /*
8: * Ensure that we sample the m_nOut index -before- we
9: * start putting bytes into the UnlockQueue.
10: */
11: __sync_synchronize();
12:
13: /* first put the data starting from fifo->in to buffer end */
14: l = std::min(len, m_nSize - (m_nIn & (m_nSize - 1)));
15: memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l);
16:
17: /* then put the rest (if any) at the beginning of the buffer */
18: memcpy(m_pBuffer, buffer + l, len - l);
19:
20: /*
21: * Ensure that we add the bytes to the kfifo -before-
22: * we update the fifo->in index.
23: */
24: __sync_synchronize();
25:
26: m_nIn += len;
27:
28: return len;
29: }
30:
31: unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len)
32: {
33: unsigned int l;
34:
35: len = std::min(len, m_nIn - m_nOut);
36:
37: /*
38: * Ensure that we sample the fifo->in index -before- we
39: * start removing bytes from the kfifo.
40: */
41: __sync_synchronize();
42:
43: /* first get the data from fifo->out until the end of the buffer */
44: l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1)));
45: memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l);
46:
47: /* then get the rest (if any) from the beginning of the buffer */
48: memcpy(buffer + l, m_pBuffer, len - l);
49:
50: /*
51: * Ensure that we remove the bytes from the kfifo -before-
52: * we update the fifo->out index.
53: */
54: __sync_synchronize();
55:
56: m_nOut += len;
57:
58: return len;
59: }
入隊和出隊操作與kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以參考前面一篇文章《Linux內核數據結構kfifo詳解》。這裡需要指出的是__sync_synchronize()函數,由於linux並未開房出內存屏障函數,而在gcc4.2以上版本提供This builtin issues a full memory barrier,有興趣同學可以參考Built-in functions for atomic memory access。
如圖所示,我們設計了兩個線程,一個生產者隨機生成學生信息放入隊列,一個消費者從隊列中取出學生信息並打印,可以看到整個代碼是無鎖的。
1: #include "UnlockQueue.h"
2: #include <iostream>
3: #include <algorithm>
4: #include <pthread.h>
5: #include <time.h>
6: #include <stdio.h>
7: #include <errno.h>
8: #include <string.h>
9:
10: struct student_info
11: {
12: long stu_id;
13: unsigned int age;
14: unsigned int score;
15: };
16:
17: void print_student_info(const student_info *stu_info)
18: {
19: if(NULL == stu_info)
20: return;
21:
22: printf("id:%ld\t",stu_info->stu_id);
23: printf("age:%u\t",stu_info->age);
24: printf("score:%u\n",stu_info->score);
25: }
26:
27: student_info * get_student_info(time_t timer)
28: {
29: student_info *stu_info = (student_info *)malloc(sizeof(student_info));
30: if (!stu_info)
31: {
32: fprintf(stderr, "Failed to malloc memory.\n");
33: return NULL;
34: }
35: srand(timer);
36: stu_info->stu_id = 10000 + rand() % 9999;
37: stu_info->age = rand() % 30;
38: stu_info->score = rand() % 101;
39: //print_student_info(stu_info);
40: return stu_info;
41: }
42:
43: void * consumer_proc(void *arg)
44: {
45: UnlockQueue* queue = (UnlockQueue *)arg;
46: student_info stu_info;
47: while(1)
48: {
49: sleep(1);
50: unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info));
51: if(len > 0)
52: {
53: printf("------------------------------------------\n");
54: printf("UnlockQueue length: %u\n", queue->GetDataLen());
55: printf("Get a student\n");
56: print_student_info(&stu_info);
57: printf("------------------------------------------\n");
58: }
59: }
60: return (void *)queue;
61: }
62:
63: void * producer_proc(void *arg)
64: {
65: time_t cur_time;
66: UnlockQueue *queue = (UnlockQueue*)arg;
67: while(1)
68: {
69: time(&cur_time);
70: srand(cur_time);
71: int seed = rand() % 11111;
72: printf("******************************************\n");
73: student_info *stu_info = get_student_info(cur_time + seed);
74: printf("put a student info to queue.\n");
75: queue->Put( (unsigned char *)stu_info, sizeof(student_info));
76: free(stu_info);
77: printf("UnlockQueue length: %u\n", queue->GetDataLen());
78: printf("******************************************\n");
79: sleep(1);
80: }
81: return (void *)queue;
82: }
83:
84:
85: int main()
86: {
87: UnlockQueue unlockQueue(1024);
88: if(!unlockQueue.Initialize())
89: {
90: return -1;
91: }
92:
93: pthread_t consumer_tid, producer_tid;
94:
95: printf("multi thread test.......\n");
96:
97: if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue))
98: {
99: fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
100: errno, strerror(errno));
101: return -1;
102: }
103:
104: if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue))
105: {
106: fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
107: errno, strerror(errno));
108: return -1;
109: }
110:
111: pthread_join(producer_tid, NULL);
112: pthread_join(consumer_tid, NULL);
113:
114: return 0;
115: }
運行結果: