/*====================================
* file: ws_thread_proto.h 文件
* anchor: wensheng
* date: 2016-05-16
* info: 協程
* log: 1. 2016-05-16 wensheng create
======================================*/
#ifndef _WS_THREAD_PROTO_H_
#define _WS_THREAD_PROTO_H_
#include "../h/types.h"
#include "../ws_output_log_d/ws_log_output.h"
#include "ws_thread_pool.h"
using namespace ws_log_output;
using namespace std;
#ifdef WS_WINDOWS // Windows 使用swatch 奇技淫巧實現
#else // linux 使用ucontext庫實現
#include "ucontext.h"
// linux的線程形式
namespace ws_thread
{
#define WS_THREAD_PROTO_TASK_MAX (9999999) // 同時允許存在的任務數為9999999個
#define WS_DEFAULT_STACK_SZIE (1024*256*2) // 512k (注意:用戶函數內的局部變量申請空間總和不能大於等於該值(或者要小於該值4000個字節以下),否則會棧溢出),或者上調該值
#define WS_THREAD_PROTO_MAX_SIZE (160)
// 一個線程允許的最大協程個數 消耗的空間為 WS_THREAD_PROTO_MAX_SIZE*WS_DEFAULT_STACK_SZIE
// 協程狀態
enum ThreadPotoState
{
WS_THREAD_POTO_FREE = 0,
// 空閒狀態
WS_THREAD_POTO_RUNNABLE = 1,
// 就緒狀態
WS_THREAD_POTO_RUNING = 2,
// 執行狀態
WS_THREAD_POTO_SUSPEND = 3
// 掛起狀態
};
// 任務類型
enum ThreadPotoTaskType
{
WS_THREAD_PROTO_TASK_TYPE_LOGIN = 0,
// 正常業務邏輯
WS_THREAD_PROTO_TASK_TYPE_SUB = 1,
// 控制邏輯,消減協程數量
};
// 從外部設置線程執行的函數
typedef void* (*WS_THREAD_PROTO_FUNCTION)(void* pObj);
// 協程信息結構體
typedef struct ws_thread_proto_t
{
ucontext_t m_ctx;
// 線程上下文對象
WS_THREAD_PROTO_FUNCTION m_func;
// 任務函數
void *m_arg;
// 任務參數
ThreadPotoState m_state;
// 協程狀態
char m_stack[WS_DEFAULT_STACK_SZIE]; // 協程獨立棧空間,linux線程默認棧空間大小是8M,所以一個linux默認線程允許同時最大協程個數是40個
}THREAD_PROTO_T;
//線程控制信息
typedef struct ws_thread_proto_control_t
{
ucontext_t m_main;
// 主線程上下文(調度協程)
int32_t m_runningId;
// 當前運行的協程id
vector<THREAD_PROTO_T*> m_protos; // 協程集合
ws_thread_proto_control_t() :m_runningId(-1) { m_protos.clear(); }
}WS_T_P_CONTROL_T;
// Task(任務)結構體
struct ProtoTask
{
uint32_t m_type; // 任務類型(0表示業務任務,1表示控制人物--消減協程)
WS_THREAD_PROTO_FUNCTION m_fun; // 函數
void* m_para; // 參數
ProtoTask(WS_THREAD_PROTO_FUNCTION afun = NULL, void *pPara = NULL):m_type(WS_THREAD_PROTO_TASK_TYPE_LOGIN), m_fun(afun), m_para(pPara) {}
};
// 協程池(在線程池的基礎上實現)
// WS_DEFAULT_STACK_SZIE(注意:用戶函數內的局部變量申請空間總和不能大於等於該值(或者要小於該值4000個字節以下),否則會棧溢出),或者上調該值
class ThreadProtoPool
{
public:
//dwNum 線程池規模
ThreadProtoPool(uint32_t thread_num = 1); // thread_num 線程個數,都允許動態調整
virtual ~ThreadProtoPool();
// 添加協程任務: (注:不承諾銷毀pObj,需要用戶自己去銷毀)
bool addProtoTask(WS_THREAD_PROTO_FUNCTION pFun, void* pObj);
//調整線程規模
uint32_t updateThreadNum(int32_t aNum);
// 所有異步I/O的地方都需要調用該函數
bool yieldIO();
// 結束協程池
bool endPool();
public:
std::map<uint32_t, WS_T_P_CONTROL_T*> m_thread_info; // 所有線程控制信息<線程id, 控制信息>
std::queue<ProtoTask*> m_TaskQueue; // 任務隊列
WS_LOCK_CRITICAL m_TaskLock; // 任務隊列鎖
WS_LOCK_CRITICAL m_PoolLock; // 線程隊列鎖
bool m_ustopPool;
// false表示結束協程池
uint32_t m_stopNum; // 關閉線程(軟關閉)(可能不生效)
};
}
#endif
#endif /* _WS_THREAD_PROTO_H_ */
//===================================================分割線============================================//
/*====================================
* file: ws_thread_proto.cpp 文件
* anchor: wensheng
* date: 2016-05-16
* info: 協程
* log: 1. 2016-05-16 wensheng create
======================================*/
#include "ws_thread_proto.h"
#include "../ws_output_log_d/ws_log_output.h"
#include <string.h>
using namespace ws_log_output;
using namespace ws_thread;
// 協程處理函數
static void* thread_proto_runing(void* a_pObj)
{
// 獲取協程管理對象
WS_T_P_CONTROL_T* pObj = (WS_T_P_CONTROL_T*)a_pObj;
if (NULL == pObj) { return NULL; }
if ((pObj->m_runningId < 0) || (pObj->m_runningId >= pObj->m_protos.size()))
{
pObj->m_runningId++;
}
// 獲取當前正在執行的協程
THREAD_PROTO_T* pProto = pObj->m_protos[pObj->m_runningId];
if (NULL == pProto) { return NULL; }
if (NULL != pProto->m_func)
{
WS_LOG(Info, "thread_proto_runing !thread_id:%u, proto_num:%u", pthread_self(), pObj->m_runningId);
// 執行協程函數
pProto->m_func(pProto->m_arg);
}
// 執行結束,清理棧空間,重置狀態
pProto->m_func = NULL; // 處理函數
pProto->m_arg = NULL; // 參數
pProto->m_state = WS_THREAD_POTO_FREE; // 起始空閒狀態
//memset(&(pProto->m_stack), 0, WS_DEFAULT_STACK_SZIE - 2); // 這個地方不能清理,保存的有寄存器信息
++pObj->m_runningId;
// 切回到主協程
swapcontext(&(pProto->m_ctx), &(pObj->m_main));
return NULL;
}
// 主協程去取任務,然後添加到分協程中
static void* main_thread_proto_runing(void* pObj)
{
// 獲取對象
ThreadProtoPool* pProtoPool = static_cast<ThreadProtoPool*>(pObj);
assert(NULL != pProtoPool);
// 獲取線程id
uint32_t pid = pthread_self();
// 創建協程管理對象
WS_T_P_CONTROL_T* proto_control = new WS_T_P_CONTROL_T();
assert(NULL != proto_control);
// 獲取當前的主協程上下文
getcontext(&(proto_control->m_main)); // 獲取當前上下文
// 加入管理池中
pthread_mutex_lock(&pProtoPool->m_PoolLock);
pProtoPool->m_thread_info.insert(make_pair(pid, proto_control));
pthread_mutex_unlock(&pProtoPool->m_PoolLock);
ProtoTask* pJob = NULL;
// 是否結束
while (pProtoPool->m_ustopPool)
{
usleep(10000);
// 檢查是不是關閉線程
if (pProtoPool->m_stopNum > 0)
{
// 不再接收新任務
bool can_stop = true;
for (uint32_t index = 0; index < proto_control->m_protos.size(); ++index)
{
if (WS_THREAD_POTO_FREE != proto_control->m_protos[index]->m_state)
{
can_stop = false;
break; // 跳出第一循環執行剩余的協程任務
}
}
if (can_stop)
{
// 刪除數量
__sync_sub_and_fetch(&pProtoPool->m_stopNum, 1);
break; // 直接跳出線程循環
}
}
else
{
pJob = NULL;
// 鎖, 獲取任務
pthread_mutex_lock(&pProtoPool->m_TaskLock);
if (!pProtoPool->m_TaskQueue.empty())
{
pJob = pProtoPool->m_TaskQueue.front(); // 獲取任務
pProtoPool->m_TaskQueue.pop();
}
pthread_mutex_unlock(&pProtoPool->m_TaskLock);
if (NULL != pJob)
{
uint32_t poroSize = proto_control->m_protos.size();
// 檢查任務類型(若是協程控制任務,就執行控制屬性)
switch (pJob->m_type)
{
case WS_THREAD_PROTO_TASK_TYPE_SUB: //控制邏輯,消減協程數量
{
vector<THREAD_PROTO_T*>::iterator it = proto_control->m_protos.begin();
for (; it != proto_control->m_protos.end(); ++it)
{
if ((*it)->m_state == WS_THREAD_POTO_FREE)// 空閒狀態才能刪除
{
THREAD_PROTO_T* pProto = *it;
proto_control->m_protos.erase(it++);
delete[] pProto; // 銷毀協程對象
break; // 跳出第一層循環
}
}
break;
}
case WS_THREAD_PROTO_TASK_TYPE_LOGIN: // 正常業務邏輯
{
// 先查找是否有空閒協程
THREAD_PROTO_T* pProto = NULL;
uint32_t index = 0;
for (; index < poroSize; ++index)
{
assert(NULL != proto_control->m_protos[index]);
if (WS_THREAD_POTO_FREE == proto_control->m_protos[index]->m_state)
{
pProto = proto_control->m_protos[index];
break;
}
}
// 如果沒找到就創建一個協程
if (NULL == pProto)
{
if (WS_THREAD_PROTO_MAX_SIZE > poroSize) // 協程個數不能太大
{
THREAD_PROTO_T* newProto = new THREAD_PROTO_T(); // 創建一個新協程
newProto->m_func = NULL; // 處理函數
newProto->m_arg = NULL; // 參數
newProto->m_state = WS_THREAD_POTO_FREE; // 起始空閒狀態
memset(&newProto->m_stack, 0, WS_DEFAULT_STACK_SZIE); // 清理棧空間
proto_control->m_protos.push_back(newProto); // 產生了拷貝
pProto = proto_control->m_protos[index];
}
}
// 添加任務
if (NULL != pProto)
{
pProto->m_func = pJob->m_fun; // 處理函數
pProto->m_arg = pJob->m_para; // 參數
pProto->m_state = WS_THREAD_POTO_RUNNABLE; // 就緒狀態
delete pJob; // 刪除任務原語
}
else // 任務沒有被執行,回插入任務隊列
{
pthread_mutex_lock(&pProtoPool->m_TaskLock);
pProtoPool->m_TaskQueue.push(pJob); // 插入任務
pthread_mutex_unlock(&pProtoPool->m_TaskLock);
}
break;
}
default:
{
break;
}
}
}
}
// 執行協程
{
if ((proto_control->m_runningId < 0) || (proto_control->m_runningId >= proto_control->m_protos.size()))
{
proto_control->m_runningId = 0;
}
// 獲取協程對象
THREAD_PROTO_T *pProto = NULL;
if (proto_control->m_protos.size() > proto_control->m_runningId)
{
pProto = proto_control->m_protos[proto_control->m_runningId];
// 檢查執行狀態
switch (pProto->m_state)
{
case WS_THREAD_POTO_RUNNABLE: // 就緒狀態
{
getcontext(&(pProto->m_ctx)); // 獲取當前上下文
pProto->m_ctx.uc_stack.ss_sp = pProto->m_stack; // 指定運行棧空間
pProto->m_ctx.uc_stack.ss_size = WS_DEFAULT_STACK_SZIE - 2; // 指定運行棧空間大小
pProto->m_ctx.uc_stack.ss_flags = 0;
pProto->m_ctx.uc_link = &proto_control->m_main;//設置後繼上下文
pProto->m_state = WS_THREAD_POTO_RUNING; // 執行狀態
//proto_control->m_runningId = index; // 當前只在執行的線程id
//設置上下文執行函數
makecontext(&(pProto->m_ctx), (void(*)(void))(thread_proto_runing), 1, proto_control);
// 這裡不需要break
}
case WS_THREAD_POTO_SUSPEND: // 掛起狀態(就是執行了一部分)
// 跳轉到指定協程,保留當前主協程信息
swapcontext(&(proto_control->m_main), &(pProto->m_ctx));
break;
default:
break;
}
}
}
}
// 關閉協程池
proto_control->m_protos.clear();
// 刪除記錄
pthread_mutex_lock(&pProtoPool->m_PoolLock);
pProtoPool->m_thread_info.erase(pProtoPool->m_thread_info.find(pid));
pthread_mutex_unlock(&pProtoPool->m_PoolLock);
return NULL;
}
//dwNum 線程池規模
//thread_num 線程個數
ThreadProtoPool::ThreadProtoPool(uint32_t thread_num)
{
assert(0 == pthread_mutex_init(&m_TaskLock, NULL));
assert(0 == pthread_mutex_init(&m_PoolLock, NULL));
m_ustopPool = true;
m_stopNum = 0;
// 創建線程
updateThreadNum(thread_num);
}
ThreadProtoPool::~ThreadProtoPool()
{
endPool();
// 刪除剩余任務
for (uint32_t i = 0; i < m_TaskQueue.size(); ++i)
{
ProtoTask* pJob = m_TaskQueue.front(); // 獲取任務
m_TaskQueue.pop();
delete pJob;
}
pthread_mutex_destroy(&m_TaskLock);
pthread_mutex_destroy(&m_PoolLock);
}
// 添加協程任務
bool ThreadProtoPool::addProtoTask(WS_THREAD_PROTO_FUNCTION pFun, void* pObj)
{
assert(pFun);
if (m_TaskQueue.size() <= WS_THREAD_PROTO_TASK_MAX)
{
ProtoTask* task = new ProtoTask(pFun, pObj);
pthread_mutex_lock(&m_TaskLock);
m_TaskQueue.push(task);
pthread_mutex_unlock(&m_TaskLock);
//WS_LOG(Info, "addProtoTask size:%u", m_TaskQueue.size());
return true;
}
WS_LOG(Error, "addProtoTask have too match! size:%u", m_TaskQueue.size());
return false;
}
//調整線程規模
uint32_t ThreadProtoPool::updateThreadNum(int32_t aNum)
{
if (aNum > 0)
{
// 創建線程
for (uint32_t i = 0; i < aNum; ++i)
{
// 創建線程
Thread* pNewThread = new Thread;
uint32_t pid = (uint32_t)pNewThread->CreateThread((void*)this, &main_thread_proto_runing);
assert(pid != 0);
}
}
else
{
// 不需要鎖
m_stopNum = -1 * aNum;
}
}
// 所有異步I/O的地方都需要調用該函數
bool ThreadProtoPool::yieldIO()
{
uint32_t pid = pthread_self();
std::map<uint32_t, WS_T_P_CONTROL_T*>::iterator it = m_thread_info.find(pid);
if (it != m_thread_info.end())
{
WS_T_P_CONTROL_T* pControl = it->second;
assert(NULL != pControl);
THREAD_PROTO_T *pProto = pControl->m_protos[pControl->m_runningId];
pProto->m_state = WS_THREAD_POTO_SUSPEND; // 掛起
++pControl->m_runningId; // 下一個協程
// 切回到主協程
swapcontext(&(pProto->m_ctx), &(pControl->m_main));
return true;
}
return false;
}
// 結束協程池
bool ThreadProtoPool::endPool()
{
m_ustopPool = false;
// 等待銷毀
while (m_thread_info.size() > 0)
{
sleep(1);
}
}
//============================================分割線 測試test.cpp==================================//
// thread_test.cpp : 定義控制台應用程序的入口點。
//
#include "ws_thread_pool.h"
#include "iostream"
#include "ws_thread_proto.h"
#include <string.h>
using namespace ws_thread;
using namespace std;
struct someThin
{
public:
uint32_t i;
};
ThreadProtoPool one_proto_pool(2);
#ifdef WS_WINDOWS
static unsigned int __stdcall RUN(void* pObj)
{
for (uint32_t i = 0; i < 10; ++i)
{
someThin* pST = ((someThin*)pObj);
pST->i = pST->i + 1;
WS_LOG(Info, "線程值 :%d thread_id:%d", pST->i, GetCurrentThreadId());
}
return NULL;
}
#else
static void* RUN(void* pObj)
{
someThin* pST = ((someThin*)pObj);
int* pone = new int[65535];
//int one[65536] = {0};
memset(pone, 0, 65535*sizeof(int));
pST->i = pST->i + 1;
WS_LOG(Info, "1: num :%d thread_id:%d %d", pST->i, pthread_self(), pone[0]);
one_proto_pool.yieldIO();
pST->i = pST->i + 1;
pone[0]++;
WS_LOG(Info, "2: num :%d thread_id:%d %d", pST->i, pthread_self(), pone[0]);
delete[] pone;
return NULL;
}
#endif
int main()
{
int i = 0;
//ThreadPool one_pool(40);
someThin one;
one.i = 0;
for (uint32_t j = 0; j < 100000; ++j)
{
//one.i = j;
WS_LOG(Info, "yuanshizhi :%d ", j);
//one_pool.addTheadTask(&RUN, &one);
one_proto_pool.addProtoTask(&RUN, &one);
usleep(10000);
}
cin >> i;
return 0;
}