net/rpc是golang標准庫提供的rpc框架,下面我們重點看下net/rpc是如何實現的。 我本機源碼安裝路徑在/usr/local/go,這net/rpc(golang 1.4版本)涉及到的相關代碼主要有:
client.go
server.go
首先我們先從client.go,客戶端入手看:
type ClientCodecinterface{// WriteRequest must be safe for concurrent use by multiple goroutines. WriteRequest(*Request,interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
type Callstruct{ServiceMethodstring// The name of the service and method to call. Argsinterface{}// The argument to the function (*struct). Replyinterface{}// The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call// Strobes when call is complete. Tracer*Trace// tracer }
type Clientstruct{
codec ClientCodec
reqMutex sync.Mutex// protects following
request Request
mutex sync.Mutex// protects following
seq uint64
pending map[uint64]*Call
closing bool// user has called Close
shutdown bool// server has told us to stop }
func (client *Client) send(call *Call){// client要想復用,保證線程安全,加上請求鎖reqMutex是必須的。
client.reqMutex.Lock()
defer client.reqMutex.Unlock()// 這其實是針對map的另外一把鎖,這樣可以更細粒度的操作
client.mutex.Lock()// client如果外部調用關閉,那麼call也是結束狀態,之後我們再分析call.done() if client.shutdown || client.closing {
call.Error=ErrShutdown
client.mutex.Unlock()
call.done()return}// 重點來了!seq序號自增在把call請求暫存在pennding的map中,鎖釋放
seq := client.seq
client.seq++
client.pending[seq]= call
client.mutex.Unlock()// 這一塊代碼屬於編碼請求了,因為rpc涉及到調用具體是誰,所以需要把method傳給rpc server// 這裡的Seq是用於當server response的時候,seq從client->server,再從server->client,然後反查map,定位call對象使用的。
client.request.Seq= seq
client.request.ServiceMethod= call.ServiceMethod// inject tracer,這個請忽視。。。
client.request.Tracer= call.Tracer
err := client.codec.WriteRequest(&client.request, call.Args)if err !=nil{
client.mutex.Lock()
call = client.pending[seq]delete(client.pending, seq)
client.mutex.Unlock()if call !=nil{
call.Error= err
call.done()}}}
我們使用rpc的時候,都知道client是線程安全的,client其實是基於單個socket連接來,依賴channel來實現復用連接以及並行的。而臨時的調用對象Call都是保存在Client的map中的,至於每個call怎麼查找,也是根據seq序列號在請求server時候轉發過去,之後response的時候,client根據返回的seq再反查結果的。不難看出,實現了ClientCodec之後就可以自定義rpc協議請求頭和內容了。那麼send函數中的Call對象是從哪裡來的?
// 我們rpc請求的時候,調用就是這個方法,傳入方法名,參數,獲取返回等
func (client *Client)Call(serviceMethod string, args interface{}, reply interface{}) error {// Call裡面調用了client.Go,然後返回一個chan,之後阻塞等待,這是基本的同步調用
call :=<-client.Go(serviceMethod, args, reply, make(chan *Call,1)).Donereturn call.Error}
func (client *Client)Go(serviceMethod string, args interface{}, reply interface{},done chan *Call)*Call{// 構建call對象
call :=new(Call)
call.ServiceMethod= serviceMethod
call.Args= args
call.Reply= reply
// 如果非外部傳入call,自己構建 ifdone==nil{done= make(chan *Call,10)// buffered. }else{// If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done)==0{
log.Panic("rpc: done channel is unbuffered")}}
call.Done=done// 發送請求
client.send(call)return call
}
在初始化client的時候,我們會指定ip,port等
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string)(*Client, error){
conn, err := net.Dial(network, address)if err !=nil{returnnil, err
}returnNewClient(conn),nil}// 我們看到其實NewClient內部使用的默認的gob編碼,gobClientCodes實現了Codec的接口
func NewClient(conn io.ReadWriteCloser)*Client{
encBuf := bufio.NewWriter(conn)
client :=&gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}returnNewClientWithCodec(client)}// 當然也提供自定義的codec,你可以使用thrift協議、messagepack等來擴展 // codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec)*Client{
client :=&Client{
codec: codec,
pending: make(map[uint64]*Call),}
go client.input()return client
}
type gobClientCodec struct{
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer}
最後,NewClient會後台開啟一枚goroutine,就是接受server返回然後轉發具體調用者了。
func (client *Client) input(){var err error
var response Responsefor err ==nil{// 二話不說先獲取Response的頭
response =Response{}
err = client.codec.ReadResponseHeader(&response)if err !=nil{break}// 頭部中包含了序列號,用於定位pending map使用的
seq := response.Seq// 小粒度鎖刪除map,獲取call對象
client.mutex.Lock()
call := client.pending[seq]delete(client.pending, seq)
client.mutex.Unlock()switch{// 如果pending找不到,那麼肯定是異常了 case call ==nil:// We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)if err !=nil{
err = errors.New("reading error body: "+ err.Error())}// rpc 報錯了,解不開什麼的都有可能 case response.Error!="":// We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one.
call.Error=ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)if err !=nil{
err = errors.New("reading error body: "+ err.Error())}
call.done()default:// 默認還是正常的處理,獲取Body給Reply,讓調用者可見
err = client.codec.ReadResponseBody(call.Reply)if err !=nil{
call.Error= errors.New("reading body "+ err.Error())}
call.done()}}// 如果有啥不可逆的異常,那麼只能shutdown client了。全部退出吧 // Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown =true
closing := client.closing
if err == io.EOF {if closing {
err =ErrShutdown}else{
err = io.ErrUnexpectedEOF}}// 之前pending的也一個個結束吧,避免調用者都等待 for _, call := range client.pending {
call.Error= err
call.done()}
client.mutex.Unlock()
client.reqMutex.Unlock()if debugLog && err != io.EOF &&!closing {
log.Println("rpc: client protocol error:", err)}}
最後call.done做了什麼了,相比你也猜到:
// 把call對象傳遞給調用者,主要是獲取內部的Error
func (call *Call)done(){select{case call.Done<- call:// ok default:// We don't want to block here. It is the caller's responsibility to make// sure the channel has enough buffer space. See comment in Go(). if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")}}}
大致的分析就結束了,但是完整的rpc框架,還應該包括,服務發現,服務降級,服務追蹤,服務容錯等, 服務發現:可以使用zk,以及配合client定制的方式實現
服務降級:可以在zk中指定服務質量,以及根據回饋系統來drop request
服務追蹤:最近我在看Twitter的Zipkin和Google的Dapper,對核心rpc庫修改的方式避免大量植入代碼,但是golang要做到這點有點困難,一是AOP不好支持,所以現在只能考慮用侵入代碼,有更好思路的可以聯系我!
服務容錯:因為input本身單連接請求獲取server,有可能<-call一直不返回,導致業務大量hang,這個可以考慮加上一些channel的timeout特性來實現,只不過浪費了一些內存。
總體來說net/rpc還是一個不錯的框架,但是幾個地方需要考慮,一個是全局大鎖reqMutex,另外是call對象會大量創建(可否考慮call pool等)
Golang 1.4 net/rpc server源碼解析 http://www.linuxidc.com/Linux/2015-04/116467.htm
Ubuntu 14.04 上搭建 Golang 開發環境配置 http://www.linuxidc.com/Linux/2015-02/113977.htm
Linux系統入門學習-在Linux中安裝Go語言 http://www.linuxidc.com/Linux/2015-02/113159.htm
Ubuntu 安裝Go語言包 http://www.linuxidc.com/Linux/2013-05/85171.htm
《Go語言編程》高清完整版電子書 http://www.linuxidc.com/Linux/2013-05/84709.htm
Go語言並行之美 -- 超越 “Hello World” http://www.linuxidc.com/Linux/2013-05/83697.htm
我為什麼喜歡Go語言 http://www.linuxidc.com/Linux/2013-05/84060.htm
Go語言內存分配器的實現 http://www.linuxidc.com/Linux/2014-01/94766.htm