我們已經知道,異步處理是搞定大並發的根本,接下來的問題是,如何讓一個就緒的socket和一個業務邏輯連接對應起來,這個問題在同步模型下並不存在,因為一個線程只處理一個連接。曾經的event機制比如select,poll,它們只能告訴你socket n就緒了,你不得不自己去通過數據結構來組織socket n和該連接信息之間的關系,典型的如下:
struct conn {
int sd;
void *others;
};
list conns;
在調用epoll_ctrl將一個socket加入到epoll中時,API會為你提供一個指針,讓你直接綁定一個socket描述符和一個指針,一旦socket就緒,取出的是一個結構體,其中包含了與該socket對應的指針,因此你便可以這麼做:
conn.sd = sd;
conn.others = all;
ev.events = EPOLLIN;
ev.data.ptr = &conn;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev);
while (1) {
nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) {
conn = events[n].data.ptr;
recv(conn.sd, ....);
....
}
}
/* 加入偵聽socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);
/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);
while(1) {
nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) {
if (events[n].data.ptr == context) {
child_sd = accept(context.sd, remote_addr....);
multi_instance *mi = create_mi(child_sd, remote_addr, ...);
entry.ptr = mi;
entry.type = SOCKET;
new_ev.events = EPOLLIN;
new_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
....
} else if (events[n].data.ptr.type == SOCKET){
multi_instance *mi = events[n].data.ptr;
data = read_from_socket(mi);
// 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包
decrypt(mi, data);
write_to_tun(data);
} else {
tun *tun = events[n].data.ptr.ptr;
packet = read_from_tun(tun);
lock(mi_hashtable);
multi_instance *mi = lookup_multi_instance_from(packet);
unlock(mi_hashtable);
encrypt(packet);
write_to_socket(packet, mi);
}
}
...
}
/* 加入唯一的UDP socket */
context.sd = udp_sd;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);
/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);
while(1) {
nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) { //實際上nfds最多也就是2
if (events[n].data.ptr == context) {
data = recvfrom(context.sd, remote_addr....);
lock(mi_hashtable); //如果多線程,這個鎖將會成為瓶頸,即便是RW鎖也一樣
multi_instance *mi = lookup_mi(child_sd, remote_addr, ...); //再好的hash算法,也不是0成本的!
unlock(mi_hashtable);
// 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包
decrypt(mi, data);
write_to_tun(data);
....
} else {
tun *tun = events[n].data.ptr.ptr;
packet = read_from_tun(tun);
lock(mi_hashtable);
multi_instance *mi = lookup_multi_instance_from(packet);
unlock(mi_hashtable);
encrypt(packet);
write_to_socket(packet, mi);
}
}
...
}
#include#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define SO_REUSEPORT 15 #define MAXBUF 10240 #define MAXEPOLLSIZE 100 int flag = 0; int read_data(int sd) { char recvbuf[MAXBUF + 1]; int ret; struct sockaddr_in client_addr; socklen_t cli_len=sizeof(client_addr); bzero(recvbuf, MAXBUF + 1); ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len); if (ret > 0) { printf("read[%d]: %s from %d\n", ret, recvbuf, sd); } else { printf("read err:%s %d\n", strerror(errno), ret); } fflush(stdout); } int udp_accept(int sd, struct sockaddr_in my_addr) { int new_sd = -1; int ret = 0; int reuse = 1; char buf[16]; struct sockaddr_in peer_addr; socklen_t cli_len = sizeof(peer_addr); ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len); if (ret > 0) { } if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { perror("child socket"); exit(1); } else { printf("parent:%d new:%d\n", sd, new_sd); } ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse)); if (ret) { exit(1); } ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); if (ret) { exit(1); } ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)); if (ret){ perror("chid bind"); exit(1); } else { } peer_addr.sin_family = PF_INET; printf("aaa:%s\n", inet_ntoa(peer_addr.sin_addr)); if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) { perror("chid connect"); exit(1); } else { } out: return new_sd; } int main(int argc, char **argv) { int listener, kdpfd, nfds, n, curfds; socklen_t len; struct sockaddr_in my_addr, their_addr; unsigned int port; struct epoll_event ev; struct epoll_event events[MAXEPOLLSIZE]; int opt = 1;; int ret = 0; port = 1234; if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { perror("socket"); exit(1); } else { printf("socket OK\n"); } ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); if (ret) { exit(1); } ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); if (ret) { exit(1); } bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = PF_INET; my_addr.sin_port = htons(port); my_addr.sin_addr.s_addr = INADDR_ANY; if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } else { printf("IP bind OK\n"); } kdpfd = epoll_create(MAXEPOLLSIZE); ev.events = EPOLLIN|EPOLLET; ev.data.fd = listener; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%dn", listener); return -1; } else { printf("ep add OK\n"); } while (1) { nfds = epoll_wait(kdpfd, events, 10000, -1); if (nfds == -1) { perror("epoll_wait"); break; } for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listener) { printf("listener:%d\n", n); int new_sd; struct epoll_event child_ev; new_sd = udp_accept(listener, my_addr); child_ev.events = EPOLLIN; child_ev.data.fd = new_sd; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd); return -1; } } else { read_data(events[n].data.fd); } } } close(listener); return 0; }
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
int proto)
{
......................
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);
if (sk != NULL) {
int ret;
#if 1
// 這個UDP_LISTEN,通過setsockopt來設置
if (sk->sk_state == UDP_LISTEN) {
// 如果是UDP的listener,創建一個數據socket
struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC);
if (newsk) {
struct inet_sock *newinet;
// 為這個數據傳輸socket根據skb來填充4元組信息
newinet = inet_sk(newsk);
newinet->inet_daddr = ip_hdr(skb)->saddr;
newinet->inet_rcv_saddr = ip_hdr(skb)->daddr;
newinet->inet_saddr = ip_hdr(skb)->daddr;
rcu_assign_pointer(newinet->inet_opt, NULL);
newinet->mc_index = inet_iif(skb);
newinet->mc_ttl = ip_hdr(skb)->ttl;
newinet->rcv_tos = ip_hdr(skb)->tos;
newinet->inet_id = 0xffffffff ^ jiffies;
inet_sk_rx_dst_set(newsk, skb);
// sock結構體新增csk變量,類似TCP的accept queue,但是為了簡單,目前每個Listen socket只能持有一個csk,即child sock。
sk->csk = newsk;
// 將新的數據傳輸socket排入全局的UDP socket hash表
if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) {
printk("[UDP listen] get port error\n");
release_sock(newsk);
err = -2;
goto out_go;
}
ret = udp_queue_rcv_skb(newsk, skb);
// 喚醒epoll,讓epoll返回UDP Listener
sk->sk_data_ready(sk, 0);
sock_put(newsk);
} else {
printk("[UDP listen] create new error\n");
sock_put(sk);
return -1;
}
out_go:
sock_put(sk);
if (ret > 0)
return -ret;
return 0;
}
#endif
ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);
......................
}
listen = 1;
listener = socket(PF_INET, SOCK_DGRAM, 0);
setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen));
/* 加入偵聽socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);
/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);
while(1) {
nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) {
if (events[n].data.ptr == context) {
getsockopt(context.sd, SOL_SOCKET, &newsock_info....);
child_sd = newsock_info.sd;
multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...);
entry.ptr = mi;
entry.type = SOCKET;
new_ev.events = EPOLLIN;
new_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
// 這是UDP,內核除了通知Listener之外,還會將數據排入child_sd,因此需要去讀取,可以參考TCP的Fastopen邏輯
data = recvfrom(child_sd, ....);
....
} else if (events[n].data.ptr.type == SOCKET){
multi_instance *mi = events[n].data.ptr;
data = read_from_socket(mi);
// 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包
decrypt(mi, data);
write_to_tun(data);
} else {
tun *tun = events[n].data.ptr.ptr;
packet = read_from_tun(tun);
lock(mi_hashtable);
multi_instance *mi = lookup_multi_instance_from(packet);
unlock(mi_hashtable);
encrypt(packet);
write_to_socket(packet, mi);
}
}
...
}