先說第一點,線程(進程)間通信有很多種方式(pipe,socketpair),為什麼這裡選擇eventfd?
eventfd 是一個比 pipe 更高效的線程間事件通知機制,一方面它比 pipe 少用一個 file descripor,節省了資源;另一方面,eventfd 的緩沖區管理也簡單得多,全部“buffer” 只有定長8 bytes,不像 pipe 那樣可能有不定長的真正 buffer。
最重要的一點:當我們想要編寫並發型服務器的時候,eventfd 可以完美取代 pipe去通知(喚醒)其他的進程(線程)。比如經典的異步IO reactor/selector 應用場景,去喚醒select的調用。可以和事件通知機制完美的的結合。
(一)eventfd
#includeint eventfd(unsigned int initval, intflags);
簡單的應用示例:
#include#include #include #include #include /* Definition of uint64_t */ #define handle_error(msg) \ do { perror(msg); exit(EXIT_FAILURE); } while (0) int main(int argc, char *argv[]) { uint64_t u; int efd = eventfd(10, 0); if (efd == -1) handle_error("eventfd"); int ret = fork(); if(ret == 0) { for (int j = 1; j < argc; j++) { printf("Child writing %s to efd\n", argv[j]); u = atoll(argv[j]); ssize_t s = write(efd, &u, sizeof(uint64_t)); if (s != sizeof(uint64_t)) handle_error("write"); } printf("Child completed write loop\n"); exit(EXIT_SUCCESS); } else { sleep(2); ssize_t s = read(efd, &u, sizeof(uint64_t)); if (s != sizeof(uint64_t)) handle_error("read"); printf("Parent read %llu from efd\n",(unsigned long long)u); exit(EXIT_SUCCESS); } }
先看一下這四個函數總體的流程圖:


依次解釋:
// 該函數可以跨線程調用
void EventLoop::quit()
{
quit_ = true;
if (!isInLoopThread())
{
wakeup();
}
}
//使用eventfd喚醒
void EventLoop::wakeup()
{
uint64_t one = 1;
//ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
ssize_t n = ::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
如果不是當前IO線程調用quit,則需要喚醒(wakeup())當前IO線程,因為它可能還阻塞在poll的位置(EventLoop::loop()),這樣再次循環判斷 while (!quit_) 才能退出循環。
// 事件循環,該函數不能跨線程調用
// 只能在創建該對象的線程中調用
void EventLoop::loop()
{// 斷言當前處於創建該對象的線程中
assertInLoopThread();
while (!quit_)
{
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
}
// 為了使IO線程在空閒時也能處理一些計算任務
// 在I/O線程中執行某個回調函數,該函數可以跨線程調用
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
{
// 如果是當前IO線程調用runInLoop,則同步調用cb
cb();
}
else
{
// 如果是其它線程調用runInLoop,則異步地將cb添加到隊列,讓IO線程處理
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
// 調用queueInLoop的線程不是當前IO線程則需要喚醒當前IO線程,才能及時執行doPendingFunctors();
// 或者調用queueInLoop的線程是當前IO線程(比如在doPendingFunctors()中執行functors[i]() 時又調用了queueInLoop())
// 並且此時正在調用pending functor,需要喚醒當前IO線程
// 因為在此時doPendingFunctors() 過程中又添加了任務,故循環回去poll的時候需要被喚醒返回,進而繼續執行doPendingFunctors()
// 只有當前IO線程的事件回調中調用queueInLoop才不需要喚醒
// 即在handleEvent()中調用queueInLoop 不需要喚醒,因為接下來馬上就會執行doPendingFunctors();
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
// 該函數只會被當前IO線程調用
void EventLoop::doPendingFunctors()
{
std::vector functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
關於doPendingFunctors 的補充說明: