一. RPC简介
- 远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。
- 该协议运行运行于一台计算机的程序调用另外一台计算机的子程序,而程序员无需额外的为这个交互作用编程。
- 如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。
二. golang中如何实现RPC
- golang中实现RPC非常简单,官方提供了封装好的库,还有一些第三方的库。
- golang官方的net/rpc库使用encoding/gob进行编解码,支持tcp和http数据传输方式。由于其它语言不支持gob编解码,所以golang的RPC只支持golang开发的服务器与客户端之间的交互。
- 官方还提供了net/rpc/jsonrpc库实现RPC方法,jsonrpc采用json进行数据编解码,因而支持跨语言调用,目前jsonrpc库是基于tcp协议实现的,暂不支持http传输方式。
例题:golang实现RPC程序,实现求矩形面积和周长。
服务器:
注册服务,监听请求,处理请求。
package mainimport ("fmt""log""net/http""net/rpc"
)// 接收参数
type Params struct {Length intHeigth int
}// 用于注册
type Rect struct{}func (r *Rect) Area(p Params, ret *int) error {fmt.Println("参数:", p.Heigth, p.Length)*ret = p.Heigth * p.Lengthreturn nil
}func (r *Rect) Perimeter(p Params, ret *int) error {fmt.Println("参数:", p.Heigth, p.Length)*ret = 2 * (p.Heigth + p.Length)return nil
}func main() {//1. 注册服务rect := new(Rect)rpc.Register(rect)//2. 服务器处理绑定到http协议上rpc.HandleHTTP()//3. 监听服务err := http.ListenAndServe(":8080", nil)if err != nil {log.Panicln(err)}
}
客户端:
连接rpc服务,发送请求调用方法。
package mainimport ("fmt""log""net/rpc"
)// 参数字段要和服务器的一样
type Params struct {Length intHeigth int
}func main() {conn, err := rpc.DialHTTP("tcp", ":8080")if err != nil {log.Fatal(err)}ret := 0p := Params{20,30,}//调用服务器的方法err = conn.Call("Rect.Area", p, &ret)if err != nil {log.Fatal(err)}fmt.Println("面积", ret)err = conn.Call("Rect.Perimeter", p, &ret)if err != nil {log.Fatal(err)}fmt.Println("周长", ret)
}
- golang写RPC程序必须符合4个基本条件,不然RPC用不了
- 结构体字段首字母必须大写,可以别人调用
- 函数名必须首字母大写
- 函数(服务方法)第一参数是接受参数,第二个参数是返回客户端的参数,必须是指针类型
- 函数(服务方法)还必须有一个返回值error
另外,net/rpc/jsonrpc库通过json格式编解码,支持跨语言调用。
服务端:
package mainimport ("fmt""log""net""net/rpc""net/rpc/jsonrpc"
)// 接收参数
type Params struct {A intB int
}type Rect struct{}func (r *Rect) Area(p Params, ret *int) error {*ret = p.A * p.Breturn nil
}func (r *Rect) Perimeter(p Params, ret *int) error {*ret = 2 * (p.A + p.B)return nil
}func main() {//注册服务rpc.Register(new(Rect))//监听连接l, err := net.Listen("tcp", ":8080")if err != nil {log.Fatal(err)}for {//接收客户端连接conn, err := l.Accept()if err != nil {continue}//创建协程处理请求go func(conn net.Conn) {fmt.Println("get a new client")//只是处理服务,没有监听和接收请求jsonrpc.ServeConn(conn)}(conn)}
}
客户端:
package mainimport ("fmt""log""net/rpc/jsonrpc"
)// 参数字段要和服务器的一样
type Params struct {A intB int
}func main() {//远程连接rpc服务conn, err := jsonrpc.Dial("tcp", ":8080")if err != nil {log.Fatal(err)}//调用方法ret := 0err = conn.Call("Rect.Area", Params{10, 20}, &ret)if err != nil {log.Fatal(err)}fmt.Println("ret=", ret)err = conn.Call("Rect.Perimeter", Params{10, 20}, &ret)if err != nil {log.Fatal(err)}fmt.Println("ret=", ret)
}
三. RPC调用流程
- 微服务架构下数据交互一般是对内RPC,对外REST
- 将业务按功能模块拆分到各个微服务,具有提高项目协作效率,降低模块耦合度,提高系统可用性等优点
- 一般情况下,我们将功能代码在本地直接调用,微服务框架下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用。
我理解的微服务(RPC),将项目中必要的功能注册成一个服务,客户端可以直接调用。
四. 网络传输数据格式
- 两端要约定好数据包的格式
- 成熟的RPC框架会有自定义传输协议,这里网络传输格式定义如下,前面是固定长度的消息头,后面是变长消息体
- 自己定义数据格式的读写
package rpcimport ("encoding/binary""fmt""io""net"
)type Session struct {Conn net.Conn
}func (s *Session) Write(data []byte) error {//定义写数据格式//4字节包头 + 变长数据buf := make([]byte, 4+len(data))//写入头部binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))//将有效数据写到头部后面copy(buf[4:], data)//发送_, err := s.Conn.Write(buf)if err != nil {return err}return nil
}func (s *Session) Read() ([]byte, error) {//读取头部header := make([]byte, 4)//按长度读取数据_, err := io.ReadFull(s.Conn, header)if err != nil {return nil, err}//读取数据,报头保存的是有效数据长度dataLen := binary.BigEndian.Uint32(header)fmt.Println(dataLen)data := make([]byte, dataLen)_, err = io.ReadFull(s.Conn, data)if err != nil {return nil, err}return data, nil
}
测试代码:
package rpcimport ("errors""fmt""net""sync""testing"
)func TestSession_ReadWrite(t *testing.T) {//地址和数据addr := "127.0.0.1:8080"my_data := "hello"//等待组wg := sync.WaitGroup{}wg.Add(2)go func() {//监听defer wg.Done()l, err := net.Listen("tcp", addr)if err != nil {t.Fatal(err)}//接收连接conn, err := l.Accept()if err != nil {t.Fatal(err)}//写数据s := Session{Conn: conn}err = s.Write([]byte(my_data))if err != nil {t.Fatal(err)}}()go func() {defer wg.Done()conn, err := net.Dial("tcp", addr)if err != nil {t.Fatal(err)}s := Session{Conn: conn}data, err := s.Read()if err != nil {t.Fatal(err)}//校验if string(data) != my_data {t.Fatal(errors.New("数据错误"))}fmt.Println(string(data))}()wg.Wait()
}
编码和解码:
func Encode(data RPCData) ([]byte, error) {//得到字节数组编码器var buf bytes.BufferbuffEnc := gob.NewEncoder(&buf)//编码器对数据编码if err := buffEnc.Encode(data); err != nil {return nil, err}return buf.Bytes(), nil
}func Decode(data []byte) (RPCData, error) {buff := bytes.NewBuffer(data)//得到字节数组解码器buffDec := gob.NewDecoder(buff)//解码器对数据解码var res RPCDataif err := buffDec.Decode(&res); err != nil {return res, err}return res, nil
}
五. 实现RPC服务端
要实现的一个功能为,接收到客户端发过来需要调用的函数和参数,实现对应函数的调用,将结果返回。
- 服务端接收的数据包括
- 调用的函数名,参数,返回值
- 服务器端需要解决的问题
- 需要维护一个map,来保存调用的函数
- 服务器的核心功能
- 维护函数map
- 接收到客户端的数据,并解析
- 找到调用的函数,将返回值打包穿给客户端
package mainimport ("bytes""encoding/binary""encoding/gob""errors""fmt""io""log""net""reflect"
)// 定义交互的数据结构
type RPCData struct {//访问的函数Name string//参数Args []interface{}
}func encode(data RPCData) ([]byte, error) {//得到字节数组编码器var buf bytes.Buffergob.Register(User{})buffEnc := gob.NewEncoder(&buf)//编码器对数据编码if err := buffEnc.Encode(data); err != nil {return nil, err}return buf.Bytes(), nil
}func decode(data []byte) (RPCData, error) {buff := bytes.NewBuffer(data)//得到字节数组解码器gob.Register(User{})buffDec := gob.NewDecoder(buff)//解码器对数据解码var res RPCDataif err := buffDec.Decode(&res); err != nil {return res, err}return res, nil
}// 发送和接收数据结构
type Session struct {Conn net.Conn
}func NewSession(conn net.Conn) *Session {return &Session{Conn: conn}
}func (s *Session) Write(data []byte) error {//定义写数据格式//4字节包头 + 变长数据buf := make([]byte, 4+len(data))//写入头部binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))//将有效数据写到头部后面copy(buf[4:], data)//发送_, err := s.Conn.Write(buf)if err != nil {return err}return nil
}func (s *Session) Read() ([]byte, error) {//读取头部header := make([]byte, 4)//按长度读取数据_, err := io.ReadFull(s.Conn, header)if err != nil {return nil, err}//读取数据,报头保存的是有效数据长度dataLen := binary.BigEndian.Uint32(header)fmt.Println(dataLen)data := make([]byte, dataLen)_, err = io.ReadFull(s.Conn, data)if err != nil {return nil, err}return data, nil
}// 服务器
type Server struct {//访问地址addr string//维护函数mapfuncs map[string]reflect.Value
}func NewServer(addr string) *Server {return &Server{addr: addr, funcs: make(map[string]reflect.Value)}
}func (s *Server) Register(key string, value interface{}) error {if _, ok := s.funcs[key]; ok {return errors.New(key + "已存在")}s.funcs[key] = reflect.ValueOf(value)return nil
}func (s *Server) Run() {//监听ls, err := net.Listen("tcp", s.addr)if err != nil {log.Printf("监听addr:%s, 失败", s.addr)return}for {conn, err := ls.Accept()if err != nil {return}//读数据session := NewSession(conn)data, err := session.Read()if err != nil {return}//解码rpc_data, err := decode(data)if err != nil {return}//查找访问的函数f, ok := s.funcs[rpc_data.Name]if !ok {log.Printf("%s不存在\n", rpc_data.Name)continue}//参数args := make([]reflect.Value, 0, len(rpc_data.Args))for _, v := range rpc_data.Args {if v != nil {args = append(args, reflect.ValueOf(v))}}//反射调用方法//返回value类型,用于客户端传递返回结果,out是所有返回结果out := f.Call(args)outInters := make([]interface{}, 0, len(out))for _, v := range out {outInters = append(outInters, v.Interface())}fmt.Println("args: ", args, "-", len(args), "-", outInters)//发送给客户端//编码respRpcData := RPCData{Name: rpc_data.Name, Args: outInters}respdata, err := encode(respRpcData)if err != nil {log.Printf("encode fail %v\n", err)continue}err = session.Write(respdata)if err != nil {log.Printf("Write fail %v\n", err)continue}}}type User struct {Name stringAge int
}func queryUser(uid int) (User, error) {users := make(map[int]User)users[0] = User{"zs", 10}users[1] = User{"ls", 20}users[2] = User{"ww", 25}users[3] = User{"lc", 18}fmt.Println(uid)if v, ok := users[uid]; ok {fmt.Println(v)return v, nil}return User{}, errors.New("Not Found")
}func main() {s := NewServer("127.0.0.1:8080")//给服务器注册函数s.Register("queryUser", queryUser)s.Run()
}
六. 实现RPC客户端
- 客户端只有函数原型,使用reflect.MakeFunc()可以完成原型到函数的调用。
- 即reflect.MakeFunc()函数可以将一个函数类型和函数实现结合起来,通过函数原型调用对应函数实现。
package mainimport ("bytes""encoding/binary""encoding/gob""fmt""io""log""net""reflect"
)type RPCData struct {//访问的函数Name string//参数Args []interface{}
}func encode(data RPCData) ([]byte, error) {//得到字节数组编码器var buf bytes.Buffer//注册接口类型gob.Register(User{})buffEnc := gob.NewEncoder(&buf)//编码器对数据编码if err := buffEnc.Encode(data); err != nil {return nil, err}return buf.Bytes(), nil
}func decode(data []byte) (RPCData, error) {buff := bytes.NewBuffer(data)//得到字节数组解码器gob.Register(User{})buffDec := gob.NewDecoder(buff)//解码器对数据解码var res RPCDataif err := buffDec.Decode(&res); err != nil {return res, err}return res, nil
}type Session struct {Conn net.Conn
}func NewSession(conn net.Conn) *Session {return &Session{Conn: conn}
}func (s *Session) Write(data []byte) error {//定义写数据格式//4字节包头 + 变长数据buf := make([]byte, 4+len(data))//写入头部binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))//将有效数据写到头部后面copy(buf[4:], data)//发送_, err := s.Conn.Write(buf)if err != nil {return err}return nil
}func (s *Session) Read() ([]byte, error) {//读取头部header := make([]byte, 4)//按长度读取数据_, err := io.ReadFull(s.Conn, header)if err != nil {return nil, err}//读取数据,报头保存的是有效数据长度dataLen := binary.BigEndian.Uint32(header)fmt.Println(dataLen)data := make([]byte, dataLen)_, err = io.ReadFull(s.Conn, data)if err != nil {return nil, err}return data, nil
}// 客户端
type Client struct {Conn net.Conn
}func NewClient(conn net.Conn) *Client {return &Client{Conn: conn}
}// fname为访问服务器函数名
// fptr为函数原型
func (c *Client) callRpc(fname string, fptr interface{}) {//获取函数原型fnptr := reflect.ValueOf(fptr).Elem()//函数实现,发送数据给服务器,收到服务端数据//args调用时传进来的参数//返回值为得到的结果f := func(args []reflect.Value) []reflect.Value {//获得参数argSlice := make([]interface{}, len(args))for _, v := range args {argSlice = append(argSlice, v.Interface())}session := NewSession(c.Conn)//发送到服务器rpcData := RPCData{Name: fname, Args: argSlice}//编码data, err := encode(rpcData)if err != nil {log.Println("encode fail", err)return nil}err = session.Write(data)if err != nil {log.Println("Write fail", err)return nil}//接收服务器响应respData, err := session.Read()if err != nil {log.Println("Read fail", err)return nil}//结果在Args里respRpcData, err := decode(respData)if err != nil {log.Println("decode fail", err)return nil}res := make([]reflect.Value, 0, len(respRpcData.Args))for i, v := range respRpcData.Args {if v == nil {//因为返回err可能为nil//进行nil转换//reflect.Zero会返回类型的零值的value//.out会返回函数输出的参数类型res = append(res, reflect.Zero(fnptr.Type().Out(i)))} else {res = append(res, reflect.ValueOf(v))}}return res}real_fn := reflect.MakeFunc(fnptr.Type(), f)//为函数原型赋值函数实现fnptr.Set(real_fn)
}type User struct {Name stringAge int
}func main() {conn, err := net.Dial("tcp", ":8080")if err != nil {log.Fatal(err)return}c := NewClient(conn)var query func(int) (User, error)c.callRpc("queryUser", &query)//进行查询u, err := query(2)if err != nil {log.Fatal(err)return}fmt.Println(u)
}