先說第一點,線程(進程)間通信有很多種方式(pipe,socketpair),為什麼這裡選擇eventfd?
eventfd 是一個比 pipe 更高效的線程間事件通知機制,一方面它比 pipe 少用一個 file descripor,節省了資源;另一方面,eventfd 的緩沖區管理也簡單得多,全部“buffer” 只有定長8 bytes,不像 pipe 那樣可能有不定長的真正 buffer。
最重要的一點:當我們想要編寫並發型服務器的時候,eventfd 可以完美取代 pipe去通知(喚醒)其他的進程(線程)。比如經典的異步IO reactor/selector 應用場景,去喚醒select的調用。可以和事件通知機制完美的的結合。
(一)eventfd
#includeint eventfd(unsigned int initval, intflags);
簡單的應用示例:
#include#include #include #include #include /* Definition of uint64_t */ #define handle_error(msg) \ do { perror(msg); exit(EXIT_FAILURE); } while (0) int main(int argc, char *argv[]) { uint64_t u; int efd = eventfd(10, 0); if (efd == -1) handle_error("eventfd"); int ret = fork(); if(ret == 0) { for (int j = 1; j < argc; j++) { printf("Child writing %s to efd\n", argv[j]); u = atoll(argv[j]); ssize_t s = write(efd, &u, sizeof(uint64_t)); if (s != sizeof(uint64_t)) handle_error("write"); } printf("Child completed write loop\n"); exit(EXIT_SUCCESS); } else { sleep(2); ssize_t s = read(efd, &u, sizeof(uint64_t)); if (s != sizeof(uint64_t)) handle_error("read"); printf("Parent read %llu from efd\n",(unsigned long long)u); exit(EXIT_SUCCESS); } }
先看一下這四個函數總體的流程圖:
依次解釋:
// 該函數可以跨線程調用 void EventLoop::quit() { quit_ = true; if (!isInLoopThread()) { wakeup(); } } //使用eventfd喚醒 void EventLoop::wakeup() { uint64_t one = 1; //ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); ssize_t n = ::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }
如果不是當前IO線程調用quit,則需要喚醒(wakeup())當前IO線程,因為它可能還阻塞在poll的位置(EventLoop::loop()),這樣再次循環判斷 while (!quit_) 才能退出循環。
// 事件循環,該函數不能跨線程調用 // 只能在創建該對象的線程中調用 void EventLoop::loop() {// 斷言當前處於創建該對象的線程中 assertInLoopThread(); while (!quit_) { pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); eventHandling_ = true; for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); } }
// 為了使IO線程在空閒時也能處理一些計算任務 // 在I/O線程中執行某個回調函數,該函數可以跨線程調用 void EventLoop::runInLoop(const Functor& cb) { if (isInLoopThread()) { // 如果是當前IO線程調用runInLoop,則同步調用cb cb(); } else { // 如果是其它線程調用runInLoop,則異步地將cb添加到隊列,讓IO線程處理 queueInLoop(cb); } }
void EventLoop::queueInLoop(const Functor& cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } // 調用queueInLoop的線程不是當前IO線程則需要喚醒當前IO線程,才能及時執行doPendingFunctors(); // 或者調用queueInLoop的線程是當前IO線程(比如在doPendingFunctors()中執行functors[i]() 時又調用了queueInLoop()) // 並且此時正在調用pending functor,需要喚醒當前IO線程 // 因為在此時doPendingFunctors() 過程中又添加了任務,故循環回去poll的時候需要被喚醒返回,進而繼續執行doPendingFunctors() // 只有當前IO線程的事件回調中調用queueInLoop才不需要喚醒 // 即在handleEvent()中調用queueInLoop 不需要喚醒,因為接下來馬上就會執行doPendingFunctors(); if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }
// 該函數只會被當前IO線程調用 void EventLoop::doPendingFunctors() { std::vector關於doPendingFunctors 的補充說明:functors; callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } callingPendingFunctors_ = false; }