APUE學習之多線程編程(二):線程同步,apue多線程編程
APUE學習之多線程編程(二):線程同步,apue多線程編程
為了保證臨界資源的安全性和可靠性,線程不得不使用鎖,同一時間只允許一個或幾個線程訪問變量。常用的鎖有互斥量,讀寫鎖,條件變量
一、互斥量
互斥量是用pthread_mutex_t數據類型表示的,在使用之前,必須對其進行初始化,可以把它設置為PTHREAD_MUTEX_INITIALIZER(只適於靜態分配的互斥量),也可以通過調用pthread_mutex_init函數進行初始化,最後還要調用pthread_mutex_destroy進行釋放。
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
要用默認的屬性初始化互斥量,只需把attr設為NULL,後面在討論互斥量屬性。
對互斥量進行加鎖,使用pthread_mutex_lock,如果互斥量已經上鎖,調用線程將阻塞至互斥量解鎖,對互斥量解鎖,使用pthread_mutex_unlock,如果線程不希望被阻塞,它可以調用pthread_mutex_trylock嘗試對互斥量進行加鎖,如果互斥量未鎖住,則成功加鎖,如果互斥量已鎖住,pthread_mutex_trylock就會失敗,返回EBUSY。
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
例子:

![]()
#include <stdio.h>
#include <pthread.h>
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
};
struct foo * foo_alloc(int id)
{
struct foo *fp = NULL;
if ((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return NULL;
}
}
return fp;
}
void foo_hold(struct foo *fp)
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
void foo_rele(struct foo *fp)
{
pthread_mutex_lock(&fp->f_lock);
if (--fp->f_count == 0)
{
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
pthread_mutex_unlock(&fp->f_lock);
}
}
View Code
上面的例子描述了用於保護某個數據結構的互斥量,我們在對象中嵌入引用計數,確保在所有使用該對象的線程完成數據訪問之前,該對象的內存空間不會被釋放。
如果線程對同一個互斥量加鎖兩次,那麼它自身將陷入死鎖狀態。如果有一個以上的互斥量,且允許一個線程一直占有第一個互斥量,並且試圖鎖住第二個互斥量時處於阻塞狀態,但是擁有第二個互斥量的線程也在試圖鎖住第一個互斥量,也阻塞,就死鎖了。
可以通過仔細控制互斥量加鎖的順序來避免死鎖的發生,譬如要求所有線程必須先鎖住互斥量A才能鎖住互斥量B。另一種辦法是當線程無法獲得下一個互斥量的時候,就釋放自己已占有的互斥量,過一段時間再試。
例子:

![]()
#include "apue.h"
#include <pthread.h>
#define NMASH 29
#define HASH(id) (((unsigned long)id) % NMASH)
struct foo *fh[NMASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next;
};
struct foo *foo_alloc(int id)
{
struct foo *fp = NULL;
int idx = 0;
if ((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = if;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return NULL;
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
}
return fp;
}
void foo_hold(struct foo *fp)
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo *foo_find(int id)
{
struct foo *fp = NULL;
pthread_mutex_lock(&hashlock);
for (fp = fh[HASH(id)]; fp != NULL; fp = fp->next)
{
if (fp->f_id = id)
{
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return fp;
}
void foo_rele(struct foo *fp)
{
struct foo *tfp = NULL;
int idx = 0;
pthread_mutex_lock(&fp->f_lock);
if (fp->f_count == 1)
{
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
pthread_mutex_lock(&fp->f_lock);
if (fp->f_count != 1)
{
fp->f_count--;
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
return;
}
idx = HASH(fp->f_id);
tfp = fh[idx];
if (tfp = fp)
{
fh[idx] = fp->f_next
}
else
{
while(tfp->next != fp)
{
tfp = tfp->next;
}
tfp->next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
}
}
View Code
這個例子比上一個例子多了一個散列表和一個保護散列表的互斥量,加鎖的順序是先hashlock,再f_lock,注意這個順序,就不會發生死鎖,不過這樣也導致代碼太繁瑣,最後一個函數解鎖f_lock後重新加鎖f_lock,需要重新考察f_count的值,因為可能在這期間被其他線程修改。
這樣的方式太復雜,讓hashlock也保護f_cout,事情會簡單很多。
例子:

![]()
#include "apue.h"
#include <pthread.h>
#define NMASH 29
#define HASH(id) (((unsigned long)id) % NMASH)
struct foo *fh[NMASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next;
};
struct foo *foo_alloc(int id)
{
struct foo *fp = NULL;
int idx = 0;
if ((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = if;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return NULL;
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
}
return fp;
}
void foo_hold(struct foo *fp)
{
pthread_mutex_lock(&hashlock);
fp->f_count++;
pthread_mutex_unlock(&hashlock);
}
struct foo *foo_find(int id)
{
struct foo *fp = NULL;
pthread_mutex_lock(&hashlock);
for (fp = fh[HASH(id)]; fp != NULL; fp = fp->next)
{
if (fp->f_id = id)
{
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return fp;
}
void foo_rele(struct foo *fp)
{
struct foo *tfp = NULL;
int idx = 0;
pthread_mutex_lock(&hashlock);
if (fp->f_count == 1)
{
idx = HASH(fp->f_id);
tfp = fh[idx];
if (tfp = fp)
{
fh[idx] = fp->f_next
}
else
{
while(tfp->next != fp)
{
tfp = tfp->next;
}
tfp->next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
fp->f_count--;
pthread_mutex_unlock(&hashlock);
}
}
View Code
當線程試圖獲取一個已加鎖的互斥量時,pthread_mutex_timedlock互斥量原語允許綁定線程阻塞時間。pthread_mutex_timedlock和pthread_mutex_lock是基本等價的,但是達到超時時間後,pthread_mutex_timedlock會返回。超時時間指原意等待的絕對時間。這個超時時間是用timespec來表示的
#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
二、讀寫鎖
讀寫鎖與互斥量相似,不過讀寫鎖允許更高的並行性,一次只有一個線程可以占有寫模式的讀寫鎖,但是多個線程可以同時占有讀模式的讀寫鎖,簡單地來說,就說支持一個寫者,多個讀者。
當讀寫鎖是寫加鎖狀態時,所以試圖對這個鎖加鎖的線程都會被阻塞,當讀寫鎖在讀加鎖狀態時,所以試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是希望以寫模式加鎖的線程會被阻塞。不過當有一個線程企圖以寫模式獲取鎖時,讀寫鎖會阻塞後面的讀模式鎖請求,防止讀模式鎖長期占用。
可知,讀寫鎖適用於對數據結構讀的次數遠大於寫的情況,又稱共享互斥鎖,讀共享,寫互斥。
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
讀寫鎖調用phtread_rwlock_init進行初始化,如果希望讀寫鎖有默認的屬性,傳null給attr即可。
讀的模式下鎖定讀寫鎖,需要調用phtread_rwlock_rdlock,寫的模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock,不過以何種方式鎖定讀寫鎖,都可以調用pthread_rwlock_unlock解鎖。
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
例子:

![]()
#include <stdio.h>
#include <pthread.h>
struct job
{
struct job *j_next;
struct job *j_prev;
pthread_t j_id;
};
struct queue
{
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};
int queue_init(struct queue *qp)
{
int err;
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qb->q_lock, NULL);
if (err != 0)
{
return err;
}
return 0
}
void job_insert(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qb->q_lock);
jp->next = qp->head;
jp->j_prev = NULL;
if (qp->q_head != NULL)
{
qp->q_head->j_prev = jp;
}
else
{
qp->tail = jp;
}
qp->head = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
void job_append(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = NULL;
jp->j_prev = qp->tail;
if (qp->q_tail != NULL)
{
qp->q_tail->j_next = jp;
}
qp->q_tail = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
void job_remove(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
if (jp == qp->q_head)
{
qp->q_head = jp->j_next;
if (qp->q_tail == jp)
{
qp->tail = NULL;
}
else
{
jp->next->j_prev = jp->j_prev;
}
}
else if (jp == qp->q_tail)
{
qp->q_tail = jp->j_prev;
jp->j_prev->j_next = NULL;
}
else
{
jp->j_prev->j_next = jp->j_next;
jp->j_next->j_prev = jp->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}
struct job *job_find(struct queue *qp, pthread_t id)
{
struct job *jp;
if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
{
return NULL;
}
for (jp = qb->q_head; jp != NULL; jp = jp->j_next)
{
if (pthread_equal(jp->j_id, id))
{
break;
}
}
pthread_rwlock_unlock(&qp->q_lock);
return jp;
}
View Code
與互斥量一樣,讀寫鎖也有帶超時的讀寫鎖函數,避免陷入永久的阻塞。
#include <pthread.h>
#include <time.h>
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
三、條件變量
條件變量與互斥量一起使用時,允許線程以無競爭的方式等待特定的條件發生。
條件本身由互斥量保護,線程在改變條件狀態之前必須鎖定互斥量。在使用條件變量之前,必須把它初始化,可以把常量PTHREAD_CON_INITIALIZE賦給靜態分配的條件變量,也可用pthread_cond_init函數進行初始化。使用pthread_cond_destroy釋放。
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_con_t *cond);
如果需要一個默認屬性的條件變量,把null給attr即可。
我們使用pthread_cond_wait等待條件變量為真,如果在給定時間內不能滿足,則返回錯誤碼。
#include<pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex)
int pthread_cond_timedwait(pthread_cond_t *restrict cond, phtread_mutex_t *restrict mutex, const struct timespec *restrict tsptr)
調用者把鎖定的互斥量傳給函數,函數自動把調用線程放到等待條件的線程列表上,對互斥量解鎖,當pthread_cond_wait返回時,互斥量再次被鎖住。pthread_cond_timedwait多了原意等待的時間。
有兩個函數可用於通知線程條件已滿足,pthread_cond_signal函數至少喚醒一個,pthread_cond_broadcast喚醒等待該條件的所有線程。
#include<phtread.h>
int pthread_cond_signal(pthread_cond_t *cond)
int pthread_cond_broadcast(pthread_cond_t *cond)
例子:

![]()
#include <pthread.h>
struct msg
{
struct msg *m_next;
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void process_msg(void)
{
struct msg *mp;
for(;;)
{
pthread_mutex_lock(&qlock);
while (workq == NULL)
{
pthread_cond_wait(&qready, &qlock);
}
mp = workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
}
}
void enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthread_cond_signal(&qready);
}
View Code
http://xxxxxx/Linuxjc/1150460.html TechArticle