本文主要为了在对golang反射学习后做一个小练习,使用100行代码实现一个通用的RPC服务。
简要说明
golang 的RPC框架还是非常丰富的,比如 gRPC,go-zero, go-dubbo 等都是使用非常普遍的rpc框架。在go语言实现的RPC客户端中,大部分RPC框架采用的是使用生成代码的方式来构建RPC服务。即:定义好相应的接口后,需要通过命令生成相应的代码。采用这种方式的优点在于可以减少不必要的类型转换;而麻烦之处也显而易见,需要在每次结构发生改变时,重新生成对应的代码。那么,如果不采用命令行生成的方式来调用RPC该怎么做呢?经过对golang反射的学习后,让我们用100行代码来小试牛刀,实现一个极简版的RPC。
协议的定义
由于极简,我们采用HTTP协议,数据传输采用最常见的json结构。
服务请求,通过http 请求路径判断调用哪个方法。
-
输入参数定义如下:
["参数1", "参数2"]
其中参数1,参数2 采用Json编码,最终的请求参数在做一次编码。
例如:Do("abc", 123)
,其post请求body为 ["\"abc\"", 123]
-
输出参数定义与输入参数定义格式相同
如何使用
服务端:
服务端需要实现每一个接口,并把接口绑定到对应的路由上。
1
2
3
4
5
6
7
8
9
10
11
12
|
package main
import "github.com/lpflpf/rpc"
import "strconv"
func main() {
serv := rpc.NewRpcServ("127.0.0.1:18080")
serv.Impl("/conv/int2str", strconv.Itoa) // 路由绑定到方法
serv.Impl("/conv/str2int", strconv.Atoi)
serv.Impl("/math/add", func(a, b int) int { return a + b })
serv.Start()
}
|
客户端调用
客户端仅需要定义对应的rpc服务的方法,并通过struct tag的方式指定路由即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package main
import "fmt"
import "github.com/lpflpf/rpc"
type Conv struct {
Int2Str func(int) string `rpc:"conv/int2str"`
Str2Int func(input string) (int, error) `rpc:"conv/str2int"`
}
type Math struct {
Add func(int, int) int `rpc:"math/add"`
}
func main() {
conv := &Conv{}
rpc.Connect("http://127.0.0.1:18080", conv) // 连接RPC 服务
fmt.Println(conv.Int2Str(123), conv.Int2Str(456)) // 123 456
fmt.Println(conv.Str2Int("1234")) // 1234 <nil>
math := &Math{}
rpc.Connect("http://127.0.0.1:18080", math) // 连接 RPC 服务
fmt.Println(math.Add(1, 2)) // 3
}
|
Server 端的实现
服务端主要是将注册路由。在处理请求时,需要将请求的数据转化为注册句柄的参数,并将句柄的处理结果编码,并返回给客户端。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package rpc
import "net/http"
import "reflect"
import "encoding/json"
import "io/ioutil"
type RpcServ struct {
serv *http.Server
mux *http.ServeMux
}
func (rs *RpcServ) Impl(router string, f interface{}) {
rs.mux.HandleFunc(router, func(rw http.ResponseWriter, request *http.Request) {
rt := reflect.TypeOf(f)
requestBody, _ := ioutil.ReadAll(request.Body)
requestData := []string{}
_ = json.Unmarshal(requestBody, &requestData)
params := []reflect.Value{}
num := rt.NumIn()
if rt.IsVariadic() {
num = num - 1
}
for i := 0; i < num; i++ {
val := reflect.New(rt.In(i))
json.Unmarshal([]byte(requestData[i]), val.Interface())
params = append(params, val.Elem())
}
call := reflect.ValueOf(f)
result := []reflect.Value{}
if rt.IsVariadic() {
val := reflect.MakeSlice(rt.In(num), 0, 0).Interface()
json.Unmarshal([]byte(requestData[num]), &val)
params = append(params, reflect.ValueOf(val))
result = call.CallSlice(params)
} else {
result = call.Call(params)
}
response := []string{}
for _, res := range result {
val, _ := json.Marshal(res.Interface())
response = append(response, string(val))
}
data, _ := json.Marshal(response)
rw.Write(data)
})
}
func (rs *RpcServ) Start() {
rs.serv.Handler = rs.mux
rs.serv.ListenAndServe()
}
func NewRpcServ(addr string) *RpcServ {
return &RpcServ{
serv: &http.Server{Addr: addr},
mux: http.NewServeMux(),
}
}
|
客户端实现
客户端需要在Connect时,针对定义的每个句柄(即客户端调用时内部的方法)均需要绑定一个RPC 请求的实现。
RPC 请求的实现,即获取方法调用的各个参数,并编码后发送请求至 server 端,读取请求结果并解码,将解码后的数据填充为函数的返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package rpc
import "io/ioutil"
import "bytes"
import "strings"
import "reflect"
import "errors"
import "encoding/json"
import "net/http"
type RpcClient struct {
serv *http.Server
mux *http.ServeMux
}
// struct BIND RPC
func Connect(addr string, iface interface{}) error {
rv := reflect.ValueOf(iface).Elem()
rt := reflect.TypeOf(iface).Elem()
if rt.Kind() != reflect.Struct {
return errors.New("")
}
for i := 0; i < rt.NumField(); i++ {
if requestPath := rt.Field(i).Tag.Get("rpc"); requestPath == "" {
continue
} else {
fieldType := rt.Field(i).Type
rv.Field(i).Set(reflect.MakeFunc(fieldType, func(params []reflect.Value) []reflect.Value {
requestBody := []string{}
for _, param := range params {
raw, _ := json.Marshal(param.Interface())
requestBody = append(requestBody, string(raw))
}
body, _ := json.Marshal(requestBody)
// 拼接请求Uri
requestUri := strings.Trim(addr, "/") + "/" + strings.Trim(requestPath, "/")
resp, _ := http.Post(requestUri, "application/json", bytes.NewReader(body))
defer resp.Body.Close()
data, _ := ioutil.ReadAll(resp.Body)
// 组装返回结果
ret := []reflect.Value{}
responseStr := []string{}
_ = json.Unmarshal(data, &responseStr)
for i := 0; i < fieldType.NumOut(); i++ {
val := reflect.New(fieldType.Out(i))
_ = json.Unmarshal([]byte(responseStr[i]), val.Interface())
ret = append(ret, val.Elem())
}
return ret
}))
}
}
return nil
}
|
小记
在看reflect.MakeFunc 时,源码中给出的例子是一个抽象的Swap 方法,联想到可以通过抽象的方法来实现一个RPC的调用。因此有了本文中的代码。
代码中未做异常处理,仅是对reflect.MakeFunc, reflect.Call, reflect.CallSlice 理解的一个实践。