歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux管理 >> Linux網絡

linux網絡編程之posix 線程(三)

posix 信號量與互斥鎖 示例生產者--消費者問題

一、posix 信號量

信號量的概念參見這裡(http://www.bianceng.cn/OS/Linux/201308/37243.htm)。前面也講過system v 信號量,現在來說說posix 信號量。

system v 信號量只能用於進程間同步,而posix 信號量除了可以進程間同步,還可以線程間同步。system v 信號量每次PV操作可以是N,但Posix 信號量每次PV只能是1。除此之外,posix 信號量還有命名和匿名之分(man 7 sem_overview):

1、命名信號量

名字以/somename 形式分辨,只能有一個/ ,且總長不能超過NAME_MAX - 4(一般是251)。

需要用sem_open 函數創建或打開,PV操作分別是sem_wait 和 sem_post,可以使用sem_close 關閉,刪除用sem_unlink。

2、匿名信號量

存放在一塊共享內存中,如果是線程共享,這塊區域可以是全局變量;如果是進程共享,可以是system v 共享內存(shmget 創建,shmat 映射),也可以是 posix 共享內存(shm_open 創建,mmap 映射)。

匿名信號量必須用sem_init 初始化,sem_init 函數其中一個參數pshared決定了線程共享還是進程共享,也可以用sem_post 和sem_wait 進行操作,在共享內存釋放前,匿名信號量要先用sem_destroy 銷毀。

有關這些函數的具體參數可以man 一下。

二、互斥鎖

對於多線程的程序,訪問沖突的問題是很普遍的,解決的辦法是引入互斥鎖(Mutex,MutualExclusive Lock),獲得鎖的線程可以完成“讀-修改-寫”的操作,然後釋放鎖給其它線程,沒有獲得鎖的線程只能等待而不能訪問共享數據,這樣“讀-修改-寫”三步操作組成一個原子操作,要麼都執行,要麼都不執行,不會執行到中間被打斷,也不會在其它處理器上並行做這個操作。

Mutex用pthread_mutex_t類型的變量表示,pthread_mutex_init函數對Mutex做初始化,參數attr設定Mutex的屬性,如果attr為NULL則表示缺省屬性,具體看結構體:

struct pthread_mutexattr_t
{
    enum lock_type    // 使用pthread_mutexattr_settype來更改
    {
         PTHREAD_MUTEX_TIMED_NP [default]//當一個線程加鎖後,其余請求鎖的線程形成等待隊列,在解鎖後按優先級獲得鎖。
         PTHREAD_MUTEX_ADAPTIVE_NP       // 動作最簡單的鎖類型,解鎖後所有線程重新競爭。
         PTHREAD_MUTEX_RECURSIVE_NP      // 允許同一線程對同一鎖成功獲得多次。當然也要解鎖多次。其余線程在解鎖時重新競爭。
         PTHREAD_MUTEX_ERRORCHECK_NP     // 若同一線程請求同一鎖,返回EDEADLK,否則與PTHREAD_MUTEX_TIMED_NP動作相同。
    } type;
} attr;

用pthread_mutex_init函數初始化的Mutex可以用pthread_mutex_destroy銷毀。如果Mutex變量是靜態分配的(全局變量或static變量),也可以用宏定義PTHREAD_MUTEX_INITIALIZER來初始化,相當於用pthread_mutex_init初始化並且attr參數為NULL。

一個線程可以調用pthread_mutex_lock獲得Mutex,如果這時另一個線程已經調用pthread_mutex_lock獲得了該Mutex,則當前線程需要掛起等待,直到另一個線程調用pthread_mutex_unlock釋放Mutex,當前線程被喚醒,才能獲得該Mutex並繼續執行。

上面的具體函數可以man 一下。

三、生產者消費者問題

生產者消費者問題概念參見這裡(http://www.bianceng.cn/OS/Linux/201308/37248.htm)。下面使用posix 信號量和互斥鎖一起來演示:

#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>
    
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
    
#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)
    
#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10
    
int g_buffer[BUFFSIZE];
    
unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0;
unsigned short consume_id = 0;
    
sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;
    
pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];
    
void *consume(void *arg)
{
    int i;
    int num = (int)arg;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);
    
        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);
    
            if (i == out)
                printf("\t<--consume");
    
            printf("\n");
        }
        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("%d end consume product %d\n", num, consume_id);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);
        sleep(1);
    }
    return NULL;
}
    
void *produce(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);
    
            if (i == in)
                printf("\t<--produce");
    
            printf("\n");
        }
    
        printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);
        sleep(5);
    }
    return NULL;
}
    
int main(void)
{
    int i;
    for (i = 0; i < BUFFSIZE; i++)
        g_buffer[i] = -1;
    
    sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);
    
    pthread_mutex_init(&g_mutex, NULL);
    
    
    for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)i);
    
    for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i);
    
    for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);
    
    sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);
    
    return 0;
}

與這裡(http://www.bianceng.cn/OS/Linux/201308/37248.htm)的程序相比,程序邏輯沒太大變化,只是用pthread_mutex_lock 替代了 sem_mutex,其次這裡是演示線程間同步,現在上述程序生產者消費者各一個線程,但生產者睡眠時間是消費者的5倍,故消費者會經常阻塞在sem_wait(&g_sem_empty) 上面,因為緩沖區經常為空,可以將PRODUCTORS_COUNT 改成5,即有5個生產者線程和1個消費者線程,而且生產者睡眠時間還是消費者的5倍,從動態輸出可以看出,基本上就動態平衡了,即5個生產者一下子生產了5份東西,消費者1s消費1份,剛好在生產者繼續生產前消費完。

Copyright © Linux教程網 All Rights Reserved