歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux基礎 >> Linux技術

線程同步

在進行多線程編程時,難免還要碰到兩個問題,那就線程間的互斥與同步:

線程同步是指線程之間所具有的一種制約關系,一個線程的執行依賴另一個線程的消息,當它沒有得到另一個線程的消息時應等待,直到消息到達時才被喚醒。

線程互斥是指對於共享的進程系統資源,在各單個線程訪問時的排它性。當有若干個線程都要使用某一共享資源時,任何時刻最多只允許一個線程去使用,其它要使用該資源的線程必須等待,直到占用資源者釋放該資源。線程互斥可以看成是一種特殊的線程同步(下文統稱為同步)。

生產者消費者問題就是一個著名的線程同步問題,該問題描述如下:有一個生產者在生產產品,這些產品將提供給若干個消費者去消費,為了使生產者和消費者能並發執行,在兩者之間設置一個具有多個緩沖區的緩沖池,生產者將它生產的產品放入一個緩沖區中,消費者可以從緩沖區中取走產品進行消費,顯然生產者和消費者之間必須保持同步,即不允許消費者到一個空的緩沖區中取產品,也不允許生產者向一個已經放入產品的緩沖區中再次投放產品。

關於線程同步和互斥的詳細說明可以看:http://blog.csdn.net/big_bit/article/details/51356381這篇文章

線程間的同步方法大體可分為兩類:用戶模式和內核模式。顧名思義,內核模式就是指利用系統內核對象的單一性來進行同步,使用時需要切換內核態與用戶態,而用戶模式就是不需要切換到內核態,只在用戶態完成操作。

用戶模式下的方法有:原子操作(例如一個單一的全局變量),臨界區。內核模式下的方法有:事件,信號量,互斥量。下面我們來分別看一下這些方法:

一、互斥鎖或互斥量(mutex)

下面是用互斥量來解決生產者和消費者問題。為了現集中體現互斥量這個概念(就是一次只能有一個線程訪問,其他線程阻塞),我們先簡化一下問題:緩沖區或者倉庫無限大(生產者和消費者都可以生產和消費產品,而且產品初始化時候數量就是無限多,這裡我們主要體現),只有一個生產者和一個消費者, 我們這個時候就可以把緩沖區設置為一個互斥量,一次要麼生產者要麼消費者霸占它。

初始化鎖。在Linux下,線程的互斥量數據類型是pthread_mutex_t。在使用前,要對它進行初始化。

靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

動態分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);

加鎖。對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,調用線程會阻塞,直到互斥量被解鎖。

int pthread_mutex_lock(pthread_mutex *mutex);

int pthread_mutex_trylock(pthread_mutex_t *mutex);

解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。

int pthread_mutex_unlock(pthread_mutex_t *mutex);

銷毀鎖。鎖在是使用完成後,需要進行銷毀以釋放資源。

int pthread_mutex_destroy(pthread_mutex *mutex);

接下來我們來看看實現流程:

下面開始代碼實現:

#include <stdio.h>
#include <pthread.h>

#define LOOP_COUNT 5			//生產者和消費者各自循環次數
pthread_mutex_t mutex;			//定義一個全局互斥量,在不同函數中
								//初始化和使用

void *producer( void *arg );	//生產者線程
void *consumer( void *arg );	//消費者線程

int main(int argc , char *argv[]){
	pthread_t thrd_prod , thrd_cons;

	pthread_mutex_init( &mutex , NULL );	//初始化互斥量

	//創建生產者和消費者線程
	if( pthread_create( &thrd_prod , NULL, producer ,
				NULL ) != 0 )
		oops( "thread create failed." );
	sleep(1);								//保證生產者線程先運行

	if( pthread_create( &thrd_cons , NULL, consumer ,
				NULL ) != 0 )
		oops( "thread create failed." );

	//等待線程結束
	if( pthread_join( thrd_prod , NULL ) != 0 )
		oops( " wait thread failed.");
	if( pthread_join( thrd_cons , NULL ) != 0 )
		oops( " wait thread failed.");

	pthread_mutex_destroy( &mutex );		//關閉互斥量
	return 0;
}

void *producer( void *arg){
	int count = 0 ;				//循環計數

	while( count++ < LOOP_COUNT ){
		pthread_mutex_lock( &mutex );	//加鎖

		//成功占有互斥量,接下來可以對緩沖區(倉庫)進行生產
		//操作
		printf( " producer put a product to buffer.\n");
		sleep(3);				//休眠3秒, 便於程序觀察

		pthread_mutex_unlock( &mutex ); //解鎖
		sleep(1);				//休眠一秒,防止它又馬上占據鎖
	}
}
void *consumer( void *arg ){
	int count = 0 ;				//循環計數

	while( count++ < LOOP_COUNT ){
//		sleep(2);				//休眠一秒, 便於程序觀察
		pthread_mutex_lock( &mutex );	//加鎖

		//成功占有互斥量,接下來可以對緩沖區(倉庫)進行取出
		//操作
		printf( " consumer get a product from buffer.\n");

		pthread_mutex_unlock( &mutex ); //解鎖
		sleep(1);				//休眠一秒,防止它又馬上占據鎖
	}
}
結果如下:

從結果可以看到,當生產者和消費者成功lock互斥量時,另一個就阻塞等待。

二、讀寫鎖

讀寫鎖也叫做共享-獨占鎖,當讀寫鎖以讀模式鎖住時,它是以共享模式鎖住的;當它以寫模式鎖住時,它是以獨占模式鎖住的。

接下來我們改變一下生產者消費者問題:現在緩沖區或者倉庫無限大(生產者和消費者都可以生產和消費產品,而且產品初始化時候數量就是無限多,這裡我們主要體現),只有一個生產者(讀寫鎖也可以應用到多個生產者問題),但有多個消費者,我們這個時候就可以把為生產者設置一個寫鎖,為每個消費者設置一個讀鎖。

1.初始化讀寫鎖。

#include <pthread.h>

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);

2.加鎖。要在讀模式下鎖定讀寫鎖,需要調用pthread_rwlock_rdlock;要在寫模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock。當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖。

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

3.解鎖。在完成了對共享資源的訪問後,要對讀寫鎖進行解鎖。

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

4.銷毀鎖。在釋放讀寫鎖占用的內存之前,需要調用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init為讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在調用pthread_rwlock_destroy之前就釋放了讀寫鎖占用的內存空間,那麼分配給這個鎖的資源就丟失了。

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

#include <stdio.h>
#include <pthread.h>

#define LOOP_COUNT 2			//生產者和消費者各自循環次數
#define LOOP_THRD 5				//消費者線程個數
pthread_rwlock_t rwlock;		//定義一個全局讀寫鎖,在不同函數中
								//初始化和使用

void *producer( void *arg );	//生產者線程
void *consumer( void *arg );	//消費者線程

int main(int argc , char *argv[]){
	int thrd_num ,thrd_id[LOOP_THRD]  ;
	pthread_t thrd_prod , thrd_cons[LOOP_THRD];

	pthread_rwlock_init( &rwlock , NULL );	//初始化互斥量

	//創建一個生產者和多個消費者線程
	if( pthread_create( &thrd_prod , NULL, producer ,
				NULL ) != 0 )
		oops( "thread create failed." );

	for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){
		thrd_id[thrd_num] = thrd_num;		//線程id,注意線程共享變量
		if( pthread_create( &thrd_cons[thrd_num], NULL,	consumer 
					, <span >(void *)( thrd_id+thrd_num)</span> ) != 0 )
			oops( "thread %d create failed." , thrd_num );
	}

	//等待線程結束
	if( pthread_join( thrd_prod , NULL ) != 0 )
		oops( " wait thread failed.");
	for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){
		if( pthread_join( thrd_cons[thrd_num] , NULL ) != 0 )
			oops( " wait thread %d failed." , thrd_num);
//		printf("wait %d thread.\n" , thrd_num);
	}
	pthread_rwlock_destroy( &rwlock );		//關閉互斥量
	return 0;
}

void *producer( void *arg){
	int count = 0 ;				//循環計數

	while( count++ < LOOP_COUNT ){
		printf( "producer try to lock wrlock.\n");
		pthread_rwlock_wrlock( &rwlock );	//加鎖

		//成功占有互斥量,接下來可以對緩沖區(倉庫)進行生產
		//操作
		printf( "producer lock successful, producer put a product to buffer.\n");
		
		/*
			休眠3秒, 便於程序觀察,可以看到
			其他讀取線程不能占據鎖而阻塞
		*/		
		sleep(3);				
		printf("prducer finished ,unlock wrlock.\n");
		pthread_rwlock_unlock( &rwlock ); //解鎖
		sleep(1);							//休眠一秒, 防止馬上又占據寫鎖
	}
}
void *consumer( void *arg ){
	int count = 0 ;							//循環計數
	int thrd_id = *( ( int*)arg );

//	printf( "consumer %d ,%#x . \n" , thrd_id ,arg);
	while( count++ < LOOP_COUNT ){
//		sleep( thrd_id+1 );					//休眠一秒, 便於程序觀察
		printf( "consumer try to lock rdlock.\n" );
		pthread_rwlock_rdlock( &rwlock );	//加鎖

		//成功占有互斥量,接下來可以對緩沖區(倉庫)進行取出
		//操作
		printf( " consumer locked successful ,consumer %d get a product from buffer."
				"\n" , thrd_id);
		/*
			休眠3秒, 便於程序觀察,可以看到
			其他讀取線程能占據讀鎖
		*/
		sleep(3);
		printf("consumer finished ,unlock rdlock.\n");
		pthread_rwlock_unlock( &rwlock );	//解鎖
		sleep(thrd_id+1);							//休眠一秒, 防止馬上又占據讀鎖
	}
}
結果如下:

可以看到當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖。雖然讀寫鎖的實現各不相同,但當讀寫鎖處於讀模式鎖住狀態時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨後的讀模式鎖請求(貌似在程序裡面沒有體現出來)。這樣可以避免讀模式鎖長期占用,而等待的寫模式鎖請求一直得不到滿足。

另外我要說明的一點就是,傳遞參數 arg 為(void *)( thrd_id+thrd_num),我一開始並沒有定義一個數組thrd_cons[LOOP_THRD]來存儲線程編號的,

而是直接傳thrd_num的地址,但通過在線程

int

thrd_id = *( ( int*)arg );

// printf( "consumer %d ,%#x . \n" , thrd_id ,arg);

這兩句話就可以知道,當傳遞的是thrd_num地址時候,由於進程的所有信息對該進程的所有線程都是共享的,包括可執行的程序文本、程序的全局內存和堆內存、棧以及文件描述符。地址,

由於進程的所有信息對該進程的所有線程都是共享的,包括可執行的程序文本、程序的全局內存和堆內存、棧以及文件描述符。 thrd_num的值會隨著線程的執行而發生改變,系統調度頻率之快是我們無法想像的,所以thrd_num的值也是動態改變的。

三、條件變量(cond)

與互斥鎖不同,條件變量是用來等待而不是用來上鎖的。條件變量用來自動阻塞一個線程,直到某特殊情況發生為止。通常條件變量和互斥鎖同時使用。條件變量分為兩部分: 條件和變量。條件本身是由互斥量保護的。線程在改變條件狀態前先要鎖住互斥量。條件變量使我們可以睡眠等待某種條件出現。條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;另一個線程使"條件成立"(給出條件成立信號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個線程自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個線程改變了條件,它發信號給關聯的條件變量,喚醒一個或多個等待它的線程,重新獲得互斥鎖,重新評價條件。如果兩進程共享可讀寫的內存,條件變量可以被用來實現這兩進程間的線程同步。

1.初始化條件變量。

靜態態初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;

動態初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

2.等待條件成立。釋放鎖,同時阻塞等待條件變量為真才行。timewait()設置等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個線程wait)

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);

3.激活條件變量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待線程)

int pthread_cond_signal(pthread_cond_t *cond);

int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有線程的阻塞

4.清除條件變量。無線程等待,否則返回EBUSY

int pthread_cond_destroy(pthread_cond_t *cond);

接下來我們又改變一下生產者消費者問題:現在緩沖區或者倉庫大小為BUFSIZE,只有一個生產者和一消費者(其實也適用於多個生產者和消費者),我們這個時候就可以把緩沖區設置為一個互斥量,一次要麼生產者要麼消費者霸占它。但接下來處理方式與互斥量有所不同:假如生產者成功占據鎖(緩沖區),這時它不能馬上開始往裡面生產東西, 要先判斷緩沖區是不是滿的,如果緩沖區滿了,那麼生產者就會把自己放到等待條件的線程列表上,然後對互斥量進行解鎖,這是一個原子操作。如果緩沖區不滿則可以生產產品,然後給消費者發送notempty信號,表示緩沖區有產品了,

你可以yy了。然後解鎖互斥量。假如是消費者成功占據鎖(緩沖區),同樣它要檢查緩沖區是不是空的,如果空,那麼消費者就會把自己放到等待條件的線程列表上,然後對互斥量進行解鎖。如果不空,消費者開始yy,然後給生產者發送nofull信號,

表示緩沖區有位置可以生產了, 你快生產吧。然後解鎖互斥量。就這樣, 生產者消費者和諧同步工作著。

流程圖我就不畫了,看代碼也能明白過程:

---producer過程:lock(mutex)->check notfull->(if notfull wait until notfull)->produce product->send notempty to consumer->unlock(mutex)

---consumer過程:lock(mutex)->check

notempty->(if

notempty wait until

notempty)->get product from buffer->send notfull to poducer->unlock(mutex)

#include <stdio.h>
#include <pthread.h>

#define LOOP_COUNT 20				//生產者和消費者各自循環次數,也可以說生產商品的總量
//#define LOOP_THRD 5				//消費者線程個數
#define BUFSIZE 5					//緩沖區大小,也就是最多能放多少個產品

pthread_mutex_t mutex;				//定義一個全局互斥量,在不同函數中
									//初始化和使用
pthread_cond_t notempty , notfull;	//定義兩個條件變量,當作信號投放
unsigned int prod_pos = 3;			//定義生產者在緩沖區開始生產的位置
unsigned int cons_pos = 0;			//定義消費者在緩沖區開始消費的位置

void *producer( void *arg );		//生產者線程
void *consumer( void *arg );		//消費者線程

int main(int argc , char *argv[]){
	pthread_t thrd_prod , thrd_cons;

	pthread_mutex_init( &mutex , NULL );	//初始化互斥量

	//創建生產者和消費者線程
	if( pthread_create( &thrd_prod , NULL, producer ,
				NULL ) != 0 )
		oops( "thread create failed." );
	sleep(1);								//保證生產者線程先運行

	if( pthread_create( &thrd_cons , NULL, consumer ,
				NULL ) != 0 )
		oops( "thread create failed." );

	//等待線程結束
	if( pthread_join( thrd_prod , NULL ) != 0 )
		oops( " wait thread failed.");
	if( pthread_join( thrd_cons , NULL ) != 0 )
		oops( " wait thread failed.");

	pthread_mutex_destroy( &mutex );		//關閉互斥量
	return 0;
}

void *producer( void *arg){
	int count = 0 ;				//循環計數

	while( count++ < LOOP_COUNT ){
		printf( "producer try to lock .\n");
		pthread_mutex_lock( &mutex );	//加鎖

		/*
		   成功占有互斥量,接著檢查緩沖區是不是滿了,
		*/
		if( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){
			//緩沖區滿了
			printf( "producer wait not full.\n");
			pthread_cond_wait( ¬full , &mutex );	//等待條件滿足
		}
		//如果沒滿,接下來可以對緩沖區(倉庫)進行生產
		//操作
		printf( "producer lock successful, producer put %d's "
				"product to buffer.\n" ,count);
		prod_pos = ( prod_pos +1 ) % BUFSIZE;		//下標前進一個
		pthread_cond_signal( ¬empty );			//向消費者發送信號
		/*
			休眠3秒, 便於程序觀察,可以看到
			其他讀取線程不能占據鎖而阻塞
		*/		
		sleep( 1 );				
		printf("prducer finished ,unlock lock.\n");
		pthread_mutex_unlock( &mutex ); //解鎖
		sleep( 1 );							//休眠一秒, 防止馬上又占據寫鎖
	}
}
void *consumer( void *arg ){
	int count = 0 ;							//循環計數

	while( count++ < LOOP_COUNT ){
//		sleep( thrd_id+1 );					//休眠一秒, 便於程序觀察
		printf( "consumer try to lock .\n" );
		pthread_mutex_lock( &mutex ); //解鎖

		/*
		   成功占有互斥量,接下來檢查緩沖區是否為空
		*/
		if( cons_pos == prod_pos ){
			printf( "consumer wait not empty.\n");
			pthread_cond_wait( ¬empty , &mutex );
		}

		//緩沖區不空,可以對緩沖區(倉庫)進行取出操作
		printf( " consumer locked successful ,consumer  "
				"get %d product from buffer.\n" , count);
		cons_pos = ( cons_pos + 1) % BUFSIZE ;	//下標前進一個
		pthread_cond_signal( ¬full );		//向生產著發送信號

		/*
			休眠3秒, 便於程序觀察,可以看到
			其他讀取線程能占據讀鎖
		*/
		sleep( 1 );
		printf("consumer finished ,unlock lock.\n");
		pthread_mutex_unlock( &mutex );			//解鎖
		sleep(1);						//休眠一秒, 防止馬上又占據讀鎖
	}
}
先不忙看結果, 想想結果跟你預想的是不是一樣,然後看結果:

死鎖了!!!! 萬萬沒想到!!!

然後排查,鎖定到pthread_cond_wait函數,查看其他資料,總結如下:

函數將解鎖mutex參數指向的互斥鎖,並使當前線程阻塞在cond參數指向的條件變量上。

被阻塞的線程可以被pthread_cond_signal函數,pthread_cond_broadcast函數喚醒,也可能在被信號中斷後被喚醒。

pthread_cond_wait函數的返回並不意味著條件的值一定發生了變化,必須重新檢查條件的值。

pthread_cond_wait函數返回時,相應的互斥鎖將被當前線程鎖定,即使是函數出錯返回。

一般一個條件表達式都是在一個互斥鎖的保護下被檢查。當條件表達式未被滿足時,線程將仍然阻塞在這個條件變量上。當另一個線程改變了條件的值並向條件變量發出信號時,等待在這個條件變量上的一個線程或所有線程被喚醒,接著都試圖再次占有相應的互斥鎖。

阻塞在條件變量上的線程被喚醒以後,直到pthread_cond_wait()函數返回之前條件的值都有可能發生變化。所以函數返回以後,在鎖定相應的互斥鎖之前,必須重新測試條件值。最好的測試方法是循環調用pthread_cond_wait函數,並把滿足條件的表達式置為循環的終止條件。

所以上述代碼應該用循環而不是if。具體修改如下:

consumer函數中:	/*
		   成功占有互斥量,接下來循環檢查緩沖區是否為空. 這個while要特別
		   說明一下,單個pthread_cond_wait功能很完善,為何這裡要有一個
		   while (cons_pos >=prod_pos)呢?因為pthread_cond_wait裡的線程可
		   能會被意外喚醒返回了,mutex又被重新lock(不一定是本線程,有可能
		   是其他線程),此時情況是cons_pos >= prod_pos ,表示緩沖區空了,
		   不能再取product,也沒有product可取。這不是我們想要的結果。應該
		   讓線程繼續進入pthread_cond_wait  
		*/

		while( cons_pos == prod_pos ){
			printf( "consumer wait not empty.\n");
			/*
			   pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的
			   mutex,然後阻塞在等待對列裡休眠,直到再次被喚醒(大多數
			   情況下是等待的條件成立而被喚醒,喚醒後,該進程會先鎖定先
			   pthread_mutex_lock(&mutex);,再讀取資源,用這個流程是比較
			   清楚的 block-->unlock-->cond_wait() return-->lock  
			*/
			pthread_cond_wait( ¬empty , &mutex );
		}
produer函數中: /*
		   成功占有互斥量,接著循環檢查緩沖區是不是滿了,
		*/
		while( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){
			//緩沖區滿了
			printf( "producer wait not full.\n");
			pthread_cond_wait( ¬full , &mutex );	//等待條件滿足
		}
這樣來看結果就對了

注:關於生產者和消費者操作緩沖區的操作,大家下來仔細揣摩一下,搞懂

while( ( prod_pos + 1 ) % BUFSIZE == cons_pos )

while( cons_pos == prod_pos )
這兩個循環條件,大家就明白緩沖區操作了。

(未完待續)

Copyright © Linux教程網 All Rights Reserved