歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Linux下的C++線程池實現

我設計這個線程池的初衷是為了與socket對接的。線程池的實現千變萬化,我得這個並不一定是最好的,但卻是否和我心目中需求模型的。現把部分設計思路和代碼貼出,以期拋磚引玉。個人比較喜歡搞開源,所以大家如果覺得有什麼需要改善的地方,歡迎給予評論。思前想後,也沒啥設計圖能表達出設計思想,就把類圖貼出來吧。

類圖設計如下:

Command類是我們的業務類。這個類裡只能存放簡單的內置類型,這樣方便與socket的直接傳輸。我定義了一個cmd_成員用於存放命令字,arg_用於存放業務的參數。這個參數可以使用分隔符來分隔各個參數。我設計的只是簡單實現,如果有序列化操作了,完全不需要使用我這種方法啦。

ThreadProcess就是業務處理類,這裡邊定義了各個方法用於進行業務處理,它將在ThreadPool中的Process函數中調用。
ThreadPool就是我們的線程池類。其中的成員變量都是靜態變量,Process就是線程處理函數。
#define MAX_THREAD_NUM 50 // 該值目前需要設定為初始線程數的整數倍
#define ADD_FACTOR 40 // 該值表示一個線程可以處理的最大任務數
#define THREAD_NUM 10 // 初始線程數
bshutdown_:用於線程退出。
command_:用於存放任務隊列
command_cond_:條件變量
command_mutex_:互斥鎖
icurr_thread_num_:當前線程池中的線程數
thread_id_map_:這個map用於存放線程對應的其它信息,我只存放了線程的狀態,0為正常,1為退出。還可以定義其它的結構來存放更多的信息,例如存放套接字。
InitializeThreads:用於初始化線程池,先創建THREAD_NUM個線程。後期擴容也需要這個函數。
Process:線程處理函數,這裡邊會調用AddThread和DeleteThread在進行線程池的伸縮。
AddWork:往隊列中添加一個任務。
ThreadDestroy:線程銷毀函數。
AddThread:擴容THREAD_NUM個線程
DeleteThread:如果任務隊列為空,則將原來的線程池恢復到THREAD_NUM個。這裡可以根據需要進行修改。
 
以下貼出代碼以供大家參考。
command.h

#ifndef COMMAND_H_
#define COMMAND_H_

class Command
{
public:
    int get_cmd();
    char* get_arg();
    void set_cmd(int cmd);
    void set_arg(char* arg);
private:
    int cmd_;
    char arg_[65];
};

#endif /* COMMAND_H_ */

command.cpp

#include <string.h>
#include "command.h"


int Command::get_cmd()
{
    return cmd_;
}

char* Command::get_arg()
{
    return arg_;
}

void Command::set_cmd(int cmd)
{
    cmd_ = cmd;
}

void Command::set_arg(char* arg)
{
    if(NULL == arg)
    {
        return;
    }
    strncpy(arg_,arg,64);
    arg_[64] = '\0';
}

thread_process.h

#ifndef THREAD_PROCESS_H_
#define THREAD_PROCESS_H_

class ThreadProcess
{
public:
    void Process0(void* arg);
    void Process1(void* arg);
    void Process2(void* arg);
};

#endif /* THREAD_PROCESS_H_ */

thread_process.cpp

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include "thread_process.h"


void ThreadProcess::Process0(void* arg)
{
    printf("thread %u is starting process %s\n",pthread_self(),arg);
    usleep(100*1000);
}
void ThreadProcess::Process1(void* arg)
{
    printf("thread %u is starting process %s\n",pthread_self(),arg);
    usleep(100*1000);
}

void ThreadProcess::Process2(void* arg)
{
    printf("thread %u is starting process %s\n",pthread_self(),arg);
    usleep(100*1000);
}

thread_pool.h

#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_

#include <map>
#include <vector>
#include "command.h"

#define MAX_THREAD_NUM 50 // 該值目前需要設定為初始線程數的整數倍
#define ADD_FACTOR 40 // 該值表示一個線程可以處理的最大任務數
#define THREAD_NUM 10 // 初始線程數

class ThreadPool
{
public:
    ThreadPool() {};
    static void InitializeThreads();
    void AddWork(Command command);
    void ThreadDestroy(int iwait = 2);
private:
    static void* Process(void* arg);
    static void AddThread();
    static void DeleteThread();
    static bool bshutdown_;
    static int icurr_thread_num_;
    static std::map<pthread_t,int> thread_id_map_;
    static std::vector<Command> command_;
    static pthread_mutex_t command_mutex_;
    static pthread_cond_t command_cond_;
};


#endif /* THREAD_POOL_H_ */

thread_pool.cpp

#include <pthread.h>
#include <stdlib.h>
#include "thread_pool.h"
#include "thread_process.h"
#include "command.h"

bool ThreadPool::bshutdown_ = false;
int ThreadPool::icurr_thread_num_ = THREAD_NUM;
std::vector<Command> ThreadPool::command_;
std::map<pthread_t,int> ThreadPool::thread_id_map_;
pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;

void ThreadPool::InitializeThreads()
{
    for (int i = 0; i < THREAD_NUM ; ++i)
    {
        pthread_t tempThread;
        pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);
        thread_id_map_[tempThread] = 0;
    }
}

void* ThreadPool::Process(void* arg)
{
    ThreadProcess threadprocess;
    Command command;
    while (true)
    {
        pthread_mutex_lock(&command_mutex_);
        // 如果線程需要退出,則此時退出
        if (1 == thread_id_map_[pthread_self()])
        {
            pthread_mutex_unlock(&command_mutex_);
            printf("thread %u will exit\n", pthread_self());
            pthread_exit(NULL);
        }
        // 當線程不需要退出且沒有需要處理的任務時,需要縮容的則縮容,不需要的則等待信號
        if (0 == command_.size() && !bshutdown_)
        {
            if(MAX_THREAD_NUM != THREAD_NUM)
            {
                DeleteThread();
                if (1 == thread_id_map_[pthread_self()])
                {
                    pthread_mutex_unlock(&command_mutex_);
                    printf("thread %u will exit\n", pthread_self());
                    pthread_exit(NULL);
                }
            }
            pthread_cond_wait(&command_cond_,&command_mutex_);
        }
        // 線程池需要關閉,關閉已有的鎖,線程退出
        if(bshutdown_)
        {
            pthread_mutex_unlock (&command_mutex_);
            printf ("thread %u will exit\n", pthread_self ());
            pthread_exit (NULL);
        }
        // 如果線程池的最大線程數不等於初始線程數,則表明需要擴容
        if(MAX_THREAD_NUM != THREAD_NUM)
        {
            AddThread();
        }
        // 從容器中取出待辦任務
        std::vector<Command>::iterator iter = command_.begin();
        command.set_arg(iter->get_arg());
        command.set_cmd(iter->get_cmd());
        command_.erase(iter);
        pthread_mutex_unlock(&command_mutex_);
        // 開始業務處理
        switch(command.get_cmd())
        {
        case 0:
            threadprocess.Process0(command.get_arg());
            break;
        case 1:
            threadprocess.Process1(command.get_arg());
            break;
        case 2:
            threadprocess.Process2(command.get_arg());
            break;
        default:
            break;
        }
    }
    return NULL; // 完全為了消除警告(eclipse編寫的代碼,警告很煩人)
}

void ThreadPool::AddWork(Command command)
{
    bool bsignal = false;
    pthread_mutex_lock(&command_mutex_);
    if (0 == command_.size())
    {
        bsignal = true;
    }
    command_.push_back(command);
    pthread_mutex_unlock(&command_mutex_);
    if (bsignal)
    {
        pthread_cond_signal(&command_cond_);
    }
}

void ThreadPool::ThreadDestroy(int iwait)
{
    while(0 != command_.size())
    {
        sleep(abs(iwait));
    }
    bshutdown_ = true;
    pthread_cond_broadcast(&command_cond_);
    std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();
    for (; iter!=thread_id_map_.end(); ++iter)
    {
        pthread_join(iter->first,NULL);
    }
    pthread_mutex_destroy(&command_mutex_);
    pthread_cond_destroy(&command_cond_);
}

void ThreadPool::AddThread()
{
    if(((icurr_thread_num_*ADD_FACTOR) < command_.size())
            && (MAX_THREAD_NUM != icurr_thread_num_))
    {
        InitializeThreads();
        icurr_thread_num_ += THREAD_NUM;
    }
}

void ThreadPool::DeleteThread()
{
    int size = icurr_thread_num_ - THREAD_NUM;
    std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();
    for(int i=0; i<size; ++i,++iter)
    {
        iter->second = 1;
    }
}

main.cpp

#include "thread_pool.h"
#include "command.h"

int main()
{
    ThreadPool thread_pool;
    thread_pool.InitializeThreads();
    Command command;
    char arg[8] = {0};
    for(int i=1; i<=1000; ++i)
    {
        command.set_cmd(i%3);
        sprintf(arg,"%d",i);
        command.set_arg(arg);
        thread_pool.AddWork(command);
    }
    sleep(10); // 用於測試線程池縮容
    thread_pool.ThreadDestroy();
    return 0;
}

代碼是按照google的開源c++編碼規范編寫。大家可以通過改變那幾個宏的值來調整線程池。有問題大家一起討論。

Copyright © Linux教程網 All Rights Reserved