在面向對象的系統中,當一個對象接收到一條消息時,可能會發生一系列的事件。通常,這些事件是以 同步(synchronous) 模式處理的:調用進程或向這個對象發送消息的線程在發送消息調用完成之前都會接收並處理一系列事件。然而,如果產生這些事件的對象是由多個進程進行共享並且保存在共享內存中時,情況就稍微有些不同了。
本文將使用兩種 C++ 設計模式來詳細介紹這種情況,並使用一些樣例代碼來展示這種解決方案(這些樣例代碼可以從本文 下載 一節中獲得):
我們將簡要介紹不使用共享內存的樣例代碼。 使用第一種設計模式來修改這些代碼,讓其使用共享內存。 然後闡述如何使用第二種設計模式來實現進程間通信(IPC)。您可以在任何機器體系架構、操作系統和編譯器上應用這兩種設計模式中的概念。對於本文來說,我們使用的是 RedHat Linux 7.1 for 32-bit x86 Intel® 體系架構的發行版;使用 GNU C++ compiler version 3.2.3 編譯器及其相關工具來編譯並測試樣例程序。
不使用共享內存
下面讓我們開始介紹這個樣例程序,首先是不使用共享內存的程序:
清單 1. common.h#ifndef __COMMON_H__ #define __COMMON_H__ class IObjectWithEvents { public: class IEventSink { public: virtual void OnEvent(pid_t pid, const char * msg) = 0; }; static IObjectWithEvents * getInstance(); virtual bool AddEventHandler(IEventSink * pEI) = 0; virtual void SendMessage() = 0; }; #endif //__COMMON_H__
接口類 IObjectWithEvents 包含了一個內嵌的接口類 IEventSink,它定義了 OnEvent() 方法。這個事件處理程序接收一個發送者的 id 和一個字符串消息。getInstance() 方法返回對共享內存中對象的引用,AddEventHandler() 注冊一個事件處理程序,SendMessage() 向這個對象發送一條消息。由於不需要引用共享內存,所以可以像清單 2 中那樣來使用 IObjectWithEvents:
清單 2. shm-client1.cpp#include <iostream> #include <sys/types.h> #include <unistd.h> #include "common.h" #define HERE __FILE__ << ":" << __LINE__ << " " using namespace std; class EventSink : public IObjectWithEvents::IEventSink { public: void OnEvent(pid_t pid, const char * msg) { cout << HERE << "Message from pid(" << pid << ")\t : " << msg << endl; } }; int main() { IObjectWithEvents * powe = IObjectWithEvents::getInstance(); EventSink sink; powe->AddEventHandler(&sink); powe->SendMessage(); return 0; }
類 EventSink 提供了這個事件處理程序的實現。主函數中給出了用於發送消息和處理事件的標准序列。
ObjectWithEvents 的典型實現如清單 3、4 所示:
清單 3. ObjectWithEvents.h#include "common.h" class ObjectWithEvents : public IObjectWithEvents { public: // We assume singleton design pattern for illustration static ObjectWithEvents * ms_pObjectWithEvents; ObjectWithEvents(); //the implementation for IObjectWithEvents void FireEvent(); virtual bool AddEventHandler(IEventSink * pEI); virtual void SendMessage(); //Collection for maintaining events enum { MAX_EVENT_HANDLERS = 16, }; long m_npEI; IEventSink * m_apEI[MAX_EVENT_HANDLERS]; pid_t m_alPID[MAX_EVENT_HANDLERS]; };
清單 4. ObjectWithEvents.cpp
#include <iostream> #include <sys/types.h> #include <sys/shm.h> #include <unistd.h> #include <pthread.h> #include "ObjectWithEvents.h" using namespace std; ObjectWithEvents * ObjectWithEvents::ms_pObjectWithEvents = NULL; IObjectWithEvents * IObjectWithEvents::getInstance() { // the following commented code is for illustration only. /* if (NULL == ObjectWithEvents::ms_pObjectWithEvents) { ObjectWithEvents::ms_pObjectWithEvents = new ObjectWithEvents(); } */ return ObjectWithEvents::ms_pObjectWithEvents; } ObjectWithEvents::ObjectWithEvents() : m_npEI(0) { } void ObjectWithEvents::FireEvent() { // iterate through the collection for (long i = 0; i < m_npEI; i++) { //Recheck for NULL if (0 != m_apEI[i]) { // Fire the event m_apEI[i]->OnEvent(m_alPID[i], ""); } } return; } bool ObjectWithEvents::AddEventHandler(IEventSink * pEI) { // NULL check if (NULL == pEI) { return false; } // check if there is space for this event handler if (MAX_EVENT_HANDLERS == m_npEI) { return false; } // Add this event handler to the collection m_alPID[m_npEI] = getpid(); m_apEI[m_npEI++] = pEI; return true; } void ObjectWithEvents::SendMessage() { //Some processing //And then fire the event FireEvent(); return; }
清單 4 中的代碼可以使用下面的腳本來編譯:
g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp
在運行 shm_client 時,應該可以看到下面的輸出:
$ ./shm_client shm_client1.cpp:16 Message from pid(3920) :
使用共享內存:沒有事件緩存
現在,為了在共享內存中對 ObjectWithEvents 進行實例化,我們需要修改 ObjectWithEvents 的實現。
清單 5. 修改 ObjectWithEvents.cpp// To add a declaration for the "new" operator: class ObjectWithEvents : public IObjectWithEvents { public: void * operator new(unsigned int); }; // To include an additional header for the Initializer class: #include "Initializer.h" // To overload the operator "new": void * ObjectWithEvents::operator new(unsigned int) { return ms_pObjectWithEvents; } // Then, FireEvent is completely changed: void ObjectWithEvents::FireEvent() { // We need to serialize all Access to the collection by more than one process int iRetVal = Initializer::LockMutex(); if (0 != iRetVal) { return; } pid_t pid = getpid(); // iterate through the collection and fire only events belonging to the current process for (long i = 0; i < m_npEI; i++) { // Check whether the handler belongs to the current process. if (pid != m_alPID[i]) { continue; } //Recheck for NULL if (0 != m_apEI[i]) { m_apEI[i]->OnEvent(pid, ""); } } // release the mutex if ((0 == iRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. } return; } // The following are changes to ObjectWithEvents::AddEventHandler(): // 1. Before accessing the collection, we lock the mutex: int bRetVal = Initializer::LockMutex(); if (0 != bRetVal) { return false; } // 2. After accessing the collection, we release the mutex: if ((0 == bRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. }
要對共享內存中的對象進行實例化,請定義另外一個類 Initializer。
清單 6. Initializer.h#ifndef __Initializer_H__ #define __Initializer_H__ class Initializer { public : int m_shmid; static Initializer ms_Initializer; Initializer(); static pthread_mutex_t ms_mutex; static int LockMutex(); static int UnlockMutex(); }; #endif // __Initializer_H__
Initializer 定義了共享內存的 id m_shmid 和一個用來處理同步事件的信號量 ms_mutex。
函數 LockMutex() 負責對這個互斥體進行加鎖,UnlockMutex() 對這個互斥體進行解鎖。
Initializer 的實現如清單 7 所示:
清單 7. Initializer.cpp#include <iostream> #include <sys/types.h> #include <sys/shm.h> #include <unistd.h> #include <pthread.h> #include "Initializer.h" #include "ObjectWithEvents.h" using namespace std; Initializer Initializer::ms_Initializer; pthread_mutex_t Initializer::ms_mutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; Initializer::Initializer() : m_shmid(-1) { bool bCreated = false; key_t key = 0x1234; m_shmid = shmget(key,sizeof(ObjectWithEvents), 0666); if (-1 == m_shmid) { if(ENOENT != errno) { cerr<<"Critical Error"<<endl; return; } m_shmid = shmget(key, sizeof(ObjectWithEvents), IPC_CREAT0666); if (-1 == m_shmid ) { cout << " Critical Error " << errno<< endl; return; } bCreated = true; } ObjectWithEvents::ms_pObjectWithEvents = (ObjectWithEvents*)shmat(m_shmid,NULL,0); if (NULL == ObjectWithEvents::ms_pObjectWithEvents) { cout << " Critical Error " << errno << endl; return; } if (true == bCreated) { ObjectWithEvents * p = new ObjectWithEvents(); } // Create a mutex with no initial owner. pthread_mutex_init(&ms_mutex, NULL); } int Initializer::LockMutex() { // Request ownership of mutex. pthread_mutex_lock(&ms_mutex); if(EDEADLK == errno) { cout << "DeadLock" << endl; return -1; } return 0; } int Initializer::UnlockMutex() { return pthread_mutex_unlock(&ms_mutex); }
如果共享內存尚不存在,就創建共享內存;並在其中創建對象。如果共享內存已經存在了,就跳過對象的構建。Initializer::m_shmid 記錄了這個標識符,ObjectWithEvents::ms_pObjectWithEvents 記錄了對這個共享對象的引用。
即使在所有進程都與之脫離之後,這個共享內存也不會被銷毀。這樣您就可以使用 ipcrm 顯式地刪除,或者使用 ipcs 命令進行查看。測試程序的編譯方式如下:
g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp
控制台上運行這個程序的結果如下所示:
清單 8. 控制台結果$ ./shm_client shm_client1.cpp:16 Message from pid(4332) : $ ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00001234 327686 sachin 666 136 0 $ ./shm_client shm_client1.cpp:16 Message from pid(4333) : $ ipcrm -m 327686
ObjectWithEvents 實例中匯集了來自各個進程的事件。它可以只釋放出當前進程所注冊的事件。這種設計模式闡述了兩點:
任何對一組事件的訪問都由一個互斥對象來保護。 在事件發出前,使用進程 ID 進行過濾。用於 IPC 的共享內存和事件緩存
現在讓我們來看一下如何使用共享內存和事件的緩存進行進程間通信。如果事件是在共享對象中進行緩存的,那麼它們可能會稍後才發出。接收進程必須要查詢這個共享對象到底發生了什麼事件。因此,通過采用一個同步模型,就可以實現進程間通信。這就是開發下面這種設計模式的動機。
給 IObjectWithEvents 添加兩個方法,如下所示:
清單 9. 給 IObjectWithEvents 添加方法class IObjectWithEvents { public: virtual bool EnqueueEvent(const char * msg) = 0; virtual bool PollForEvents() = 0; };
EnqueueEvent() 簡單地在共享對象中添加事件的緩存,PollForEvents() 則會對這些緩存數據進行檢索。
shm_client1 將使用 EnqueueEvent() 方法,如下所示:
powe->EnqueueEvent("Message from shm_client1");
shm_client2(實際上是 shm_client1 的一個拷貝)會使用 PollForEvents() 方法,如下所示:
powe->EnqueueEvent("Message from shm_client2"); powe->PollForEvents();
另外,我們給 ObjectWithEvents 增加了點東西,如下所示:
清單 10. 對 ObjectWithEvents 的改動class ObjectWithEvents : public IObjectWithEvents { public: virtual bool EnqueueEvent(const char * msg); virtual bool PollForEvents(); //The event cache enum { MAX_EVENTS = 16, MAX_EVENT_MSG = 256, }; long m_nEvents; pid_t m_alPIDEvents[MAX_EVENTS]; char m_aaMsgs[MAX_EVENTS][MAX_EVENT_MSG]; };
這些生成了新的構造函數:
ObjectWithEvents::ObjectWithEvents() : m_npEI(0), m_nEvents(0) { }
EnqueueEvent() 將事件(例如每個發出的事件的消息和進程 id)加入一個隊列中。 PollForEvents() 負責遍歷這個隊列,並逐一對隊列中的事件調用 OnEvent()。
清單 11. EnqueueEventbool ObjectWithEvents::EnqueueEvent(const char * msg) { if (NULL == msg) { return false; } if (MAX_EVENTS == m_nEvents) { //IEventSink collection full return false; } int bRetVal = Initializer::LockMutex(); if (0 != bRetVal) { return false; } m_alPIDEvents[m_nEvents] = getpid(); strncpy(m_aaMsgs[m_nEvents++], msg, MAX_EVENT_MSG - 1); if ((0 == bRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. } return true; } bool ObjectWithEvents::PollForEvents() { if (0 == m_nEvents) { return true; } int bRetVal = Initializer::LockMutex(); if (0 != bRetVal) { return false; } pid_t pid = getpid(); for (long i = 0; i < m_npEI; i++) { // Does the handler belongs to current process ? if (pid != m_alPID[i]) { continue; } //Recheck for NULL if (0 == m_apEI[i]) { continue; } for (long j = 0; j < m_nEvents; j++) { m_apEI[i]->OnEvent(m_alPIDEvents[j], m_aaMsgs[j]); } } if ((0 == bRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. } return true; }
現在試著運行一下編譯腳本:
g++ -g -o shm_client1 shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp g++ -g -o shm_client2 shm_client2.cpp ObjectWithEvents.cpp Initializer.cpp
控制台上的輸出應該如下所示:
清單 12. shm_client1 和 shm_client2 的輸出$ ./shm_client1 $ ./ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00001234 360454 sachin 666 4300 0 $ ./shm_client2 shm_client2.cpp:16 Message from pid(4454) : Message from shm_client1 shm_client2.cpp:16 Message from pid(4456) : Message from shm_client2