一、epoll 系列函數簡介
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
* epoll_create(2) creates an epoll instance and returns a file descriptor referring to that instance. (The more recent
epoll_create1(2) extends the functionality of epoll_create(2).)
* Interest in particular file descriptors is then registered via epoll_ctl(2). The set of file descriptors currently
registered on an epoll instance is sometimes called an epoll set.
* epoll_wait(2) waits for I/O events, blocking the calling thread if no events are currently available.
1、epoll_create1 產生一個epoll 實例,返回的是實例 的句柄。flag 可以設置為0 或者EPOLL_CLOEXEC,為0時函數表現與epoll_create一致,EPOLL_CLOEXEC標志與open 時的 O_CLOEXEC 標志類似,即進程被替換時會關閉文件描述符。
2、epoll_ctl :
(1)epfd:epoll 實例句柄;
(2)op:對文件描述符fd 的操作,主要有EPOLL_CTL_ADD、 EPOLL_CTL_DEL等;
(3)fd:需要操作的目標 文件描述符;
(4)event:結構體指針
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events 參數主要有EPOLLIN、EPOLLOUT、EPOLLET 、EPOLLLT等;一般data 共同體我們設置其成員fd即可,也就是epoll_ctl 函數的第三個參數。
3、epoll_wait:
(1)epfd:epoll 實例句柄;
(2)events:結構體指針
(3)maxevents:事件的最大個數
(4)timeout:超時時間,設為-1表示永不超時
下面我們使用c++ 來實現一個服務器端程序:
#include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #include <fcntl.h> #include <sys/wait.h> #include <sys/epoll.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <string.h> #include <vector> #include <algorithm> #include "read_write.h" #include "sysutil.h" typedef std::vector<struct epoll_event> EventList; /* 相比於select與poll,epoll最大的好處是不會隨著關心的fd數目的增多而降低效率 */ int main(void) { int count = 0; int listenfd; if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) ERR_EXIT("socket"); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(5188); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) ERR_EXIT("setsockopt"); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) ERR_EXIT("bind"); if (listen(listenfd, SOMAXCONN) < 0) ERR_EXIT("listen"); std::vector<int> clients; int epollfd; epollfd = epoll_create1(EPOLL_CLOEXEC); //epoll實例句柄 struct epoll_event event; event.data.fd = listenfd; event.events = EPOLLIN | EPOLLET; //邊沿觸發 epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event); EventList events(16); struct sockaddr_in peeraddr; socklen_t peerlen; int conn; int i; int nready; while (1) { nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1); if (nready == -1) { if (errno == EINTR) continue; ERR_EXIT("epoll_wait"); } if (nready == 0) continue; if ((size_t)nready == events.size()) events.resize(events.size() * 2); for (i = 0; i < nready; i++) { if (events[i].data.fd == listenfd) { peerlen = sizeof(peeraddr); conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen); if (conn == -1) ERR_EXIT("accept"); printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)); printf("count = %d\n", ++count); clients.push_back(conn); activate_nonblock(conn); event.data.fd = conn; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event); } else if (events[i].events & EPOLLIN) { conn = events[i].data.fd; if (conn < 0) continue; char recvbuf[1024] = {0}; int ret = readline(conn, recvbuf, 1024); if (ret == -1) ERR_EXIT("readline"); if (ret == 0) { printf("client close\n"); close(conn); event = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &event); clients.erase(std::remove(clients.begin(), clients.end(), conn), clients.end()); } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); } } } return 0; }
在程序的最開始定義一個新類型EventList,內部裝著struct epoll_event 結構體的容器。
接下面的 socket,bind,listen 都跟以前說的一樣,不述。接著使用epoll_create1 創建一個epoll 實例,再來看下面四行代碼:
struct epoll_event event;
event.data.fd = listenfd;
event.events = EPOLLIN | EPOLLET; //邊沿觸 發
epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event);
根據前面的函數分析,這四句意思就是將監 聽套接字listenfd 加入關心的套接字序列。
在epoll_wait 函數中的第二個參數,其實events.begin() 是個迭代器 ,但其具體實現也是struct epoll_event* 類型,雖然 &*events.begin() 得到的也是struct epoll_event* ,但不能 直接使用events.begin() 做參數,因為類型不匹配,編譯會出錯。
EventList events(16); 即初始化容器的大小為 16,當返回的事件個數nready 已經等於16時,需要增大容器的大小,使用events.resize 函數即可,容器可以動態增大, 這也是我們使用c++實現的其中一個原因。
當監聽套接字有可讀事件,accept 返回的conn也需要使用epoll_ctl 函 數將其加入關心的套接字隊列。
還需要調用 activate_nonblock(conn); 將conn 設置為非阻塞,man 7 epoll 裡有 這樣一句話:
An application that employs the EPOLLET flag should use nonblocking file descriptors to avoid having a blocking read or
write starve a task that is handling multiple file descriptors.
當下次循環回來某個已連接套接字有可讀事件,則讀取數 據,若read 返回0表示對方關閉,需要使用epoll_ctl 函數將conn 從隊列中清除,我們使用 std::vector<int> clients; 來保存每次accept 返回的conn,所以現在也需要將其擦除掉,調用clients.erase() 函數。
我們可以使 用前面寫的conntest 客戶端程序測試一下,先運行服務器程序,再運行客戶端,輸出如下:
simba@ubuntu:~/Documents/code/linux_programming/UNP/socket$ ./echoser_epoll
................................
count = 1015
ip=127.0.0.1 port=60492
count = 1016
ip=127.0.0.1 port=60493
count = 1017
ip=127.0.0.1 port=60494
count = 1018
ip=127.0.0.1 port=60495
count = 1019
accept: Too many open files
simba@ubuntu:~/Documents/code/linux_programming/UNP/socket$ ./conntest
.........................................................
count = 1015
ip=127.0.0.1 port=60492
count = 1016
ip=127.0.0.1 port=60493
count = 1017
ip=127.0.0.1 port=60494
count = 1018
ip=127.0.0.1 port=60495
count = 1019
connect: Connection reset by peer
為什麼服務器端的 count 只有1019呢,因為除去012,一個監聽套接字還有一個epoll 實例句柄,所以1024 - 5 = 1019。
為什麼客戶 端的錯誤提示跟這裡的不一樣呢?這正說明epoll 處理效率比poll和select 都高,因為處理得快,來一個連接就accept一 個,當服務器端accept 完第1019個連接,再次accept 時會因為文件描述符總數超出限制,打印錯誤提示,而此時客戶端雖 然已經創建了第1020個sock,但在connect 過程中發現對等方已經退出了,故打印錯誤提示,連接被對等方重置。如果服務 器端處理得慢的話,那麼客戶端會connect 成功1021個連接,然後在創建第1022個sock 的時候出錯,打印錯誤提示: socket: Too many open files,當然因為文件描述符的限制,服務器端也只能從已完成連接隊列中accept 成功1019個連接 。
二、epoll與select、poll區別
1、相比於select與poll,epoll最大的好處在於它不會隨著監聽fd數目的 增長而降低效率。內核中的select與poll的實現是采用輪詢來處理的,輪詢的fd數目越多,自然耗時越多。
2、epoll的 實現是基於回調的,如果fd有期望的事件發生就通過回調函數將其加入epoll就緒隊列中,也就是說它只關心“活躍”的fd, 與fd數目無關。
3、內核 / 用戶空間 內存拷貝問題,如何讓內核把 fd消息通知給用戶空間呢?在這個問題上 select/poll采取了內存拷貝方法。而epoll采用了共享內存的方式。
4、epoll不僅會告訴應用程序有I/0 事件到來,還 會告訴應用程序相關的信息,這些信息是應用程序填充的,因此根據這些信息應用程序就能直接定位到事件,而不必遍歷整 個fd集合。
三、epoll 的EPOLLLT (電平觸發,默認)和 EPOLLET(邊沿觸發)模式的區別
1、EPOLLLT:完 全靠kernel epoll驅動,應用程序只需要處理從epoll_wait返回的fds,這些fds我們認為它們處於就緒狀態。此時epoll可 以認為是更快速的poll。
2、EPOLLET:此模式下,系統僅僅通知應用程序哪些fds變成了就緒狀態,一旦fd變成就緒 狀態,epoll將不再關注這個fd的任何狀態信息,(從epoll隊列移除)直到應用程序通過讀寫操作(非阻塞)觸發EAGAIN狀 態,epoll認為這個fd又變為空閒狀態,那麼epoll又重新關注這個fd的狀態變化(重新加入epoll隊列)。隨著epoll_wait 的返回,隊列中的fds是在減少的,所以在大並發的系統中,EPOLLET更有優勢,但是對程序員的要求也更高,因為有可能會 出現數據讀取不完整的問題,舉例如下:
假設現在對方發送了2k的數據,而我們先讀取了1k,然後這時調用了 epoll_wait,如果是邊沿觸發,那麼這個fd變成就緒狀態就會從epoll 隊列移除,很可能epoll_wait 會一直阻塞,忽略尚 未讀取的1k數據,與此同時對方還在等待著我們發送一個回復ack,表示已經接收到數據;如果是電平觸發,那麼 epoll_wait 還會檢測到可讀事件而返回,我們可以繼續讀取剩下的1k 數據。