使用I/O multipexing 的網絡編程中,一般需要采用非阻塞網絡編程的風格,防止服務端在處理高連接量大時候阻塞在某個文件描述符上面,比如某個socket 有大量的數據需要寫,但是內核發送緩沖區已經填滿,無法在一次write中將需要發送到數據發送出去,程序就會阻塞在該處,導致select/poll/epoll_wait() 此時不能處理其它到來的請求,同樣read或者accept也可能出現阻塞的情況,比如當客戶端發起connect,之後立刻關閉該鏈接,在服務端尚未調用accept之前就關閉了該連接,當後來服務端accept得以調用此時完成隊列中又沒有完成的三次握手的連接,accept就會導致進程睡眠(詳細情況可以參見UNPv1非阻塞accept的描述)。因此I/O multiplexing 一般采用非阻塞網絡編程的風格。
對於read/wirte 操作來說,如果采用了非阻塞編程則需要為每個connection配備應用層緩沖區,read端主要防止一次來到數據太多,write主要防止出現阻塞,可以把沒有發送完成的數據寫入緩沖區,等到socket 可寫之後繼續發送。如果在新一次write請求到來的時候,應用層寫緩沖區中還有之前未發送完的數據,則應該先將上次未寫入內核的數據寫入內核緩沖區,保證發送到順序性。此處給一個簡單的例子。
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <vector>
#include <string.h>
#include <stdlib.h>
#include <map>
#include <fcntl.h>
#include <errno.h>
#include <string>
#include <iostream>
#include <sys/select.h>
#define SEVER_PORT 1314
#define MAX_LINE_LEN 1024
using namespace std;
int Accept(int fd, struct sockaddr_in *addr)
{
socklen_t addr_len = static_cast<socklen_t>( sizeof *addr);
int connfd,flags;
connfd = accept(fd,reinterpret_cast<struct sockaddr *>(addr),&addr_len);
flags = fcntl(connfd,F_GETFL,0);
fcntl(connfd,F_SETFL,flags | O_NONBLOCK);
if(connfd < 0)
{
int ErrorCode = errno;
switch(ErrorCode)
{
case 0:
case EWOULDBLOCK:
case ECONNABORTED:
case EPROTO:
case EINTR:
case EMFILE:
errno = ErrorCode;
printf("Accept Error: %s\n",strerror(ErrorCode));
break;
default:
break;
}
}
return connfd;
}
int Read(int fd, map<int, string> &bufMap)
{
struct iovec iov[2];
char buf[MAX_LINE_LEN+1];
char exbuf[65535]; // 如果一次read很多數據,則動用該緩沖區
int nrcv;
iov[0].iov_base = buf;
iov[0].iov_len = MAX_LINE_LEN;
iov[1].iov_base = exbuf;
iov[1].iov_len = sizeof exbuf;
nrcv = readv(fd, iov, 2);// 使用readv保證能將數據讀取完
if(nrcv > MAX_LINE_LEN)
{
bufMap[fd] += string(buf) + string(exbuf); // test !
printf("extrabuf in use! \n");
}
else if( nrcv > 0)
{
bufMap[fd] += string(buf);
}
else
{
return nrcv;
}
return nrcv;
}
int getSocketError(int fd)
{
int optval;
socklen_t optlen = static_cast<socklen_t>(sizeof optval);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
return errno;
}
else
{
return optval;
}
}
int main()
{
struct sockaddr_in cli_addr, server_addr;
vector<int> client(FD_SETSIZE,-1);
map<int ,string> bufMap;// 簡易應用層緩沖區
fd_set rset,wrset,allset;
int listenfd, connfd, sockfd, maxfd, nready, ix,maxid, nrcv,flags,nwrt,one;
char addr_str[INET_ADDRSTRLEN];
int accepted = 0;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SEVER_PORT);
listenfd = socket(AF_INET,SOCK_STREAM,0);
flags = fcntl(listenfd,F_GETFL,0);
fcntl(listenfd,F_SETFL,flags | O_NONBLOCK);
one = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,&one, sizeof(one));
if(bind(listenfd,(struct sockaddr *)&server_addr,sizeof server_addr) < 0)
{
printf("socket bind error: %s\n",strerror(errno));
return 0;
}
listen(listenfd,10);
FD_ZERO(&rset);
FD_ZERO(&wrset);
FD_ZERO(&allset);
FD_SET(listenfd,&allset);
maxfd = listenfd;
maxid = -1;
while(1)
{
rset = allset;
nready = select(maxfd + 1, &rset,&wrset,NULL,NULL);
if(nready < 0)
{
printf("select error: %s\n",strerror(errno));
exit(1);
}
if(FD_ISSET(listenfd, &rset))
{
connfd = Accept(listenfd,&cli_addr);
printf("recieve from : %s at port %d\n", inet_ntop(AF_INET,&cli_addr.sin_addr,addr_str,INET_ADDRSTRLEN),cli_addr.sin_port);
for(ix = 0; ix < static_cast<int>(client.size()); ix++)
{
if(client[ix] < 0)
{
client[ix] = connfd;
break;
}
}
printf("client[%d] = %d\n",ix,connfd);
if( FD_SETSIZE == ix)
{
printf("too many client! \n");
exit(1);
}
if( connfd > maxfd)
{
maxfd = connfd;
}
FD_SET(connfd, &allset);
accepted++;
printf("accepted: %d\n",accepted);
if(ix > maxid)
{
maxid = ix;
}
if(--nready == 0)
{
continue;
}
}
for(ix = 0; ix <= maxid; ix++)
{
if((sockfd = client[ix]) < 0)
{
continue;
}
if(FD_ISSET(sockfd,&rset))
{
int left_len = bufMap[sockfd].length();
if( 0 == (nrcv = Read(sockfd,bufMap)))
{
client[ix] = -1;
printf("close! \n");
FD_CLR(sockfd,&allset);
bufMap.erase(sockfd);
close(sockfd);
}
else if ( nrcv > 0)
{
printf("nrcv = %d \n",nrcv);
nrcv += left_len;//next time when client write to
//nwrt = write(sockfd,bufMap[sockfd].c_str(),200);// 模擬還有剩余
nwrt = write(sockfd,bufMap[sockfd].c_str(),nrcv);
if(nwrt < 0)
{
if( errno != EWOULDBLOCK)
{
printf("Write error: %s\n", strerror(errno));
}
}
printf("nwrt = %d \n",nwrt);
if(nwrt == nrcv) // 全部寫到了內核緩沖區
{
bufMap[sockfd].clear();
//bufMap[sockfd].erase(0,nrcv);
if(FD_ISSET(sockfd,&wrset))
{
FD_CLR(sockfd,&wrset);
}
}
else // 還有剩余
{
printf("write left \n");
bufMap[sockfd].erase(0,nwrt);
std::cout << " after erase: "<<bufMap[sockfd] <<std::endl;
FD_SET(sockfd,&wrset);//開始關注寫事件
}
}
else
{
int err = getSocketError(sockfd);
printf("SocketError: %s\n",strerror(err));
}
}
if(FD_ISSET(sockfd,&wrset))
{
nrcv = bufMap[sockfd].size();
printf("write again: nrcv left = %d \n",nrcv);
nwrt = write(sockfd,bufMap[sockfd].c_str(),nrcv);
if(nwrt == nrcv)
{
bufMap[sockfd].clear();
if(FD_ISSET(sockfd,&wrset))
{
FD_CLR(sockfd,&wrset);
}
printf("Write complete! \n");
}
else
{
bufMap[sockfd].erase(0,nwrt);
}
}
if(--nready == 0)
{
break;
}
}
}
return 0;
}