概述
epoll是linux提供一種多路復用的技術,類似各個平台都支持的select,只是epoll在內核的實現做了 更多地優化,可以支持比select更多的文件描述符,當然也支持 socket這種網絡的文件描述符。linux上 的大並發的接入服務器,目前的實現方式肯定都通過epoll實現。
epoll和線程
有很多開發人員用epoll的時候,會開多個線程來進行數據通信,比如一個線程專門accept(我個人早 些年在FreeBSD用kqueue的時候,由於對內部機制沒有基本了解也這樣搞),一個線程收發,或者接收和發 送都用各自獨立的線程。
通常情況下,accept獨立線程是沒必要的,accept對於內核而言,就應用程序從內核的未完成的SYN隊 列讀取一點數據而已。具體參見 accept部分:
TCP三次握手過程與對應的Berkeley Socket APIs的介紹
收發獨立成兩個線程也沒有必要,因為大部分的應用服務器,通常情況下,啟動一個線程收發數據, 最大數據的收發量瓶頸在於網卡,而不是CPU;像網游接入服務器配置一個KM的網卡,很少有游戲會申請 1G的帶寬,那一台機器得有多少數據輸入和輸出。所以我們通信線程為epoll服務器就夠了。
epoll的基本原理
為了讓某些朋友能讀得更連慣,我還是說一下epoll基本原理。
epoll外部表現和select是一樣的。他提供READ, WRITE和ERROR等事件。
大致流程像下面這樣:
1. 應用注冊感興趣的事件到內核;
2. 內核在某種條件下,將事件通知應用程序;
3. 應用程序收到事件後,根據事件類型做對應的邏輯處理。
原理部分我再說一下,不容易理解的地方,包括水平觸發和邊緣觸發,WRITE事件的事件利用(這個可 以結合參考文獻1的kqueue的WRITE事件,原理一致的)和修改事件的細節。
水平觸發
READ事件,socket recv buff有數據,將一直向應用程序通知,直到buff為空。
WRITE事件,socket send buff從滿的狀態到可發送數據,將一直通知應用程序,直到buff滿。
邊緣觸發
READ事件,socket recv buff有數據了,只通知應用一次,不管應用程序有沒有調用read api,接下 來都不通知了。
WRITE事件,socket send buff從滿的狀態到可以發送數據,只通知一次。
上面這個解釋不知道大家能否理解,也只能這樣說了。有疑問的做一下試驗。另外,這些細節的東西 ,前幾年固定下來後,這幾年做的項目,是直接用的,也就很少在涉及細節,是憑理解和記憶寫下的文字 ,萬一有錯請指正^-^。
WRITE事件的利用
這個還一下不好描述。大概描述一下,詳細看參考文獻1。大致這樣:
1. 邏輯層寫數據到應用層的發送buff,向epoll注冊一下WRITE事件;
2. 這時epoll會通知應用程序一個WRITE事件;
3. 在WRITE事件響應函數裡,從應用層的發送buff讀數據,然後用socket send api發送。
因為我在很多實際項目中,看到大家沒有利用epoll的WRITE的事件來發數據,特意地說一下。大部分 的項目,是直接輪詢應用程序的發送隊列,我早期項目也是這麼干的。
epoll的修改事件
對於這個我的映像比較深刻。epoll的修改事件比較坑爹,不能單獨修改某個事件!怎麼說呢?比如 epoll裡已經注冊了READ&WRITE事件,你如果想單單重注冊一下WRITE事件而且READ事件不變,epoll 的epoll_ctl API是做不到的,你必須同時注冊READ&WRITE,這個在下面的代碼中可以看到。FreeBSD 的kqueue在這一點完全滿足我們程序員的要求。
抽象epoll API
我把herm socket epoll封裝部分貼出來,讓朋友們參考一下epoll的用法。大部分錯誤拋異常代碼被 我去掉了。
class Multiplexor { public: Multiplexor(int size, int timeout = -1, bool lt = true); ~Multiplexor(); void Run(); void Register(ISockHandler* eh, MultiplexorMask mask); void Remove(ISockHandler* eh); void EnableMask(ISockHandler* eh, MultiplexorMask mask); void DisableMask(ISockHandler* eh, MultiplexorMask mask); private: inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask) { struct epoll_event evt; evt.data.ptr = eh; evt.events = mask; return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1; } private: int m_epfd; struct epoll_event* m_evts; int m_size; int m_timeout; __uint32_t m_otherMasks; };
Multiplexor::Multiplexor(int size, int timeout, bool lt) { m_epfd = epoll_create(size); if (m_epfd == -1) throw HERM_SOCKET_EXCEPTION(ST_OTHER); m_size = size; m_evts = new struct epoll_event[size]; m_timeout = timeout; // sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP m_otherMasks = EPOLLERR | EPOLLHUP; if (!lt) m_otherMasks |= EPOLLET; } Multiplexor::~Multiplexor() { close(m_epfd); delete[] m_evts; } void Multiplexor::Run() { int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout); if (fds == -1) { if (errno == EINTR) return; } for (int i = 0; i < fds; ++i) { __uint32_t evts = m_evts[i].events; ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr); int stateType = ST_SUCCESS; if (evts & EPOLLIN) stateType = eh->OnReceive(); if (evts & EPOLLOUT) stateType = eh->OnSend(); if (evts & EPOLLERR || evts & EPOLLHUP) stateType = ST_EXCEPT_FAILED; if (stateType != ST_SUCCESS) eh->OnError(stateType, errno); } } void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask) { MultiplexorMask masks = mask | m_otherMasks; OperateHandler(EPOLL_CTL_ADD, eh, masks); } void Multiplexor::Remove(ISockHandler* eh) { // Delete fd from epoll, don't need masks OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK); } void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask) { MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK; OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks); } void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask) { MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask); if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks)) throw HERM_SOCKET_EXCEPTION(ST_OTHER); }
上面類就用到epoll_create(), epoll_ctl()和epoll_wait(),以及幾種事件。epoll用起來比select 清爽一些。
大致用法類似下面這樣:
先定義一個Handler
class StreamHandler : public Herm::ISockHandler { public: virtual Herm::Handle GetHandle() const; virtual int OnReceive(int); virtual int OnSend(int); };
在OnReceive()處理收到數據的動作,在OnSend()。。。。
在通信線程中,大概類似這樣的代碼,實際看情況。
Multiplexor multiplexor; StreamHandler sh; multiplexor.Register(&sh, READ_EVT); multiplexor.Run(...);