我們已經知道,異步處理是搞定大並發的根本,接下來的問題是,如何讓一個就緒的socket和一個業務邏輯連接對應起來,這個問題在同步模型下並不存在,因為一個線程只處理一個連接。曾經的event機制比如select,poll,它們只能告訴你socket n就緒了,你不得不自己去通過數據結構來組織socket n和該連接信息之間的關系,典型的如下:
struct conn { int sd; void *others; }; list conns;
在調用epoll_ctrl將一個socket加入到epoll中時,API會為你提供一個指針,讓你直接綁定一個socket描述符和一個指針,一旦socket就緒,取出的是一個結構體,其中包含了與該socket對應的指針,因此你便可以這麼做:
conn.sd = sd; conn.others = all; ev.events = EPOLLIN; ev.data.ptr = &conn; epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev); while (1) { nfds = epoll_wait(kdpfd, events, 10000, -1); for (n = 0; n < nfds; ++n) { conn = events[n].data.ptr; recv(conn.sd, ....); .... } }
/* 加入偵聽socket */ context.sd = listener; context.others = dont_care; listen_ev.events = EPOLLIN; listen_ev.data.ptr = context; epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev); /* 加入TUN網卡 */ tun.sd = tun; tun.others = dont_care; entry.ptr = tun; entry.type = TUN; tun_ev.events = EPOLLIN; tun_ev.data.ptr = entry; epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev); while(1) { nfds = epoll_wait(kdpfd, events, 10000, -1); for (n = 0; n < nfds; ++n) { if (events[n].data.ptr == context) { child_sd = accept(context.sd, remote_addr....); multi_instance *mi = create_mi(child_sd, remote_addr, ...); entry.ptr = mi; entry.type = SOCKET; new_ev.events = EPOLLIN; new_ev.data.ptr = entry; epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev); .... } else if (events[n].data.ptr.type == SOCKET){ multi_instance *mi = events[n].data.ptr; data = read_from_socket(mi); // 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包 decrypt(mi, data); write_to_tun(data); } else { tun *tun = events[n].data.ptr.ptr; packet = read_from_tun(tun); lock(mi_hashtable); multi_instance *mi = lookup_multi_instance_from(packet); unlock(mi_hashtable); encrypt(packet); write_to_socket(packet, mi); } } ... }
/* 加入唯一的UDP socket */ context.sd = udp_sd; context.others = dont_care; listen_ev.events = EPOLLIN; listen_ev.data.ptr = context; epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev); /* 加入TUN網卡 */ tun.sd = tun; tun.others = dont_care; entry.ptr = tun; entry.type = TUN; tun_ev.events = EPOLLIN; tun_ev.data.ptr = entry; epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev); while(1) { nfds = epoll_wait(kdpfd, events, 10000, -1); for (n = 0; n < nfds; ++n) { //實際上nfds最多也就是2 if (events[n].data.ptr == context) { data = recvfrom(context.sd, remote_addr....); lock(mi_hashtable); //如果多線程,這個鎖將會成為瓶頸,即便是RW鎖也一樣 multi_instance *mi = lookup_mi(child_sd, remote_addr, ...); //再好的hash算法,也不是0成本的! unlock(mi_hashtable); // 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包 decrypt(mi, data); write_to_tun(data); .... } else { tun *tun = events[n].data.ptr.ptr; packet = read_from_tun(tun); lock(mi_hashtable); multi_instance *mi = lookup_multi_instance_from(packet); unlock(mi_hashtable); encrypt(packet); write_to_socket(packet, mi); } } ... }
#include#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define SO_REUSEPORT 15 #define MAXBUF 10240 #define MAXEPOLLSIZE 100 int flag = 0; int read_data(int sd) { char recvbuf[MAXBUF + 1]; int ret; struct sockaddr_in client_addr; socklen_t cli_len=sizeof(client_addr); bzero(recvbuf, MAXBUF + 1); ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len); if (ret > 0) { printf("read[%d]: %s from %d\n", ret, recvbuf, sd); } else { printf("read err:%s %d\n", strerror(errno), ret); } fflush(stdout); } int udp_accept(int sd, struct sockaddr_in my_addr) { int new_sd = -1; int ret = 0; int reuse = 1; char buf[16]; struct sockaddr_in peer_addr; socklen_t cli_len = sizeof(peer_addr); ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len); if (ret > 0) { } if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { perror("child socket"); exit(1); } else { printf("parent:%d new:%d\n", sd, new_sd); } ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse)); if (ret) { exit(1); } ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); if (ret) { exit(1); } ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)); if (ret){ perror("chid bind"); exit(1); } else { } peer_addr.sin_family = PF_INET; printf("aaa:%s\n", inet_ntoa(peer_addr.sin_addr)); if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) { perror("chid connect"); exit(1); } else { } out: return new_sd; } int main(int argc, char **argv) { int listener, kdpfd, nfds, n, curfds; socklen_t len; struct sockaddr_in my_addr, their_addr; unsigned int port; struct epoll_event ev; struct epoll_event events[MAXEPOLLSIZE]; int opt = 1;; int ret = 0; port = 1234; if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { perror("socket"); exit(1); } else { printf("socket OK\n"); } ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); if (ret) { exit(1); } ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); if (ret) { exit(1); } bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = PF_INET; my_addr.sin_port = htons(port); my_addr.sin_addr.s_addr = INADDR_ANY; if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } else { printf("IP bind OK\n"); } kdpfd = epoll_create(MAXEPOLLSIZE); ev.events = EPOLLIN|EPOLLET; ev.data.fd = listener; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%dn", listener); return -1; } else { printf("ep add OK\n"); } while (1) { nfds = epoll_wait(kdpfd, events, 10000, -1); if (nfds == -1) { perror("epoll_wait"); break; } for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listener) { printf("listener:%d\n", n); int new_sd; struct epoll_event child_ev; new_sd = udp_accept(listener, my_addr); child_ev.events = EPOLLIN; child_ev.data.fd = new_sd; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd); return -1; } } else { read_data(events[n].data.fd); } } } close(listener); return 0; }
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable, int proto) { ...................... sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable); if (sk != NULL) { int ret; #if 1 // 這個UDP_LISTEN,通過setsockopt來設置 if (sk->sk_state == UDP_LISTEN) { // 如果是UDP的listener,創建一個數據socket struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC); if (newsk) { struct inet_sock *newinet; // 為這個數據傳輸socket根據skb來填充4元組信息 newinet = inet_sk(newsk); newinet->inet_daddr = ip_hdr(skb)->saddr; newinet->inet_rcv_saddr = ip_hdr(skb)->daddr; newinet->inet_saddr = ip_hdr(skb)->daddr; rcu_assign_pointer(newinet->inet_opt, NULL); newinet->mc_index = inet_iif(skb); newinet->mc_ttl = ip_hdr(skb)->ttl; newinet->rcv_tos = ip_hdr(skb)->tos; newinet->inet_id = 0xffffffff ^ jiffies; inet_sk_rx_dst_set(newsk, skb); // sock結構體新增csk變量,類似TCP的accept queue,但是為了簡單,目前每個Listen socket只能持有一個csk,即child sock。 sk->csk = newsk; // 將新的數據傳輸socket排入全局的UDP socket hash表 if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) { printk("[UDP listen] get port error\n"); release_sock(newsk); err = -2; goto out_go; } ret = udp_queue_rcv_skb(newsk, skb); // 喚醒epoll,讓epoll返回UDP Listener sk->sk_data_ready(sk, 0); sock_put(newsk); } else { printk("[UDP listen] create new error\n"); sock_put(sk); return -1; } out_go: sock_put(sk); if (ret > 0) return -ret; return 0; } #endif ret = udp_queue_rcv_skb(sk, skb); sock_put(sk); ...................... }
listen = 1; listener = socket(PF_INET, SOCK_DGRAM, 0); setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen)); /* 加入偵聽socket */ context.sd = listener; context.others = dont_care; listen_ev.events = EPOLLIN; listen_ev.data.ptr = context; epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev); /* 加入TUN網卡 */ tun.sd = tun; tun.others = dont_care; entry.ptr = tun; entry.type = TUN; tun_ev.events = EPOLLIN; tun_ev.data.ptr = entry; epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev); while(1) { nfds = epoll_wait(kdpfd, events, 10000, -1); for (n = 0; n < nfds; ++n) { if (events[n].data.ptr == context) { getsockopt(context.sd, SOL_SOCKET, &newsock_info....); child_sd = newsock_info.sd; multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...); entry.ptr = mi; entry.type = SOCKET; new_ev.events = EPOLLIN; new_ev.data.ptr = entry; epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev); // 這是UDP,內核除了通知Listener之外,還會將數據排入child_sd,因此需要去讀取,可以參考TCP的Fastopen邏輯 data = recvfrom(child_sd, ....); .... } else if (events[n].data.ptr.type == SOCKET){ multi_instance *mi = events[n].data.ptr; data = read_from_socket(mi); // 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包 decrypt(mi, data); write_to_tun(data); } else { tun *tun = events[n].data.ptr.ptr; packet = read_from_tun(tun); lock(mi_hashtable); multi_instance *mi = lookup_multi_instance_from(packet); unlock(mi_hashtable); encrypt(packet); write_to_socket(packet, mi); } } ... }