歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Golang 1.4 net/rpc server源碼解析

上一篇(http://www.linuxidc.com/Linux/2015-04/116466.htm)文章我們講了net/rpc中client部分的實現,我本機源碼安裝路徑在/usr/local/go,這net/rpc(golang 1.4版本)涉及到的相關代碼主要有:

server.go

方法注冊:

因為從client我們知道是復用的socket來實現並發調用rpc方法,我們先從方法注冊來看源碼部分:

// Server對象大都是保存方法存根,保證對象互斥的
type Serverstruct{                                                            
    mu         sync.RWMutex// protects the serviceMap                          
    serviceMap map[string]*service                                              
    reqLock    sync.Mutex// protects freeReq                                   
    freeReq    *Request                                                         
    respLock   sync.Mutex// protects freeResp                                  
    freeResp   *Response}

func NewServer()*Server{return&Server{serviceMap: make(map[string]*service)}}// rpc.Register默認使用了一個Server,只對serviceMap進行了初始化                          varDefaultServer=NewServer()// rpc的service包括方法名、方法反射,類型等
type service struct{                                                           
    name   string// name of service                            
    rcvr   reflect.Value// receiver of methods for the service        
    typ    reflect.Type// type of the receiver                       
    method map[string]*methodType // registered methods                         }// 無論是RegisterName、Register最終都調用了register的內部方法
func (server *Server)register(rcvr interface{}, name string, useName bool) error {// 保證注冊服務安全,先加鎖
    server.mu.Lock()                                                               
    defer server.mu.Unlock()// 如果服務為空,默認注冊一個                                                      if server.serviceMap ==nil{                                                  
        server.serviceMap = make(map[string]*service)}// 獲取注冊服務的反射信息                                                                              
    s :=new(service)                                                              
    s.typ = reflect.TypeOf(rcvr)                                                   
    s.rcvr = reflect.ValueOf(rcvr)// 可以使用自定義名稱                                               
    sname := reflect.Indirect(s.rcvr).Type().Name()if useName {                                                                   
        sname = name                                                               
    }if sname ==""{                                                               
        s :="rpc.Register: no service name for type "+ s.typ.String()            
        log.Print(s)return errors.New(s)}// 方法必須是暴露的,既服務名首字符大寫                                                                             if!isExported(sname)&&!useName {                                            
        s :="rpc.Register: type "+ sname +" is not exported"                    
        log.Print(s)return errors.New(s)}// 不允許重復注冊                                                                         if _, present := server.serviceMap[sname]; present {return errors.New("rpc: service already defined: "+ sname)}                                                                           
    s.name = sname                                                              

    // 開始注冊rpc struct內部的方法存根                                                      
    s.method = suitableMethods(s.typ,true)// 如果struct內部一個方法也沒,那麼直接報錯,錯誤信息還非常詳細                                                                            if len(s.method)==0{                                                     
        str :=""// To help the user, see if a pointer receiver would work.              
        method := suitableMethods(reflect.PtrTo(s.typ),false)if len(method)!=0{                                                   
            str ="rpc.Register: type "+ sname +" has no exported methods of suitable type (hint: pass a pointer to value of that type)"}else{                                                                
            str ="rpc.Register: type "+ sname +" has no exported methods of suitable type"}                                                                       
        log.Print(str)return errors.New(str)}// 保存在server的serviceMap中                                                                           
    server.serviceMap[s.name]= s                                               
    returnnil}// 上文提到了服務還需要方法存根的注冊
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {// 根據方法名創建保存內部方法map
    methods := make(map[string]*methodType)// 獲取rpc struct內部的方法                                     for m :=0; m < typ.NumMethod(); m++{                                      
        method := typ.Method(m)                                                 
        mtype := method.Type                                                    
        mname := method.Name// 之前對這行代碼覺得比較奇葩,方法是否是暴露,是看是否有PkgPath的,如果是私有方法,PkgPath顯示包名                       if method.PkgPath!=""{continue}// 判斷是否是三個參數:第一個是結構本身,第二個是參數,第三個是返回值                                                    // Method needs three ins: receiver, *args, *reply.                     if mtype.NumIn()!=3{if reportErr {                                                      
                log.Println("method", mname,"has wrong number of ins:", mtype.NumIn())}continue}// args是指針類型                                                                     // First arg need not be a pointer.                                     
        argType := mtype.In(1)if!isExportedOrBuiltinType(argType){if reportErr {                                                      
                log.Println(mname,"argument type not exported:", argType)}continue}// reply是指針類型                                                                   // Second arg must be a pointer.                                        
        replyType := mtype.In(2)if replyType.Kind()!= reflect.Ptr{if reportErr {                                                      
                log.Println("method", mname,"reply type not a pointer:", replyType)}continue}// Reply type must be exported.       // reply必須是可暴露的                                  if!isExportedOrBuiltinType(replyType){if reportErr {                                                      
                log.Println("method", mname,"reply type not exported:", replyType)}continue}// Method needs one out.  // 必須有一個返回值,而且要是error                                              if mtype.NumOut()!=1{if reportErr {                                                      
                log.Println("method", mname,"has wrong number of outs:", mtype.NumOut())}continue}// The return type of the method must be error.                         if returnType := mtype.Out(0); returnType != typeOfError {if reportErr {                                                      
                log.Println("method", mname,"returns", returnType.String(),"not error")}continue}                                                                       
        methods[mname]=&methodType{method: method,ArgType: argType,ReplyType: replyType}}return methods                                                              
}

請求調用:

方法已經被注冊成功,接下來我們看看是如何客戶端發送請求調用的:

func (server *Server)Accept(lis net.Listener){for{                                                                       
        conn, err := lis.Accept()if err !=nil{                                                         
            log.Fatal("rpc.Serve: accept:", err.Error())// TODO(r): exit?      }// accept連接以後,打開一個goroutine處理請求                                                                       
        go server.ServeConn(conn)}} 

func (server *Server)ServeConn(conn io.ReadWriteCloser){                      
    buf := bufio.NewWriter(conn)                                                
    srv :=&gobServerCodec{                                                     
        rwc:    conn,                                                           
        dec:    gob.NewDecoder(conn),                                           
        enc:    gob.NewEncoder(buf),                                            
        encBuf: buf,}// 根據指定的codec進行協議解析                                                                          
    server.ServeCodec(srv)} 

func (server *Server)ServeCodec(codec ServerCodec){                           
    sending :=new(sync.Mutex)for{// 解析請求                                                                     
        service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)if err !=nil{if debugLog && err != io.EOF {                                      
                log.Println("rpc:", err)}if!keepReading {break}// send a response if we actually managed to read a header. // 如果當前請求錯誤了,我們應該返回信息,然後繼續處理        if req !=nil{                                                     
                server.sendResponse(sending, req, invalidRequest, codec, err.Error())
                server.freeRequest(req)}continue}// 因為需要繼續處理後續請求,所以開一個gorutine處理rpc方法                                                                      
        go service.call(server, sending, mtype, req, argv, replyv, codec)}// 如果連接關閉了需要釋放資源                                                                           
    codec.Close()} 

func (server *Server) readRequestHeader(codec ServerCodec)(service *service, mtype *methodType, req *Request, keepReading bool, err error){// 解析頭部,如果失敗,直接返回了                                                 
    req = server.getRequest()                                                   
    err = codec.ReadRequestHeader(req)if err !=nil{                                                             
        req =nilif err == io.EOF || err == io.ErrUnexpectedEOF{return}                                                                       
        err = errors.New("rpc: server cannot decode request: "+ err.Error())return}if debugLog {                                                               
        log.Printf("rpc: [trace:%v]\n", req.Tracer)}// We read the header successfully.  If we see an error now,                // we can still recover and move on to the next request.                    
    keepReading =true// 獲取請求中xxx.xxx中.的位置                                                                        
    dot := strings.LastIndex(req.ServiceMethod,".")if dot <0{                                                                
        err = errors.New("rpc: service/method request ill-formed: "+ req.ServiceMethod)return}// 拿到struct名字和方法名字                                                                 
    serviceName := req.ServiceMethod[:dot]                                      
    methodName := req.ServiceMethod[dot+1:]// Look up the request.// 加讀鎖,獲取對象                                                     
    server.mu.RLock()                                                           
    service = server.serviceMap[serviceName]                                    
    server.mu.RUnlock()if service ==nil{                                                         
        err = errors.New("rpc: can't find service "+ req.ServiceMethod)return}// 獲取反射類型,看見rpc中的發射其實是預先放入map中的                                                                         
    mtype = service.method[methodName]if mtype ==nil{                                                           
        err = errors.New("rpc: can't find method "+ req.ServiceMethod)}return}

func (server *Server) readRequest(codec ServerCodec)(service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error){
    service, mtype, req, keepReading, err = server.readRequestHeader(codec)if err !=nil{if!keepReading {return}// discard body                                                         
        codec.ReadRequestBody(nil)return}// 解析請求中的args                                               
    argIsValue :=false// if true, need to indirect before calling.            if mtype.ArgType.Kind()== reflect.Ptr{                                    
        argv = reflect.New(mtype.ArgType.Elem())}else{                                                                    
        argv = reflect.New(mtype.ArgType)                                       
        argIsValue =true}// argv guaranteed to be a pointer now.                                     if err = codec.ReadRequestBody(argv.Interface()); err !=nil{return}if argIsValue {                                                             
        argv = argv.Elem()}// 初始化reply類型                                                                            
    replyv = reflect.New(mtype.ReplyType.Elem())return}

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec){
    mtype.Lock()                                                                
    mtype.numCalls++                                                            
    mtype.Unlock()function:= mtype.method.Func// Invoke the method, providing a new value for the reply.  // 這裡是真正調用rpc方法的地方                
    returnValues :=function.Call([]reflect.Value{s.rcvr, argv, replyv})// The return value for the method is an error.                             
    errInter := returnValues[0].Interface()                                     
    errmsg :=""if errInter !=nil{                                                        
        errmsg = errInter.(error).Error()}// 處理返回請求了                                                                 
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)        
    server.freeRequest(req)} 

func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string){
    resp := server.getResponse()// Encode the response header                                               
    resp.ServiceMethod= req.ServiceMethodif errmsg !=""{                                                           
        resp.Error= errmsg                                                     
        reply = invalidRequest                                                  
    }// 上一文提到,客戶端是根據序號來定位請求的,所以需要原樣返回                                                                     
    resp.Seq= req.Seq                                                          
    sending.Lock()                                                              
    err := codec.WriteResponse(resp, reply)if debugLog && err !=nil{                                                 
        log.Println("rpc: writing response:", err)}                                                                           
    sending.Unlock()                                                            
    server.freeResponse(resp)}

資源重用:

上面把大致的rpc請求都說明了,server有一個技巧是重用對象,這裡使用的是鏈表方式處理的:

// 可以看出使用一個free list鏈表,來避免Request以及Response對象頻繁創建,導致GC壓力
func (server *Server) getRequest()*Request{                                   
    server.reqLock.Lock()                                                       
    req := server.freeReq                                                       
    if req ==nil{                                                             
        req =new(Request)}else{                                                                    
        server.freeReq = req.next*req =Request{}}                                                                           
    server.reqLock.Unlock()return req                                                                  
}                                                                               

func (server *Server) freeRequest(req *Request){                               
    server.reqLock.Lock()                                                       
    req.next= server.freeReq                                                   
    server.freeReq = req                                                        
    server.reqLock.Unlock()}                                                                               

func (server *Server) getResponse()*Response{                                 
    server.respLock.Lock()                                                      
    resp := server.freeResp                                                     
    if resp ==nil{                                                            
        resp =new(Response)}else{                                                                    
        server.freeResp = resp.next*resp =Response{}}                                                                           
    server.respLock.Unlock()return resp                                                                 
}                                                                               

func (server *Server) freeResponse(resp *Response){                            
    server.respLock.Lock()                                                      
    resp.next= server.freeResp                                                 
    server.freeResp = resp                                                      
    server.respLock.Unlock()}

最後,sending這把鎖的目的是避免同一個套接字快速請求中避免返回包寫入亂序,因此避免一個包完整寫入完畢才允許下一個返回寫入套接字。通過rpc包源碼解析,可以看到標准庫中的核心思想還是channel+mutex實現復用對象,以及各種方式的復用,避免GC壓力,在我們以後寫高性能服務端可以借鑒的地方。

Ubuntu 14.04 上搭建 Golang 開發環境配置  http://www.linuxidc.com/Linux/2015-02/113977.htm

Linux系統入門學習-在Linux中安裝Go語言  http://www.linuxidc.com/Linux/2015-02/113159.htm

Ubuntu 安裝Go語言包 http://www.linuxidc.com/Linux/2013-05/85171.htm

《Go語言編程》高清完整版電子書 http://www.linuxidc.com/Linux/2013-05/84709.htm

Go語言並行之美 -- 超越 “Hello World” http://www.linuxidc.com/Linux/2013-05/83697.htm

我為什麼喜歡Go語言 http://www.linuxidc.com/Linux/2013-05/84060.htm

Go語言內存分配器的實現 http://www.linuxidc.com/Linux/2014-01/94766.htm

Copyright © Linux教程網 All Rights Reserved