从 rpc 包的 Server 端 和 Client 端入手,学习 Server/Client 的源码实现。并以一个例子作为总结。最后总结了rpc实现的几个学习的要点。
Golang net/rpc 包学习 golang 提供了一个开箱即用的RPC服务,实现方式简约而不简单。
RPC 简单介绍 远程过程调用 (Remote Procedure Call,RPC) 是一种计算机通信协议。允许运行再一台计算机的程序调用另一个地址空间的子程序(一般是开放网络种的一台计算机),而程序员就像调用调用本地程序一样,无需额外的做交互编程。RPC 是一种 CS (Client-Server) 架构的模式,通过发送请求-接收响应的方式进行信息的交互。
有很多广泛使用的RPC框架,例如 gRPC, Thrift, Dubbo, brpc 等。这里的RPC 框架有的实现了跨语言调用,有的实现了服务注册发现等。比我们今天介绍的官方提供的 rpc 包要使用广泛的多。但是,通过对net/rpc
的学习,可以使我们对一个rpc框架做一个最基本的了解。
Golang 的实现 rpc 是 cs 架构,所以既有客户端,又有服务端。下面,我们先分析通信的编码,之后从服务端、客户端角度分析RPC的实现。
通信编码 golang 在rpc 实现中,抽象了协议层,我们可以自定义协议实现我们自己的接口。如下是协议的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(interface {}) error WriteResponse(*Response, interface {}) error Close() error } type ClientCodec interface { WriteRequest(*Request, interface {}) error ReadResponseHeader(*Response) error ReadResponseBody(interface {}) error Close() error }
而包中提供了基于gob 二进制编码的编解码实现。当然我们也可以实现自己想要的编解码方式。
Server 端实现 结构定义
1 2 3 4 5 6 7 type Server struct { serviceMap sync.Map reqLock sync.Mutex freeReq *Request respLock sync.Mutex freeResp *Response }
server端通过互斥锁的方式支持了并发执行。由于每个请求和响应都需要定义Request/Response 对象,为了减少内存的分配,这里使用了一个freeReq/freeResp 链表实现了两个对象池。 当需要Request 对象时,从 freeReq 链表中获取,当使用完毕后,再放回链表中。
服务的注册 service保存在 Server 的 serviceMap 中,每个Service 的信息如下:
1 2 3 4 5 6 type service struct { name string rcvr reflect.Value typ reflect.Type method map [string ]*methodType }
从上面可以看到,一个类型以及该类型的多个方法可以被注册为一个Service。在注册服务时,通过下面的方法将服务保存在serviceMap 中。
1 2 3 4 func (server *Server) Register (rcvr interface {}) error {}func (server *Server) RegisterName (name string , rcvr interface {}) error {}
服务的调用 首先,是rpc 服务的启动。和大部分的网络应用一致,在accept一个连接后,会启动一个协程做消息处理,代码如下:
1 2 3 4 5 6 7 8 for { conn, err := lis.Accept() if err != nil { log.Print("rpc.Serve: accept:" , err.Error()) return } go server.ServeConn(conn) }
其次,对于每一个连接,服务端会不断获取请求,并异步发送响应。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 for { service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) if err != nil { if debugLog && err != io.EOF { log.Println("rpc:" , err) } if !keepReading { break } if req != nil { server.sendResponse(sending, req, invalidRequest, codec, err.Error()) server.freeRequest(req) } continue } wg.Add(1 ) go service.call(server, sending, wg, mtype, req, argv, replyv, codec) }
最后,由于异步发送请求,所以请求的顺序和响应顺序不一定一致。所以,在响应报文中,会携带请求报文的seq (序列号),保证消息的一致性。 除此之外,为了兼容http 服务,net/rpc
包还通过http包实现的 Hijack 方式,将 http 协议转换为 rpc 协议。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (server *Server) ServeHTTP (w http.ResponseWriter, req *http.Request) { conn, _, err := w.(http.Hijacker).Hijack() if err != nil { log.Print("rpc hijacking " , req.RemoteAddr, ": " , err.Error()) return } io.WriteString(conn, "HTTP/1.0 " +connected+"\n\n" ) server.ServeConn(conn) }
Client 端实现 客户端的连接相较于服务端是比较简单的。我们从发起连接、发送请求、读取响应三个角度学习。
RPC 的连接 由于该RPC支持HTTP协议做连接升级,因此,有几种连接方式。
直接使用 tcp 协议。
1 func Dial (network, address string ) (*Client, error) {}
使用 http 协议。 http 协议可以指定路径,或者使用默认的rpc 路径。
1 2 3 4 func DialHTTP (network, address string ) (*Client, error) {}func DialHTTPPath (network, address, path string ) (*Client, error) {}
请求的发送 RPC 请求的发送,提供了同步和异步的接口调用,方式如下:
1 2 3 4 func (client *Client) Go (serviceMethod string , args interface {}, reply interface {}, done chan *Call) *Call {}func (client *Client) Call (serviceMethod string , args interface {}, reply interface {}) error {}
从内部实现可以知道,都是通过Go 异步的方式拿到返回数据。
下面,我们看内部如何实现请求的发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (client *Client) send (call *Call) { seq := client.seq client.seq++ client.pending[seq] = call client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) if err != nil { client.mutex.Lock() call = client.pending[seq] delete (client.pending, seq) client.mutex.Unlock() if call != nil { call.Error = err call.done() } } }
从上面可以看出,对于一个客户端,可以同时发送多条请求,然后异步等待响应。
读取响应 在rpc 连接成功后,会建立一个连接,专门用于做响应的读取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for err == nil { response = Response{} err = client.codec.ReadResponseHeader(&response) if err != nil { break } seq := response.Seq client.mutex.Lock() call := client.pending[seq] delete (client.pending, seq) client.mutex.Unlock() client.codec.ReadResponseBody(nil ) call.done() }
通过循环读取响应头,响应body,并将读取结果通知调用rpc 的异步请求,完成一次响应的读取。
简单例子 下面我们官方提供的一个简单例子,对rpc包学习做个总结。
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 type Args struct { A, B int } type Quotient struct { Quo, Rem int } type Arith int func (t *Arith) Multiply (args *Args, reply *int ) error { *reply = args.A * args.B return nil } func (t *Arith) Divide (args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by zero" ) } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main () { serv := rpc.NewServer() arith := new (Arith) serv.Register(arith) http.ListenAndServe("0.0.0.0:3000" , serv) }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func main () { client, err := rpc.DialHTTP("tcp" , "127.0.0.1:3000" ) if err != nil { log.Fatal("dialing:" , err) } dones := make ([]chan *rpc.Call, 0 , 10 ) for i := 0 ; i < 10 ; i++ { quotient := new (Quotient) args := &Args{i + 10 , i} divCall := client.Go("Arith.Divide" , args, quotient, nil ) dones = append (dones, divCall.Done) log.Print("send" , i) } log.Print("---------------" ) for idx, done := range dones { replyCall := <-done args := replyCall.Args.(*Args) reply := replyCall.Reply.(*Quotient) log.Printf("%d / %d = %d, %d %% %d = %d\n" , args.A, args.B, reply.Quo, args.A, args.B, reply.Rem) log.Print("recv" , idx) } }
我们可以学到什么 最后,做一个学习的总结。
对统一连接上的不同请求实现异步操作,通过请求、响应需要保证数据的一致性。
链表方式实现一个对象池
对 http 包中实现的Hijack 方式的一次简单实践,通过http协议升级为rpc协议。劫持了原有http协议的tcp连接,转为rpc使用。
rpc 的实现,通过gob编码,应该是不支持与其他语言通信的。需要自己实现编解码方式。
rpc 的实现,也不支持服务的注册和发现,需要我们自己去维护服务方。