在處理多個socket套接字的時候,會很自然的遇到一個問題:某個套接字什麼時候可讀?什麼時候可寫?哪些套接字是需要關閉的?我們可以回憶一下,一般我們在最開始編寫socket程序的時候,send,recv都是同步的,send完後就傻等著recv。這種模式的一個很大的問題是,recv會占用一整個線程,單個線程裡沒法處理第二個socket。怎麼辦呢?加線程,每個socket分配一個線程?顯然不合適,1000個客戶端難道要1000個線程麼。select提供了一種方式同時監控多個套接字,執行過程大致為:首先將感興趣的套接字加入到select的集合中,select函數執行,監控這些個套接字,當其中有套接字產生了事件(可讀,可寫,異常)或者select超時後,select返回告知調用者,哪些個套接字發送了事件,調用者就對發生了事件的套接字挨個處理,然後繼續執行select函數,下個循環開始。
通過這樣的一種方式,在同一個線程裡面就實現了對多個套接字的讀寫操作。當然需要注意的是,select調用針對的是文件描述符,不管是socket,pipe,file都是可以被select監控的。
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
當前linux下selet的每個fd_set最多可以監控1024個文件描述符
假設對於一個文件描述符fd,我們想監控其讀事件,就將加入到讀監控集合中。
fd_set read_set;
FD_ZERO(&read_set);
FD_SET(fd, &read_set);
select(maxFd +1, &read_set, NULL, NULL, NULL)
同理,對於寫事件,異常事件也是一樣的處理方式。
當select調用返回後,如果select返回>0 則可以對指定的文件描述符進行操作。
if (FD_ISSET(fd, &read_set)) {
recv(fd)
}
在程序設計中,一般將select結合while使用。
使用select構建的一個tcp echo程序,程序可以接受多個客戶端的鏈接,並將客戶端發送過來的字符串發送回去。
測試環境:CentOS7.1
server端截圖:
cleint1截圖:
cleint2截圖:
/* author: bymzy
* 2017/2/12
* */
#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <strings.h>
#include <stdlib.h>
#define PORT 3333
#define IP "127.0.0.1"
#define BACKLOG 5
#define BUFSIZE 1025
void SetFDSet(int *clientFd, int listenFd, fd_set *read, fd_set *except, int *maxFd)
{
int i = listenFd + 1;
FD_ZERO(read);
FD_ZERO(except);
for (;i <= *maxFd; ++i) {
if (clientFd[i] != 0) {
FD_SET(clientFd[i], read);
FD_SET(clientFd[i], except);
}
}
}
void CheckFDSet(int *clientFd, int listenFd, fd_set *read, fd_set *except, int *maxFd)
{
int i = listenFd + 1;
int tempMaxFd = *maxFd;
char buf[1024];
int recved = 0;
bool needClose = false;
bzero(buf, 1024);
for (;i <= tempMaxFd; ++i) {
bzero(buf, recved);
if (FD_ISSET(clientFd[i], read)) {
recved = recv(clientFd[i], buf, BUFSIZE -1 , 0);
if (recved <= 0) {
perror("Recv error");
close(clientFd[i]);
if (i == tempMaxFd) {
*maxFd = -1;
}
clientFd[i] = 0;
continue;
}
printf("Recv: %s \n", buf);
send(clientFd[i], buf, recved, 0);
}
bzero(buf, recved);
if (FD_ISSET(clientFd[i], except)) {
recved = recv(clientFd[i], buf, BUFSIZE - 1, MSG_OOB);
if (recved <= 0) {
perror("Recv error");
close(clientFd[i]);
if (i == tempMaxFd) {
*maxFd = -1;
}
clientFd[i] = 0;
continue;
}
printf("OOB: %s \n", buf);
send(clientFd[i], buf, recved, MSG_OOB);
}
}
}
void ChooseMaxFd(int *clientFd, int listenFd, int total, int *maxFd)
{
int i = listenFd;
for (;i < total; ++i) {
if(clientFd[i] != 0) {
*maxFd = clientFd[i];
}
}
}
void CloseFd(int *clientFd, int listenFd, int total)
{
int i = listenFd + 1;
for (;i < total; ++i) {
if(clientFd[i] != 0) {
close(clientFd[i]);
clientFd[i] = 0;
}
}
}
int main(int argc, char* argv[])
{
int err = 0;
int listenFd = -1;
do {
listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd < 0) {
err = errno;
perror("Create listen socket failed");
break;
}
struct sockaddr_in bindaddr;
bzero(&bindaddr, 0);
bindaddr.sin_addr.s_addr = inet_addr(IP);
bindaddr.sin_port = htons(PORT);
bindaddr.sin_family = AF_INET;
socklen_t socklen = sizeof(bindaddr);
int reuse = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
err = bind(listenFd, (sockaddr*)&bindaddr, socklen);
if (err != 0) {
err = errno;
perror("Listen socket bind failed!");
break;
}
err = listen(listenFd, BACKLOG);
if (err != 0) {
err = errno;
perror("Listen socket listen failed!");
break;
}
/* use select read data */
fd_set read_set;
fd_set exception_set;
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
int *clientFd = (int*)malloc(sizeof(int) * (FD_SETSIZE + listenFd + 1));
clientFd[listenFd] = listenFd;
int maxFd = -1;
while (1) {
if (maxFd == -1) {
ChooseMaxFd(clientFd, listenFd, FD_SETSIZE + listenFd + 1, &maxFd);
}
SetFDSet(clientFd, listenFd, &read_set, &exception_set, &maxFd);
if (maxFd != (FD_SETSIZE + listenFd)) {
FD_SET(listenFd, &read_set);
}
tv.tv_sec = 5;
tv.tv_usec = 0;
err = select(maxFd + 1, &read_set, NULL, &exception_set, &tv);
if (err < 0) {
perror("Select failed");
err = errno;
break;
} else if (err == 0) {
printf("Select timeout!\n");
} else {
if (FD_ISSET(listenFd, &read_set)) {
struct sockaddr_in clientaddr;
socklen_t clientlen;
int tempFd= -1;
tempFd = accept(listenFd, (sockaddr*)&clientaddr, &clientlen);
if (tempFd < 0) {
err = errno;
perror("accept failed!");
break;
}
clientFd[tempFd] = tempFd;
if (tempFd > maxFd) {
maxFd = tempFd;
}
printf("Accept client fd: %d\n", tempFd);
}
CheckFDSet(clientFd, listenFd, &read_set, &exception_set, &maxFd);
}
}
CloseFd(clientFd, listenFd, FD_SETSIZE + listenFd + 1);
free(clientFd);
} while(0);
if (listenFd != -1) {
close(listenFd);
listenFd = -1;
}
return err;
}