本博文主要針對UNP一書中的第六章內容來聊聊I/O復用技術以及其在網絡編程中的實現
I/O多路復用是指內核一旦發現進程指定的一個或者多個I/O條件准備就緒,它就通知該進程。I/O復用適用於以下場合:
(1) 當客戶處理多個描述符(一般是交互式輸入或網絡套接字),必須適用I/O復用
(2) 當一個客戶處理多個套接字時,這種情況很少見,但也可能出現
(3) 當一個TCP服務器既要處理監聽套接字,又要處理已連接套接字,一般就要使用I/O復用
(4) 如果一個服務器既要適用TCP,又要適用UDP,一般就要使用I/O復用
(5) 如果一個服務器要處理多個服務或者多個協議,一般就要使用I/O復用
與多線程和多進程技術相比,I/O復用技術的最大優勢就是系統開銷小,系統不必創建進程/線程,也不必維護這些進程/進程,從而大大減小了系統的開銷。
Unix下常見的I/O模型有五種,分別是:阻塞式I/O,非阻塞式I/O,I/O復用,信號驅動式I/O和異步I/O。
Unix下對於一個輸入操作,通常包含兩個不同的階段:
(1) 等待數據准備好
(2) 從內核向進程復制數據
例如:對於一次read函數操作來說,數據先會被拷貝到操作系統內核的緩沖區去,然後才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。
再比如對於一次socket流傳輸來說,首先等待網絡上的數據到達,然後復制到內核的某個緩沖區,然後再把內核緩沖區的數據復制到進程緩沖區。
下面就以上述兩個階段來闡述五種I/O模型。
假定一個特定的場景,你的一個好朋友找你借錢,你身上沒有充足的現金,於是,你要去銀行取錢,銀行人多,你只能在那裡排隊,在這段時間內,你不能離開隊伍去干你自己的事情。時間都浪費在排隊上面了。這就是典型的阻塞式I/O模型。
默認情況下,所有的套接字都時阻塞的,以數據報套接字為例
vcTausuw0cr9vt2xqNe8sbi6w7rzo6y+zb2ryv2+3bTTxNq6y7i01sa1vdPDu6e9+LPMo6zL5rrz08O7p734s8zU2bbU1eLQqcr9vt29+NDQtKbA7aGjPC9wPg0KPHA+1eLW1sSj0M21xLrDtKa+zcrHo6zE3Lm7vLDKsbvxtcPK/b7do6zDu9PQ0dOz2aOstavKx77Nz/HJz8PmyKS94sSj0M3W0L2ytb2jrLbU08O7p8C0y7WjrNXits7Ksbzk0rvWsdKqtKbT2rXItb3XtMyso6yyu8TcyKXX9sbky/u1xMrCx+mjrNTa0NTE3Le9w+a4trP2wcu0+rzboaM8L3A+DQo8aDIgaWQ9"22-非阻塞式io模型">2.2 非阻塞式I/O模型
還是去取錢的例子,假設你無法忍受一直在那裡排隊,而是去旁邊的商場逛逛,然後隔一段時間回來看看還有在排隊沒,有的話再繼續去逛逛,直到有一次你回來看到沒有人排隊了為止。這就是非阻塞式I/O模型。
進程把一個套接字設置成非阻塞是在通知內核:當所請求的I/O操作非得把本進程投入睡眠才能完成時,不要把本進程投入睡眠,而是返回一個錯誤。
如上圖所示,前三次詢問都返回一個錯誤,即內核沒有數據報准備好,到第四次調用recvform函數時,數據被准備好了,它被復制到應用進程緩存區,於是recvform成功返回,應用進程隨後處理數據。
這種模型相對於阻塞式來說,
優點在於:應用進程不必阻塞在recvfrom調用中,而是可以去處理其他事情
缺點在於:如趣解模型中所說,你來回跑銀行帶來了很大的延時,可能在你來回的路上叫到了你的號。在網絡模型中即可以表現在任務完成的響應延遲增大了,隔一段時間輪詢一次recvform,數據報可能在兩次輪詢之間的任意時間內准備好,這將會導致整體數據吞吐量的降低。
現在,銀行都會按一個顯示屏,上面會顯示輪到幾號客戶了。這個時候,你就不用每次都去跑進去看還有排隊沒,而是遠遠的看看顯示屏上輪到你沒有,如果顯示了你的名字,你就去取錢就行了。這就是I/O復用模型。
有了I/O復用技術,我們可以調用select或poll函數,阻塞在這兩個系統調用中的某一個之上,而不是阻塞在真正的I/O系統調用上。
如上圖所示,進程受阻於select調用,等待可能多個套接字中的任一個變為可讀。當select返回套接字可讀這一條件時,應用進程就調用recvfrom把所讀的數據報復制到應用進程緩沖區。
進程阻塞在select,如果進程還有其他的任務的話就能體現到I/O復用技術的好處,那個任務先返回可讀條件,就去執行哪個任務。從單一的等待變成多個任務的同時等待。
這種模型較之前的模型來說,可以不必多次輪詢內核,而是等到內核的通知。
你還是不滿意銀行的服務,雖然不必排隊,但你在商場逛的也不放心啊,你還是要盯著顯示屏,深怕沒有看到顯示屏上面你的名字,於是,銀行也退出了全新的服務,你去銀行取錢的時候,銀行目前人多不能及時處理你的業務,而是叫你留下手機號,等到空閒的時候就短信通知你可以去取錢了。這就是信號驅動式I/O模型。
我們可以用信號,讓內核在描述符就緒時發送SIGIO信號告知我們。
如上圖所示,進程建立SIGIO的信號處理程序(就要趣解模型中的留下手機號),並通過sigaction系統調用安裝一個信號處理函數,該系統調用將立即返回,進程繼續工作,知道數據報准備好後,內核產生一個SIGIO信號,告知應用進程以及准備好,於是就在信號處理程序中調用recvfrom讀取數據報,並通知主循環數據已准備好待處理,也可以立即通知主循環讓他讀取數據報。
這種模型的好處就是,在數據報沒有准備好的期間,應用進程不必阻塞,繼續執行主循環,只要等待來自信號處理函數的通知即可。
你細細的想了想自己取錢時為了什麼,無非時借給你的朋友,銀行都退出了網上銀行服務,你只需要知道你的好朋友的銀行卡號,然後在網銀中申請轉賬,銀行後台會給你處理,然後把錢打到你朋友的賬戶下面,等這些都處理好後,銀行會給你發一條短信,告訴你轉賬成功,這個時候你就可以跟你的好朋友說,錢已經打給你了。這就是異步I/O模型,取錢借錢的繁瑣事就交給銀行後台給你處理吧。
POSIX規范中提供一些函數,這些函數的工作機制是:告知內核啟動某個操作,並讓內核在整個操作完成後通知我們。
如上圖所示,我們調用aio_read函數(POSIX異步I/O函數以aio_或lio_開頭),給內核傳遞描述符,緩沖區指針,緩沖區大小和文件偏移,並告訴內核完成整個操作後通知我們。
不同於信號驅動式I/O模型,信號是在數據已復制到進程緩沖區才產生的。
以一張圖來說明五種I/O操作的差異:
同步I/O操作:導致請求進程阻塞,直到I/O操作完成
異步I/O操作:不導致進程阻塞
可知,前四種都屬於同步I/O操作慢系統都會阻塞與recvfrom操作,而異步I/O不會。
select函數用於I/O復用,該函數允許進程指示內核等待多個事件中的任何一個發生,並只在有一個或多個事件發生或經歷一段指定的事件才喚醒它。
它的函數原型時:
int select(int maxfdp1, fd_set *readset, fd_set *writeset , fd_set *exceptset , const struct timeval *timeout);
對於timeout參數:
(1) timeout==NULL,表示要永遠等待下去,直到有一個描述符准備好I/O時才返回
(2) *timeout的值為0,表示不等待,檢查描述符就立即返回,這稱為輪詢。
(2) *timeout的值不為0,表示等待一段固定的時間,再有一個描述符准備好I/O時返回,但是不能超過由該參數制定的時間。
對於readset,writeset和exceptset三個參數:
這三個描述符說明了可讀,可寫和處於異常條件的描述符集合
對於描述集fd_set結構,提供了如下四個操作函數
#include
int FD_ISSET(int fd,fd_set *fdset); //設定描述集中的某個描述符
void FD_CLR(int fd,fd_set *fdset);//關掉描述集中的某個描述符
void FD_SET(int fd,fd_set *fdset);//打開描述集中的某個描述符
void FD_ZERO(fd_set *fdset);//清除集合內所有元素
對於maxfdp1參數:
指定待測試的描述符個數,它的值時待測試的最大描述符編號加1,即從上面三個描述符集中的最大描述符編號加1。
對於返回值:
select返回值有三種情況:
(1) 返回值為-1時,表示出錯,如果在指定的描述符一個都沒有准備好時捕捉一個信號,則返回-1
(2) 返回0,表示沒有描述符准備好,指定的時間就超過了。
(3) 返回正數,表示已經准備好的描述符個數,在這種情況下,三個描述符集中依舊打開的位對應於已准備好的描述符
3.2 使用select函數修改的str_cli函數
#include "unp.h"
void
str_cli(FILE *fp, int sockfd)
{
int maxfdp1;
fd_set rset;
char sendline[MAXLINE], recvline[MAXLINE];
FD_ZERO(&rset);
for ( ; ; ) {
FD_SET(fileno(fp), &rset);//標准輸入描述符
FD_SET(sockfd, &rset);//socket描述符
maxfdp1 = max(fileno(fp), sockfd) + 1;//最大描述符編號+1
Select(maxfdp1, &rset, NULL, NULL, NULL);//調用select,阻塞於此
//如果返回的套接字可讀,就用readline讀入回射文本
if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
if (Readline(sockfd, recvline, MAXLINE) == 0)
err_quit("str_cli: server terminated prematurely");
Fputs(recvline, stdout);
}
//如果標准輸入可讀,就先用fgets讀入一行文本
if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
if (Fgets(sendline, MAXLINE, fp) == NULL)
return; /* all done */
Writen(sockfd, sendline, strlen(sendline));
}
}
}
3.3 批量輸入
在上一節提到的str_cli版本中,仍然存在一個問題。假設客戶在標准輸入中批量輸入數據,在輸入完最後一個數據後,碰到了EOF,str_cli返回到main函數,main函數隨後終止。但是,在這個過程中,標准輸入的EOF終止符並不意味著我們也同時完成了從套接字的讀入,可能仍有請求在去往服務器的路上,或者仍有應答在返回客戶的路上。
原因就處在於此:
if (Fgets(sendline, MAXLINE, fp) == NULL)
return; /* all done */
當碰到EOF終止符的時候,str_cli函數選擇了立即返回,而此時,我們更需要的是找到一個條件來判斷套接字的讀取是否完成。
3.4 shutdown函數
shutdown函數提供了關閉TCP連接其中一半的方法,也正是為了解決上一小節發現的問題。
假設在標准輸入碰到EOF終止符時,我們只關閉發送這一端,也就是給服務器發送一個FIN,告訴它我們已經完成了數據發送,但是仍然保持套接字描述打開以便讀取。
這點跟close函數有點像,但是考慮到close函數有如下兩個限制:
(1) close把描述符的引用計數減1,僅在該計數變為0時才關閉該套接字。但是使用shutdown可以不管引用計數就激發TCP的正常連接終止序列
(2) close終止讀和寫兩個方向的數據傳送。shutdown只是關閉單方向的讀或寫。
其函數原型如下:
int shutdown(int sockfd , int howto);//若成功則返回0,若出錯返回-1
關於該函數的第二個參數howto:
(1) SHUT_RD 關閉連接的讀這一半,套接字中不再有數據可接收,而且套接字接收緩沖區中的現有數據都被丟棄
(2) SHUT_WR 關閉連接的寫這一半,對於TCP套接字來說,這稱為半關閉,當前留在套接字發送緩沖區的數據將被發送,後跟TCP正常的連接終止序列。
(3) SHUT_RDWR 關閉讀半部和寫半部,這與調用shutdown兩次等效。
3.5 str_cli函數再修改版
#include "unp.h"
void
str_cli(FILE *fp, int sockfd)
{
int maxfdp1, stdineof;
fd_set rset;
char buf[MAXLINE];
int n;
stdineof = 0;
FD_ZERO(&rset);
for ( ; ; ) {
if (stdineof == 0)//套接字讀取完成標識符
FD_SET(fileno(fp), &rset);//關閉select描述符集中的標准輸入描述符
FD_SET(sockfd, &rset);
maxfdp1 = max(fileno(fp), sockfd) + 1;
Select(maxfdp1, &rset, NULL, NULL, NULL);
if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
if (stdineof == 1)
return; /* normal termination */
else
err_quit("str_cli: server terminated prematurely");
}
Write(fileno(stdout), buf, n);
}
if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
if ( (n = Read(fileno(fp), buf, MAXLINE)) == 0) {//若讀取的字節數為0
stdineof = 1;//表明套接字讀取數據完成
Shutdown(sockfd, SHUT_WR); /* send FIN *///關閉讀這一半
FD_CLR(fileno(fp), &rset);
continue;
}
Writen(sockfd, buf, n);
}
}
}
4. TCP回射服務器程序(采用select函數)
在【unix網絡編程第三版】閱讀筆記(四):TCP客戶/服務器實例中我們采用fork生成子進程來處理每個客戶的需求。
如今,有了select函數,就不必創建那麼多子進程了,避免了為每一個客戶創建一個子進程的所有開銷,本節就將其改寫成任意個客戶的單進程版本。
select函數的描述符集中需要存儲每個客戶的連接套接字。於是我們很容易想到用采用一個數組client[FD_SETSIZE]來保存所有已連接的套接字。
每次有新客戶連接的時候,就在client數組中找到第一個可用項來保存該連接套接字。
具體解釋見代碼注釋:
#include "unp.h"
int
main(int argc, char **argv)
{
int i, maxi, maxfd, listenfd, connfd, sockfd;
int nready, client[FD_SETSIZE];
ssize_t n;
fd_set rset, allset;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
listenfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
Listen(listenfd, LISTENQ);
maxfd = listenfd; //初始化maxfd,在傳入select函數時需要+1
maxi = -1; //記錄client數組中最後一個非-1數所占的序號
for (i = 0; i < FD_SETSIZE; i++)
client[i] = -1; //初始化client數組,為-1表示該項可用
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
for ( ; ; ) {
rset = allset; //初始化描述符集
nready = Select(maxfd+1, &rset, NULL, NULL, NULL);//注意此處為最大描述符編號+1,返回已准備好的描述符個數
if (FD_ISSET(listenfd, &rset)) { //檢測到有新客戶連接
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);//連接新客戶,獲得已連接套接字
#ifdef NOTDEF
printf("new client: %s, port %d\n",
Inet_ntop(AF_INET, &cliaddr.sin_addr, 4, NULL),
ntohs(cliaddr.sin_port));
#endif
for (i = 0; i < FD_SETSIZE; i++)
if (client[i] < 0) {//找到第一個可用項
client[i] = connfd; //存儲套接字描述符
break;
}
if (i == FD_SETSIZE)//限制最大連接個數
err_quit("too many clients");
FD_SET(connfd, &allset); /* add new descriptor to set */
if (connfd > maxfd)
maxfd = connfd; //重置maxfd為最大描述符編號+1
if (i > maxi)
maxi = i; //client數組中最後一個描述符所占的序號
if (--nready <= 0)
continue; //沒有已連接套接字了
}
for (i = 0; i <= maxi; i++) { /* check all clients for data */
if ( (sockfd = client[i]) < 0)
continue;
if (FD_ISSET(sockfd, &rset)) {
if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
/*connection closed by client */
Close(sockfd);//直接關閉套接字
FD_CLR(sockfd, &allset);
client[i] = -1;
} else
Writen(sockfd, buf, n);
if (--nready <= 0)
break; //沒有已連接套接字了
}
}
}
}
5. pselect函數
pselect函數由POSIX發明,是select的變種。
#include
int pselect(int maxfdp1,fd_set *restrict readfds,fd_set *restrict writefds,fd_set *restrict exceptfds,const struct timespec *restrict tsptr,const sigset_t *restrict sigmask);
相對於select函數,pselect函數有如下幾點不同:
(1) pselect使用timespec結構,新結構的tv_nsec指定納秒數,而原結構裡的tv_usec指定微妙級
(2) pselect增加了第六個參數:一個指向信號掩碼的指針。該參數允許程序先禁止遞交某些信號,再測試由這些當前被禁止的信號處理函數設置的全局變量,然後調用pselect,告訴它重新設置信號掩碼。
(3)pselect的超時值設為了const,保證了調用pselect不會修改此值。
6. poll函數
poll函數的功能與select相似,不過在處理流設備時,它能夠提供額外的信息。
#include
int poll(struct pollfd *fdarray,nfds_t nfds,int timeout);//若有就緒描述符就返回其數目,如超時則返回0,若出錯就返回-1
對於第一個參數:為指向一個結構數組第一個元素的指針,每個數組元素都是一個pollfd結構,用於指定測試某個給定描述符fd的條件。
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
要測試的條件由events成員指定,函數在相應的revents成員中返回該描述符的狀態。
這裡每個描述符都有兩個變量,一個為調用值,一個為返回結果,避免了使用值結果參數。
該結構中events和revents成員所用的常值如下表:
該表中,前四個處理輸入,中間三個處理輸出,最後三個處理異常。
就TCP/UDP而言,如下幾種情況引起poll返回特定的revent
(1) 所有正規TCP數據和所有UDP數據都被認為時普通數據
(2) TCP的帶外數據被認為時優先級帶數據
(3) 當TCP連接的讀半部關閉時,也被認為時普通數據,隨後的讀操作將返回0
(4) TCP連接存在錯誤既可認為是普通數據,也可認為時錯誤,無論哪種情況,隨後的讀操作都會返回-1,並把errno設為合適的值
(5) 在監聽套接字上有新的連接可用既可認為時普通數據,也可認為時優先級數據。
(6) 非阻塞式connect的完成被認為是使相應套接字可寫
對於第二個參數nfds:表示結構數組中元素的個數
對於第三個參數timeout:指定poll函數返回前等待多長時間。
|:timeout值:|:說明:|
|:–:|:–:|
|INFINT|永遠等待|
|0|立即返回,不阻塞進程|
|大於0|等待指定數目的毫秒數|
在select函數中,FD_SETSIZE以及每個描述符集中最大描述符數目這些都涉及到固定值。但是在poll函數中分配一個pollfd數組並把該數組中元素的數據通知內核成了調用者的責任,內核不再需要知道這些固定大小的數據類型。
7. TCP回射服務器再修改版
#include "unp.h"
#include /* for OPEN_MAX */
int
main(int argc, char **argv)
{
int i, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
char buf[MAXLINE];
socklen_t clilen;
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;
listenfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
Listen(listenfd, LISTENQ);
client[0].fd = listenfd;
client[0].events = POLLRDNORM;
for (i = 1; i < OPEN_MAX; i++)//由調用者指定OPEN_MAX
client[i].fd = -1; //初始化為-1,表示可用
maxi = 0; //client數組中已用項的最大序號
for ( ; ; ) {
nready = Poll(client, maxi+1, INFTIM);
if (client[0].revents & POLLRDNORM) {//新客戶連接
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);//返回已連接客戶套接字
#ifdef NOTDEF
printf("new client: %s\n", Sock_ntop((SA *) &cliaddr, clilen));
#endif
for (i = 1; i < OPEN_MAX; i++)//與select不同,這裡的最大值均由調用者指定
if (client[i].fd < 0) {//找到第一個可用項
client[i].fd = connfd; //保存已連接套接字描述符
break;
}
if (i == OPEN_MAX)
err_quit("too many clients");
client[i].events = POLLRDNORM;
if (i > maxi)
maxi = i; //更新已用項的最大序號值
if (--nready <= 0)
continue; //沒有已連接套接字了
}
for (i = 1; i <= maxi; i++) { //檢查client數組中所有項
if ( (sockfd = client[i].fd) < 0)
continue;
//有些實現在一個連接上接收到RST時返回的時POLLERR事件,而其他實現返回的只是POLLRDNORM事件
if (client[i].revents & (POLLRDNORM | POLLERR)) {//查看返回的revents狀態
if ( (n = read(sockfd, buf, MAXLINE)) < 0) {
if (errno == ECONNRESET) {
//由用戶來關閉該套接字
#ifdef NOTDEF
printf("client[%d] aborted connection\n", i);
#endif
Close(sockfd);
client[i].fd = -1;
} else
err_sys("read error");
} else if (n == 0) {
//由用戶來關閉該套接字
#ifdef NOTDEF
printf("client[%d] closed connection\n", i);
#endif
Close(sockfd);
client[i].fd = -1;
} else
Writen(sockfd, buf, n);
if (--nready <= 0)
break; //沒有已連接套接字了
}
}
}
}