歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux基礎 >> Linux技術

C++協程(2):使用ucontext實現Linux下的協程池

/*====================================

* file: ws_thread_proto.h 文件

* anchor: wensheng

* date: 2016-05-16

* info: 協程

* log: 1. 2016-05-16 wensheng create

======================================*/

#ifndef _WS_THREAD_PROTO_H_

#define _WS_THREAD_PROTO_H_

#include "../h/types.h"

#include "../ws_output_log_d/ws_log_output.h"

#include "ws_thread_pool.h"

using namespace ws_log_output;

using namespace std;

#ifdef WS_WINDOWS // Windows 使用swatch 奇技淫巧實現

#else // linux 使用ucontext庫實現

#include "ucontext.h"

// linux的線程形式

namespace ws_thread

{

#define WS_THREAD_PROTO_TASK_MAX (9999999) // 同時允許存在的任務數為9999999個

#define WS_DEFAULT_STACK_SZIE (1024*256*2) // 512k (注意:用戶函數內的局部變量申請空間總和不能大於等於該值(或者要小於該值4000個字節以下),否則會棧溢出),或者上調該值

#define WS_THREAD_PROTO_MAX_SIZE (160)

// 一個線程允許的最大協程個數 消耗的空間為 WS_THREAD_PROTO_MAX_SIZE*WS_DEFAULT_STACK_SZIE

// 協程狀態

enum ThreadPotoState

{

WS_THREAD_POTO_FREE = 0,

// 空閒狀態

WS_THREAD_POTO_RUNNABLE = 1,

// 就緒狀態

WS_THREAD_POTO_RUNING = 2,

// 執行狀態

WS_THREAD_POTO_SUSPEND = 3

// 掛起狀態

};

// 任務類型

enum ThreadPotoTaskType

{

WS_THREAD_PROTO_TASK_TYPE_LOGIN = 0,

// 正常業務邏輯

WS_THREAD_PROTO_TASK_TYPE_SUB = 1,

// 控制邏輯,消減協程數量

};

// 從外部設置線程執行的函數

typedef void* (*WS_THREAD_PROTO_FUNCTION)(void* pObj);

// 協程信息結構體

typedef struct ws_thread_proto_t

{

ucontext_t m_ctx;

// 線程上下文對象

WS_THREAD_PROTO_FUNCTION m_func;

// 任務函數

void *m_arg;

// 任務參數

ThreadPotoState m_state;

// 協程狀態

char m_stack[WS_DEFAULT_STACK_SZIE]; // 協程獨立棧空間,linux線程默認棧空間大小是8M,所以一個linux默認線程允許同時最大協程個數是40個

}THREAD_PROTO_T;

//線程控制信息

typedef struct ws_thread_proto_control_t

{

ucontext_t m_main;

// 主線程上下文(調度協程)

int32_t m_runningId;

// 當前運行的協程id

vector<THREAD_PROTO_T*> m_protos; // 協程集合

ws_thread_proto_control_t() :m_runningId(-1) { m_protos.clear(); }

}WS_T_P_CONTROL_T;

// Task(任務)結構體

struct ProtoTask

{

uint32_t m_type; // 任務類型(0表示業務任務,1表示控制人物--消減協程)

WS_THREAD_PROTO_FUNCTION m_fun; // 函數

void* m_para; // 參數

ProtoTask(WS_THREAD_PROTO_FUNCTION afun = NULL, void *pPara = NULL):m_type(WS_THREAD_PROTO_TASK_TYPE_LOGIN), m_fun(afun), m_para(pPara) {}

};

// 協程池(在線程池的基礎上實現)

// WS_DEFAULT_STACK_SZIE(注意:用戶函數內的局部變量申請空間總和不能大於等於該值(或者要小於該值4000個字節以下),否則會棧溢出),或者上調該值

class ThreadProtoPool

{

public:

//dwNum 線程池規模

ThreadProtoPool(uint32_t thread_num = 1); // thread_num 線程個數,都允許動態調整

virtual ~ThreadProtoPool();

// 添加協程任務: (注:不承諾銷毀pObj,需要用戶自己去銷毀)

bool addProtoTask(WS_THREAD_PROTO_FUNCTION pFun, void* pObj);

//調整線程規模

uint32_t updateThreadNum(int32_t aNum);

// 所有異步I/O的地方都需要調用該函數

bool yieldIO();

// 結束協程池

bool endPool();

public:

std::map<uint32_t, WS_T_P_CONTROL_T*> m_thread_info; // 所有線程控制信息<線程id, 控制信息>

std::queue<ProtoTask*> m_TaskQueue; // 任務隊列

WS_LOCK_CRITICAL m_TaskLock; // 任務隊列鎖

WS_LOCK_CRITICAL m_PoolLock; // 線程隊列鎖

bool m_ustopPool;

// false表示結束協程池

uint32_t m_stopNum; // 關閉線程(軟關閉)(可能不生效)

};

}

#endif

#endif /* _WS_THREAD_PROTO_H_ */

//===================================================分割線============================================//

/*====================================

* file: ws_thread_proto.cpp 文件

* anchor: wensheng

* date: 2016-05-16

* info: 協程

* log: 1. 2016-05-16 wensheng create

======================================*/

#include "ws_thread_proto.h"

#include "../ws_output_log_d/ws_log_output.h"

#include <string.h>

using namespace ws_log_output;

using namespace ws_thread;

// 協程處理函數

static void* thread_proto_runing(void* a_pObj)

{

// 獲取協程管理對象

WS_T_P_CONTROL_T* pObj = (WS_T_P_CONTROL_T*)a_pObj;

if (NULL == pObj) { return NULL; }

if ((pObj->m_runningId < 0) || (pObj->m_runningId >= pObj->m_protos.size()))

{

pObj->m_runningId++;

}

// 獲取當前正在執行的協程

THREAD_PROTO_T* pProto = pObj->m_protos[pObj->m_runningId];

if (NULL == pProto) { return NULL; }

if (NULL != pProto->m_func)

{

WS_LOG(Info, "thread_proto_runing !thread_id:%u, proto_num:%u", pthread_self(), pObj->m_runningId);

// 執行協程函數

pProto->m_func(pProto->m_arg);

}

// 執行結束,清理棧空間,重置狀態

pProto->m_func = NULL; // 處理函數

pProto->m_arg = NULL; // 參數

pProto->m_state = WS_THREAD_POTO_FREE; // 起始空閒狀態

//memset(&(pProto->m_stack), 0, WS_DEFAULT_STACK_SZIE - 2); // 這個地方不能清理,保存的有寄存器信息

++pObj->m_runningId;

// 切回到主協程

swapcontext(&(pProto->m_ctx), &(pObj->m_main));

return NULL;

}

// 主協程去取任務,然後添加到分協程中

static void* main_thread_proto_runing(void* pObj)

{

// 獲取對象

ThreadProtoPool* pProtoPool = static_cast<ThreadProtoPool*>(pObj);

assert(NULL != pProtoPool);

// 獲取線程id

uint32_t pid = pthread_self();

// 創建協程管理對象

WS_T_P_CONTROL_T* proto_control = new WS_T_P_CONTROL_T();

assert(NULL != proto_control);

// 獲取當前的主協程上下文

getcontext(&(proto_control->m_main)); // 獲取當前上下文

// 加入管理池中

pthread_mutex_lock(&pProtoPool->m_PoolLock);

pProtoPool->m_thread_info.insert(make_pair(pid, proto_control));

pthread_mutex_unlock(&pProtoPool->m_PoolLock);

ProtoTask* pJob = NULL;

// 是否結束

while (pProtoPool->m_ustopPool)

{

usleep(10000);

// 檢查是不是關閉線程

if (pProtoPool->m_stopNum > 0)

{

// 不再接收新任務

bool can_stop = true;

for (uint32_t index = 0; index < proto_control->m_protos.size(); ++index)

{

if (WS_THREAD_POTO_FREE != proto_control->m_protos[index]->m_state)

{

can_stop = false;

break; // 跳出第一循環執行剩余的協程任務

}

}

if (can_stop)

{

// 刪除數量

__sync_sub_and_fetch(&pProtoPool->m_stopNum, 1);

break; // 直接跳出線程循環

}

}

else

{

pJob = NULL;

// 鎖, 獲取任務

pthread_mutex_lock(&pProtoPool->m_TaskLock);

if (!pProtoPool->m_TaskQueue.empty())

{

pJob = pProtoPool->m_TaskQueue.front(); // 獲取任務

pProtoPool->m_TaskQueue.pop();

}

pthread_mutex_unlock(&pProtoPool->m_TaskLock);

if (NULL != pJob)

{

uint32_t poroSize = proto_control->m_protos.size();

// 檢查任務類型(若是協程控制任務,就執行控制屬性)

switch (pJob->m_type)

{

case WS_THREAD_PROTO_TASK_TYPE_SUB: //控制邏輯,消減協程數量

{

vector<THREAD_PROTO_T*>::iterator it = proto_control->m_protos.begin();

for (; it != proto_control->m_protos.end(); ++it)

{

if ((*it)->m_state == WS_THREAD_POTO_FREE)// 空閒狀態才能刪除

{

THREAD_PROTO_T* pProto = *it;

proto_control->m_protos.erase(it++);

delete[] pProto; // 銷毀協程對象

break; // 跳出第一層循環

}

}

break;

}

case WS_THREAD_PROTO_TASK_TYPE_LOGIN: // 正常業務邏輯

{

// 先查找是否有空閒協程

THREAD_PROTO_T* pProto = NULL;

uint32_t index = 0;

for (; index < poroSize; ++index)

{

assert(NULL != proto_control->m_protos[index]);

if (WS_THREAD_POTO_FREE == proto_control->m_protos[index]->m_state)

{

pProto = proto_control->m_protos[index];

break;

}

}

// 如果沒找到就創建一個協程

if (NULL == pProto)

{

if (WS_THREAD_PROTO_MAX_SIZE > poroSize) // 協程個數不能太大

{

THREAD_PROTO_T* newProto = new THREAD_PROTO_T(); // 創建一個新協程

newProto->m_func = NULL; // 處理函數

newProto->m_arg = NULL; // 參數

newProto->m_state = WS_THREAD_POTO_FREE; // 起始空閒狀態

memset(&newProto->m_stack, 0, WS_DEFAULT_STACK_SZIE); // 清理棧空間

proto_control->m_protos.push_back(newProto); // 產生了拷貝

pProto = proto_control->m_protos[index];

}

}

// 添加任務

if (NULL != pProto)

{

pProto->m_func = pJob->m_fun; // 處理函數

pProto->m_arg = pJob->m_para; // 參數

pProto->m_state = WS_THREAD_POTO_RUNNABLE; // 就緒狀態

delete pJob; // 刪除任務原語

}

else // 任務沒有被執行,回插入任務隊列

{

pthread_mutex_lock(&pProtoPool->m_TaskLock);

pProtoPool->m_TaskQueue.push(pJob); // 插入任務

pthread_mutex_unlock(&pProtoPool->m_TaskLock);

}

break;

}

default:

{

break;

}

}

}

}

// 執行協程

{

if ((proto_control->m_runningId < 0) || (proto_control->m_runningId >= proto_control->m_protos.size()))

{

proto_control->m_runningId = 0;

}

// 獲取協程對象

THREAD_PROTO_T *pProto = NULL;

if (proto_control->m_protos.size() > proto_control->m_runningId)

{

pProto = proto_control->m_protos[proto_control->m_runningId];

// 檢查執行狀態

switch (pProto->m_state)

{

case WS_THREAD_POTO_RUNNABLE: // 就緒狀態

{

getcontext(&(pProto->m_ctx)); // 獲取當前上下文

pProto->m_ctx.uc_stack.ss_sp = pProto->m_stack; // 指定運行棧空間

pProto->m_ctx.uc_stack.ss_size = WS_DEFAULT_STACK_SZIE - 2; // 指定運行棧空間大小

pProto->m_ctx.uc_stack.ss_flags = 0;

pProto->m_ctx.uc_link = &proto_control->m_main;//設置後繼上下文

pProto->m_state = WS_THREAD_POTO_RUNING; // 執行狀態

//proto_control->m_runningId = index; // 當前只在執行的線程id

//設置上下文執行函數

makecontext(&(pProto->m_ctx), (void(*)(void))(thread_proto_runing), 1, proto_control);

// 這裡不需要break

}

case WS_THREAD_POTO_SUSPEND: // 掛起狀態(就是執行了一部分)

// 跳轉到指定協程,保留當前主協程信息

swapcontext(&(proto_control->m_main), &(pProto->m_ctx));

break;

default:

break;

}

}

}

}

// 關閉協程池

proto_control->m_protos.clear();

// 刪除記錄

pthread_mutex_lock(&pProtoPool->m_PoolLock);

pProtoPool->m_thread_info.erase(pProtoPool->m_thread_info.find(pid));

pthread_mutex_unlock(&pProtoPool->m_PoolLock);

return NULL;

}

//dwNum 線程池規模

//thread_num 線程個數

ThreadProtoPool::ThreadProtoPool(uint32_t thread_num)

{

assert(0 == pthread_mutex_init(&m_TaskLock, NULL));

assert(0 == pthread_mutex_init(&m_PoolLock, NULL));

m_ustopPool = true;

m_stopNum = 0;

// 創建線程

updateThreadNum(thread_num);

}

ThreadProtoPool::~ThreadProtoPool()

{

endPool();

// 刪除剩余任務

for (uint32_t i = 0; i < m_TaskQueue.size(); ++i)

{

ProtoTask* pJob = m_TaskQueue.front(); // 獲取任務

m_TaskQueue.pop();

delete pJob;

}

pthread_mutex_destroy(&m_TaskLock);

pthread_mutex_destroy(&m_PoolLock);

}

// 添加協程任務

bool ThreadProtoPool::addProtoTask(WS_THREAD_PROTO_FUNCTION pFun, void* pObj)

{

assert(pFun);

if (m_TaskQueue.size() <= WS_THREAD_PROTO_TASK_MAX)

{

ProtoTask* task = new ProtoTask(pFun, pObj);

pthread_mutex_lock(&m_TaskLock);

m_TaskQueue.push(task);

pthread_mutex_unlock(&m_TaskLock);

//WS_LOG(Info, "addProtoTask size:%u", m_TaskQueue.size());

return true;

}

WS_LOG(Error, "addProtoTask have too match! size:%u", m_TaskQueue.size());

return false;

}

//調整線程規模

uint32_t ThreadProtoPool::updateThreadNum(int32_t aNum)

{

if (aNum > 0)

{

// 創建線程

for (uint32_t i = 0; i < aNum; ++i)

{

// 創建線程

Thread* pNewThread = new Thread;

uint32_t pid = (uint32_t)pNewThread->CreateThread((void*)this, &main_thread_proto_runing);

assert(pid != 0);

}

}

else

{

// 不需要鎖

m_stopNum = -1 * aNum;

}

}

// 所有異步I/O的地方都需要調用該函數

bool ThreadProtoPool::yieldIO()

{

uint32_t pid = pthread_self();

std::map<uint32_t, WS_T_P_CONTROL_T*>::iterator it = m_thread_info.find(pid);

if (it != m_thread_info.end())

{

WS_T_P_CONTROL_T* pControl = it->second;

assert(NULL != pControl);

THREAD_PROTO_T *pProto = pControl->m_protos[pControl->m_runningId];

pProto->m_state = WS_THREAD_POTO_SUSPEND; // 掛起

++pControl->m_runningId; // 下一個協程

// 切回到主協程

swapcontext(&(pProto->m_ctx), &(pControl->m_main));

return true;

}

return false;

}

// 結束協程池

bool ThreadProtoPool::endPool()

{

m_ustopPool = false;

// 等待銷毀

while (m_thread_info.size() > 0)

{

sleep(1);

}

}

//============================================分割線 測試test.cpp==================================//

// thread_test.cpp : 定義控制台應用程序的入口點。

//

#include "ws_thread_pool.h"

#include "iostream"

#include "ws_thread_proto.h"

#include <string.h>

using namespace ws_thread;

using namespace std;

struct someThin

{

public:

uint32_t i;

};

ThreadProtoPool one_proto_pool(2);

#ifdef WS_WINDOWS

static unsigned int __stdcall RUN(void* pObj)

{

for (uint32_t i = 0; i < 10; ++i)

{

someThin* pST = ((someThin*)pObj);

pST->i = pST->i + 1;

WS_LOG(Info, "線程值 :%d thread_id:%d", pST->i, GetCurrentThreadId());

}

return NULL;

}

#else

static void* RUN(void* pObj)

{

someThin* pST = ((someThin*)pObj);

int* pone = new int[65535];

//int one[65536] = {0};

memset(pone, 0, 65535*sizeof(int));

pST->i = pST->i + 1;

WS_LOG(Info, "1: num :%d thread_id:%d %d", pST->i, pthread_self(), pone[0]);

one_proto_pool.yieldIO();

pST->i = pST->i + 1;

pone[0]++;

WS_LOG(Info, "2: num :%d thread_id:%d %d", pST->i, pthread_self(), pone[0]);

delete[] pone;

return NULL;

}

#endif

int main()

{

int i = 0;

//ThreadPool one_pool(40);

someThin one;

one.i = 0;

for (uint32_t j = 0; j < 100000; ++j)

{

//one.i = j;

WS_LOG(Info, "yuanshizhi :%d ", j);

//one_pool.addTheadTask(&RUN, &one);

one_proto_pool.addProtoTask(&RUN, &one);

usleep(10000);

}

cin >> i;

return 0;

}

Copyright © Linux教程網 All Rights Reserved