歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux基礎 >> 關於Linux

Linux下套接字詳解(九)---poll模式下的IO多路復用服務器

參照

poll調用深入解析-從poll的實現來講poll多路復用模型,很有深度

poll多路復用


poll的機制與select類似,與select在本質上沒有多大差別,管理多個描述符也是進行輪詢,根據描述符的狀態進行處理,但是poll沒有最大文件描述符數量的限制。

poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數量的增加而線性增大。

poll編程模型


這裡寫圖片描述

函數原型


函數格式如下所示:

# include 
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);

參數說明


fds

是一個struct pollfd結構類型的數組,用於存放需要檢測其狀態的Socket描述符;每當調用這個函數之後,系統不會清空這個數組,操作起來比較方便;特別是對於socket連接比較多的情況下,在一定程度上可以提高處理的效率;這一點與select()函數不同,調用select()函數之後,select()函數會清空它所檢測的socket描述符集合,導致每次調用select()之前都必須把socket描述符重新加入到待檢測的集合中;因此,select()函數適合於只檢測一個socket描述符的情況,而poll()函數適合於大量socket描述符的情況;

nfds

nfds_t類型的參數,用於標記數組fds中的結構體元素的總數量;

timeout

是poll函數調用阻塞的時間,單位:毫秒;

和 select 一樣,最後一個參數 timeout 指定 poll() 將在超時前等待一個事件多長事件。這裡有 3 種情況:

timeout 為 -1
這會造成 poll 永遠等待。poll() 只有在一個描述符就緒時返回,或者在調用進程捕捉到信號時返回(在這裡,poll 返回 -1),並且設置 errno 值為 EINTR 。-1 可以用宏定義常量 INFTIM 來代替(在 pth.h 中有定義) 。

timeout 等於0
在這種情況下,測試所有的描述符,並且 poll() 立刻返回。這允許在 poll 中沒有阻塞的情況下找出多個文件描述符的狀態。

time > 0
這將以毫秒為單位指定 timeout 的超時周期。poll() 只有在超時到期時返回,除非一個描述符變為就緒,在這種情況下,它立刻返回。如果超時周期到齊,poll() 返回 0。這裡也可能會因為某個信號而中斷該等待。

和 select 一樣,文件描述符是否阻塞對 poll 是否阻塞沒有任何影響。

返回值和錯誤代碼


成功時,poll()返回結構體中revents域不為0的文件描述符個數;如果在超時前沒有任何事件發生,poll()返回0;失敗時,poll()返回-1

>0

數組fds中准備好讀、寫或出錯狀態的那些socket描述符的總數量;

==0

數組fds中沒有任何socket描述符准備好讀、寫,或出錯;此時poll超時,超時時間是timeout毫秒;換句話說,如果所檢測的socket描述符上沒有任何事件發生的話,那麼poll()函數會阻塞timeout所指定的毫秒時間長度之後返回,如果timeout==0,那麼poll() 函數立即返回而不阻塞,如果timeout==INFTIM,那麼poll() 函數會一直阻塞下去,直到所檢測的socket描述符上的感興趣的事件發生是才返回,如果感興趣的事件永遠不發生,那麼poll()就會永遠阻塞下去;

-1

poll函數調用失敗,同時會自動設置全局變量errno為下列值之一

errno 描述 EBADF 一個或多個結構體中指定的文件描述符無效 EFAULTfds 指針指向的地址超出進程的地址空間 EINTR 請求的事件之前產生一個信號,調用可以重新發起 EINVALnfds 參數超出PLIMIT_NOFILE值 ENOMEM 可用內存不足,無法完成請求

pollfd結構體


struct pollfd
{
    int fd;         /* 文件描述符 */
    short events;         /* 等待的事件 */
    short revents;       /* 實際發生了的事件 */
} ; 

fd 成員表示感興趣的,且打開了的文件描述符;

events 成員是位掩碼,用於指定針對這個文件描述符感興趣的事件;

revents 成員是位掩碼,用於指定當 poll 返回時,在該文件描述符上已經發生了哪些事情。

每一個pollfd結構體指定了一個被監視的文件描述符,可以傳遞多個結構體,指示poll()監視多個文件描述符。

事件


在poll返回時,我們可以檢查revents中的標志,對應於文件描述符請求的events結構體。如果POLLIN事件被設置,則文件描述符可以被讀取而不阻塞。

如果POLLOUT被設置,則文件描述符可以寫入而不導致阻塞。這些標志並不是互斥的:它們可能被同時設置,表示這個文件描述符的讀取和寫入操作都會正常返回而不阻塞。

timeout參數指定等待的毫秒數,無論I/O是否准備好,poll都會返回。timeout指定為負數值表示無限超時,使poll()一直掛起直到一個指定事件發生;timeout為0指示poll調用立即返回並列出准備好I/O的文件描述符,但並不等待其它的事件。這種情況下,poll()就像它的名字那樣,一旦選舉出來,立即返回。

event注冊的事件,通過revents返回


每個結構體的events域是監視該文件描述符的事件掩碼,由用戶來設置這個域。

revents域是文件描述符的操作結果事件掩碼,內核在調用返回時設置這個域。

events域中請求的任何事件都可能在revents域中返回。

例如fds[0].events = POLLIN; /將測試條件設置成普通或優先級帶數據可讀/

然後 int pollresult = poll(fds,xx,xx); //這樣就可以監聽fds裡面文件描述符了,當滿足特定條件就返回,並將結果保存在revents中。

事件描述符概述


合法的事件如下:

事件 描述 POLLIN 有數據可讀 POLLRDNORM 有普通數據可讀 POLLRDBAND 有優先數據可讀。 POLLPRI 有緊迫數據可讀。 POLLOUT 寫數據不會導致阻塞 POLLWRNORM 寫普通數據不會導致阻塞 POLLWRBAND 寫優先數據不會導致阻塞 POLLMSGSIGPOLL 消息可用。

此外,revents域中還可能返回下列事件:

事件 描述 POLLER 指定的文件描述符發生錯誤 POLLHUP 指定的文件描述符掛起事件 POLLNVAL 指定的文件描述符非法

這些事件在events域中無意義,因為它們在合適的時候總是會從revents中返回。

事件使用技巧

POLLIN

events 中使用該宏常數,能夠在折本文件的可讀情況下,結束 poll() 函數。相反,revents 上使用該宏常數,在檢查 poll() 函數結束後,可依此判斷設備文件是否處於可讀狀態(即使消息長度是 0)。

POLLPRI

在 events 域中使用該宏常數,能夠在設備文件的高優先級數據讀取狀態下,結束 poll() 函數。相反,revents 上使用該宏常數,在檢查 poll() 函數結束後,可依此判斷設備文件是否處於可讀高優先級數據的狀態(即使消息長度是 0)。該宏常數用於處理網絡信息包(packet) 的數據傳遞。

POLLOUT

在 events 域中使用該宏常數,能夠在設備文件的寫入狀態下,結束 poll() 函數。相反,revents 域上使用該宏常數,在檢查 poll() 結束後,可依此判斷設備文件是否處於可寫狀態。

POLLERR

在 events 域中使用該宏常數,能夠在設備文件上發生錯誤時,結束 poll() 函數。相反,revents 域上使用該宏函數,在檢查 poll() 函數結束後,可依此判斷設備文件是否出錯。

POLLHUP

在 events域中使用該宏常數,能夠在設備文件中發生 hungup 時,結束 poll() 函數 。相反,在檢查 poll() 結束後,可依此判斷設備文件是否發生 hungup 。

POLLNVAL

在 events 域中使用該宏函數,能夠在文件描述符的值無效時,結束 poll() 。相反,在 revents 域上使用該宏函數時,在檢查 poll() 函數後,文件描述符是否有效。可用於處理網絡信息時,檢查 socket handler 是否已經無效。

poll與select的區別與聯系


使用poll()和select()不一樣,你不需要顯式地請求異常情況報告。

POLLIN | POLLPRI等價於select()的讀事件,
POLLOUT |POLLWRBAND等價於select()的寫事件。
POLLIN等價於POLLRDNORM |POLLRDBAND,
而POLLOUT則等價於POLLWRNORM。

例如,要同時監視一個文件描述符是否可讀和可寫,我們可以設置 events為POLLIN |POLLOUT。

示例


poll的本質是輪訓,就是監聽我們所有的文件描述符的所注冊的事件,當有事件請求時,poll返回,然後我們輪詢所有的描述符,找到有時間請求的那個即可

服務器server


#define _GNU_SOURCE 1
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include 
#include 

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535

typedef struct client_data                  //  客戶端的數據結構
{
    struct sockaddr_in  address;            //
    char*               write_buf;                //  發送數據緩沖區
    char                buf[ BUFFER_SIZE ];        //  接收數據緩沖區
}client_data;

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;

    fcntl( fd, F_SETFL, new_option );

    return old_option;
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int                 listenfd;
    int                 ret = 0;
    struct sockaddr_in  address;

    client_data         *users = NULL;

    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    //
    //  創建服務器的監聽套接字
    //
    if( ( listenfd = socket( PF_INET, SOCK_STREAM, 0 ) ) < 0)
    {
        perror("create socket error...\n");
        exit(-1);
    }
    else
    {
        printf("create socket success...\n");
    }

    //
    //  命名服務器的監聽套接字
    //
    if((ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address))) < 0 )
    {

        perror("bind socket error...\n");
        exit(-1);
    }
    else
    {
        printf("bind socket success...\n");
    }

    if((ret = listen(listenfd, 5)) < 0)
    {
        perror("listen error...\n");
    }
    else
    {
        printf("start listen...\n");
    }
    assert( ret != -1 );

    if((users = (client_data *)malloc(sizeof(client_data) * FD_LIMIT)) == NULL)
    {
        perror("malloc client_data error...");
    }
    else
    {
        printf("malloc client_data success...");
    }

    struct pollfd          fds[USER_LIMIT + 1];
    /*  Data structure describing a polling request.
        struct pollfd
        {
            int fd;                      poll 的文件描述符.
            short int events;            fd 上感興趣的事件(等待的事件).
            short int revents;           fd 上實際發生的事件.
        };
    */

    /// 初始化poll的
    int user_counter = 0;
    for( int i = 1; i <= USER_LIMIT; ++i )
    {
        fds[i].fd = -1;
        fds[i].events = 0;
    }
    fds[0].fd = listenfd;
    fds[0].events = POLLIN | POLLERR;           //  POLLIN表示有數據可讀, POLLERR表示出錯
    fds[0].revents = 0;

    while( 1 )
    {
        ret = poll( fds,                        //  准備輪訓的套接字文件描述符
                    user_counter + 1,           //
                    -1);                        //   poll 永遠等待。poll() 只有在一個描述符就緒時返回,或者在調用進程捕捉到信號時返回
        if ( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }

        ///
        /// poll模型的本質就是輪訓, 在pull返回時,輪詢所有的文件描述符, 查找到有事情請求的那個文件
        ///
        for( int i = 0; i < user_counter + 1; ++i )
        {
            if((fds[i].fd == listenfd)                  /*  監聽的是服務器套接字, 此時如果有數據可讀,說明有客戶端請求鏈接*/
             && (fds[i].revents & POLLIN))              /*  有數據可讀取  */
            {
                struct sockaddr_in  client_address;
                socklen_t           client_addrlength = sizeof( client_address );

                //  開始接收客戶端的鏈接
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d\n", errno );
                    continue;
                }
                if( user_counter >= USER_LIMIT )
                {
                    const char* info = "too many users\n";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }

                user_counter++;
                users[connfd].address = client_address;
                setnonblocking( connfd );
                fds[user_counter].fd = connfd;
                fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
                fds[user_counter].revents = 0;
                printf( "comes a new user, now have %d users\n", user_counter );
            }
            else if( fds[i].revents & POLLERR )                     //  數據出錯
            {
                printf( "get an error from %d\n", fds[i].fd );
                char errors[ 100 ];
                memset( errors, '\0', 100 );
                socklen_t length = sizeof( errors );
                if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
                {
                    printf( "get socket option failed\n" );
                }
                continue;
            }
            else if( fds[i].revents & POLLRDHUP )                   //  被掛起---斷開
            {
                users[fds[i].fd] = users[fds[user_counter].fd];
                close( fds[i].fd );
                fds[i] = fds[user_counter];
                i--;
                user_counter--;
                printf( "a client left\n" );
            }
            else if( fds[i].revents & POLLIN )                      //  客戶端套接字有數據可寫
            {
                int connfd = fds[i].fd;
                memset( users[connfd].buf, '\0', BUFFER_SIZE );
                ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );
                printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        close( connfd );
                        users[fds[i].fd] = users[fds[user_counter].fd];
                        fds[i] = fds[user_counter];
                        i--;
                        user_counter--;
                    }
                }
                else if( ret == 0 )
                {
                    printf( "code should not come to here\n" );
                }
                else
                {
                    for( int j = 1; j <= user_counter; ++j )
                    {
                        if( fds[j].fd == connfd )
                        {
                            continue;
                        }

                        fds[j].events |= ~POLLIN;
                        fds[j].events |= POLLOUT;
                        users[fds[j].fd].write_buf = users[connfd].buf;
                    }
                }
            }
            else if( fds[i].revents & POLLOUT )                     //  服務器向外發送數據
            {
                int connfd = fds[i].fd;
                if( ! users[connfd].write_buf )
                {
                    continue;
                }
                ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );
                users[connfd].write_buf = NULL;
                fds[i].events |= ~POLLOUT;
                fds[i].events |= POLLIN;
            }
        }
    }

    free(users);
    close( listenfd );
    return 0;
}

服務器client


#define _GNU_SOURCE 1
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define BUFFER_SIZE 64

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    struct sockaddr_in server_address;
    bzero( &server_address, sizeof( server_address ) );
    server_address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &server_address.sin_addr );
    server_address.sin_port = htons( port );

    int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( sockfd >= 0 );

    if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
    {
        printf( "connection failed\n" );
        close( sockfd );
        return 1;
    }

    struct pollfd fds[2];
    //  添加標准輸入
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    //  添加套接字描述符
    fds[1].fd = sockfd;
    fds[1].events = POLLIN | POLLRDHUP;
    fds[1].revents = 0;

    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret = pipe( pipefd );
    assert( ret != -1 );

    while( 1 )
    {
        ret = poll( fds, 2, -1 );
        if( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }

        if( fds[1].revents & POLLRDHUP )
        {
            printf( "server close the connection\n" );
            break;
        }
        else if( fds[1].revents & POLLIN )
        {
            memset( read_buf, '\0', BUFFER_SIZE );
            recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
            printf( "%s\n", read_buf );
        }

        if( fds[0].revents & POLLIN )
        {
            ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
            ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
        }
    }

    close( sockfd );
    return 0;
}
Copyright © Linux教程網 All Rights Reserved