歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux基礎 >> 關於Linux

Muduo網絡庫源碼分析(三)線程間使用eventfd通信和EventLoop::runInLoop系列函數

先說第一點,線程(進程)間通信有很多種方式(pipe,socketpair),為什麼這裡選擇eventfd?

eventfd 是一個比 pipe 更高效的線程間事件通知機制,一方面它比 pipe 少用一個 file descripor,節省了資源;另一方面,eventfd 的緩沖區管理也簡單得多,全部“buffer” 只有定長8 bytes,不像 pipe 那樣可能有不定長的真正 buffer。

最重要的一點:當我們想要編寫並發型服務器的時候,eventfd 可以完美取代 pipe去通知(喚醒)其他的進程(線程)。比如經典的異步IO reactor/selector 應用場景,去喚醒select的調用。可以和事件通知機制完美的的結合。

(一)eventfd

 

#include 
int eventfd(unsigned int initval, intflags);

 

 

簡單的應用示例:
#include   
#include   
#include   
#include   
#include              /* Definition of uint64_t */  
  
#define handle_error(msg) \  
   do { perror(msg); exit(EXIT_FAILURE); } while (0)  
  
int  
main(int argc, char *argv[])  
{  
   uint64_t u;  
      
   int efd = eventfd(10, 0);  
   if (efd == -1)  
       handle_error("eventfd");  
     
   int ret = fork();  
   if(ret == 0)  
   {  
       for (int j = 1; j < argc; j++) {  
           printf("Child writing %s to efd\n", argv[j]);  
           u = atoll(argv[j]);  
           ssize_t s = write(efd, &u, sizeof(uint64_t));  
           if (s != sizeof(uint64_t))  
               handle_error("write");  
       }  
       printf("Child completed write loop\n");  
  
       exit(EXIT_SUCCESS);  
   }  
   else  
   {  
       sleep(2);  
  
       ssize_t s = read(efd, &u, sizeof(uint64_t));  
       if (s != sizeof(uint64_t))  
           handle_error("read");  
       printf("Parent read %llu from efd\n",(unsigned long long)u);  
       exit(EXIT_SUCCESS);  
   }  
}  

(二)EventLoop::loop、runInLoop、queueInLoop、doPendingFunctors

 

先看一下這四個函數總體的流程圖:

\

\

依次解釋:

 

// 該函數可以跨線程調用
void EventLoop::quit()
{
    quit_ = true;
    if (!isInLoopThread())
    {
        wakeup();
    }
}

//使用eventfd喚醒
void EventLoop::wakeup()
{
  uint64_t one = 1;
  //ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

 

如果不是當前IO線程調用quit,則需要喚醒(wakeup())當前IO線程,因為它可能還阻塞在poll的位置(EventLoop::loop()),這樣再次循環判斷 while (!quit_) 才能退出循環。
// 事件循環,該函數不能跨線程調用
// 只能在創建該對象的線程中調用
void EventLoop::loop()
{// 斷言當前處於創建該對象的線程中
  assertInLoopThread();
    while (!quit_)
    {
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);

        eventHandling_ = true;
        for (ChannelList::iterator it = activeChannels_.begin();
                it != activeChannels_.end(); ++it)
        {
            currentActiveChannel_ = *it;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;
        doPendingFunctors();
    }
}

// 為了使IO線程在空閒時也能處理一些計算任務
// 在I/O線程中執行某個回調函數,該函數可以跨線程調用
void EventLoop::runInLoop(const Functor& cb)
{
  if (isInLoopThread())
  {
    // 如果是當前IO線程調用runInLoop,則同步調用cb
    cb();
  }
  else
  {
    // 如果是其它線程調用runInLoop,則異步地將cb添加到隊列,讓IO線程處理
    queueInLoop(cb);
  }
}

void EventLoop::queueInLoop(const Functor& cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(cb);
  }

  // 調用queueInLoop的線程不是當前IO線程則需要喚醒當前IO線程,才能及時執行doPendingFunctors();

  // 或者調用queueInLoop的線程是當前IO線程(比如在doPendingFunctors()中執行functors[i]() 時又調用了queueInLoop())
  // 並且此時正在調用pending functor,需要喚醒當前IO線程
  // 因為在此時doPendingFunctors() 過程中又添加了任務,故循環回去poll的時候需要被喚醒返回,進而繼續執行doPendingFunctors()

  // 只有當前IO線程的事件回調中調用queueInLoop才不需要喚醒
 //  即在handleEvent()中調用queueInLoop 不需要喚醒,因為接下來馬上就會執行doPendingFunctors();
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

// 該函數只會被當前IO線程調用
void EventLoop::doPendingFunctors()
{
  std::vector functors;
  callingPendingFunctors_ = true;

  {
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);
  }

  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}
關於doPendingFunctors 的補充說明:
1、不是簡單地在臨界區內依次調用Functor,而是把回調列表swap到functors中,這樣一方面減小了臨界區的長度(意味著不會阻塞其它線程的queueInLoop()),另一方面,也避免了死鎖(因為Functor可能再次調用queueInLoop()) 2、由於doPendingFunctors()調用的Functor可能再次調用queueInLoop(cb),這時,queueInLoop()就必須wakeup(),否則新增的cb可能就不能及時調用了 3、muduo沒有反復執行doPendingFunctors()直到pendingFunctors_為空而是每次poll 返回就執行一次,這是有意的,否則IO線程可能陷入死循環,無法處理IO事件。
總結一下就是: 假設我們有這樣的調用:loop->runInLoop(run),說明想讓IO線程執行一定的計算任務,此時若是在當前的IO線程,就馬上執行run();如果是其他線程調用的,那麼就執行queueInLoop(run),將run異步添加到隊列,當loop內處理完事件後,就執行doPendingFunctors(),也就執行到了run();最後想要結束線程的話,執行quit。

參考: 《linux多線程服務端編程》 http://www.2cto.com/os/201604/498859.html
Copyright © Linux教程網 All Rights Reserved