接觸到GO之後,GO的網絡支持非常令人喜歡。GO實現了在語法層面上可以保持同步語義,但是卻又沒有犧牲太多性能,底層一樣使用了IO路徑復用,比如在LINUX下用了EPOLL,在WINDOWS下用了IOCP。
但是在開發服務端程序的時候,很多都是被動觸發的,都是客戶端發送來的請求需要處理。天生就是一個event-based的程序。而在GO下,因為並發是作為語言的一部分,goroutine, channel等特性則很容易的使程序員在實現功能時從容的在同步與異步之間進行轉換。
因為自己的需要,我針對event-based場景的服務端做了簡易的封裝。具體代碼見這裡.
因為GO的IO機制和並發原語的原生支持,再加上對網絡API的封裝,程序員可以簡單的實現一個高效的服務端或者客戶端程序。一般的實現就是調用net.Listen(“tcp4”, address)得到一個net.Listener,然後無限循環調用net.Listener.Accept,之後就可以得到一個net.Conn,可以調用net.Conn的接口設置發送和接收緩沖區大小,可以設置KEEPALIVE等。因為TCP的雙工特性,所以可以針對一個net.Conn可以專門啟動一個goroutine去無限循環接收對端發來的數據,然後解包等。
我的想法是在這個簡單實現的基礎上做一層薄薄的封裝,使其盡量的精簡,但是又不失靈活。希望能夠適應不同的協議,對使用者造成盡量小的約束。
該對象就是對net.Conn的一個簡易封裝,可以通過swnet.Server.AcceptLoop得到,也可以通過swnet.NewSession創建新的對象,這種一般是客戶端情境下使用。得到Session對象後,可以調用Start方法開始工作。之所以還暴露出一個方法叫Start是因為在服務端下,可能會有某些需求,比如針對IP設置了ACL,那麼,把Start行為交給使用者決定如何調用。但是這裡需要注意的是,如果使用者不想Start,使用者有責任自己Close掉,否則會造成資源洩露。
Start後,會啟動兩個goroutine,一個用於專門接收對端發來的數據,一個專門用來發送數據到對端。想發送數據到對端,可以用AsyncSend方法,該方法會把要發送的數據排隊到發送通道。這裡使用通道的原因是因為在服務端情境下,有必要對發送的數據進行排隊,防止發送很快,但是對端接收很慢,或者過多的調用AsyncSend方法,導致堆積了太多的數據,增加了內存的壓力。通過channel來控制發送速率我認為是比較合理的。同時,還提供了方法可以用來修改channel的長度,一是調用NewSession時傳入指定大小,二是調用Session.SetSendChannelSize設置大小,但是要注意的是,調用此方法時必須在Start之前完成,否則會產生錯誤。這樣做的原因也是因為沒必要動態更改發送通道大小。
如果發送channel滿了,AsyncSend方法會返回ErrSendChanBlocking。增加這個錯誤類型也是因為上面的設計導致的。不返回這個錯誤,就沒有辦法讓使用者得到處理該問題的機會。使用者如果拿到該錯誤,可以自己試著分析問題的原因,或者可以嘗試循環發送,或者直接丟棄該次的發送數據。總之能夠讓使用者得到自己處理的機會。
如果Session對象已經Close了,那麼調用AsyncSend會返回ErrStoped錯誤。除此之外,因為AsyncSend是把數據排隊到發送channel中,那麼使用者有責任確保發送的數據在發送完成前不會修改。
如果數據發送失敗,或者其他原因,我的實現是直接粗暴的Close掉該Session。
還有就是,可能有些用例情景下,會發送比較大的數據包,比如64K大小,或者32K大小的數據等,未了避免反復申請內存,特此為Session增加了SetSendCallback方法。可以設置一個回調函數,用於在發送完成後可以調用該回調,給予使用者回收數據對象的機會,比如可以配合sync.Pool使用。雖然我自己測試時並沒有太大的效果。
為了方便使用者設置一些net.Conn參數,增加了一個RawConn方法,可以獲取到net.Conn 的實例。這裡其實是挺糾結的。因為暴露出這個內部資源後,會給予使用者一個非常大的靈活度。它可以直接繞過Session的發送channel,自己玩自己的。不過出於方便使用者使用的目的,我還是這麼做了。使用者自己承擔相應的責任。其實這裡還可以像net.HTTP那樣增加一個Hijack方法,讓使用者自己接管net.Conn,自己玩自己的。
Session中的很多SET/GET方法都是沒有加鎖的。一方面是因為很多操作在Start前一次完成,或者是GET的數據不是那麼緊密的。
有些時候,如果一個Session被關閉了,可能需要知道這個行為。所以提供了SetCloseCallback方法,可以設置該方法。不設置也沒有關系。調用closeCallback時會確保只調用一次。
因為目標之一就是能夠隔離具體協議格式。所以對協議做了抽象。只需要實現PacketProtocol接口即可:
// PacketReader is used to unmarshal a complete packet from buff type PacketReader interface { // Read data from conn and build a complete packet. // How to read from conn is up to you. You can set read timeout or other option. // If buff's capacity is small, you can make a new buff, then return it, // so can reuse to reduce memory overhead. ReadPacket(conn net.Conn, buff []byte) (interface{}, []byte, error) } // PacketWriter is used to marshal packet into buff type PacketWriter interface { // Build a complete packet. If buff's capacity is too small, you can make a new one // and return it to reuse. BuildPacket(packet interface{}, buff []byte) ([]byte, error) // How to write data to conn is up to you. So you can set write timeout or other option. WritePacket(conn net.Conn, buff []byte) error } // PacketProtocol just a composite interface type PacketProtocol interface { PacketReader PacketWriter }
也就是實現PacketReader/PacketWriter兩個接口。為了讓內存盡量的復用,減少內存壓力,所以在ReadPacket方法和BuildPacket方法的返回值中需要返回一個切片。框架會在第一次調用時傳入一個默認大小的切片到這兩個方法中,如果容量不夠,使用者可以自己重新建立切片,然後寫入數據後返回該切片。下一次再實用時就使用這個返回出來的切片。
其中ReadPacket方法是在一個專門用於接收數據的goroutine中調用。實現者可以自己根據自己的策略進行讀取,因為傳入了net.Conn,所以使用者可以自己設置I/O Timeout。實現者有責任返回一個完整的請求包。如果中間出了錯誤,有必要返回一個error。當發現有error後,會關閉該Session。這樣做的原因是當讀取或者構建一個請求包失敗時,可能是數據錯誤,可能是鏈路錯誤,或者其他原因,總之,個人認為這種情況下沒有必要繼續處理,直接關閉鏈接。而且這裡還有一個需要注意的事項,返回出來的請求包中的數據如果有包含切片類型的數據,建議重新分配一個切片,然後從buff中拷貝進去,盡量不要對buff切片做復用,否則可能會產生額外的BUG。
BuildPacket方法是在一個專門處理發送的goroutine中調用。當發送goroutine收到數據包後,會調用BuildPacket,實現者就可以按照自己的私有格式進行序列化。同樣的,buff不夠,就自己重新構造一個buff,然後填充數據,並返回這個buff。
WritePacket是給予實現者自己個性化發送的需求。可能實現者需要設置I/O Timeout.
基於event-based的實現,總是少不了要做的事情就是把一個請求包轉發到對應的處理函數中。但是具體怎麼轉,怎麼做是取決於具體的用例情景和實現的。所以我這裡做的非常簡單,就是定義了一個PacketHandler接口:
// PacketHandler is used to process packet that recved from remote session type PacketHandler interface { // When got a valid packet from PacketReader, you can dispatch it. Handle(s *Session, packet interface{}) }
使用者自己實現對應的Handle方法即可。當接收數據的goroutine收到對端發來的數據並調用PacketReader.ReadPacket後,會調用Handle方法 ,傳入該Session實例與請求包。傳入Session的目的是方便使用者不用去維護一個Session的實例。因為有的程序員要實現的邏輯可能比較簡單,他僅僅用Session就滿足了他的需求,他只需要實現對應的處理函數就好了。處理完成後,就調用Session.AsyncSend發送回應包。
這裡其實可以提供一個簡單的默認版本的實現的。但是考慮到協議的不同,那麼就導致調度的key的不同,所以還是讓使用者自己發揮吧。
使用者其實在這裡有很大的自由度,他可以做基於map關系的回調分發邏輯,也可以做一個簡單的實現邏輯,然後通過type assert做相應的實現。具體也是看各自的口味而定。我是比較喜歡後者,可以減少很多的Register,實現出Actor Model + Pattern Match味道的東西。
這裡還要說一下對服務端的一個簡易封裝。Server的實現非常簡單,就是反復的去Accept,然後構造一個Session,之後就是調用用戶傳入的回調函數,就完活了。使用者可以自己傳入net.Listener,可以傳入PacketProtocol, PacketHandler以及SendChanSize。這些參數會在構造Session時傳入進去,可以減少重復的代碼實現。Server.AcceptLoop不會關閉構造出來的Session,使用者負責完成這件事情!
整體非常簡陋,只是搭了一個模制。在我自己未公開的代碼裡,其實是實現了我所在公司的協議,實現了PacketProtocol。為此還專門寫了個代碼生成器。
還有就是NewServer需要傳入一個net.Listener,比較蛋疼。後面再決定是否干掉。NewSession需要傳入net.Conn,其實是妥協的產物,因為net.Listener返回的就是net.Conn,這個實例需要交給Session使用,不得已而為之,但是這裡囧的是,客戶端使用的時候,需要自己去net.Dial,得到一個net.Conn,也許該提供一個swnet.Dial方法。
我這個發布的代碼是在原有的代碼基礎上進行了修改,從達達的https://github.com/funny/link中得到了一些啟發,但是又有很多的不同。再次感謝達達的貢獻。
Ubuntu 14.04 上搭建 Golang 開發環境配置 http://www.linuxidc.com/Linux/2015-02/113977.htm
Linux系統入門學習-在Linux中安裝Go語言 http://www.linuxidc.com/Linux/2015-02/113159.htm
Ubuntu 安裝Go語言包 http://www.linuxidc.com/Linux/2013-05/85171.htm
《Go語言編程》高清完整版電子書 http://www.linuxidc.com/Linux/2013-05/84709.htm
Go語言並行之美 -- 超越 “Hello World” http://www.linuxidc.com/Linux/2013-05/83697.htm
我為什麼喜歡Go語言 http://www.linuxidc.com/Linux/2013-05/84060.htm
Go語言內存分配器的實現 http://www.linuxidc.com/Linux/2014-01/94766.htm