之前有講過進程及多進程服務端的實現,現在我們來看看更為廣泛而且實用的線程及多線程服務端的實現。
那麼什麼是線程呢?
線程是操作系統能夠進行運算調度的最小單位,它被包涵在進程之中,是行程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並行多個線程,每條線程並行執行不同的任務。
這是比較正式的解釋,簡單點來說,線程就是進程的更進一步的細化。
由於進程代表的是一個正在運行的程序,所以進程的切換其實也就是程序的切換,而在切換過程中要移除當前運行進程在內存中的數據,移入將要運行的進程所需要的數據,所以進程的切換會消耗很多的系統資源,尤其是請求量大時,cpu必須來回切換不同的進程。而線程的出現解決了這一問題,線程是比進程更小的執行單元,一個進程中可以包含若干個線程,每一個線程有一個任務執行的順序。這樣,線程本身並不保有資源,而是在需要時向所在的進程進行申請,以滿足運行需要。這樣我們就可以用線程來實現多客戶端的訪問,每個訪問都放到一個線程中去執行。
attr(參數):用於創建線程時指定一些參數,傳入null時使用默認參數
start_routine(函數指針):用於保存新線程所執行的函數的地址,相當於新線程中main()函數的地址
arg(變量指針):第三個參數中函數所傳入的參數的地址值,即新線程中main()函數中參數的地址值
成功時返回0,失敗時返回其他值
int pthread_detach(pthread_t thread):線程銷毀thread(線程ID):要銷毀的線程id
成功時返回0,失敗時返回其他值
int pthread_join(pthread_t thread, void ** status):線程聯合 thread(線程):該參數值id的線程終止時才會從該函數返回
status(結果):用於保存線程main()函數返回值的地址
成功時返回0,失敗時返回其他值
pthread_join和pthread_detach的區別:調用pthread_join函數的線程會進入等待狀態,直到指定id的線程結束,而且可以得到結束線程的返回值
而調用pthread_detach函數不會使得調用該函數的線程進入等待狀態,起作用也是可以釋放已經運行結束的線程所占用的內存空間
互斥量:如名字所示,互斥,即不允許多個線程同時對臨界區進行訪問,當一個線程在訪問時,另一個必須要進行等待。只有當另一個線程釋放互斥量時,等待中的線程才能訪問資源。
信號量:可以用來保證兩個或多個關鍵代碼段不被並發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麼該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。
我們可以通過這兩種機制來很好的保證線程執行的同步。
int pthread_mutex_init(pthread_mutex_t * mutex , const pthread_mutexattr_t * attr):初始化互斥量mutex(型號量):創建互斥信號量時傳遞保存互斥量的變量地址。
attr(互斥量屬性):傳入null時創建默認的互斥量
成功時返回0,失敗時返回其他值
int pthread_mutex_destroy(pthread_mutex_t * mutex)銷毀互斥量mutex(互斥量):要銷毀的互斥量
成功時返回0,失敗時返回其他值
int pthread_mutex_lock(pthread_mutex_t * mutex):鎖定互斥量mutex(信號量):要操作的互斥量
成功時返回0,失敗時返回其他值
int pthread_mutex_unlock(pthread_mutex_t * mutex):解鎖互斥量mutex(信號量):要操作的互斥量
成功時返回0,失敗時返回其他值
例如:
[code]pthread_mutex_lock(&mutex); //臨界區開始 ...... ..... .. //臨界區結束 pthread_mutex_unlock(&mutex);int sem_init(sem_t * sem , int pthread , unsigned int value):初始化信號量
sem(信號量):創建信號量時保存信號量的變量地址值
pthread(控制參數):若傳入0,則只能在一個進程中使用,若是其他值,則可以由多個信號量共享
value(初始值):指定創建的信號量的初始值
成功時返回0,失敗時返回其他值
int sem_destroy(sem_t * sem):銷毀信號量sem(信號量):要銷毀的信號量
成功時返回0,失敗時返回其他值
int sem_post(sem_t * sem)釋放信號量sem(信號量):要釋放的信號量
成功時返回0,失敗時返回其他值
int sem_wait(sem_t * sem)申請信號量sem(信號量):要申請的信號量
成功時返回0,失敗時返回其他值
例如:
[code]sem_wait(&sem)//信號量減為0 //臨界區域開始 ........ ....... .... ... //臨界區域結束 sem_post(&sem)//信號量增1在了解了線程以及線程使用中需要注意的問題之後可以改寫一下服務端的程序,使其通過多線程的方式來實現並發訪問。
[code]#include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<semaphore.h> #include<unistd.h> #include<arpa/inet.h> #include<sys/socket.h> #define BUFF_SIZE 100 #define MAX_CLNT 256 //客戶端處理函數 void *handle_clent(void *arg); //消息發送函數 void send_message(char * msg,int len); //出錯處理函數 void error_handling(char *msg); //定義全局的客戶端socket變量 int clnt_cnt=0; //定義全局的socket數組,保存客戶端套接字 int clnt_socks[MAX_CLNT]; //定義的全局的互斥信息號量 pthread_mutex_t mutx; int main(int argc,char *argv[]){ int server_sock,client_sock; struct sockaddr_in server_addr; struct sockaddr_in client_addr; int client_addr_size; //用於記錄線程id pthread_t t_id; if(argc!=2){ printf("Usage : %s <port>\n",argv[0]); exit(1); } //初始化一個互斥信號量 pthread_mutex_init(&mutx,NULL); server_sock = socket(PF_INET,SOCK_STREAM,0); memset(&server_addr,0,sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(atoi(argv[1])); if(bind(server_sock,(struct sockaddr *)&server_addr,sizeof(server_addr)) == -1){ error_handling("bind() error"); } if(listen(server_sock,5) == -1){ error_handling("listen() error"); } while(1){ client_addr_size = sizeof(client_addr); client_sock = accept(server_sock,(struct sockaddr *)&client_addr,&client_addr_size); //上鎖 pthread_mutex_lock(&mutx); //將接收到的客戶端socket保存到全局變量中 clnt_socks[clnt_cnt++] = client_sock; //創建一個新的線程,用於處理接收到的客戶端 pthread_create(&t_id,NULL,handle_clent,(void *)&client_sock); //處理線程的結束 pthread_detach(t_id); printf("Connected client IP: %s \n",inet_ntoa(client_addr.sin_addr)); } close(server_sock); return 0; } /** 用於處理接收到的客戶端 **/ void *handle_clent(void *arg){ //將參數強轉成int型 int clnt_sock = *((int *)arg); int str_len = 0 ,i; char msg[BUFF_SIZE]; while((str_len = read(clnt_sock,msg,sizeof(msg))) != 0){ send_message(msg,str_len); } //上鎖,修改全局的socket數組,去除當前的socket客戶端 pthread_mutex_lock(&mutx); for(i=0;i<clnt_cnt;i++){ if(clnt_sock == clnt_socks[i]){ while(i++<clnt_cnt-1){ clnt_socks[i] = clnt_socks[i+1]; } break; } } //客戶端數量修改 clnt_cnt--; pthread_mutex_unlock(&mutx); close(clnt_sock); return NULL; } /** 用於消息處理,給所有已連接的客戶端發送消息 **/ void send_message(char *msg,int len){ int i; //上鎖,使用全局的clnt_socks變量來輸出數據 pthread_mutex_lock(&mutx); //遍歷客戶端數組 for(i = 0;i<clnt_cnt;i++){ write(clnt_socks[i],msg,len); } //解鎖 pthread_mutex_unlock(&mutx); } void error_handling(char * message){ fputs(message,stderr); fputc('\n',stderr); exit(1); }
[code]#include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<semaphore.h> #include<unistd.h> #include<arpa/inet.h> #include<sys/socket.h> #define BUFF_SIZE 100 #define NAME_SIZE 20 void * send_msg(void *arg); void * recv_msg(void *arg); void error_handling(char *msg); char name[NAME_SIZE]="[DEFAULT]"; char msg[BUFF_SIZE]; int main(int argc ,char * argv[]){ int sock; struct sockaddr_in server_addr; pthread_t send_thread; pthread_t recv_thread; void *thread_return; if(argc != 4){ printf("Usage : %s <port> <name>\n",argv[0]); exit(1); } sprintf(name,"[%s]",argv[3]); sock = socket(PF_INET,SOCK_STREAM,0); memset(&server_addr,0,sizeof(server_addr)); server_addr.sin_family=AF_INET; server_addr.sin_addr.s_addr = inet_addr(argv[1]); server_addr.sin_port = htons(atoi(argv[2])); if(connect(sock,(struct sockaddr *)&server_addr,sizeof(server_addr)) == -1){ error_handling("connect() error"); } pthread_create(&send_thread,NULL,send_msg,(void *)&sock); pthread_create(&recv_thread,NULL,recv_msg,(void *)&sock); pthread_join(send_thread,&thread_return); pthread_join(recv_thread,&thread_return); close(sock); return 0; } /** 用於發送消息的線程 **/ void * send_msg(void *arg){ int sock=*(((int *)arg)); char name_msg[NAME_SIZE+BUFF_SIZE]; while(1){ fgets(msg,BUFF_SIZE,stdin); if(!strcmp(msg,"q\n") || !strcmp(msg,"Q\n")){ close(sock); exit(0); } sprintf(name_msg," %s %s",name,msg); write(sock,name_msg,strlen(name_msg)); } return NULL; } /** 用於接收數據的線程 **/ void * recv_msg(void *arg){ int sock=*(((int *)arg)); char name_msg [NAME_SIZE+BUFF_SIZE]; int str_len; while(1){ str_len=read(sock,name_msg,NAME_SIZE+BUFF_SIZE-1); if(str_len == -1){ return (void *) -1; } name_msg[str_len] = 0; fputs(name_msg,stdout); } return NULL; } void error_handling(char * message){ fputs(message,stderr); fputc('\n',stderr); exit(1); }
最後編譯運行:
編譯客戶端:gcc chat_client.c -DREENTRANT -o chat_client -lpthread
編譯服務端: gcc chat_server.c -D_REENTRANT -o chat_server -lpthread
運行服務端:./chat_server 9090
運行客戶端:./chat_client 127.0.0.1 9090 wei
這樣就完成了一個簡易基於多線程的服務端/客戶端程序