TCP粘包問題的產生
由於TCP協議是基於字節流並且無邊界的傳輸協議, 因此很有可能產生粘包問題。此外,發送方引起的粘包是由TCP協議本身造成的,TCP為提高傳輸效率,發送方往往要收集到足夠多的數據後才發送一個TCP段。若連續幾次需要send的數據都很少,通常TCP會根據優化算法把這些數據合成一個TCP段後一次發送出去,但是接收方並不知道要一次接收多少字節的數據,這樣接收方就收到了粘包數據。具體可以見下圖:
假設主機A send了兩條消息M1和M2 各10k 給主機B,由於主機B一次提取的字節數是不確定的,接收方提取數據的情況可能是:
? 一次性提取20k 數據粘包問題產生的多種原因:
1、SQ_SNDBUF 套接字本身有緩沖區大小的限制 (發送緩沖區、接受緩沖區)
2、TCP傳送的端 MSS大小限制
3、鏈路層也有MTU大小限制,如果數據包大於>MTU要在IP層進行分片,導致數據分割。
4、TCP的流量控制和擁塞控制,也可能導致粘包
5、文章開始提到的TCP延遲確認機制等
注: 關於MTU和MSS
MSS指的是TCP中的一個概念。MTU是一個沒有固定到特定OSI層的概念,不受其他特定協議限制。也就是說第二層會有MTU,第三層會有MTU,像MPLS這樣的第2.5層協議,也有自己的MTU值。並且不同層之間存在關聯關系。舉個例子:如果你要搬家,需要把東西打包,用車運走。這樣的情況下,車的大小受路的寬度限制;箱子的大小受車限制;能夠搬運的東西的大小受箱子的限制。這時可以將路的寬度理解成第二層的MTU,車的大小理解成第三層的MTU,箱子的大小理解成第四層的MTU,搬運的東西理解成MSS。
粘包問題的解決方案(本質上是要在應用層維護消息和消息之間的邊界)
(1)定長包
該方式並不實用: 如果所定義的長度過長, 則會浪費網絡帶寬,增加網絡負擔;而又如果定義的長度過短, 則一條消息又會拆分成為多條, 僅在TCP的應用一層就增加了合並的開銷。
(2)包尾加\r\n(FTP使用方案)
如果消息本身含有\r\n字符,則也分不清消息的邊界;
(3)報文長度+報文內容,自定義包結構
(4)更復雜的應用層協議
注:簡單的使用 setsockopt 設置開啟TCP_NODELAY禁用 Nagle’s Algorithm可以解決上述第5個問題(延遲確認機制)。
static void _set_tcp_nodelay(int fd) { int enable = 1; setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void*)&enable, sizeof(enable)); }著名的Nginx服務器 默認是開啟了這個選項的.....
因為TCP協議是面向流的,read和write調用的返回值往往小於參數指定的字節數。對於read調用(套接字標志為阻塞),如果接收緩沖區中有20字節,請求讀100個字節,就會返回20;對於write調用,如果請求寫100個字節,而發送緩沖區中只有20個字節的空閒位置,那麼write會阻塞,直到把100個字節全部交給發送緩沖區才返回;還有信號中斷之後需要處理為 繼續讀寫;為避免這些情況干擾主程序的邏輯,確保讀寫我們所請求的字節數,我們實現了兩個包裝函數readn和writen,如下所示。
/**實現: 這兩個函數只是按需多次調用read和write系統調用直至讀/寫了count個數據 **/ /**返回值說明: == count: 說明正確返回, 已經真正讀取了count個字節 == -1 : 讀取出錯返回 < count: 讀取到了末尾 **/ ssize_t readn(int fd, void *buf, size_t count) { size_t nLeft = count; ssize_t nRead = 0; char *pBuf = (char *)buf; while (nLeft > 0) { if ((nRead = read(fd, pBuf, nLeft)) < 0) { //如果讀取操作是被信號打斷了, 則說明還可以繼續讀 if (errno == EINTR) continue; //否則就是其他錯誤 else return -1; } //讀取到末尾 else if (nRead == 0) return count-nLeft; //正常讀取 nLeft -= nRead; pBuf += nRead; } return count; }
/**返回值說明: == count: 說明正確返回, 已經真正寫入了count個字節 == -1 : 寫入出錯返回 **/ ssize_t writen(int fd, const void *buf, size_t count) { size_t nLeft = count; ssize_t nWritten = 0; char *pBuf = (char *)buf; while (nLeft > 0) { if ((nWritten = write(fd, pBuf, nLeft)) < 0) { //如果寫入操作是被信號打斷了, 則說明還可以繼續寫入 if (errno == EINTR) continue; //否則就是其他錯誤 else return -1; } //如果 ==0則說明是什麼也沒寫入, 可以繼續寫 else if (nWritten == 0) continue; //正常寫入 nLeft -= nWritten; pBuf += nWritten; } return count; }
發報文時:前四個字節長度+報文內容一次性發送;
收報文時:先讀前四個字節,求出報文內容長度;根據長度讀數據
自定義包結構:struct Packet { unsigned int msgLen; //數據部分的長度(注:這是網絡字節序) char text[1024]; //報文的數據部分 };
//echo 回射client端發送與接收代碼 ... struct Packet buf; memset(&buf, 0, sizeof(buf)); while (fgets(buf.text, sizeof(buf.text), stdin) != NULL) { /**寫入部分**/ unsigned int lenHost = strlen(buf.text); buf.msgLen = htonl(lenHost); if (writen(sockfd, &buf, sizeof(buf.msgLen)+lenHost) == -1) err_exit("writen socket error"); /**讀取部分**/ memset(&buf, 0, sizeof(buf)); //首先讀取首部 ssize_t readBytes = readn(sockfd, &buf.msgLen, sizeof(buf.msgLen)); if (readBytes == -1) err_exit("read socket error"); else if (readBytes != sizeof(buf.msgLen)) { cerr << "server connect closed... \nexiting..." << endl; break; } //然後讀取數據部分 lenHost = ntohl(buf.msgLen); readBytes = readn(sockfd, buf.text, lenHost); if (readBytes == -1) err_exit("read socket error"); else if (readBytes != lenHost) { cerr << "server connect closed... \nexiting..." << endl; break; } //將數據部分打印輸出 cout << buf.text; memset(&buf, 0, sizeof(buf)); } ...
//server端echo部分的改進代碼 void echo(int clientfd) { struct Packet buf; int readBytes; //首先讀取首部 while ((readBytes = readn(clientfd, &buf.msgLen, sizeof(buf.msgLen))) > 0) { //網絡字節序 -> 主機字節序 int lenHost = ntohl(buf.msgLen); //然後讀取數據部分 readBytes = readn(clientfd, buf.text, lenHost); if (readBytes == -1) err_exit("readn socket error"); else if (readBytes != lenHost) { cerr << "client connect closed..." << endl; return ; } cout << buf.text; //然後將其回寫回socket if (writen(clientfd, &buf, sizeof(buf.msgLen)+lenHost) == -1) err_exit("write socket error"); memset(&buf, 0, sizeof(buf)); } if (readBytes == -1) err_exit("read socket error"); else if (readBytes != sizeof(buf.msgLen)) cerr << "client connect closed..." << endl; }注:網絡字節序和本機字節序之間是必要的轉換。 按行讀取(由\r\n判斷)
ssize_t recv(int sockfd, void *buf, size_t len, int flags); ssize_t send(int sockfd, const void *buf, size_t len, int flags);與read相比,recv只能用於套接字文件描述符,但是多了一個flags,這個flags能夠幫助我們實現解決粘包問題的操作。
MSG_PEEK(可以讀數據,但不從緩存區中讀走[僅僅是一瞥],利用此特點可以方便的實現按行讀取數據;一個一個字符的讀,多次調用系統調用read方法,效率不高,但是可以判斷'\n')。
This flag causes the receive operation to return data from the beginning of
the receive queue without removing that data from the queue. Thus, a subsequent
receive call will return the same data.
readline實現思想:
在readline函數中,我們先用recv_peek”偷窺” 一下現在緩沖區有多少個字符並讀取到pBuf,然後查看是否存在換行符'\n'。如果存在,則使用readn連同換行符一起讀取(作用相當於清空socket緩沖區); 如果不存在,也清空一下緩沖區, 且移動pBuf的位置,回到while循環開頭,再次窺看。注意,當我們調用readn讀取數據時,那部分緩沖區是會被清空的,因為readn調用了read函數。還需注意一點是,如果第二次才讀取到了'\n',則先用returnCount保存了第一次讀取的字符個數,然後返回的ret需加上原先的數據大小。
/**示例: 通過MSG_PEEK封裝一個recv_peek函數(僅查看數據, 但不取走)**/ ssize_t recv_peek(int sockfd, void *buf, size_t len) { while (true) { int ret = recv(sockfd, buf, len, MSG_PEEK); //如果recv是由於被信號打斷, 則需要繼續(continue)查看 if (ret == -1 && errno == EINTR) continue; return ret; } } /**使用recv_peek實現按行讀取readline(只能用於socket)**/ /** 返回值說明: == 0: 對端關閉 == -1: 讀取出錯 其他: 一行的字節數(包含'\n') **/ ssize_t readline(int sockfd, void *buf, size_t maxline) { int ret; int nRead = 0; int returnCount = 0; char *pBuf = (char *)buf; int nLeft = maxline; while (true) { ret = recv_peek(sockfd, pBuf, nLeft); //如果查看失敗或者對端關閉, 則直接返回 if (ret <= 0) return ret; nRead = ret; for (int i = 0; i < nRead; ++i) //在當前查看的這段緩沖區中含有'\n', 則說明已經可以讀取一行了 if (pBuf[i] == '\n') { //則將緩沖區內容讀出 //注意是i+1: 將'\n'也讀出 ret = readn(sockfd, pBuf, i+1); if (ret != i+1) exit(EXIT_FAILURE); return ret + returnCount; } // 如果在查看的這段消息中沒有發現'\n', 則說明還不滿足一條消息, // 在將這段消息從緩沖中讀出之後, 還需要繼續查看 ret = readn(sockfd, pBuf, nRead);; if (ret != nRead) exit(EXIT_FAILURE); pBuf += nRead; nLeft -= nRead; returnCount += nRead; } //如果程序能夠走到這裡, 則說明是出錯了 return -1; }
... char buf[512] = {0}; memset(buf, 0, sizeof(buf)); while (fgets(buf, sizeof(buf), stdin) != NULL) { if (writen(sockfd, buf, strlen(buf)) == -1) err_exit("writen error"); memset(buf, 0, sizeof(buf)); int readBytes = readline(sockfd, buf, sizeof(buf)); if (readBytes == -1) err_exit("readline error"); else if (readBytes == 0) { cerr << "server connect closed..." << endl; break; } cout << buf; memset(buf, 0, sizeof(buf)); } ...
void echo(int clientfd) { char buf[512] = {0}; int readBytes; while ((readBytes = readline(clientfd, buf, sizeof(buf))) > 0) { cout << buf; if (writen(clientfd, buf, readBytes) == -1) err_exit("writen error"); memset(buf, 0, sizeof(buf)); } if (readBytes == -1) err_exit("readline error"); else if (readBytes == 0) cerr << "client connect closed..." << endl; }