epoll是Linux下高性能的IO復用技術,是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率。另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。epoll除了提供select/poll那種IO事件的水平觸發(Level Triggered)外,還提供了邊緣觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。
為什麼會出現IO復用技術呢,比如在Web應用中,大量的請求連接事件,如果采用多進程方式處理,也就是一個連接對應一個fork來處理,這樣開銷太大了,畢竟創建進程還是很耗資源的;如果采用多線程方式處理,也就是一個連接對應一個線程來處理,當請求並發量上去的話,系統中就會充斥著很多處理線程,畢竟一個系統創建線程是有一定上限的。這時,就需要我們的IO復用技術了。常見的網絡模型中,有多進程+IO復用編程模型,也有多線程+IO復用編程模型,比如大名鼎鼎的nginx默認采用的就是多進程+IO復用技術來處理網絡請求的;開源網絡庫libevent也是基於IO復用技術來完成網絡數據處理的。
epoll是Linux特有的IO復用函數,它在實現和使用上與select和poll有很大差異,首先,epoll使用一組函數來完成操作,而不是單個函數。其次,epoll把用戶關心的文件描述符上的事件放在內核上的一個事件表中,從而無須像select和poll那樣每次調用都要重復傳入文件描述符集合事件表。但epoll需要使用一個額外的文件描述符,來唯一標識內核中這個事件表,這個文件描述符使用如下epoll_create函數創建:
#include <sys/epoll.h> int epoll_create(int size); // 返回:成功返回創建的內核事件表對應的描述符,出錯-1
size參數現在並不起作用,只是給內核一個提示,告訴它內核表需要多大,該函數返回的文件描述符將用作其他所有epoll函數的第一個參數,以指定要訪問的內核事件表。用epoll_ctl函數操作內核事件表
#include <sys/epoll.h> int epoll_ctl(int opfd, int op, int fd, struct epoll_event *event); // 返回:成功返回創建的內核事件表對應的描述符,出錯-1
fd參數是要操作的文件描述符,op指定操作類型,操作類型有3種
event指定事件類型,它是epoll_event結構指針類型:
struct epoll_event { __uint32_t events; /* epoll事件 */ epoll_data_t data; /* 用戶數據 */ };
其中events描述事件類型,epoll支持的事件類型和poll基本相同,表示epoll事件類型的宏是在poll對應的宏加上”E”,比如epoll的數據可讀事件是EPOLLIN,但epoll有兩個額外的事件類型-EPOLLET和EPOLLONESHOT,它們對於高效運作非常關鍵,data用於存儲用戶數據,其類型epoll_data_t定義如下:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; }epoll_data_t;
epoll_data_t是一個聯合體,其4個成員最多使用的是fd,它指定事件所從屬的目標文件描述符,ptr成員可用來指定fd相關的用戶數據,但由於opoll_data_t是一個聯合體,我們不能同時使用fd和ptr,如果要將文件描述符嗯哼用戶數據關聯起來,以實現快速的數據訪問,則只能使用其他手段,比如放棄使用fd成員,而在ptr指針指向的用戶數據中包含fd。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); // 返回:成功返回就緒的文件描述符個數,出錯-1
timeout參數的含義與poll接口的timeout參數相同,maxevents參數指定最多監聽多少個事件,它必須大於0。
epoll_wait如果檢測到事件,就將所有就緒的事件從內核事件表(由epfd指定)中復制到events指定的數組中,這個數組只用來輸epoll_wait檢測到的就緒事件,而不像select和poll的參數數組既傳遞用於用戶注冊的事件,有用於輸出內核檢測到就緒事件,這樣極大提高了應用程序索引就緒文件描述符的效率。
epoll是怎麼實現的呢?其實很簡單,從這3個方法就可以看出,它比select聰明的避免了每次頻繁調用“哪些連接已經處在消息准備好階段”的 epoll_wait時,是不需要把所有待監控連接傳入的。這意味著,它在內核態維護了一個數據結構保存著所有待監控的連接。這個數據結構就是一棵紅黑樹,它的結點的增加、減少是通過epoll_ctrl來完成的。
圖中左下方的紅黑樹由所有待監控的連接構成。左上方的鏈表,同是目前所有活躍的連接。於是,epoll_wait執行時只是檢查左上方的鏈表,並返回左上方鏈表中的連接給用戶。這樣,epoll_wait的執行效率能不高嗎?
最後,再看看epoll提供的2種玩法ET和LT,即翻譯過來的邊緣觸發和水平觸發。其實這兩個中文名字倒也有些貼切。這2種使用方式針對的仍然是效率問題,只不過變成了epoll_wait返回的連接如何能夠更准確些。 例如,我們需要監控一個連接的寫緩沖區是否空閒,滿足“可寫”時我們就可以從用戶態將響應調用write發送給客戶端 。但是,或者連接可寫時,我們的“響應”內容還在磁盤上呢,此時若是磁盤讀取還未完成呢?肯定不能使線程阻塞的,那麼就不發送響應了。但是,下一次epoll_wait時可能又把這個連接返回給你了,你還得檢查下是否要處理。可能,我們的程序有另一個模塊專門處理磁盤IO,它會在磁盤IO完成時再發送響應。那麼,每次epoll_wait都返回這個“可寫”的、卻無法立刻處理的連接,是否符合用戶預期呢? 於是,ET和LT模式就應運而生了。LT是每次滿足期待狀態的連接,都得在epoll_wait中返回,所以它一視同仁,都在一條水平線上。ET則不然,它傾向更精確的返回連接。在上面的例子中,連接第一次變為可寫後,若是程序未向連接上寫入任何數據,那麼下一次epoll_wait是不會返回這個連接的。ET叫做 邊緣觸發,就是指,只有連接從一個狀態轉到另一個狀態時,才會觸發epoll_wait返回它。可見,ET的編程要復雜不少,至少應用程序要小心的防止epoll_wait的返回的連接出現:可寫時未寫數據後卻期待下一次“可寫”、可讀時未讀盡數據卻期待下一次“可讀”。 當然,從一般應用場景上它們性能是不會有什麼大的差距的,ET可能的優點是,epoll_wait的調用次數會減少一些,某些場景下連接在不必要喚醒時不會被喚醒(此喚醒指epoll_wait返回)。但如果像我上面舉例所說的,有時它不單純是一個網絡問題,跟應用場景相關。當然,大部分開源框架都是基於ET寫的,框架嘛,它追求的是純技術問題,當然力求盡善盡美。
網絡事件庫封裝了底層IO復用函數,同時提供給外部使用的接口,提供的接口可以多種多樣,但是一般有添加事件、刪除事件、開始事件循環等接口。為了展示下網絡事件庫的是如何封裝IO復用函數,同時學習epoll的使用,"迷你"網絡事件庫-tomevent今天誕生了 :) (ps:tomevent采用C++語言實現)。
既然是網絡事件庫,那首先需要定義一個事件的結構,LZ這裡就使用Event結構體了,事件結構體中包含監聽的文件描述符、事件類型、回調函數、傳遞給回調函數的參數,當然,這只是一個簡單的事件結構,如果還需要其他信息可另外添加。
/** * event struct. */ struct Event { int fd; /* the fd want to monitor */ short event; /* the event you want to monitor */ void *(*callback)(int fd, void *arg); /* the callback function */ void *arg; /* the parameter of callback function */ };
定義一個事件處理接口IEvent,該接口定義了3個基本的事件操作函數,也就是添加事件、刪除事件、開始事件循環。定義IEvent接口,與具體的底層IO技術解耦,使用具體的IO復用類來實現該接口,比如對應select的SelectEvent,或者是對應poll的PollEvent,當然,這裡就用epoll對應的EpollEvent來實現IEvent接口(ps:c++中接口貌似應該稱為抽象類,不過這裡稱為接口更合適一點)。
/** * the interface of event. */ class IEvent { public: virtual int addEvent(const Event &event) = 0; virtual int delEvent(const Event &event) = 0; virtual int dispatcher() = 0; virtual ~IEvent() { } };
IEvent的實現類EpollEvent,其中封裝了epoll相關的函數。EpollEvent有3個成員,分別是pollCreateSize、epollFd、events,pollCreateSize表示調用epoll_create時傳遞的參數值,epollFd表示epoll_create的返回值,events是記錄事件的map,events中記錄了監聽事件的信息,當事件來臨時被用到。
class EpollEvent : public IEvent { public: EpollEvent() : EpollEvent(16) { } EpollEvent(int createSize) { if (createSize < 16) { createSize = 16; } epollCreateSize = createSize; initEvent(); } virtual int addEvent(const Event &event); virtual int delEvent(const Event &event); virtual int dispatcher(); private: int initEvent() { int epollFd = epoll_create(this->epollCreateSize); if (epollFd <= 0) { perror("create_create error:"); return epollFd; /* here epollFd is -1 */ } this->epollFd = epollFd;return 0; } int epollCreateSize; int epollFd; //Event event; map<int, Event> events; };-------------------------------------------------------------------------
int EpollEvent::addEvent(const Event &event) { struct epoll_event epollEvent; epollEvent.data.fd = event.fd; epollEvent.events = event.event; int retCode = epoll_ctl(this->epollFd, EPOLL_CTL_ADD, event.fd, &epollEvent); if (retCode < 0) { perror("epoll_ctl error:"); return retCode; } /* add event to this->events */ this->events[event.fd] = event;return 0; } int EpollEvent::delEvent(const Event &event) { struct epoll_event epollEvent; epollEvent.data.fd = event.fd; epollEvent.events = event.event; int retCode = epoll_ctl(this->epollFd, EPOLL_CTL_DEL, event.fd, &epollEvent); if (retCode < 0) { perror("epoll_ctl error:"); return retCode; } this->events.erase(event.fd); return 0; } int EpollEvent::dispatcher() { struct epoll_event epollEvents[32]; //cout << "epoll_wait before" << endl; int nEvents = epoll_wait(epollFd, epollEvents, 32, -1); if (nEvents <= 0) { perror("epoll_wait error:"); return -1; } //cout << "epoll_wait after nEvent" << endl; for (int i = 0; i < nEvents; i++) { int fd = epollEvents[i].data.fd; Event event = this->events[fd]; if (event.callback) { event.callback(fd, event.arg); } } return 0; }
到這裡整個tomevent的框架代碼就結束了,那麼該如何使用呢,以下是一個測試用例。使用tomevent來同時監聽2個文件描述符,一個是標准輸入(fd為0),另一個是提供UDP服務的一個文件描述符。
void *test(int fd, void *arg) { cout << "****************test(): fd=" << fd << endl; char buff[256]; int len = recvfrom(fd, buff, sizeof(buff), 0, NULL, NULL); if (len > 0) { buff[len] = '\0'; cout << buff << endl; } else { perror("recvfrom error:"); } cout << "****************test()**********" << endl; } void *inTest(int fd, void *arg) { cout << "****************inTest(): fd=" << fd << endl; char buff[256]; int len = read(fd, buff, sizeof(buff)); if (len > 0) { buff[len] = '\0'; cout << buff << endl; } else { perror("read stdin error:"); } cout << "****************inTest()**********" << endl; } int main(int argc, char **argv) { int listenFd = -1; int connFd = -1; struct sockaddr_in servAddr; listenFd = socket(AF_INET, SOCK_DGRAM, 0); memset(&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; servAddr.sin_port = htons(8080); servAddr.sin_addr.s_addr = INADDR_ANY; bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr)); listen(listenFd, 5); Event event, inEvent; EpollEvent eventBase; event.fd = listenFd; event.event = EPOLLIN; event.arg = NULL; event.callback = test; inEvent.fd = 0; inEvent.event = EPOLLIN; inEvent.arg = NULL; inEvent.callback = inTest; eventBase.addEvent(event); eventBase.addEvent(inEvent); for (; ;) { eventBase.dispatcher(); } return 0; }
以下是測試結果 ,同時提供UDP服務和響應鍵盤輸入。