tcp的粘包問題觊觎了很久,但是一直都沒有動手實現。
直到今天才動手開始實現:先貼原理(steal) :)
一般所謂的TCP粘包是在一次接收數據不能完全地體現一個完整的消息數據。TCP通訊為何存在粘包呢?主要原因是TCP是以流的方式來處理數據,再加上網絡上MTU的往往小於在應用處理的消息數據,所以就會引發一次接收的數據無法滿足消息的需要,導致粘包的存在。處理粘包的唯一方法就是制定應用層的數據通訊協議,通過協議來規范現有接收的數據是否滿足消息數據的需要。在應用中處理粘包的基礎方法主要有兩種分別是以4節字描述消息大小或以結束符,實際上也有兩者相結合的如HTTP,redis的通訊協議等。
在平時交流過程發現一些朋友即使做了這些協議的處理,但有時在處理數據的時候也會出現數據不對的情況。這主要原因他們在一些個別情況下沒有處理好。因為當一系列的消息發送過來的時候,對於4節字頭或結束符分布位置都是不確定的。一種簡單的情況就是當前消息處理完成後,緊接著就是處理一下個消息的4節字描述,但在實際情況下當前接收的buffer剩下的內容有可能不足4節字的。如果你想通過通訊的程序來測這情況相對來說觸發的機率性不高,所以對於協議分析的功能最好通過單元測試來模擬。
通過下面這個圖可以更清晰地了解協議標記數據分布的情況 (http://www.cnblogs.com/smark/p/3284756.html)盜圖要留名
關於tcp粘包的處理感覺還是用定長作為包頭比較方面,也更加高效。在實際環境中,每次接收數據或者發送數據,我們都需要一個buffer來數據。
於是我寫了這樣一個buffer類。負責粘包與拆包。
對於服務器的接收程序:
vector<char> buffer; int packIndex; int packSize; //when encode is low water level int bufferIndex; //when encode is high water level用每個buffer的前packIndex用來表示包頭。這裡用了int 不過總感覺uint32是一個更好的選擇(一開始,我用了size_t。突然想起來size_t在不同位的操作系統位數不同。。。。。)
packsize就是包體的大小,通過解析包頭來獲得(PS : 包頭解析這一塊做得不夠好,希望有大神看到後,能給指條明路)
bufferIndex則表示收到的消息
在解析端,每次有消息通過appenddecode存放到buffer的同時做一個存儲空間充足的判斷,空間不足則繼續分配。
關鍵在於unpack,每次有消息追加進來時,都判斷一下包體是否做過解析,即packsize是否為0
如此一來,通過getdecode輕輕松松就可以獲取真+++包的內容了。
對於服務端的發送程序
對於encode系列的程序,相比較decode,我感覺不怎麼理想。。一開始我是准備將二者合而為一,這樣每次只需要調用buffer,但後來發現不太現實,二者的需求太多地方不同。
對於encode系列的程序,packsize就沒有什麼用了。因為我們每次只需要加個包頭,然後將消息投遞出去就好了,或許有更好的解決方法————見重點關注。其實packindex在這裡也作用不大,不過好歹起一個標記包頭長度的作用。
一開始的時候,我天真的覺得主要一個bufferindex就足夠了。直到我想到,每次發送端發送的能力有限。這個沒有用的packsize就又有了重新利用的價值(三寶再次合體)。
於是將其視作是一個發送緩沖區的低水位,bufferindex則作為發送緩沖區的高水位,這樣就可以通過getencode函數將一小塊數據取出去,或許可以寫一個while循環getsize為0時緩沖區就清空了。
還有一點vector不如指針那樣容易分配和釋放內存,於是我寫了一個clearbuffer來幫助釋放內存,雖然一般情況下不使用它。
重點關注:
void unPack(){ if(packSize==0){ if(bufferIndex>=packIndex){ char ps[packIndex]; std::copy(begin(),begin()+packIndex,ps); for(int i=packIndex-1;i>=0;i--){ packSize=packSize*256+ps[i]; } //packSize=*(int*)ps; }else{ packSize=0; } } }這裡總感覺做得不夠高效,一個for循環老是感覺很別扭。
剛開始的時候想到用諸如 *(T*)的強制轉換來做,但總是感覺怪怪的,而且大小端的情況會導致一系列的錯誤,在包頭很小的情況下用for效率還行。
這裡突然發現c++也能支持變量:)
好了,說了那麼多代碼奉上求大神訂正:
#ifndef _buffer_當然還有個小小的測試程序:#define _buffer_
#include <stdio.h>
#include <assert.h>
#include <string>
#include <vector>
#include <iostream>
using namespace std;
class Buffer
{
private:
vector<char> buffer; int packIndex; int packSize; //when encode is low water level int bufferIndex; //when encode is high water level
char* begin(){
return &*buffer.begin();
}
const char* begin()const{
return &*buffer.begin();
}
void makeSpace(int len){
if(writableSize()<len){
buffer.resize(bufferIndex+len);
}
}
void moveDelete(int len){
assert(len<=bufferIndex);
std::copy(begin()+len,begin()+bufferIndex,begin());
bufferIndex-=len;
packSize=0;
}
public:
static const int InitSize=1024;
static const int MaxPack=8;
static const int pInitIndex=2;
explicit Buffer(int packin=pInitIndex,int initsize=InitSize):buffer(initsize){
assert(packin<=MaxPack);
assert(packin>0);
packIndex=packin;
packSize=0;
bufferIndex=0;
}
int readableSize()const{
if(bufferIndex<packIndex){
return 0;
}
return bufferIndex-packIndex;
}
int writableSize()const{
return static_cast<int>(buffer.size()-bufferIndex);
}
char* readBegin(){
return begin()+packIndex;
}
const char* readBegin()const{
return begin()+packIndex;
}
char* writeBegin(){
return begin()+bufferIndex;
}
const char* writeBegin()const{
return begin()+bufferIndex;
}
bool hldiff(){
if(packSize<bufferIndex){
return true;
}
return false;
}
bool decodable(){
if(packSize!=0&&packSize<readableSize()){
return true;
}
return false;
}
//test func
int packNow(){
return packSize;
}
int bufferNow(){
return bufferIndex;
}
void clearBuffer(){
printf("%lu",buffer.size());
vector<char> item;
item.swap(buffer);
printf("%lu",buffer.size());
}
void unPack(){ if(packSize==0){ if(bufferIndex>=packIndex){ char ps[packIndex]; std::copy(begin(),begin()+packIndex,ps); for(int i=packIndex-1;i>=0;i--){ packSize=packSize*256+ps[i]; } //packSize=*(int*)ps; }else{ packSize=0; } } }
string getDecode(){
string ret(readBegin(),packSize);
if(packSize!=0){
moveDelete(packSize+packIndex);
}
if(bufferNow()>=packIndex){
unPack();
}
return ret;
}
string getEncode(int getsize){
if(getsize+packSize<=bufferIndex){
string ret(begin()+packSize,getsize);
packSize+=getsize;
return ret;
}else{
assert(packSize<=bufferIndex);
string ret(begin()+packSize,bufferIndex-packSize);
packSize=0;
bufferIndex=0;
return ret;
}
}
string getAll(){
string ret(begin(),bufferIndex);
bufferIndex=0;
packSize=0;
return ret;
}
void hasWritten(int len){
assert(len<=writableSize());
bufferIndex+=len;
}
void ensureWriteableSize(int len){
if(writableSize()<len){
makeSpace(len);
}
assert(writableSize()>=len);
}
void appendDecode(const char* data,int len){
ensureWriteableSize(len);
std::copy(data,data+len,writeBegin());
hasWritten(len);
unPack();
}
void appendDecode(const void* data, int len)
{
appendDecode(static_cast<const char*>(data), len);
}
void appendDecode(string data){
appendDecode(data.c_str(),static_cast<int>(data.size()));
}
void appendEncode(const char* data,int len){
ensureWriteableSize(len+packIndex);
char ps[packIndex];
int len_tem=len;
for(int i=0;i<packIndex;i++){
ps[i]=len_tem%256;
len_tem=len_tem>>8;
}
//char ps[4];
//strcpy(ps,(char*)&len);
std::copy(ps,ps+packIndex,writeBegin());
std::copy(data,data+len,writeBegin()+packIndex);
hasWritten(len+packIndex);
}
void appendEncode(const void* data, int len)
{
appendEncode(static_cast<const char*>(data), len);
}
void appendEncode(string data){
appendEncode(data.c_str(),static_cast<int>(data.size()));
}
};
#endif
#include "buffer.h" #include <iostream> #include <ctype.h> int main(){ string h="hello!"; Buffer buff; buff.appendEncode(h); cout<<buff.packNow()<<" "<<buff.bufferNow()<<endl; buff.appendEncode("world",6); cout<<buff.packNow()<<" "<<buff.bufferNow()<<endl; string ge=buff.getEncode(2); cout<<ge<<" "<<ge.size()<<endl; string ga=buff.getAll(); cout<<ga<<" "<<ga.size()<<endl; for(int i=0;i<1000;i++){ buff.appendDecode(ga); cout<<buff.packNow()<<" "<<buff.bufferNow()<<endl; } for(int j=0;j<1000;j++){ string gd=buff.getDecode(); cout<<gd<<" "<<gd.size()<<endl; } buff.clearBuffer(); }