歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux基礎 >> 關於Linux

Linux下簡單線程池的實現

線程池的技術背景

在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每一個對象,以便能夠在對象銷毀後進行垃圾回收。所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。如何利用已有對象來服務(不止一個不同的任務)就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因。比如大家所熟悉的數據庫連接池正是遵循這一思想而產生的,本文將介紹的線程池技術同樣符合這一思想。

目前,一些著名的大公司都特別看好這項技術,並早已經在他們的產品中應用該技術。比如IBM的WebSphere,IONA的Orbix 2000在SUN的 Jini中,Microsoft的MTS(Microsoft Transaction Server 2.0),COM+等。

現在您是否也想在服務器程序應用該項技術?

線程池技術如何提高服務器程序的性能

我所提到服務器程序是指能夠接受客戶請求並能處理請求的程序,而不只是指那些接受網絡客戶請求的網絡服務器程序。

多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。但如果對多線程應用不當,會增加對單個任務的處理時間。可以舉一個簡單的例子:

假設在一台服務器完成一項任務的時間為T

T1 創建線程的時間

T2 在線程中執行任務的時間,包括線程間同步所需時間

T3 線程銷毀的時間

顯然T = T1+T2+T3。注意這是一個極度簡化的假設。

可以看出T1,T3是多線程本身的帶來的開銷,我們渴望減少T1,T3所用的時間,從而減少T的時間。但一些線程的使用者並沒有注意到這一點,所以在程序中頻繁的創建或銷毀線程,這導致T1和T3在T中占有相當比例(在傳統的多線程服務器模型中是這樣實現的:一旦有個請求到達,就創建一個新的線程,由該線程執行任務,任務執行完畢之後,線程就退出。這就是"即時創建,即時銷毀"的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數非常頻繁,那麼服務器就將處於一個不停的創建線程和銷毀線程的狀態。這筆開銷是不可忽略的,尤其是線程執行的時間非常非常短的情況。)。顯然這是突出了線程的弱點(T1,T3),而不是優點(並發性)。

線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閒的時間段(在應用程序啟動之後,就馬上創建一定數量的線程,放入空閒的隊列中。這些線程都是處於阻塞狀態,這些線程只占一點內存,不占用CPU。當任務到來後,線程池將選擇一個空閒的線程,將任務傳入此線程中運行。當所有的線程都處在處理任務的時候,線程池將自動創建一定的數量的新線程,用於處理更多的任務。執行任務完成之後線程並不退出,而是繼續在線程池中等待下一次任務。當大部分線程處於阻塞狀態時,線程池將自動銷毀一部分的線程,回收系統資源),這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目。再看一個例子:

假設一個服務器一天要處理50000個請求,並且每個請求需要一個單獨的線程完成。我們比較利用線程池技術和不利於線程池技術的服務器處理這些請求時所產生的線程總數。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目或者上限(以下簡稱線程池尺寸),而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池尺寸是遠小於50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。

簡單線程池的實現

1、線程池,顧名思義,就是要創建很多線程。創建線程的函數pthread_creat()應該是最容易被想到的。有線程創建就要有線程退出pthread_exit(),在線程退出前,如果線程沒有設置pthread_detach()屬性,那麼顯然要回收線程資源pthread_join()。當然咯,可能要獲取線程的ID值pthread_self()。 2、第一步創建了線程,剛開始線程是不做事情的,初始化好了,就等待。等待當然不會是while(1)這種函數,因為那樣太消耗CPU資源。容易想到的等待自然是使用條件變量的等待pthread_cond_wait(),這個函數干兩件事情,第一件是對解除與形參mutex對應的互斥鎖,然後是重新加鎖,為的是在線程將任務放入任務隊列的一個緩沖。任務放入完成後,再加鎖,這樣不會影響其他任務獲取加鎖的權利。因此,在調用該函數之前,會自然會想到加互斥鎖。初始化互斥鎖函數pthread_mutex_init(),反初始化互斥鎖函數pthread_mutex_destroy(),加鎖函數pthread_mutex_lock(),解鎖函數pthread_mutex_unlock(),稍微再細化一點,可能會用到嘗試解鎖pthread_mutex_trylock()。 3.、實現了上面二步,一個線程池的框架就初步搭起來了。當然沒法用,因為真正干事情的線程全部在等待中,注意不應該是超時的等待pthread_cond_timewait()。要使處於阻塞狀態的線程干事情,得用信號去喚醒它pthread_cond_signal(),“打鳥”的一個函數,開一槍,總會把這只鳥吵醒,但具體是那一只,看那只最先在那排隊了(上面已經說了pthread_cond_wait()函數的等待隊列問題)。當然也可以想到“打鳥驚群”的函數pthread_cond_broadcast(),打一槍,無論打沒打著,一群鳥都飛走了。 4、有了上面的基礎,接下來就重點關注任務部分了。當然線程數量有限,上面已經說了,是固定的數目。因此任務大於線程數時,排隊是難免的了。因此創建了一個任務隊列,隊列中的每一項代表一個任務。任務隊列的節點最簡單的模型就是一個處理任務的回掉函數void* (*callback_function)(void *arg)。指向函數的指針,參數是個指針,返回值也是個指針。具體的函數和參數則需要另外寫函數定義。沒次調用完線程處理完這個任務,就需要把它從任務隊列中刪除。進入任務隊列的任務數也不能無限多,因此也設為一個比線程數稍微大個幾個的一個固定值。 5、線程動態創建:一個線程退出後(在任務執行失敗時,就有可能會退出),主線程要能檢測到,然後動態創建一個新的線程,以維持線程池中線程總數不變。可以通過pthread_join()阻塞等待回收子線程資源,但是這就意味著主線程在阻塞狀態下干不了其他工作,因此考慮使用線程信號,在子線程結束時,給主線程用pthread_kill()發送一個SIGUSR1信號,在主線程接收到此信號時,通過調用注冊函數signal()或sigaction()函數注冊的函數創建一個新的線程。 下面只列出Threadpool核心的實現,封裝條件變量的類在這裡沒有列出。
//ThreadPool設計  
void *thread_routine(void *args);  
class ThreadPool  
{  
    friend void *thread_routine(void *args);  
private:  
    //回調函數類型  
    typedef void *(*callback_t)(void *);  
    //任務結構體  
    struct task_t  
    {  
        callback_t run; //任務回調函數  
        void *args;     //任務函數參數  
    };  
  
public:  
    ThreadPool(int _maxThreads = 36, unsigned int _waitSeconds = 2);  
    ~ThreadPool();  
    //添加任務接口  
    void addTask(callback_t run, void *args);  
  
private:  
    void startTask();  
  
private:  
    Condition ready;                //任務准備就緒或線程池銷毀通知  
    std::queue taskQueue; //任務隊列  
  
    unsigned int maxThreads;        //線程池最多允許的線程數  
    unsigned int counter;           //線程池當前線程數  
    unsigned int idle;              //線程池空閒線程數  
    unsigned int waitSeconds;       //線程可以等待的秒數  
    bool         quit;              //線程池銷毀標志  
};  

// 線程入口函數  
// 這其實就相當於一個消費者線程, 不斷的消費任務(執行任務)  
void *thread_routine(void *args)  
{  
    //將子線程設置成為分離狀態, 這樣主線程就可以不用jion  
    pthread_detach(pthread_self());  
    printf("*thread 0x%lx is starting...\n", (unsigned long)pthread_self());  
    ThreadPool *pool = (ThreadPool *)args;  
  
    //等待任務的到來, 然後執行任務  
    while (true)  
    {  
        bool timeout = false;  
  
        pool->ready.lock();  
        //當處於等待的時候, 則說明空閒的線程多了一個  
        ++ pool->idle;  
  
        //pool->ready中的條件變量有三個作用:  
        // 1.等待任務隊列中有任務到來  
        // 2.等待線程池銷毀通知  
        // 3.確保當等待超時的時候, 能夠將線程銷毀(線程退出)  
        while (pool->taskQueue.empty() && pool->quit == false)  
        {  
            printf("thread 0x%lx is waiting...\n", (unsigned long)pthread_self());  
            //等待waitSeconds  
            if (0 != pool->ready.timedwait(pool->waitSeconds))  
            {  
                //如果等待超時  
                printf("thread 0x%lx is wait timeout ...\n", (unsigned long)pthread_self());  
                timeout = true;  
                //break出循環, 繼續向下執行, 會執行到下面第1個if處  
                break;  
            }  
        }  
        //條件成熟(當等待結束), 線程開始執行任務或者是線程銷毀, 則說明空閒線程又少了一個  
        -- pool->idle;  
  
        // 狀態3.如果等待超時(一般此時任務隊列已經空了)  
        if (timeout == true && pool->taskQueue.empty())  
        {  
            -- pool->counter;  
            //解鎖然後跳出循環, 直接銷毀線程(退出線程)  
            pool->ready.unlock();  
            break;  
        }  
  
        // 狀態2.如果是等待到了線程的銷毀通知, 且任務都執行完畢了  
        if (pool->quit == true && pool->taskQueue.empty())  
        {  
            -- pool->counter;  
            //如果沒有線程了, 則給線程池發送通知  
            //告訴線程池, 池中已經沒有線程了  
            if (pool->counter == 0)  
                pool->ready.signal();  
            //解鎖然後跳出循環  
            pool->ready.unlock();  
            break;  
        }  
  
        // 狀態1.如果是有任務了, 則執行任務  
        if (!(pool->taskQueue.empty()))  
        {  
            //從隊頭取出任務進行處理  
            ThreadPool::task_t *t = pool->taskQueue.front();  
            pool->taskQueue.pop();  
  
            //執行任務需要一定的時間  
            //解鎖以便於其他的生產者可以繼續生產任務, 其他的消費者也可以消費任務  
            pool->ready.unlock();  
            //處理任務  
            t->run(t->args);  
            delete t;  
        }  
    }  
  
    //跳出循環之後, 打印退出信息, 然後銷毀線程  
    printf("thread 0x%lx is exiting...\n", (unsigned long)pthread_self());  
    pthread_exit(NULL);  
}  

//addTask函數  
//添加任務函數, 類似於一個生產者, 不斷的將任務生成, 掛接到任務隊列上, 等待消費者線程進行消費  
void ThreadPool::addTask(callback_t run, void *args)  
{  
    /** 1. 生成任務並將任務添加到"任務隊列"隊尾 **/  
    task_t *newTask = new task_t {run, args};  
  
    ready.lock();   //注意需要使用互斥量保護共享變量  
    taskQueue.push(newTask);  
  
    /** 2. 讓線程開始執行任務 **/  
    startTask();  
    ready.unlock();//解鎖以使任務開始執行  
}  

//線程啟動函數  
void ThreadPool::startTask()  
{  
    // 如果有等待線程, 則喚醒其中一個, 讓它來執行任務  
    if (idle > 0)  
        ready.signal();  
    // 沒有等待線程, 而且當前先線程總數尚未達到阈值, 我們就需要創建一個新的線程  
    else if (counter < maxThreads)  
    {  
        pthread_t tid;  
        pthread_create(&tid, NULL, thread_routine, this);  
        ++ counter;  
    }  
}  

//析構函數  
ThreadPool::~ThreadPool()  
{  
    //如果已經調用過了, 則直接返回  
    if (quit == true)  
        return;  
  
    ready.lock();  
    quit = true;  
    if (counter > 0)  
    {  
        //對於處於等待狀態, 則給他們發送通知,  
        //這些處於等待狀態的線程, 則會接收到通知,  
        //然後直接退出  
        if (idle > 0)  
            ready.broadcast();  
  
        //對於正處於執行任務的線程, 他們接收不到這些通知,  
        //則需要等待他們執行完任務  
        while (counter > 0)  
            ready.wait();  
    }  
    ready.unlock();  
}  
完整的代碼請到我的Github查看: https://github.com/Tachone/LinuxPorgDemo/tree/master/threadpool_C%2B%2B

關於高級線程池的探討

簡單線程池存在一些問題,比如如果有大量的客戶要求服務器為其服務,但由於線程池的工作線程是有限的,服務器只能為部分客戶服務,其它客戶提交的任務,只能在任務隊列中等待處理。一些系統設計人員可能會不滿這種狀況,因為他們對服務器程序的響應時間要求比較嚴格,所以在系統設計時可能會懷疑線程池技術的可行性,但是線程池有相應的解決方案。調整優化線程池尺寸是高級線程池要解決的一個問題。主要有下列解決方案:

 

方案一:動態增加工作線程

在一些高級線程池中一般提供一個可以動態改變的工作線程數目的功能,以適應突發性的請求。一旦請求變少了將逐步減少線程池中工作線程的數目。當然線程增加可以采用一種超前方式,即批量增加一批工作線程,而不是來一個請求才建立創建一個線程。批量創建是更加有效的方式。該方案還有應該限制線程池中工作線程數目的上限和下限。否則這種靈活的方式也就變成一種錯誤的方式或者災難,因為頻繁的創建線程或者短時間內產生大量的線程將會背離使用線程池原始初衷--減少創建線程的次數。

舉例:Jini中的TaskManager,就是一個精巧線程池管理器,它是動態增加工作線程的。SQL Server采用單進程(Single Process)多線程(Multi-Thread)的系統結構,1024個數量的線程池,動態線程分配,理論上限32767。

方案二:優化工作線程數目

如果不想在線程池應用復雜的策略來保證工作線程數滿足應用的要求,你就要根據統計學的原理來統計客戶的請求數目,比如高峰時段平均一秒鐘內有多少任務要求處理,並根據系統的承受能力及客戶的忍受能力來平衡估計一個合理的線程池尺寸。線程池的尺寸確實很難確定,所以有時干脆用經驗值。

舉例:在MTS中線程池的尺寸固定為100。

方案三:一個服務器提供多個線程池

在一些復雜的系統結構會采用這個方案。這樣可以根據不同任務或者任務優先級來采用不同線程池處理。

舉例:COM+用到了多個線程池。

這三種方案各有優缺點。在不同應用中可能采用不同的方案或者干脆組合這三種方案來解決實際問題。

 

線程池技術適用范圍及應注意的問題

下面是我總結的一些線程池應用范圍,可能是不全面的。

線程池的應用范圍:

(1)需要大量的線程來完成任務,且完成任務的時間比較短。 WEB服務器完成網頁請求這樣的任務,使用線程池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點擊次數。 但對於長時間的任務,比如一個Telnet連接請求,線程池的優點就不明顯了。因為Telnet會話時間比線程的創建時間大多了。

(2)對性能要求苛刻的應用,比如要求服務器迅速相應客戶請求。

(3)接受突發性的大量請求,但不至於使服務器因此產生大量線程的應用。突發性大量客戶請求,在沒有線程池情況下,將產生大量線程,雖然理論上大部分操作系統線程數目最大值不是問題,短時間內產生大量線程可能使內存到達極限,並出現"OutOfMemory"的錯誤。

參考:

《UNP》

IBM文檔: http://www.ibm.com/developerworks/cn/java/l-threadPool/

Copyright © Linux教程網 All Rights Reserved