nginx epoll 事件模型
nginx做為一個異步高效的事件驅動型web服務器,在linux平台中當系統支持epoll時nginx默認采用epoll來高效的處理事件。nginx中使用ngx_event_t結構來表示一個事件,先介紹下ngx_event_t結構體中成員的含義:
struct ngx_event_s { void *data; //與事件關聯的對象,常指向事件所在的ngx_connection_t連接對象 unsigned write:1; //可寫標識位,1表示對應的tcp連接是可寫的 unsigned accept:1;// 1表示對應的連接是處於監聽狀態的連接,即可接收新的連接 /* used to detect the stale events in kqueue, rtsig, and epoll */ unsigned instance:1; //可來區分事件是否已過期 /* * the event was passed or would be passed to a kernel; * in aio mode - operation was posted. */ unsigned active:1;// 1表示事件活躍,即事件已添加到epoll中 unsigned disabled:1;//epoll中不使用該標識位 /* the ready event; in aio mode 0 means that no operation can be posted */ unsigned ready:1; //事件已就緒(即可讀或可寫) unsigned oneshot:1;//epoll不使用該標識位 /* aio operation is complete */ unsigned complete:1;//aio中使用,表示 事件對應的aio異步操作已完成(io_getevents函數已成功返回) unsigned eof:1;// 1表示當前處理的字符流已完成,如調用recv讀取連接數據時返回0,此時置該標識位為1 unsigned error:1;// 1表示事件處理過程中發生錯誤 unsigned timedout:1; //事件是否超時,1:表示超時。超時後事件對應的請求不需再被處理(對於http模塊來說事件超時後直接關閉請求) unsigned timer_set:1; //為1時表示這個事件在定時器中 unsigned delayed:1;// 1表示 需延遲處理該事件,常用於限速功能中 unsigned deferred_accept:1;//延遲接收接連,即當連接中收到對象發送的數據後才真正建立連接 /* the pending eof reported by kqueue, epoll or in aio chain operation */ unsigned pending_eof:1;// 1表示TCP連接對向關閉讀端,即epoll返回EPOLLRDHUP #if !(NGX_THREADS) unsigned posted_ready:1;//該標識位在1.5.5版本源碼中只在ngx_epoll_process_events函數中有置位,其它地方並沒有用到 #endif #if (NGX_WIN32) /* setsockopt(SO_UPDATE_ACCEPT_CONTEXT) was successful */ unsigned accept_context_updated:1; #endif #if (NGX_HAVE_KQUEUE) unsigned kq_vnode:1; /* the pending errno reported by kqueue */ int kq_errno; #endif /* * kqueue only: * accept: number of sockets that wait to be accepted * read: bytes to read when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * write: available space in buffer when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * * iocp: TODO * * otherwise: * accept: 1 if accept many, 0 otherwise */ #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) int available; #else unsigned available:1;// 1表示每次調用accept時盡可能多的接收TCP連接,與multi_accept配置項對應 #endif ngx_event_handler_pt handler; // 事件產生後的回調函數句柄 #if (NGX_HAVE_AIO) #if (NGX_HAVE_IOCP) ngx_event_ovlp_t ovlp; #else struct aiocb aiocb; #endif #endif ngx_uint_t index; //epoll中不使用 ngx_log_t *log; //ngx_log_t對象 ngx_rbtree_node_t timer; unsigned closed:1; // 1表示事件已關閉 /* to test on worker exit */ unsigned channel:1;// 只在ngx_add_channel_event函數中有置位,其它地方沒用到 unsigned resolver:1; // resolver功能中使用? #if (NGX_THREADS) unsigned locked:1; unsigned posted_ready:1; unsigned posted_timedout:1; unsigned posted_eof:1; #if (NGX_HAVE_KQUEUE) /* the pending errno reported by kqueue */ int posted_errno; #endif #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) int posted_available; #else unsigned posted_available:1; #endif ngx_atomic_t *lock; ngx_atomic_t *own_lock; #endif /* the links of the posted queue */ ngx_event_t *next; ngx_event_t **prev; #if 0 /* the threads support */ /* * the event thread context, we store it here * if $(CC) does not understand __thread declaration * and pthread_getspecific() is too costly */ void *thr_ctx; #if (NGX_EVENT_T_PADDING) /* event should not cross cache line in SMP */ uint32_t padding[NGX_EVENT_T_PADDING]; #endif #endif }; #if (NGX_HAVE_FILE_AIO) struct ngx_event_aio_s { void *data; ngx_event_handler_pt handler; ngx_file_t *file; ngx_fd_t fd; #if (NGX_HAVE_EVENTFD) int64_t res; #if (NGX_TEST_BUILD_EPOLL) ngx_err_t err; size_t nbytes; #endif #else ngx_err_t err; size_t nbytes; #endif #if (NGX_HAVE_AIO_SENDFILE) off_t last_offset; #endif ngx_aiocb_t aiocb; ngx_event_t event; }; #endif
nginx中使用ngx_epoll_module模塊來封裝epoll機制處理事件,ngx_epoll_module模塊只對兩個配置項感興趣,其ngx_command_t結構如下:
static ngx_command_t ngx_epoll_commands[] = { { /***epoll_events配置項表示epoll_wait函數每次最多返回多少個事件,在ngx_epoll_init函數中 會預先分配epoll_events配置項指定的epoll_event結構個數**/ ngx_string("epoll_events"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, 0, offsetof(ngx_epoll_conf_t, events), NULL }, { /***worker_aio_requests配置項表示創建的aio context能並發處理異步事件的個數,即io_setup函數的第一個參數***/ ngx_string("worker_aio_requests"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, 0, offsetof(ngx_epoll_conf_t, aio_requests), NULL }, ngx_null_command };
ngx_epoll_module的ngx_event_module_t結構如下:
ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ { //向epoll中添加事件時調用 ngx_epoll_add_event, /* add an event */ //從epoll中刪除事件時調用 ngx_epoll_del_event, /* delete an event */ /***epoll中不存在enable/disable事件的情況,這裡默認設置成添加/刪除事件的函數***/ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ //向epoll中添加tcp連接時調用,每個tcp連接對象一個讀事件和一個寫事件 ngx_epoll_add_connection, /* add an connection */ //從epoll中刪除事件時調用 ngx_epoll_del_connection, /* delete an connection */ NULL, /* process the changes */ // epoll 事件處理函數 ngx_epoll_process_events, /* process the events */ //epoll模塊初始化函數 ngx_epoll_init, /* init the events */ //epoll模塊清理函數只在多線程模型中被調用 ngx_epoll_done, /* done the events */ } };
ngx_epoll_create_conf在配置項解析前調用用來初始化配置結構,ngx_epoll_init_conf函數在配置項解析完後調用,如果配置文件是不存在epoll_events或worker_aio_requests配置項,默認將epoll_events設置為512,worker_aio_requests設置為32。ngx_epoll_module_ctx結構體中後十個函數對應於ngx_event_actions_t結構,它是事件模塊獨有的結構。ngx_epoll_init函數在什麼時候被調用呢,它在nginx啟動過程中每個worker進程啟動後被調用(由ngx_event_core_module的ngx_event_process_init函數調用)。
ngx_epoll_module源碼分析
ngx_epoll_init函數:
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; // 獲取ngx_epoll_module模塊存放配置項的結構 epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (ep == -1) { // 創建epoll,成功返回描述符,失敗返回-1 ep = epoll_create(cycle->connection_n / 2); if (ep == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } /***如果系統支持aio , 這裡初始化aio***/ #if (NGX_HAVE_FILE_AIO) ngx_epoll_aio_init(cycle, epcf); #endif } /***預分配events個epoll_event結構, epcf->events由epoll_events配置項指定,默認為512***/ if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list == NULL) { return NGX_ERROR; } } nevents = epcf->events; //指定I/O讀寫的方法 ngx_io = ngx_os_io; // 設置ngx_event_actions接口,後續通過ngx_event_actions來調用epoll模塊中的方法 ngx_event_actions = ngx_epoll_module_ctx.actions; /***nginx使用epoll事件模型時NGX_HAVE_CLEAR_EVENT宏被定義, NGX_USE_CLEAR_EVENT宏表示使用epoll的ET模式***/ #if (NGX_HAVE_CLEAR_EVENT) ngx_event_flags = NGX_USE_CLEAR_EVENT #else ngx_event_flags = NGX_USE_LEVEL_EVENT #endif |NGX_USE_GREEDY_EVENT |NGX_USE_EPOLL_EVENT; return NGX_OK; }
ngx_epoll_add_event函數:
static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; //獲取事件關聯的連接 c = ev->data; events = (uint32_t) event; /***根據event參數判斷當前是添加讀事件還是寫事件***/ if (event == NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; #if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP) events = EPOLLIN|EPOLLRDHUP; #endif } else { e = c->read; prev = EPOLLIN|EPOLLRDHUP; #if (NGX_WRITE_EVENT != EPOLLOUT) events = EPOLLOUT; #endif } /***如果當前需添加讀事件,就通過active標識判斷讀事件所關聯的連接對應的寫事件是否活躍( 活躍表示事件已添加到epoll中)。***/ if (e->active) { op = EPOLL_CTL_MOD; events |= prev; } else { op = EPOLL_CTL_ADD; } //將flags參數加入到epoll標志中 ee.events = events | (uint32_t) flags; /*** ptr存儲事件關聯的連接對象(ngx_connection_t*)及事件過期比特位, linux平台中任何對象的地址最低位必定為零***/ ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); //向epoll中添加事件 if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } //標識事件活躍 ev->active = 1; #if 0 ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; #endif return NGX_OK; }
ngx_epoll_del_event函數:
static ngx_int_t ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; /* * when the file descriptor is closed, the epoll automatically deletes * it from its queue, so we do not need to delete explicitly the event * before the closing the file descriptor */ /***上面的注釋說得很清楚了,當文件描述符被關閉後,epoll會自動將其刪除。***/ if (flags & NGX_CLOSE_EVENT) { ev->active = 0; return NGX_OK; } //獲取事件關聯的連接 c = ev->data; /***根據event參數判斷當前是刪除讀事件還是寫事件***/ if (event == NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; } else { e = c->read; prev = EPOLLIN|EPOLLRDHUP; } /***參考ngx_epoll_add_event函數***/ if (e->active) { op = EPOLL_CTL_MOD; ee.events = prev | (uint32_t) flags; ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); } else { op = EPOLL_CTL_DEL; ee.events = 0; ee.data.ptr = NULL; } ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll del event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); //從epoll中刪除事件 if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } //清除事件活躍標識 ev->active = 0; return NGX_OK; }
ngx_epoll_add_connection及ngx_epoll_del_connection函數
這兩個函數的實現很簡單,也是通過調用epoll_ctl添加事件,只是會同時將讀/寫事件一起添加進epoll,這裡不再列出源碼。
ngx_epoll_process_events函數:
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); //調用epoll_wait獲取已准備就緒的事件 events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; /***NGX_UPDATE_TIME標識在沒有設置timer_resolution配置項時有效表示每次調用epoll_wait函數返回會都更新時間。 ngx_event_timer_alarm變量在設置timer_resolution配置項時有效,每間隔timer_resolution配置項參數值就會設置 ngx_event_timer_alarm變量為1表示需更新時間。***/ if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } //err為非零指示epoll_wait失敗 if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } if (events == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } //僅在多線程環境下此鎖才有效 ngx_mutex_lock(ngx_posted_events_mutex); /***循環處理已就緒的事件***/ for (i = 0; i < events; i++) { //獲取事件關聯的連接對象,對象地址最低位保存有在事件添加時設置的事件過期位 c = event_list[i].data.ptr; //取事件過期位 instance = (uintptr_t) c & 1; //屏蔽掉連接對象的最低位 c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; /***同一條連接的讀/寫事件的instance位值相同,由於下面先處理讀事件這裡通過讀事件 的過期位來判斷連接是否過期,當fd為-1時也表示連接過期。***/ if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } //獲取連接已就緒的事件類型 revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); /***連接出現錯誤,EPOLLHUP標識表示收到RST報文。檢測到這兩種類型時 tcp連接中可能還有 數據未被讀取***/ if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); } #endif /***如果連接發生錯誤但未置EPOLLIN及EPOLLOUT,這時我們加上EPOLLIN和EPOLLOUT,在調用讀/寫事件的 回調函數時就會知道為什麼出現錯誤。 如果不加EPOLLIN和EPOLLOUT,後面就沒法調用讀/寫事件的 回調函數也就無法處理該連接了。***/ if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; } /***連接可讀且活躍***/ if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) //EPOLLRDHUP表示連接對方關閉了讀端 if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } #endif //NGX_POST_THREAD_EVENTS宏末被使用 if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; } else { //標識事件已就緒 rev->ready = 1; } /***NGX_POST_EVENTS表示事件需要延後處理,這裡根據accept標識位將事件加入到相應隊列中***/ if (flags & NGX_POST_EVENTS) { queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); ngx_locked_post_event(rev, queue); } else { //調用事件的回調函數 rev->handler(rev); } } wev = c->write; /***連接可寫且活躍***/ if ((revents & EPOLLOUT) && wev->active) { //重新檢查事件是否過期,因為在處理讀事件過程中該事件可能已結束。 if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } if (flags & NGX_POST_THREAD_EVENTS) { wev->posted_ready = 1; } else { wev->ready = 1; } if (flags & NGX_POST_EVENTS) { ngx_locked_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; }