很久以前做過ACE + MFC/QT 的中輕量級線程池應用,大概就是利用線程池執行客戶機上的運算需求,將結果返回。ACE是跨平台重量級的通信中間件,與常見的應用程序框架需要精心契合,才能不出問題。最近想到既然QT框架本身就已經具有各類功能,何不玩一玩呢,那就開搞!這個實驗的代碼可以從我的資源內下載。
第一步打算實現的模式,我們需要一個設置為CPU核心數的線程池,這個線程池可以異步接受N個數據生產者傳入的數據,均衡的分配處理任務,處理後的數據返回給某1個或者幾個消費者。有兩種均衡方法。一種是生產者粒度的均衡。同一個生產者的各批數據FIFO順序不被打破,這需要判斷,當處理線程隊列中還有該生產者的數據時,不改變當前處理線程。第二種是數據粒度的並行,某個生產者傳來的數據被分配到不同的線程,不保證後到的數據後被處理(也可能先到的處理的慢,後到的快)。
這種異步隊列機制如果在MFC、WinAPI中,需要手工使用 Mutex 同步隊列,更可惡的是分配的數據對象的生存期非常微妙,一不小心就會出紅叉叉。QT首先為我們提供了信號和槽的機制,且該機制原生支持跨線程。假設我們在16核心服務器上,則使用 15個 QThread對象管理15組工作線程(留一個給主界面)。但是,如果仔細看了QT的文檔,就會發現QThread的信號事件循環默認是在創建者中(很多時候就是主線程!),所以,要想讓槽在子線程運行,一般是派生一個QObject的類,並把對象MoveToThread到某個QThread管理的線程上去。這樣,信號和槽就是全異步FIFO了。其次,QT提供了引用計數的QByteArray封裝,這個東西在參數傳遞的時候,速度很快,很少出現memcpy,生存期也特別容易控制。雖然C++11裡有 shared_ptr<T>,但是那個東西還是需要在一開始new 一個int8型的存儲區,很討厭。
說了這麼多,上關鍵代碼。
先是線程池的封裝qghthreadengine.h
- #ifndef QGHTHREADENGINE_H
- #define QGHTHREADENGINE_H
-
- #include <QObject>
- #include <QThread>
- #include <QVector>
- #include <QList>
- #include <QMap>
- #include <QMutex>
- #include "qghthreadtaskitem.h"
- #include "qghthreadobject.h"
-
- //線程池引擎,幫助用戶進行動態平衡
- class QGHThreadEngine : public QObject
- {
- Q_OBJECT
- public:
- QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads = 2,bool bFIFOKeep = true);
- ~QGHThreadEngine();
- protected:
- QVector<QThread *> m_ThreadPool;
- QVector<QGHThreadObject *> m_ThreadObjs;
- QGHThreadTaskItem * m_pThreadTaskItem;
- int m_nThreads;
- bool m_bFIFOKeep;
- private:
- //各個m_ThreadPool\m_ThreadObjs的任務數
- QMap<QObject *,qint32> m_map_Tasks;
- //m_bFIFOKeep == true 時,下面兩個成員將保證非空閒的單個 data_source 將始終在單一線程處理
- //各個data_source 目前的處理線程
- QMap<QObject *,QObject *> m_map_busy_source_task;
- //各個data_source 目前的排隊數目
- QMap<QObject *,int> m_map_busy_source_counter;
- public:
- void SetThreadTaskItem(QGHThreadTaskItem * pTaskItem);
- QList<qint32> CurrentLoad()
- {
- return m_map_Tasks.values();
- }
- public slots:
- void append_new(QObject * data_source, const QByteArray & data);
- //捕獲QGHThreadObject::sig_process_finished, 以便管理data_source的 FIFO 順序
- void on_sig_process_finished(QObject * data_source);
- signals:
- //************************************
- // Method: do_task
- // FullName: QGHThreadEngine::do_task
- // Access: public
- // Returns: void
- // Qualifier:
- // Parameter: QObject * 任務來源 (相同任務源的任務,在隊列非空時會被安排到同一個線程處理,以確保對相同源的FIFO)
- // Parameter: QByteArray 任務體
- // Parameter: QObject * 處理任務的線程對象(QGHThreadObject)
- //************************************
- void do_task(QObject *, const QByteArray &,QObject *);
- };
-
- #endif // QGHTHREADENGINE_H
實現qghthreadengine.cpp:
- #include "qghthreadengine.h"
- #include <assert.h>
- QGHThreadEngine::QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads,bool bFIFOKeep)
- : QObject(parent),
- m_nThreads(nThreads),
- m_pThreadTaskItem(pTaskItem),
- m_bFIFOKeep(bFIFOKeep)
- {
- assert(nThreads>0 && nThreads<512 && pTaskItem!=NULL);
- //創建固定數目的線程
- for (int i=0;i<nThreads;i++)
- {
- QThread * pNewThread = new QThread(this);
- QGHThreadObject * pNewObject = new QGHThreadObject(0,pTaskItem);
- //記錄下來
- m_ThreadPool.push_back(pNewThread);
- m_ThreadObjs.push_back(pNewObject);
- m_map_Tasks[pNewObject] = 0;
- pNewThread->start();
- //把QGHThreadObject的信號、曹處理搬移到子線程內
- pNewObject->moveToThread(pNewThread);
- //連接處理完成消息
- connect(pNewObject,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));
- //連接處理新任務消息
- connect(this,SIGNAL(do_task(QObject *, const QByteArray &,QObject *)),pNewObject,SLOT(process(QObject *, const QByteArray &,QObject *)));
-
- }
- }
-
- QGHThreadEngine::~QGHThreadEngine()
- {
- foreach(QGHThreadObject * obj,m_ThreadObjs)
- {
- disconnect(obj,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));
- obj->deleteLater();
- }
- foreach(QThread * th ,m_ThreadPool)
- {
- disconnect(this,SIGNAL(do_task(QObject *, QByteArray,QObject *)),th,SLOT(process(QObject *, QByteArray,QObject *)));
- th->exit(0);
- th->wait();
- }
- }
-
- //負載均衡添加任務,生產者的信號要掛接到這個槽上
- void QGHThreadEngine::append_new(QObject * data_source, const QByteArray & data)
- {
- QObject * pMinObj = 0;
- //對一批來自同一數據源的數據,使用同樣的數據源處理,以免發生多線程擾亂FIFO對單個data_source的完整性
- if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end()&& m_bFIFOKeep==true)
- {
- m_map_busy_source_counter[data_source]++;
- pMinObj = m_map_busy_source_task[data_source];
- }
- else
- {
- qint32 nMinCost = 0x7fffffff;
- //尋找現在最空閒的一個線程
- for (QMap<QObject *,qint32>::iterator p = m_map_Tasks.begin();p!=m_map_Tasks.end();p++)
- {
- if (p.value()< nMinCost)
- {
- nMinCost = p.value();
- pMinObj = p.key();
- }
- }
- if (pMinObj)
- {
- m_map_busy_source_counter[data_source] = 1;
- m_map_busy_source_task[data_source] = pMinObj;
- }
- }
- if (pMinObj)
- {
- m_map_Tasks[pMinObj]++;
- emit do_task(data_source,data,pMinObj);
- }
- }
- void QGHThreadEngine::on_sig_process_finished(QObject * data_source)
- {
- if (m_map_Tasks.find(sender())!=m_map_Tasks.end())
- {
- m_map_Tasks[sender()]--;
- }
- if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end())
- {
- m_map_busy_source_counter[data_source]--;
- if (m_map_busy_source_counter[data_source]<=0)
- {
- m_map_busy_source_counter.remove(data_source);
- m_map_busy_source_task.remove(data_source);
- }
- }
- }
用於綁定的 qghthreadobject.h
- #ifndef QGHTHREADOBJECT_H
- #define QGHTHREADOBJECT_H
- #include <QObject>
- #include "qghthreadtaskitem.h"
- //用於在子線程內具體承擔事件循環的類,用戶無需重載
- class QGHThreadObject:public QObject
- {
- Q_OBJECT
-
- public:
- QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem);
- ~QGHThreadObject();
- public:
- void SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem);
- public slots:
- //************************************
- // Method: process
- // FullName: QGHThreadObject::process
- // Access: public
- // Returns: void
- // Qualifier:
- // Parameter: QObject * 任務來源 (相同任務源的任務,在隊列非空時會被安排到同一個線程處理,以確保對相同源的FIFO)
- // Parameter: QByteArray 任務體
- // Parameter: QObject * 處理任務的線程對象(QGHThreadObject)
- //************************************
- void process(QObject * data_source, const QByteArray &data,QObject * target);
- private:
- QGHThreadTaskItem * m_pThreadTaskItem;
- signals:
- //信號,表示一次處理已經完成。QGHThreadEngine捕獲該信號,管理data_source的 FIFO 順序
- void sig_process_finished(QObject * data_source);
- };
- #endif
相應實現qghthreadobject.cpp
- #include "qghthreadobject.h"
- #include <assert.h>
-
- QGHThreadObject::QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem)
- : QObject(parent),
- m_pThreadTaskItem(pThreadTaskItem)
- {
- assert(pThreadTaskItem!=NULL);
-
- }
-
- QGHThreadObject::~QGHThreadObject()
- {
- }
- void QGHThreadObject::process(QObject * data_source, const QByteArray &data,QObject * target)
- {
- if (target==this)
- {
- m_pThreadTaskItem->run(data_source,data);
- emit sig_process_finished(data_source);
- }
- }
-
- void QGHThreadObject::SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem)
- {
- assert(pThreadTaskItem!=NULL);
- m_pThreadTaskItem = pThreadTaskItem;
- }
最後,是供用戶重載的實際處理方法的純虛基類qghthreadtaskitem.h
- #ifndef QGHTHREADTASKITEM_H
- #define QGHTHREADTASKITEM_H
- #include <QObject>
- //用戶重載該類,實現自定義方法的線程池調用
- class QGHThreadTaskItem:public QObject
- {
- Q_OBJECT
-
- public:
- QGHThreadTaskItem(QObject *parent);
- ~QGHThreadTaskItem();
- public:
- virtual void run(QObject * task_source, const QByteArray & data_array) = 0;
-
- };
- #endif
下次,繼續寫如何實現一個TCP鏈路,讓這個線程池活起來。