前面的一片文章我們已經講過使用信號量解決生產者消費者問題,那麼什麼情況下我們需要引入條件變量呢?
假設有共享的資源sum,與之相關聯的mutex 是lock_s.假設每個線程對sum的操作很簡單的,與sum的狀態無關,比如只是sum++.那麼只用mutex足夠了.程序員只要確保每個線程操作前,取得lock,然後sum++,再unlock即可.每個線程的代碼將像這樣:
add() { pthread_mutex_lock(lock_s); sum++; pthread_mutex_unlock(lock_s); }
如果操作比較復雜,假設線程t0,t1,t2的操作是sum++,而線程t3則是在sum到達100的時候,打印出一條信息,並對sum清零. 這種情況下,如果只用mutex, 則t3需要一個循環,每個循環裡先取得lock_s,然後檢查sum的狀態,如果sum>=100,則打印並清零,然後unlock.如果sum<100,則unlock,並sleep()本線程合適的一段時間。
這個時候,t0,t1,t2的代碼不變,t3的代碼如下:
print() { while (1) { pthread_mutex_lock(lock_s); if(sum<100) { printf(“sum reach 100!”); pthread_mutex_unlock(lock_s); } else { pthread_mutex_unlock(lock_s); my_thread_sleep(100); return OK; } } }
這種辦法有兩個問題
1) sum在大多數情況下不會到達100,那麼對t3的代碼來說,大多數情況下,走的是else分支,只是lock和unlock,然後sleep().這浪費了CPU處理時間.
2) 為了節省CPU處理時間,t3會在探測到sum沒到達100的時候sleep()一段時間.這樣卻又帶來另外一個問題,亦即t3響應速度下降.可能在sum到達200的時候,t4才會醒過來.
3) 這樣,程序員在設置sleep()時間的時候陷入兩難境地,設置得太短了節省不了資源,太長了又降低響應速度.真是難辦啊!
這個時候,condition variable,從天而降,拯救了焦頭爛額的你.
你首先定義一個condition variable.
pthread_cond_t cond_sum_ready=PTHREAD_COND_INITIALIZER;
t0,t1,t2的代碼只要後面加兩行,像這樣:
add() { pthread_mutex_lock(lock_s); sum++; pthread_mutex_unlock(lock_s); if(sum>=100) pthread_cond_signal(&cond_sum_ready); } 而t3的代碼則是 print { pthread_mutex_lock(lock_s); while(sum<100) pthread_cond_wait(&cond_sum_ready, &lock_s); printf(“sum is over 100!”); sum=0; pthread_mutex_unlock(lock_s); return OK; }
注意兩點:
1) 在thread_cond_wait()之前,必須先lock相關聯的mutex, 因為假如目標條件未滿足,pthread_cond_wait()實際上會unlock該mutex, 然後block,在目標條件滿足後再重新lock該mutex, 然後返回.
2) 為什麼是while(sum<100),而不是if(sum<100) ?這是因為在pthread_cond_signal()和pthread_cond_wait()返回之間,有時間差,假設在這個時間差內,還有另外一個線程t4又把sum減少到100以下了,那麼t3在pthread_cond_wait()返回之後,顯然應該再檢查一遍sum的大小.這就是用 while的用意
Posix條件變量常用API:
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr); int pthread_cond_destroy(pthread_cond_t *cond); int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime); int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond);
條件變量的使用規范:
(一)、等待條件代碼 pthread_mutex_lock(&mutex); while (條件為假) pthread_cond_wait(cond, mutex); 修改條件 pthread_mutex_unlock(&mutex); (二)、給條件發送通知代碼 pthread_mutex_lock(&mutex); 設置條件為真 pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);注意是while而不是if,原因是在信號的中斷後還能正常運行。
解決生產者消費者問題(無界緩沖區):
#include#include #include #include #include #include #include #include #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ } while(0) #define CONSUMERS_COUNT 2 #define PRODUCERS_COUNT 1 pthread_mutex_t g_mutex; pthread_cond_t g_cond; pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT]; int nready = 0; void *consume(void *arg) { int num = (int)arg; while (1) { pthread_mutex_lock(&g_mutex); while (nready == 0) { printf("%d begin wait a condtion ...\n", num); pthread_cond_wait(&g_cond, &g_mutex); } printf("%d end wait a condtion ...\n", num); printf("%d begin consume product ...\n", num); --nready; printf("%d end consume product ...\n", num); pthread_mutex_unlock(&g_mutex); sleep(1); } return NULL; } void *produce(void *arg) { int num = (int)arg; while (1) { pthread_mutex_lock(&g_mutex); printf("%d begin produce product ...\n", num); ++nready; printf("%d end produce product ...\n", num); pthread_cond_signal(&g_cond); printf("%d signal ...\n", num); pthread_mutex_unlock(&g_mutex); sleep(1); } return NULL; } int main(void) { int i; pthread_mutex_init(&g_mutex, NULL); pthread_cond_init(&g_cond, NULL); for (i = 0; i < CONSUMERS_COUNT; i++) pthread_create(&g_thread[i], NULL, consume, (void *)i); sleep(1); for (i = 0; i < PRODUCERS_COUNT; i++) pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i); for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++) pthread_join(g_thread[i], NULL); pthread_mutex_destroy(&g_mutex); pthread_cond_destroy(&g_cond); return 0; }