在進行多線程編程時,難免還要碰到兩個問題,那就線程間的互斥與同步:
線程同步是指線程之間所具有的一種制約關系,一個線程的執行依賴另一個線程的消息,當它沒有得到另一個線程的消息時應等待,直到消息到達時才被喚醒。
線程互斥是指對於共享的進程系統資源,在各單個線程訪問時的排它性。當有若干個線程都要使用某一共享資源時,任何時刻最多只允許一個線程去使用,其它要使用該資源的線程必須等待,直到占用資源者釋放該資源。線程互斥可以看成是一種特殊的線程同步(下文統稱為同步)。
生產者消費者問題就是一個著名的線程同步問題,該問題描述如下:有一個生產者在生產產品,這些產品將提供給若干個消費者去消費,為了使生產者和消費者能並發執行,在兩者之間設置一個具有多個緩沖區的緩沖池,生產者將它生產的產品放入一個緩沖區中,消費者可以從緩沖區中取走產品進行消費,顯然生產者和消費者之間必須保持同步,即不允許消費者到一個空的緩沖區中取產品,也不允許生產者向一個已經放入產品的緩沖區中再次投放產品。
關於線程同步和互斥的詳細說明可以看:http://blog.csdn.net/big_bit/article/details/51356381這篇文章
線程間的同步方法大體可分為兩類:用戶模式和內核模式。顧名思義,內核模式就是指利用系統內核對象的單一性來進行同步,使用時需要切換內核態與用戶態,而用戶模式就是不需要切換到內核態,只在用戶態完成操作。
用戶模式下的方法有:原子操作(例如一個單一的全局變量),臨界區。內核模式下的方法有:事件,信號量,互斥量。下面我們來分別看一下這些方法:
一、互斥鎖或互斥量(mutex)
下面是用互斥量來解決生產者和消費者問題。為了現集中體現互斥量這個概念(就是一次只能有一個線程訪問,其他線程阻塞),我們先簡化一下問題:緩沖區或者倉庫無限大(生產者和消費者都可以生產和消費產品,而且產品初始化時候數量就是無限多,這裡我們主要體現),只有一個生產者和一個消費者, 我們這個時候就可以把緩沖區設置為一個互斥量,一次要麼生產者要麼消費者霸占它。
初始化鎖。在Linux下,線程的互斥量數據類型是pthread_mutex_t。在使用前,要對它進行初始化。
靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
動態分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);
加鎖。對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,調用線程會阻塞,直到互斥量被解鎖。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
銷毀鎖。鎖在是使用完成後,需要進行銷毀以釋放資源。
int pthread_mutex_destroy(pthread_mutex *mutex);
接下來我們來看看實現流程:
下面開始代碼實現:
#include <stdio.h> #include <pthread.h> #define LOOP_COUNT 5 //生產者和消費者各自循環次數 pthread_mutex_t mutex; //定義一個全局互斥量,在不同函數中 //初始化和使用 void *producer( void *arg ); //生產者線程 void *consumer( void *arg ); //消費者線程 int main(int argc , char *argv[]){ pthread_t thrd_prod , thrd_cons; pthread_mutex_init( &mutex , NULL ); //初始化互斥量 //創建生產者和消費者線程 if( pthread_create( &thrd_prod , NULL, producer , NULL ) != 0 ) oops( "thread create failed." ); sleep(1); //保證生產者線程先運行 if( pthread_create( &thrd_cons , NULL, consumer , NULL ) != 0 ) oops( "thread create failed." ); //等待線程結束 if( pthread_join( thrd_prod , NULL ) != 0 ) oops( " wait thread failed."); if( pthread_join( thrd_cons , NULL ) != 0 ) oops( " wait thread failed."); pthread_mutex_destroy( &mutex ); //關閉互斥量 return 0; } void *producer( void *arg){ int count = 0 ; //循環計數 while( count++ < LOOP_COUNT ){ pthread_mutex_lock( &mutex ); //加鎖 //成功占有互斥量,接下來可以對緩沖區(倉庫)進行生產 //操作 printf( " producer put a product to buffer.\n"); sleep(3); //休眠3秒, 便於程序觀察 pthread_mutex_unlock( &mutex ); //解鎖 sleep(1); //休眠一秒,防止它又馬上占據鎖 } } void *consumer( void *arg ){ int count = 0 ; //循環計數 while( count++ < LOOP_COUNT ){ // sleep(2); //休眠一秒, 便於程序觀察 pthread_mutex_lock( &mutex ); //加鎖 //成功占有互斥量,接下來可以對緩沖區(倉庫)進行取出 //操作 printf( " consumer get a product from buffer.\n"); pthread_mutex_unlock( &mutex ); //解鎖 sleep(1); //休眠一秒,防止它又馬上占據鎖 } }結果如下:
從結果可以看到,當生產者和消費者成功lock互斥量時,另一個就阻塞等待。
接下來我們改變一下生產者消費者問題:現在緩沖區或者倉庫無限大(生產者和消費者都可以生產和消費產品,而且產品初始化時候數量就是無限多,這裡我們主要體現),只有一個生產者(讀寫鎖也可以應用到多個生產者問題),但有多個消費者,我們這個時候就可以把為生產者設置一個寫鎖,為每個消費者設置一個讀鎖。
1.初始化讀寫鎖。
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);
2.加鎖。要在讀模式下鎖定讀寫鎖,需要調用pthread_rwlock_rdlock;要在寫模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock。當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖。
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
3.解鎖。在完成了對共享資源的訪問後,要對讀寫鎖進行解鎖。
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
4.銷毀鎖。在釋放讀寫鎖占用的內存之前,需要調用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init為讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在調用pthread_rwlock_destroy之前就釋放了讀寫鎖占用的內存空間,那麼分配給這個鎖的資源就丟失了。
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
#include <stdio.h> #include <pthread.h> #define LOOP_COUNT 2 //生產者和消費者各自循環次數 #define LOOP_THRD 5 //消費者線程個數 pthread_rwlock_t rwlock; //定義一個全局讀寫鎖,在不同函數中 //初始化和使用 void *producer( void *arg ); //生產者線程 void *consumer( void *arg ); //消費者線程 int main(int argc , char *argv[]){ int thrd_num ,thrd_id[LOOP_THRD] ; pthread_t thrd_prod , thrd_cons[LOOP_THRD]; pthread_rwlock_init( &rwlock , NULL ); //初始化互斥量 //創建一個生產者和多個消費者線程 if( pthread_create( &thrd_prod , NULL, producer , NULL ) != 0 ) oops( "thread create failed." ); for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){ thrd_id[thrd_num] = thrd_num; //線程id,注意線程共享變量 if( pthread_create( &thrd_cons[thrd_num], NULL, consumer , <span >(void *)( thrd_id+thrd_num)</span> ) != 0 ) oops( "thread %d create failed." , thrd_num ); } //等待線程結束 if( pthread_join( thrd_prod , NULL ) != 0 ) oops( " wait thread failed."); for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){ if( pthread_join( thrd_cons[thrd_num] , NULL ) != 0 ) oops( " wait thread %d failed." , thrd_num); // printf("wait %d thread.\n" , thrd_num); } pthread_rwlock_destroy( &rwlock ); //關閉互斥量 return 0; } void *producer( void *arg){ int count = 0 ; //循環計數 while( count++ < LOOP_COUNT ){ printf( "producer try to lock wrlock.\n"); pthread_rwlock_wrlock( &rwlock ); //加鎖 //成功占有互斥量,接下來可以對緩沖區(倉庫)進行生產 //操作 printf( "producer lock successful, producer put a product to buffer.\n"); /* 休眠3秒, 便於程序觀察,可以看到 其他讀取線程不能占據鎖而阻塞 */ sleep(3); printf("prducer finished ,unlock wrlock.\n"); pthread_rwlock_unlock( &rwlock ); //解鎖 sleep(1); //休眠一秒, 防止馬上又占據寫鎖 } } void *consumer( void *arg ){ int count = 0 ; //循環計數 int thrd_id = *( ( int*)arg ); // printf( "consumer %d ,%#x . \n" , thrd_id ,arg); while( count++ < LOOP_COUNT ){ // sleep( thrd_id+1 ); //休眠一秒, 便於程序觀察 printf( "consumer try to lock rdlock.\n" ); pthread_rwlock_rdlock( &rwlock ); //加鎖 //成功占有互斥量,接下來可以對緩沖區(倉庫)進行取出 //操作 printf( " consumer locked successful ,consumer %d get a product from buffer." "\n" , thrd_id); /* 休眠3秒, 便於程序觀察,可以看到 其他讀取線程能占據讀鎖 */ sleep(3); printf("consumer finished ,unlock rdlock.\n"); pthread_rwlock_unlock( &rwlock ); //解鎖 sleep(thrd_id+1); //休眠一秒, 防止馬上又占據讀鎖 } }結果如下:
可以看到當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖。雖然讀寫鎖的實現各不相同,但當讀寫鎖處於讀模式鎖住狀態時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨後的讀模式鎖請求(貌似在程序裡面沒有體現出來)。這樣可以避免讀模式鎖長期占用,而等待的寫模式鎖請求一直得不到滿足。
另外我要說明的一點就是,傳遞參數 arg 為(void *)( thrd_id+thrd_num),我一開始並沒有定義一個數組thrd_cons[LOOP_THRD]來存儲線程編號的,
而是直接傳thrd_num的地址,但通過在線程
int
thrd_id = *( ( int*)arg );
// printf( "consumer %d ,%#x . \n" , thrd_id ,arg);
這兩句話就可以知道,當傳遞的是thrd_num地址時候,由於進程的所有信息對該進程的所有線程都是共享的,包括可執行的程序文本、程序的全局內存和堆內存、棧以及文件描述符。地址,
由於進程的所有信息對該進程的所有線程都是共享的,包括可執行的程序文本、程序的全局內存和堆內存、棧以及文件描述符。 thrd_num的值會隨著線程的執行而發生改變,系統調度頻率之快是我們無法想像的,所以thrd_num的值也是動態改變的。
1.初始化條件變量。
靜態態初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
動態初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
2.等待條件成立。釋放鎖,同時阻塞等待條件變量為真才行。timewait()設置等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個線程wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
3.激活條件變量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待線程)
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有線程的阻塞
4.清除條件變量。無線程等待,否則返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);
接下來我們又改變一下生產者消費者問題:現在緩沖區或者倉庫大小為BUFSIZE,只有一個生產者和一個消費者(其實也適用於多個生產者和消費者),我們這個時候就可以把緩沖區設置為一個互斥量,一次要麼生產者要麼消費者霸占它。但接下來處理方式與互斥量有所不同:假如生產者成功占據鎖(緩沖區),這時它不能馬上開始往裡面生產東西, 要先判斷緩沖區是不是滿的,如果緩沖區滿了,那麼生產者就會把自己放到等待條件的線程列表上,然後對互斥量進行解鎖,這是一個原子操作。如果緩沖區不滿則可以生產產品,然後給消費者發送notempty信號,表示緩沖區有產品了,
你可以yy了。然後解鎖互斥量。假如是消費者成功占據鎖(緩沖區),同樣它要檢查緩沖區是不是空的,如果空,那麼消費者就會把自己放到等待條件的線程列表上,然後對互斥量進行解鎖。如果不空,消費者開始yy,然後給生產者發送nofull信號,
表示緩沖區有位置可以生產了, 你快生產吧。然後解鎖互斥量。就這樣, 生產者消費者和諧同步工作著。
流程圖我就不畫了,看代碼也能明白過程:
---producer過程:lock(mutex)->check notfull->(if notfull wait until notfull)->produce product->send notempty to consumer->unlock(mutex)
---consumer過程:lock(mutex)->check
notempty->(if
notempty wait until
notempty)->get product from buffer->send notfull to poducer->unlock(mutex)
#include <stdio.h> #include <pthread.h> #define LOOP_COUNT 20 //生產者和消費者各自循環次數,也可以說生產商品的總量 //#define LOOP_THRD 5 //消費者線程個數 #define BUFSIZE 5 //緩沖區大小,也就是最多能放多少個產品 pthread_mutex_t mutex; //定義一個全局互斥量,在不同函數中 //初始化和使用 pthread_cond_t notempty , notfull; //定義兩個條件變量,當作信號投放 unsigned int prod_pos = 3; //定義生產者在緩沖區開始生產的位置 unsigned int cons_pos = 0; //定義消費者在緩沖區開始消費的位置 void *producer( void *arg ); //生產者線程 void *consumer( void *arg ); //消費者線程 int main(int argc , char *argv[]){ pthread_t thrd_prod , thrd_cons; pthread_mutex_init( &mutex , NULL ); //初始化互斥量 //創建生產者和消費者線程 if( pthread_create( &thrd_prod , NULL, producer , NULL ) != 0 ) oops( "thread create failed." ); sleep(1); //保證生產者線程先運行 if( pthread_create( &thrd_cons , NULL, consumer , NULL ) != 0 ) oops( "thread create failed." ); //等待線程結束 if( pthread_join( thrd_prod , NULL ) != 0 ) oops( " wait thread failed."); if( pthread_join( thrd_cons , NULL ) != 0 ) oops( " wait thread failed."); pthread_mutex_destroy( &mutex ); //關閉互斥量 return 0; } void *producer( void *arg){ int count = 0 ; //循環計數 while( count++ < LOOP_COUNT ){ printf( "producer try to lock .\n"); pthread_mutex_lock( &mutex ); //加鎖 /* 成功占有互斥量,接著檢查緩沖區是不是滿了, */ if( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){ //緩沖區滿了 printf( "producer wait not full.\n"); pthread_cond_wait( ¬full , &mutex ); //等待條件滿足 } //如果沒滿,接下來可以對緩沖區(倉庫)進行生產 //操作 printf( "producer lock successful, producer put %d's " "product to buffer.\n" ,count); prod_pos = ( prod_pos +1 ) % BUFSIZE; //下標前進一個 pthread_cond_signal( ¬empty ); //向消費者發送信號 /* 休眠3秒, 便於程序觀察,可以看到 其他讀取線程不能占據鎖而阻塞 */ sleep( 1 ); printf("prducer finished ,unlock lock.\n"); pthread_mutex_unlock( &mutex ); //解鎖 sleep( 1 ); //休眠一秒, 防止馬上又占據寫鎖 } } void *consumer( void *arg ){ int count = 0 ; //循環計數 while( count++ < LOOP_COUNT ){ // sleep( thrd_id+1 ); //休眠一秒, 便於程序觀察 printf( "consumer try to lock .\n" ); pthread_mutex_lock( &mutex ); //解鎖 /* 成功占有互斥量,接下來檢查緩沖區是否為空 */ if( cons_pos == prod_pos ){ printf( "consumer wait not empty.\n"); pthread_cond_wait( ¬empty , &mutex ); } //緩沖區不空,可以對緩沖區(倉庫)進行取出操作 printf( " consumer locked successful ,consumer " "get %d product from buffer.\n" , count); cons_pos = ( cons_pos + 1) % BUFSIZE ; //下標前進一個 pthread_cond_signal( ¬full ); //向生產著發送信號 /* 休眠3秒, 便於程序觀察,可以看到 其他讀取線程能占據讀鎖 */ sleep( 1 ); printf("consumer finished ,unlock lock.\n"); pthread_mutex_unlock( &mutex ); //解鎖 sleep(1); //休眠一秒, 防止馬上又占據讀鎖 } }先不忙看結果, 想想結果跟你預想的是不是一樣,然後看結果:
死鎖了!!!! 萬萬沒想到!!!
然後排查,鎖定到pthread_cond_wait函數,查看其他資料,總結如下:
函數將解鎖mutex參數指向的互斥鎖,並使當前線程阻塞在cond參數指向的條件變量上。
被阻塞的線程可以被pthread_cond_signal函數,pthread_cond_broadcast函數喚醒,也可能在被信號中斷後被喚醒。
pthread_cond_wait函數的返回並不意味著條件的值一定發生了變化,必須重新檢查條件的值。
pthread_cond_wait函數返回時,相應的互斥鎖將被當前線程鎖定,即使是函數出錯返回。
一般一個條件表達式都是在一個互斥鎖的保護下被檢查。當條件表達式未被滿足時,線程將仍然阻塞在這個條件變量上。當另一個線程改變了條件的值並向條件變量發出信號時,等待在這個條件變量上的一個線程或所有線程被喚醒,接著都試圖再次占有相應的互斥鎖。
阻塞在條件變量上的線程被喚醒以後,直到pthread_cond_wait()函數返回之前條件的值都有可能發生變化。所以函數返回以後,在鎖定相應的互斥鎖之前,必須重新測試條件值。最好的測試方法是循環調用pthread_cond_wait函數,並把滿足條件的表達式置為循環的終止條件。
所以上述代碼應該用循環而不是if。具體修改如下:
consumer函數中: /* 成功占有互斥量,接下來循環檢查緩沖區是否為空. 這個while要特別 說明一下,單個pthread_cond_wait功能很完善,為何這裡要有一個 while (cons_pos >=prod_pos)呢?因為pthread_cond_wait裡的線程可 能會被意外喚醒返回了,mutex又被重新lock(不一定是本線程,有可能 是其他線程),此時情況是cons_pos >= prod_pos ,表示緩沖區空了, 不能再取product,也沒有product可取。這不是我們想要的結果。應該 讓線程繼續進入pthread_cond_wait */ while( cons_pos == prod_pos ){ printf( "consumer wait not empty.\n"); /* pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的 mutex,然後阻塞在等待對列裡休眠,直到再次被喚醒(大多數 情況下是等待的條件成立而被喚醒,喚醒後,該進程會先鎖定先 pthread_mutex_lock(&mutex);,再讀取資源,用這個流程是比較 清楚的 block-->unlock-->cond_wait() return-->lock */ pthread_cond_wait( ¬empty , &mutex ); }
produer函數中: /* 成功占有互斥量,接著循環檢查緩沖區是不是滿了, */ while( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){ //緩沖區滿了 printf( "producer wait not full.\n"); pthread_cond_wait( ¬full , &mutex ); //等待條件滿足 }這樣來看結果就對了
注:關於生產者和消費者操作緩沖區的操作,大家下來仔細揣摩一下,搞懂
while( ( prod_pos + 1 ) % BUFSIZE == cons_pos )
while( cons_pos == prod_pos )這兩個循環條件,大家就明白緩沖區操作了。
(未完待續)