一、線程池概述
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然後在創建線程後自動啟動這些任務。線程池線程都是後台線
程。每個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。如果某個線程在托管代碼中空閒(如正在等待
某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起
的工作,則線程池將在一段時間後創建另一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們
要等到其他線程完成後才啟動。應用程序可以有多個線程,這些線程在休眠狀態中需要耗費大量時間來等待事件發生。其他線程
可能進入睡眠狀態,並且僅定期被喚醒以輪循更改或更新狀態信息,然後再次進入休眠狀態。為了簡化對這些線程的管理,為每
個進程提供了一個線程池,一個線程池有若干個等待操作狀態,當一個等待操作完成時,線程池中的輔助線程會執行回調函數。
線程池中的線程由系統管理,程序員不需要費力於線程管理,可以集中精力處理應用程序任務。
使用線程池的好處:
1、減少在創建和銷毀線程上所花的時間以及系統資源的開銷;
2、如不使用線程池,有可能造成系統創建大量線程而導致消耗完系統內存以及”過度切換”。
在什麼情況下使用線程池:
1、單個任務處理的時間比較短;
2、將需處理的任務的數量大。
在什麼情況下不使用線程池線程:
1、如果需要使一個任務具有特定優先級;
2、如果具有可能會長時間運行(並因此阻塞其他任務)的任務;
3、如果需要將線程放置到單線程單元中(線程池中的線程均處於多線程單元中);
4、如果需要永久標識來標識和控制線程,比如想使用專用線程來終止該線程,將其掛起或按名稱發現它。
二、線程池程序
main.c
#include <stdio.h>#include <unistd.h>#include "thread_pool.h"/**********************************************************************功能: 任務處理函數*參數: 無*返回值: NULL*********************************************************************/void *task_test(void *arg){ printf("/t/tworking on task %d/n", (int)arg); sleep(1); /*休息一秒,延長任務的執行時間*/ return NULL;}int main (int argc, char *argv[]){ pool_t pool; int i = 0; pool_init(&pool, 2);//初始化線程池 sleep(1); for(i = 0; i < 5; i++){ sleep(1); pool_add_task(&pool, task_test, (void *)i);//向線程池中添加一個任務 } sleep(4); pool_uninit(&pool);//銷毀線程池 return 0;}
thread_pool.c
#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <assert.h>#include "thread_pool.h"static void *pool_thread_server(void *arg);/**********************************************************************功能: 初始化線程池結構體並創建線程*參數: pool:線程池句柄* threads_limit:線程池中線程的數量*返回值: 無*********************************************************************/void pool_init(pool_t *pool, int threads_limit){ pool->threads_limit = threads_limit; pool->queue_head = NULL; pool->task_in_queue = 0; pool->destroy_flag = 0; /*創建存放線程ID的空間*/ pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t)); int i = 0; /*初始化互斥鎖和條件變量*/ pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); /*循環創建threads_limit個線程*/ for (i = 0; i < threads_limit; i++){ pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool); } return;}/**********************************************************************功能: 銷毀線程池,等待隊列中的任務不會再被執行,* 但是正在運行的線程會一直,把任務運行完後再退出*參數: 線程池句柄*返回值: 成功:0,失敗非0*********************************************************************/int pool_uninit(pool_t *pool){ pool_task *head = NULL; int i; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag)/* 防止兩次調用 */ return -1; pool->destroy_flag = 1; pthread_mutex_unlock(&(pool->queue_lock)); /* 喚醒所有等待線程,線程池要銷毀了 */ pthread_cond_broadcast(&(pool->queue_ready)); /* 阻塞等待線程退出,否則就成僵屍了 */ for (i = 0; i < pool->threads_limit; i++) pthread_join(pool->threadid[i], NULL); free(pool->threadid); /* 銷毀等待隊列 */ pthread_mutex_lock(&(pool->queue_lock)); while(pool->queue_head != NULL){ head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } pthread_mutex_unlock(&(pool->queue_lock)); /*條件變量和互斥量也別忘了銷毀*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); return 0;}/**********************************************************************功能: 向任務隊列中添加一個任務*參數: pool:線程池句柄* process:任務處理函數* arg:任務參數*返回值: 無*********************************************************************/static void enqueue_task(pool_t *pool, pool_task_f process, void *arg){ pool_task *task = NULL; pool_task *member = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->task_in_queue >= pool->threads_limit){ printf("task_in_queue > threads_limit!/n"); pthread_mutex_unlock (&(pool->queue_lock)); return; } task = (pool_task *)calloc(1, sizeof(pool_task)); assert(task != NULL); task->process = process; task->arg = arg; task->next = NULL; pool->task_in_queue++; member = pool->queue_head; if(member != NULL){ while(member->next != NULL) /* 將任務加入到任務鏈連的最後位置. */ member = member->next; member->next = task; }else{ pool->queue_head = task; /* 如果是第一個任務的話,就指向頭 */ } printf("/ttasks %d/n", pool->task_in_queue); /* 等待隊列中有任務了,喚醒一個等待線程 */ pthread_cond_signal (&(pool->queue_ready)); pthread_mutex_unlock (&(pool->queue_lock));}/**********************************************************************功能: 從任務隊列中取出一個任務*參數: 線程池句柄*返回值: 任務句柄*********************************************************************/static pool_task *dequeue_task(pool_t *pool){ pool_task *task = NULL; pthread_mutex_lock(&(pool->queue_lock)); /* 判斷線程池是否要銷毀了 */ if(pool->destroy_flag){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%lx will be destroyed/n", pthread_self()); pthread_exit(NULL); } /* 如果等待隊列為0並且不銷毀線程池,則處於阻塞狀態 */ if(pool->task_in_queue == 0){ while((pool->task_in_queue == 0) && (!pool->destroy_flag)){ printf("thread 0x%lx is leisure/n", pthread_self()); /* 注意:pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒後會加鎖 */ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } }else{ /* 等待隊列長度減去1,並取出隊列中的第一個元素 */ pool->task_in_queue--; task = pool->queue_head; pool->queue_head = task->next; printf("thread 0x%lx received a task/n", pthread_self()); } pthread_mutex_unlock(&(pool->queue_lock)); return task;}/**********************************************************************功能: 向線程池中添加一個任務*參數: pool:線程池句柄* process:任務處理函數* arg:任務參數*返回值: 0*********************************************************************/int pool_add_task(pool_t *pool, pool_task_f process, void *arg){ enqueue_task(pool, process, arg); return 0;}/**********************************************************************功能: 線程池服務程序*參數: 略*返回值: 略*********************************************************************/static void *pool_thread_server(void *arg){ pool_t *pool = NULL; pool = (pool_t *)arg; while(1){ pool_task *task = NULL; task = dequeue_task(pool); /*調用回調函數,執行任務*/ if(task != NULL){ printf ("thread 0x%lx is busy/n", pthread_self()); task->process(task->arg); free(task); task = NULL; } } /*這一句應該是不可達的*/ pthread_exit(NULL); return NULL;}
thread_pool.h
#ifndef __THREAD_POOL_H__#define __THREAD_POOL_H__#include <pthread.h>/********************************************************************** 任務回調函數,也可根據需要自行修改*********************************************************************/typedef void *(*pool_task_f)(void *arg);/********************************************************************** 任務句柄*********************************************************************/typedef struct _task{ pool_task_f process;/*回調函數,任務運行時會調用此函數,注意也可聲明成其它形式*/ void *arg; /*回調函數的參數*/ struct _task *next;}pool_task;/********************************************************************** 線程池句柄*********************************************************************/typedef struct{ pthread_t *threadid; /* 線程號 */ int threads_limit; /* 線程池中允許的活動線程數目 */ int destroy_flag; /* 是否銷毀線程池 , 0銷毀,1不銷毀*/ pool_task *queue_head; /* 鏈表結構,線程池中所有等待任務 */ int task_in_queue; /* 當前等待隊列的任務數目 */ pthread_mutex_t queue_lock; /* 鎖 */ pthread_cond_t queue_ready; /* 條件變量 */}pool_t;/**********************************************************************功能: 初始化線程池結構體並創建線程*參數: pool:線程池句柄* threads_limit:線程池中線程的數量*返回值: 無*********************************************************************/void pool_init(pool_t *pool, int threads_limit);/**********************************************************************功能: 銷毀線程池,等待隊列中的任務不會再被執行,* 但是正在運行的線程會一直,把任務運行完後再退出*參數: 線程池句柄*返回值: 成功:0,失敗非0*********************************************************************/int pool_uninit(pool_t *pool);/**********************************************************************功能: 向線程池中添加一個任務*參數: pool:線程池句柄* process:任務處理函數* arg:任務參數*返回值: 0*********************************************************************/int pool_add_task(pool_t *pool, pool_task_f process, void *arg);#endif