歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

利用epoll寫一個“迷你”的網絡事件庫

  epoll是Linux下高性能的IO復用技術,是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率。另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。epoll除了提供select/poll那種IO事件的水平觸發(Level Triggered)外,還提供了邊緣觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。

  為什麼會出現IO復用技術呢,比如在Web應用中,大量的請求連接事件,如果采用多進程方式處理,也就是一個連接對應一個fork來處理,這樣開銷太大了,畢竟創建進程還是很耗資源的;如果采用多線程方式處理,也就是一個連接對應一個線程來處理,當請求並發量上去的話,系統中就會充斥著很多處理線程,畢竟一個系統創建線程是有一定上限的。這時,就需要我們的IO復用技術了。常見的網絡模型中,有多進程+IO復用編程模型,也有多線程+IO復用編程模型,比如大名鼎鼎的nginx默認采用的就是多進程+IO復用技術來處理網絡請求的;開源網絡庫libevent也是基於IO復用技術來完成網絡數據處理的。

epoll系列函數

  epoll是Linux特有的IO復用函數,它在實現和使用上與select和poll有很大差異,首先,epoll使用一組函數來完成操作,而不是單個函數。其次,epoll把用戶關心的文件描述符上的事件放在內核上的一個事件表中,從而無須像select和poll那樣每次調用都要重復傳入文件描述符集合事件表。但epoll需要使用一個額外的文件描述符,來唯一標識內核中這個事件表,這個文件描述符使用如下epoll_create函數創建:

#include <sys/epoll.h>  
int epoll_create(int size);  // 返回:成功返回創建的內核事件表對應的描述符,出錯-1  

  size參數現在並不起作用,只是給內核一個提示,告訴它內核表需要多大,該函數返回的文件描述符將用作其他所有epoll函數的第一個參數,以指定要訪問的內核事件表。用epoll_ctl函數操作內核事件表

#include <sys/epoll.h>  
int epoll_ctl(int opfd, int op, int fd, struct epoll_event *event);  // 返回:成功返回創建的內核事件表對應的描述符,出錯-1  

  fd參數是要操作的文件描述符,op指定操作類型,操作類型有3種

  • EPOLL_CTL_ADD:往事件表中注冊fd上的事件
  • EPOLL_CTL_MOD:修改fd上的注冊事件
  • EPOLL_CTL_DEL:刪除fd上的注冊事件

  event指定事件類型,它是epoll_event結構指針類型:

struct epoll_event  
{  
    __uint32_t events;  /* epoll事件 */  
    epoll_data_t data;  /* 用戶數據 */  
};  

  其中events描述事件類型,epoll支持的事件類型和poll基本相同,表示epoll事件類型的宏是在poll對應的宏加上”E”,比如epoll的數據可讀事件是EPOLLIN,但epoll有兩個額外的事件類型-EPOLLET和EPOLLONESHOT,它們對於高效運作非常關鍵,data用於存儲用戶數據,其類型epoll_data_t定義如下:

typedef union epoll_data  
{  
    void *ptr;  
    int fd;  
    uint32_t u32;  
    uint64_t u64;  
}epoll_data_t;  

  epoll_data_t是一個聯合體,其4個成員最多使用的是fd,它指定事件所從屬的目標文件描述符,ptr成員可用來指定fd相關的用戶數據,但由於opoll_data_t是一個聯合體,我們不能同時使用fd和ptr,如果要將文件描述符嗯哼用戶數據關聯起來,以實現快速的數據訪問,則只能使用其他手段,比如放棄使用fd成員,而在ptr指針指向的用戶數據中包含fd。

#include <sys/epoll.h>  
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);  // 返回:成功返回就緒的文件描述符個數,出錯-1  

  timeout參數的含義與poll接口的timeout參數相同,maxevents參數指定最多監聽多少個事件,它必須大於0。

  epoll_wait如果檢測到事件,就將所有就緒的事件從內核事件表(由epfd指定)中復制到events指定的數組中,這個數組只用來輸epoll_wait檢測到的就緒事件,而不像select和poll的參數數組既傳遞用於用戶注冊的事件,有用於輸出內核檢測到就緒事件,這樣極大提高了應用程序索引就緒文件描述符的效率。

 

epoll原理與實現

  epoll是怎麼實現的呢?其實很簡單,從這3個方法就可以看出,它比select聰明的避免了每次頻繁調用“哪些連接已經處在消息准備好階段”的 epoll_wait時,是不需要把所有待監控連接傳入的。這意味著,它在內核態維護了一個數據結構保存著所有待監控的連接。這個數據結構就是一棵紅黑樹,它的結點的增加、減少是通過epoll_ctrl來完成的。

  圖中左下方的紅黑樹由所有待監控的連接構成。左上方的鏈表,同是目前所有活躍的連接。於是,epoll_wait執行時只是檢查左上方的鏈表,並返回左上方鏈表中的連接給用戶。這樣,epoll_wait的執行效率能不高嗎?

  最後,再看看epoll提供的2種玩法ET和LT,即翻譯過來的邊緣觸發和水平觸發。其實這兩個中文名字倒也有些貼切。這2種使用方式針對的仍然是效率問題,只不過變成了epoll_wait返回的連接如何能夠更准確些。 例如,我們需要監控一個連接的寫緩沖區是否空閒,滿足“可寫”時我們就可以從用戶態將響應調用write發送給客戶端 。但是,或者連接可寫時,我們的“響應”內容還在磁盤上呢,此時若是磁盤讀取還未完成呢?肯定不能使線程阻塞的,那麼就不發送響應了。但是,下一次epoll_wait時可能又把這個連接返回給你了,你還得檢查下是否要處理。可能,我們的程序有另一個模塊專門處理磁盤IO,它會在磁盤IO完成時再發送響應。那麼,每次epoll_wait都返回這個“可寫”的、卻無法立刻處理的連接,是否符合用戶預期呢?   於是,ET和LT模式就應運而生了。LT是每次滿足期待狀態的連接,都得在epoll_wait中返回,所以它一視同仁,都在一條水平線上。ET則不然,它傾向更精確的返回連接。在上面的例子中,連接第一次變為可寫後,若是程序未向連接上寫入任何數據,那麼下一次epoll_wait是不會返回這個連接的。ET叫做 邊緣觸發,就是指,只有連接從一個狀態轉到另一個狀態時,才會觸發epoll_wait返回它。可見,ET的編程要復雜不少,至少應用程序要小心的防止epoll_wait的返回的連接出現:可寫時未寫數據後卻期待下一次“可寫”、可讀時未讀盡數據卻期待下一次“可讀”。    當然,從一般應用場景上它們性能是不會有什麼大的差距的,ET可能的優點是,epoll_wait的調用次數會減少一些,某些場景下連接在不必要喚醒時不會被喚醒(此喚醒指epoll_wait返回)。但如果像我上面舉例所說的,有時它不單純是一個網絡問題,跟應用場景相關。當然,大部分開源框架都是基於ET寫的,框架嘛,它追求的是純技術問題,當然力求盡善盡美。

 

基於epoll的"迷你"網絡事件庫

  網絡事件庫封裝了底層IO復用函數,同時提供給外部使用的接口,提供的接口可以多種多樣,但是一般有添加事件、刪除事件、開始事件循環等接口。為了展示下網絡事件庫的是如何封裝IO復用函數,同時學習epoll的使用,"迷你"網絡事件庫-tomevent今天誕生了 :) (ps:tomevent采用C++語言實現)。

  既然是網絡事件庫,那首先需要定義一個事件的結構,LZ這裡就使用Event結構體了,事件結構體中包含監聽的文件描述符、事件類型、回調函數、傳遞給回調函數的參數,當然,這只是一個簡單的事件結構,如果還需要其他信息可另外添加。

/**
 * event struct.
 */
struct Event {
    int fd;                                 /* the fd want to monitor */
    short event;                            /* the event you want to monitor */
    void *(*callback)(int fd, void *arg);   /* the callback function */
    void *arg;                               /* the parameter of callback function */
};

  定義一個事件處理接口IEvent,該接口定義了3個基本的事件操作函數,也就是添加事件、刪除事件、開始事件循環。定義IEvent接口,與具體的底層IO技術解耦,使用具體的IO復用類來實現該接口,比如對應select的SelectEvent,或者是對應poll的PollEvent,當然,這裡就用epoll對應的EpollEvent來實現IEvent接口(ps:c++中接口貌似應該稱為抽象類,不過這裡稱為接口更合適一點)。

/**
 * the interface of event.
 */
class IEvent {
public:
    virtual int addEvent(const Event &event) = 0;
    virtual int delEvent(const Event &event) = 0;
    virtual int dispatcher() = 0;

    virtual ~IEvent() { }
};

  IEvent的實現類EpollEvent,其中封裝了epoll相關的函數。EpollEvent有3個成員,分別是pollCreateSize、epollFd、events,pollCreateSize表示調用epoll_create時傳遞的參數值,epollFd表示epoll_create的返回值,events是記錄事件的map,events中記錄了監聽事件的信息,當事件來臨時被用到。

class EpollEvent : public IEvent {
public:
    EpollEvent() : EpollEvent(16) {

    }
    EpollEvent(int createSize) {
        if (createSize < 16) {
            createSize = 16;
        }

        epollCreateSize = createSize;
        initEvent();
    }

    virtual int addEvent(const Event &event);
    virtual int delEvent(const Event &event);
    virtual int dispatcher();

private:
    int initEvent() {
        int epollFd = epoll_create(this->epollCreateSize);
        if (epollFd <= 0) {
            perror("create_create error:");
            return epollFd; /* here epollFd is -1 */
        }

        this->epollFd = epollFd;return 0;
    }

    int epollCreateSize;
    int epollFd;

    //Event event;
    map<int, Event> events;
};
-------------------------------------------------------------------------  
int EpollEvent::addEvent(const Event &event) {
    struct epoll_event epollEvent;

    epollEvent.data.fd = event.fd;
    epollEvent.events = event.event;
    int retCode = epoll_ctl(this->epollFd, EPOLL_CTL_ADD, event.fd, &epollEvent);
    if (retCode < 0) {
        perror("epoll_ctl error:");
        return retCode;
    }

    /* add event to this->events */
    this->events[event.fd] = event;return 0;
}

int EpollEvent::delEvent(const Event &event) {
    struct epoll_event epollEvent;

    epollEvent.data.fd = event.fd;
    epollEvent.events = event.event;
    int retCode = epoll_ctl(this->epollFd, EPOLL_CTL_DEL, event.fd, &epollEvent);
    if (retCode < 0) {
        perror("epoll_ctl error:");
        return retCode;
    }

    this->events.erase(event.fd);
    return 0;
}

int EpollEvent::dispatcher() {
    struct epoll_event epollEvents[32];

    //cout << "epoll_wait before" << endl;
    int nEvents = epoll_wait(epollFd, epollEvents, 32, -1);
    if (nEvents <= 0) {
        perror("epoll_wait error:");
        return -1;
    }
    //cout << "epoll_wait after nEvent" << endl;
    for (int i = 0; i < nEvents; i++) {
        int fd = epollEvents[i].data.fd;
        Event event = this->events[fd];
        if (event.callback) {
            event.callback(fd, event.arg);
        }
    }
    return 0;
}

  到這裡整個tomevent的框架代碼就結束了,那麼該如何使用呢,以下是一個測試用例。使用tomevent來同時監聽2個文件描述符,一個是標准輸入(fd為0),另一個是提供UDP服務的一個文件描述符。

void *test(int fd, void *arg) {
    cout << "****************test(): fd=" << fd << endl;

    char buff[256];
    int len = recvfrom(fd, buff, sizeof(buff), 0, NULL, NULL);
    if (len > 0) {
        buff[len] = '\0';
        cout << buff << endl;
    }
    else {
        perror("recvfrom error:");
    }
    cout << "****************test()**********" << endl;
}

void *inTest(int fd, void *arg) {
    cout << "****************inTest(): fd=" << fd << endl;

    char buff[256];
    int len = read(fd, buff, sizeof(buff));
    if (len > 0) {
        buff[len] = '\0';
        cout << buff << endl;
    }
    else {
        perror("read stdin error:");
    }
    cout << "****************inTest()**********" << endl;
}

int main(int argc, char **argv) {
    int listenFd = -1;
    int connFd = -1;
    struct sockaddr_in servAddr;

    listenFd = socket(AF_INET, SOCK_DGRAM, 0);

    memset(&servAddr, 0, sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_port = htons(8080);
    servAddr.sin_addr.s_addr = INADDR_ANY;
    bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr));

    listen(listenFd, 5);

    Event event, inEvent;
    EpollEvent eventBase;

    event.fd = listenFd;
    event.event = EPOLLIN;
    event.arg = NULL;
    event.callback = test;

    inEvent.fd = 0;
    inEvent.event = EPOLLIN;
    inEvent.arg = NULL;
    inEvent.callback = inTest;

    eventBase.addEvent(event);
    eventBase.addEvent(inEvent);

    for (; ;) {
        eventBase.dispatcher();
    }

    return 0;
}

  以下是測試結果 ,同時提供UDP服務和響應鍵盤輸入。

Copyright © Linux教程網 All Rights Reserved