14-RPC-自研微服务框架

RPC

RPC 框架是分布式领域核心组件,也是微服务的基础。

RPC (Remote Procedure Call)全称是远程过程调用,相对于本地方法调用,在同一内存空间可以直接通过方法栈实现调用,远程调用则跨了不同的服务终端,并不能直接调用。

RPC框架 要解决的就是远程方法调用的问题,并且实现调用远程服务像调用本地服务一样简单,框架内部封装实现了网络调用的细节。

在这里插入图片描述

1. 通信协议选择

根据不同的需求来选择通信协议,UDP是不可靠传输,一般来说很少做为RPC框架的选择。

TCP和HTTP是最佳选择。

HTTP虽然有很多无用的头部信息,传输效率上会比较低,但是HTTP通用性更强,跨语言,跨平台,更易移植。

TCP可靠传输,需要自定义协议,传输效率更高,但是通用性不强。

1.1 HTTP/1.0和HTTP/1.1的区别

HTTP1.0最早在网页中使用是在1996年,那个时候只是使用一些较为简单的网页上和网络请求上,而HTTP1.1则在1999年才开始广泛应用于现在的各大浏览器网络请求中,同时HTTP1.1也是当前使用最为广泛的HTTP协议。 主要区别主要体现在:

  1. 缓存处理,在HTTP1.0中主要使用header里的If-Modified-Since,Expires来做为缓存判断的标准,HTTP1.1则引入了更多的缓存控制策略例如Entity tag,If-Unmodified-Since, If-Match, If-None-Match等更多可供选择的缓存头来控制缓存策略。
  2. 带宽优化及网络连接的使用,HTTP1.0中,存在一些浪费带宽的现象,例如客户端只是需要某个对象的一部分,而服务器却将整个对象送过来了,并且不支持断点续传功能,HTTP1.1则在请求头引入了range头域,它允许只请求资源的某个部分,即返回码是206(Partial Content),这样就方便了开发者自由的选择以便于充分利用带宽和连接。
  3. 错误通知的管理,在HTTP1.1中新增了24个错误状态响应码,如409(Conflict)表示请求的资源与资源的当前状态发生冲突;410(Gone)表示服务器上的某个资源被永久性的删除。
  4. Host头处理,在HTTP1.0中认为每台服务器都绑定一个唯一的IP地址,因此,请求消息中的URL并没有传递主机名(hostname)。但随着虚拟主机技术的发展,在一台物理服务器上可以存在多个虚拟主机(Multi-homed Web Servers),并且它们共享一个IP地址。HTTP1.1的请求消息和响应消息都应支持Host头域,且请求消息中如果没有Host头域会报告一个错误(400 Bad Request)。
  5. 长连接,HTTP 1.1支持长连接(PersistentConnection)和请求的流水线(Pipelining)处理,在一个TCP连接上可以传送多个HTTP请求和响应,减少了建立和关闭连接的消耗和延迟,在HTTP1.1中默认开启Connection: keep-alive,一定程度上弥补了HTTP1.0每次请求都要创建连接的缺点。

1.2 HTTP/1.1和HTTP/2的区别

  • 新的二进制格式(Binary Format),HTTP1.x的解析是基于文本。基于文本协议的格式解析存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合。基于这种考虑HTTP2.0的协议解析决定采用二进制格式,实现方便且健壮。
  • 多路复用(MultiPlexing),即连接共享,即每一个request都是是用作连接共享机制的。一个request对应一个id,这样一个连接上可以有多个request,每个连接的request可以随机的混杂在一起,接收方可以根据request的 id将request再归属到各自不同的服务端请求里面。
  • header压缩,如上文中所言,对前面提到过HTTP1.x的header带有大量信息,而且每次都要重复发送,HTTP2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小。
  • 服务端推送(server push)HTTP2.0也具有server push功能。

grpc采用了http2协议,由于http的通用性,所以现在的几乎所有的rpc框架都支持grpc

2. 序列化协议

数据在网络中传输,必须是二进制的,所以我们需要先将传输的对象进行序列化之后,才能传输。

接收方通过反序列化将数据解析出来。

序列化协议有XML、 JSON、Protobuf、Thrift 等,Golang 原生支持的 Gob 协议。

3. 编解码

如果使用TCP,我们需要定义数据传输的格式,防止在传输过程中出现的粘包,拆包等问题。

在这里插入图片描述

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
  2. 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
  3. 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包
  4. 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。

特别要注意的是,如果TCP的接受滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种情况,即服务端分多次才能将D1和D2包完全接收,期间发生多次拆包

自定义格式可以使用定长的头和不定长的体,标识数据长度即可

1字节1字节4字节1字节1字节1字节8字节不定
魔法数(Magic Number)版本(Version)消息长度(full length)消息类型(messageType)压缩类型(compress)序列化类型(serialize)请求id(requestId)请求体(body)
  • magic number : 通信双方协商的一个暗号 魔数的作用是用于服务端在接收数据时先解析出魔数做正确性对比。如果和协议中的魔数不匹配,则认为是非法数据
  • version : 不同版本的协议对应的解析方法可能是不同的,应对业务变化需求
  • full length: 记录了整个消息的长度
  • messageType:普通请求、普通响应、心跳等,根据消息类型做出不同的解析
  • compress: 序列化的字节流,还可以进行压缩,使得体积更小,在网络传输更快,不一定要使用
  • serialize:序列化方式,比如json,protostuff,glob等
  • request id:每个请求分配好请求Id,这样响应数据的时候,才能对的上
  • body:具体的数据

4. 实现

4.1 http方式

package rpcimport ("bufio""bytes""encoding/json""errors""fmt""io""net/http""net/url""strings""time"
)type MsHttpClient struct {client http.Client
}// NewHttpClient Transport请求分发,协程安全,支持连接池s
func NewHttpClient() *MsHttpClient {client := http.Client{Timeout: time.Duration(3) * time.Second,Transport: &http.Transport{MaxIdleConnsPerHost:   5,MaxConnsPerHost:       100,IdleConnTimeout:       90 * time.Second,TLSHandshakeTimeout:   10 * time.Second,ExpectContinueTimeout: 1 * time.Second,},}return &MsHttpClient{client: client}
}func (c *MsHttpClient) GetRequest(method string, url string, args map[string]any) (*http.Request, error) {if args != nil && len(args) > 0 {url = url + "?" + c.toValues(args)}req, err := http.NewRequest(method, url, nil)if err != nil {return nil, err}return req, nil
}func (c *MsHttpClient) FormRequest(method string, url string, args map[string]any) (*http.Request, error) {req, err := http.NewRequest(method, url, strings.NewReader(c.toValues(args)))if err != nil {return nil, err}return req, nil
}func (c *MsHttpClient) JsonRequest(method string, url string, args map[string]any) (*http.Request, error) {jsonStr, _ := json.Marshal(args)req, err := http.NewRequest(method, url, bytes.NewReader(jsonStr))if err != nil {return nil, err}return req, nil
}func (c *MsHttpClient) Get(url string, args map[string]any) ([]byte, error) {if args != nil && len(args) > 0 {url = url + "?" + c.toValues(args)}req, err := http.NewRequest("GET", url, nil)if err != nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) PostForm(url string, args map[string]any) ([]byte, error) {req, err := http.NewRequest("POST", url, strings.NewReader(c.toValues(args)))if err != nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) PostJson(url string, args map[string]any) ([]byte, error) {jsonStr, _ := json.Marshal(args)req, err := http.NewRequest("POST", url, bytes.NewReader(jsonStr))if err != nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) Response(req *http.Request) ([]byte, error) {return c.handleResponse(req)
}
func (c *MsHttpClient) handleResponse(req *http.Request) ([]byte, error) {var err errorresponse, err := c.client.Do(req)if err != nil {return nil, err}if response.StatusCode != 200 {return nil, errors.New(response.Status)}buffLen := 79buff := make([]byte, buffLen)body := make([]byte, 0)reader := bufio.NewReader(response.Body)for {n, err := reader.Read(buff)if err == io.EOF || n == 0 {break}body = append(body, buff[:n]...)if n < buffLen {break}}defer response.Body.Close()if err != nil {return nil, err}return body, nil
}func (c *MsHttpClient) toValues(args map[string]any) string {if args != nil && len(args) > 0 {params := url.Values{}for k, v := range args {params.Set(k, fmt.Sprintf("%v", v))}return params.Encode()}return ""
}

ordercenter:

package mainimport ("encoding/json""fmt""github.com/mszlu521/msgo""github.com/mszlu521/msgo/rpc""net/http"
)type Result struct {Code int    `json:"code"`Msg  string `json:"msg"`Data any    `json:"data"`
}type Goods struct {Id   int64  `json:"id"`Name string `json:"name"`
}func main() {engine := msgo.Default()client := rpc.NewHttpClient()g := engine.Group("order")g.Get("/find", func(ctx *msgo.Context) {//查询商品bytes, err := client.Get("http://localhost:9002/goods/find", nil)if err != nil {ctx.Logger.Error(err)}fmt.Println(string(bytes))v := &Result{}json.Unmarshal(bytes, v)ctx.JSON(http.StatusOK, v)})engine.Run(":9003")
}

goodsCenter:

package mainimport ("github.com/mszlu521/msgo""net/http"
)type Result struct {Code int    `json:"code"`Msg  string `json:"msg"`Data any    `json:"data"`
}type Goods struct {Id   int64  `json:"id"`Name string `json:"name"`
}func main() {engine := msgo.Default()g := engine.Group("goods")g.Get("/find", func(ctx *msgo.Context) {//查询商品goods := Goods{Id: 1000, Name: "商品中心9001商品"}ctx.JSON(http.StatusOK, &Result{Code: 200, Msg: "success", Data: goods})})engine.Run(":9002")
}

4.2 改造http方式

config:

package rpcimport "strconv"type Config struct {Protocol stringHost     stringPort     intSsl      bool
}func (c Config) Url() string {switch c.Protocol {case HTTP, HTTP2:prefix := "http://"if c.Ssl {prefix = "https://"}return prefix + c.Host + ":" + strconv.FormatInt(int64(c.Port), 10)}return ""
}const (HTTP  = "HTTP"HTTP2 = "HTTP2"TCP   = "TCP"
)const (GET      = "GET"POSTForm = "POST_FORM"POSTJson = "POST_JSON"
)

rpc.go:

package rpctype MsService interface {Env() Config
}

func (c *MsHttpClient) Use(name string, s MsService) {if c.serviceMap == nil {c.serviceMap = make(map[string]MsService)}c.serviceMap[name] = s
}func (c *MsHttpClient) Do(name string, method string) MsService {s, ok := c.serviceMap[name]if !ok {panic(errors.New(name + " not exist, please action"))}t := reflect.TypeOf(s)v := reflect.ValueOf(s)if t.Kind() != reflect.Pointer {panic(errors.New("service must be pointer"))}tVar := t.Elem()vVar := v.Elem()findIndex := -1for i := 0; i < tVar.NumField(); i++ {field := tVar.Field(i)name := field.Nameif method == name {findIndex = i}}if findIndex == -1 {panic(errors.New(method + " not exist"))}requestPath := tVar.Field(findIndex).Tag.Get("msrpc")if requestPath == "" {panic(errors.New("msrpc tag not exist"))}split := strings.Split(requestPath, ",")mt := split[0]path := split[1]co := s.Env()prefix := co.Url()f := func(args map[string]any) ([]byte, error) {if mt == GET {return c.Get(prefix+path, args)}if mt == POSTForm {return c.PostForm(prefix+path, args)}if mt == POSTJson {return c.PostJson(prefix+path, args)}return nil, nil}value := reflect.ValueOf(f)vVar.Field(findIndex).Set(value)return s
}

goods:

package serviceimport ("github.com/mszlu521/msgo/rpc"
)type Goods struct {Id   int64  `json:"id"`Name string `json:"name"`
}type GoodsService struct {Find func(args map[string]any) ([]byte, error) `msrpc:"GET,/goods/find"`
}func (r *GoodsService) Env() rpc.Config {c := rpc.Config{Host:     "localhost",Port:     9002,Protocol: rpc.HTTP,}return c
}
package mainimport ("encoding/json""fmt""github.com/mszlu521/msgo""github.com/mszlu521/msgo/rpc""github.com/mszlu521/ordercenter/model""github.com/mszlu521/ordercenter/service""net/http"
)func main() {engine := msgo.Default()client := rpc.NewHttpClient()g := engine.Group("order")goodsService := &service.GoodsService{}client.Use("goodsService", goodsService)g.Get("/find", func(ctx *msgo.Context) {//查询商品v := &model.Result{}bytes, err := client.Do("goodsService", "Find").(*service.GoodsService).Find(nil)if err != nil {ctx.Logger.Error(err)}fmt.Println(string(bytes))json.Unmarshal(bytes, v)ctx.JSON(http.StatusOK, v)})engine.Run(":9003")
}

通过上述改造,我们可以比较轻易的使用框架,来实现http方式的rpc调用

记住:框架的目的是易用,但同时需要遵守规则,所以定义规则也是框架的一部分

4.3 http2(grpc)方式

有关grpc的使用可以先去看教程,教程地址

go get google.golang.org/grpc
protoc  --go_out=./ --go-grpc_out=./  .\api\goods.proto

goodscenter服务端:

syntax = "proto3";//import "google/protobuf/any.proto";option go_package="/api";package api;service GoodsApi {rpc Find(GoodsRequest) returns (GoodsResponse);
}message GoodsRequest {}message GoodsResponse {int64 Code = 1;string Msg = 2;Goods Data = 3;
}message Goods {int64 Id = 1;string Name = 2;
}
package serviceimport ("context""github.com/mszlu521/goodscenter/api"
)type GoodsApiService struct {
}func (GoodsApiService) Find(context.Context, *api.GoodsRequest) (*api.GoodsResponse, error) {goods := &api.Goods{Id: 1000, Name: "商品中心9002商品,grpc提供"}res := &api.GoodsResponse{Code: 200,Msg:  "success",Data: goods,}return res, nil
}
func (GoodsApiService) mustEmbedUnimplementedGoodsApiServer() {}

grpc服务端:

listen, _ := net.Listen("tcp", ":9111")server := grpc.NewServer()api.RegisterGoodsApiServer(server, &api.GoodsApiService{})err := server.Serve(listen)log.Println(err)

grpc客户端:

g.Get("/findGrpc", func(ctx *msgo.Context) {//查询商品var serviceHost = "127.0.0.1:9111"conn, err := grpc.Dial(serviceHost, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {fmt.Println(err)}defer conn.Close()client := api.NewGoodsApiClient(conn)rsp, err := client.Find(context.TODO(), &api.GoodsRequest{})if err != nil {fmt.Println(err)}ctx.JSON(http.StatusOK, rsp)})
4.3.1 形成框架工具

服务端:

package rpcimport ("google.golang.org/grpc""net"
)type MsGrpcServer struct {listen     net.ListenergrpcServer *grpc.Serverregisters  []func(grpcServer *grpc.Server)ops        []grpc.ServerOption
}func NewGrpcServer(address string, ops ...MsGrpcOption) (*MsGrpcServer, error) {listen, err := net.Listen("tcp", address)if err != nil {return nil, err}ms := &MsGrpcServer{listen: listen,}for _, op := range ops {op.Apply(ms)}s := grpc.NewServer(ms.ops...)ms.grpcServer = sreturn ms, nil
}func (s *MsGrpcServer) Run() error {for _, register := range s.registers {register(s.grpcServer)}return s.grpcServer.Serve(s.listen)
}func (s *MsGrpcServer) Register(register func(grpServer *grpc.Server)) {s.registers = append(s.registers, register)
}type MsGrpcOption interface {Apply(s *MsGrpcServer)
}type DefaultGrpcOption struct {f func(s *MsGrpcServer)
}func (d DefaultGrpcOption) Apply(s *MsGrpcServer) {d.f(s)
}func WithGrpcOptions(options ...grpc.ServerOption) MsGrpcOption {return DefaultGrpcOption{f: func(s *MsGrpcServer) {s.ops = append(s.ops, options...)}}
}
   grpcServer, _ := rpc.NewGrpcServer(":9111")grpcServer.Register(func(grpServer *grpc.Server) {api.RegisterGoodsApiServer(grpServer, &api.GoodsApiService{})})err := grpcServer.Run()

type MsGrpcClient struct {Conn *grpc.ClientConn
}func NewGrpcClient(config *MsGrpcClientConfig) (*MsGrpcClient, error) {var ctx = context.Background()var dialOptions = config.dialOptionsif config.Block {//阻塞if config.DialTimeout > time.Duration(0) {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, config.DialTimeout)defer cancel()}dialOptions = append(dialOptions, grpc.WithBlock())}if config.KeepAlive != nil {dialOptions = append(dialOptions, grpc.WithKeepaliveParams(*config.KeepAlive))}conn, err := grpc.DialContext(ctx, config.Address, dialOptions...)if err != nil {return nil, err}return &MsGrpcClient{Conn: conn,}, nil
}type MsGrpcClientConfig struct {Address     stringBlock       boolDialTimeout time.DurationReadTimeout time.DurationDirect      boolKeepAlive   *keepalive.ClientParametersdialOptions []grpc.DialOption
}func DefaultGrpcClientConfig() *MsGrpcClientConfig {return &MsGrpcClientConfig{dialOptions: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()),},DialTimeout: time.Second * 3,ReadTimeout: time.Second * 2,Block:       true,}
}

4.4 TCP方式

tcp方式就需要实现序列化,编解码等操作了

序列化协议支持两种:

Protobuf 和 go的Gob协议。

4.4.1 server端

type Serializer interface {Serialize(i interface{}) ([]byte, error)Deserialize(data []byte, i interface{}) error
}
type GobSerializer struct{}func (c GobSerializer) Serialize(data any) ([]byte, error) {var buffer bytes.Bufferencoder := gob.NewEncoder(&buffer)if err := encoder.Encode(data); err != nil {return nil, err}return buffer.Bytes(), nil
}func (c GobSerializer) Deserialize(data []byte, target any) error {buffer := bytes.NewBuffer(data)decoder := gob.NewDecoder(buffer)return decoder.Decode(target)
}type MsRpcMessage struct {//头Header *Header//消息体Data any
}const mn byte = 0x1d
const version = 0x01type CompressType byteconst (Gzip CompressType = iota
)type SerializeType byteconst (Gob SerializeType = iotaProtoBuff
)type MessageType byteconst (msgRequest MessageType = iotamsgResponsemsgPingmsgPong
)type Header struct {MagicNumber   byteVersion       byteFullLength    int32MessageType   MessageTypeCompressType  CompressTypeSerializeType SerializeTypeRequestId     int64
}type MsRpcRequest struct {RequestId   int64ServiceName stringMethodName  stringArgs        []any
}type MsRpcResponse struct {RequestId     int64Code          int16Msg           stringCompressType  CompressTypeSerializeType SerializeTypeData          any
}type MsRpcServer interface {Register(name string, service interface{})Run()Stop()
}type MsTcpServer struct {listener   net.ListenerHost       stringPort       intNetwork    stringserviceMap map[string]interface{}
}type MsTcpConn struct {s       *MsTcpServerconn    net.ConnrspChan chan *MsRpcResponse
}func (c *MsTcpConn) writeHandle() {ctx := context.Background()_, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second)defer cancel()select {case rsp := <-c.rspChan://编码数据err := c.Send(c.conn, rsp)if err != nil {log.Println(err)}returncase <-ctx.Done():log.Println("超时了")return}
}func (c *MsTcpConn) Send(conn net.Conn, rsp *MsRpcResponse) error {headers := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgResponse)//压缩类型headers[7] = byte(rsp.CompressType)//序列化headers[8] = byte(rsp.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(rsp.RequestId))serializer, err := loadSerialize(rsp.SerializeType)if err != nil {return err}body, err := serializer.Serialize(rsp)if err != nil {return err}body, err = compress(body, rsp.CompressType)if err != nil {return err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = conn.Write(headers[:])if err != nil {return err}err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return err}log.Println("发送数据成功")return nil
}func NewTcpServer(host string, port int) *MsTcpServer {return &MsTcpServer{Host:    host,Port:    port,Network: "tcp",}
}
func (s *MsTcpServer) Register(name string, service interface{}) {if s.serviceMap == nil {s.serviceMap = make(map[string]interface{})}v := reflect.ValueOf(service)if v.Kind() != reflect.Pointer {panic(errors.New("service not pointer"))}s.serviceMap[name] = service
}
func (s *MsTcpServer) Run() {addr := fmt.Sprintf("%s:%d", s.Host, s.Port)listen, err := net.Listen(s.Network, addr)if err != nil {panic(err)}s.listener = listenfor {conn, err := s.listener.Accept()if err != nil {log.Println(err)continue}msConn := &MsTcpConn{conn: conn, rspChan: make(chan *MsRpcResponse, 1), s: s}go s.readHandle(msConn)go msConn.writeHandle()}
}func (s *MsTcpServer) readHandle(msConn *MsTcpConn) {defer func() {if err := recover(); err != nil {log.Println(err)msConn.conn.Close()}}()msg := s.decodeFrame(msConn.conn)if msg == nil {msConn.rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgRequest {req := msg.Data.(*MsRpcRequest)//查找注册的服务匹配后进行调用,调用完发送到一个channel当中service, ok := s.serviceMap[req.ServiceName]rsp := &MsRpcResponse{RequestId: req.RequestId, CompressType: msg.Header.CompressType, SerializeType: msg.Header.SerializeType}if !ok {rsp.Code = 500rsp.Msg = "no service found"msConn.rspChan <- rspreturn}v := reflect.ValueOf(service)reflectMethod := v.MethodByName(req.MethodName)args := make([]reflect.Value, len(req.Args))for i := range req.Args {args[i] = reflect.ValueOf(req.Args[i])}result := reflectMethod.Call(args)if len(result) == 0 {//无返回结果rsp.Code = 200msConn.rspChan <- rspreturn}resArgs := make([]interface{}, len(result))for i := 0; i < len(result); i++ {resArgs[i] = result[i].Interface()}var err errorif _, ok := result[len(result)-1].Interface().(error); ok {err = result[len(result)-1].Interface().(error)}if err != nil {rsp.Code = 500rsp.Msg = err.Error()}rsp.Code = 200rsp.Data = resArgs[0]msConn.rspChan <- rsplog.Println("接收数据成功")return}
}func (s *MsTcpServer) Close() {if s.listener != nil {s.listener.Close()}
}func (*MsTcpServer) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerreq := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = reqreturn msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerrsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rspreturn msg}return nil
}func loadSerialize(serializeType SerializeType) (Serializer, error) {switch serializeType {case Gob://gobs := &GobSerializer{}return s, nil}return nil, errors.New("no serializeType")
}func compress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzip//创建一个新的 byte 输出流var buf bytes.Bufferw := gzip.NewWriter(&buf)_, err := w.Write(body)if err != nil {return nil, err}if err := w.Close(); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}func unCompress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzipreader, err := gzip.NewReader(bytes.NewReader(body))defer reader.Close()if err != nil {return nil, err}buf := new(bytes.Buffer)// 从 Reader 中读取出数据if _, err := buf.ReadFrom(reader); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}
tcpServer := rpc.NewTcpServer("localhost", 9112)gob.Register(&model.Result{})gob.Register(&model.Goods{})tcpServer.Register("goods", &service.GoodsRpcService{})go tcpServer.Run()go engine.Run(":9002")quit := make(chan os.Signal)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)<-quittcpServer.Close()
package serviceimport ("github.com/mszlu521/goodscenter/model"
)type GoodsRpcService struct {
}func (*GoodsRpcService) Find(id int64) *model.Result {goods := model.Goods{Id: 1000, Name: "商品中心9002商品"}return &model.Result{Code: 200, Msg: "success", Data: goods}
}
4.4.2 client端

type MsRpcClient interface {Connect() errorInvoke(context context.Context, serviceName string, methodName string, args []any) (any, error)Close() error
}type MsTcpClient struct {conn   net.Connoption TcpClientOption
}type TcpClientOption struct {Retries           intConnectionTimeout time.DurationSerializeType     SerializeTypeCompressType      CompressTypeHost              stringPort              int
}var DefaultOption = TcpClientOption{Host:              "127.0.0.1",Port:              9112,Retries:           3,ConnectionTimeout: 5 * time.Second,SerializeType:     Gob,CompressType:      Gzip,
}func NewTcpClient(option TcpClientOption) *MsTcpClient {return &MsTcpClient{option: option}
}func (c *MsTcpClient) Connect() error {addr := fmt.Sprintf("%s:%d", c.option.Host, c.option.Port)conn, err := net.DialTimeout("tcp", addr, c.option.ConnectionTimeout)if err != nil {return err}c.conn = connreturn nil
}var reqId int64func (c *MsTcpClient) Invoke(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, c.option.ConnectionTimeout)defer cancel()req := &MsRpcRequest{}req.RequestId = atomic.AddInt64(&reqId, 1)req.ServiceName = serviceNamereq.MethodName = methodNamereq.args = argsheaders := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgRequest)//压缩类型headers[7] = byte(c.option.CompressType)//序列化headers[8] = byte(c.option.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(req.RequestId))serializer, err := loadSerialize(c.option.SerializeType)if err != nil {return nil, err}body, err := serializer.Serialize(req)if err != nil {return nil, err}body, err = compress(body, c.option.CompressType)if err != nil {return nil, err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = c.conn.Write(headers[:])if err != nil {return nil, err}err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return nil, err}rspChan := make(chan *MsRpcResponse)go c.readHandle(rspChan)rsp := <-rspChanreturn rsp, nil
}func (c *MsTcpClient) Close() error {if c.conn != nil {return c.conn.Close()}return nil
}func (c *MsTcpClient) readHandle(rspChan chan *MsRpcResponse) {defer func() {if err := recover(); err != nil {log.Println(err)c.conn.Close()}}()for {msg := c.decodeFrame(c.conn)if msg == nil {log.Println("未解析出任何数据")rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgResponse {rsp := msg.Data.(*MsRpcResponse)rspChan <- rspreturn}}
}func (*MsTcpClient) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerreq := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = reqreturn msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerrsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rspreturn msg}return nil
}type MsTcpClientProxy struct {client *MsTcpClientoption TcpClientOption
}func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {return &MsTcpClientProxy{option: option}
}func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {client := NewTcpClient(p.option)p.client = clienterr := client.Connect()if err != nil {return nil, err}for i := 0; i < p.option.Retries; i++ {result, err := client.Invoke(ctx, serviceName, methodName, args)if err != nil {if i >= p.option.Retries-1 {log.Println(errors.New("already retry all time"))client.Close()return nil, err}continue}client.Close()return result, nil}return nil, errors.New("retry time is 0")
}
g.Get("/findTcp", func(ctx *msgo.Context) {//查询商品gob.Register(&model.Result{})gob.Register(&model.Goods{})args := make([]any, 1)args[0] = 1result, err := proxy.Call(context.Background(), "goods", "Find", args)if err != nil {panic(err)}ctx.JSON(http.StatusOK, result)})
4.4.3 protobuf序列化支持

type ProtobufSerializer struct{}func (c ProtobufSerializer) Serialize(data any) ([]byte, error) {marshal, err := proto.Marshal(data.(proto.Message))if err != nil {return nil, err}return marshal, nil
}func (c ProtobufSerializer) Deserialize(data []byte, target any) error {message := target.(proto.Message)return proto.Unmarshal(data, message)
}
protoc  --go_out=./ --go-grpc_out=./  .\rpc\tcp.proto 
syntax = "proto3";import "google/protobuf/struct.proto";option go_package="/rpc";package rpc;message Request {int64 RequestId = 1;string ServiceName = 2;string MethodName = 3;repeated google.protobuf.Value Args = 4;
}message Response {int64 RequestId = 1;int32 Code = 2;string Msg = 3;int32 CompressType = 4;int32 SerializeType = 5;google.protobuf.Value Data = 6;
}
package rpcimport ("bytes""compress/gzip""context""encoding/binary""encoding/gob""encoding/json""errors""fmt""google.golang.org/protobuf/proto""google.golang.org/protobuf/types/known/structpb""io""log""net""reflect""sync/atomic""time"
)type Serializer interface {Serialize(i interface{}) ([]byte, error)Deserialize(data []byte, i interface{}) error
}
type GobSerializer struct{}func (c GobSerializer) Serialize(data any) ([]byte, error) {var buffer bytes.Bufferencoder := gob.NewEncoder(&buffer)if err := encoder.Encode(data); err != nil {return nil, err}return buffer.Bytes(), nil
}func (c GobSerializer) Deserialize(data []byte, target any) error {buffer := bytes.NewBuffer(data)decoder := gob.NewDecoder(buffer)return decoder.Decode(target)
}type ProtobufSerializer struct{}func (c ProtobufSerializer) Serialize(data any) ([]byte, error) {marshal, err := proto.Marshal(data.(proto.Message))if err != nil {return nil, err}return marshal, nil
}func (c ProtobufSerializer) Deserialize(data []byte, target any) error {message := target.(proto.Message)return proto.Unmarshal(data, message)
}type MsRpcMessage struct {//头Header *Header//消息体Data any
}const mn byte = 0x1d
const version = 0x01type CompressType byteconst (Gzip CompressType = iota
)type SerializeType byteconst (Gob SerializeType = iotaProtoBuff
)type MessageType byteconst (msgRequest MessageType = iotamsgResponsemsgPingmsgPong
)type Header struct {MagicNumber   byteVersion       byteFullLength    int32MessageType   MessageTypeCompressType  CompressTypeSerializeType SerializeTypeRequestId     int64
}type MsRpcRequest struct {RequestId   int64ServiceName stringMethodName  stringArgs        []any
}type MsRpcResponse struct {RequestId     int64Code          int16Msg           stringCompressType  CompressTypeSerializeType SerializeTypeData          any
}type MsRpcServer interface {Register(name string, service interface{})Run()Stop()
}type MsTcpServer struct {listener   net.ListenerHost       stringPort       intNetwork    stringserviceMap map[string]interface{}
}type MsTcpConn struct {s       *MsTcpServerconn    net.ConnrspChan chan *MsRpcResponse
}func (c *MsTcpConn) writeHandle() {ctx := context.Background()_, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second)defer cancel()select {case rsp := <-c.rspChan://编码数据err := c.Send(c.conn, rsp)if err != nil {log.Println(err)}returncase <-ctx.Done():log.Println("超时了")return}
}func (c *MsTcpConn) Send(conn net.Conn, rsp *MsRpcResponse) error {headers := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgResponse)//压缩类型headers[7] = byte(rsp.CompressType)//序列化headers[8] = byte(rsp.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(rsp.RequestId))serializer, err := loadSerialize(SerializeType(rsp.SerializeType))if err != nil {return err}var body []byteif ProtoBuff == rsp.SerializeType {pRsp := &Response{}pRsp.SerializeType = int32(rsp.SerializeType)pRsp.CompressType = int32(rsp.CompressType)pRsp.Code = int32(rsp.Code)pRsp.Msg = rsp.MsgpRsp.RequestId = rsp.RequestId//value, err := structpb.//	log.Println(err)m := make(map[string]any)marshal, _ := json.Marshal(rsp.Data)_ = json.Unmarshal(marshal, &m)value, err := structpb.NewStruct(m)log.Println(err)pRsp.Data = structpb.NewStructValue(value)body, err = serializer.Serialize(pRsp)} else {body, err = serializer.Serialize(rsp)}if err != nil {return err}body, err = compress(body, CompressType(rsp.CompressType))if err != nil {return err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = conn.Write(headers[:])if err != nil {return err}err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return err}log.Println("发送数据成功")return nil
}func NewTcpServer(host string, port int) *MsTcpServer {return &MsTcpServer{Host:    host,Port:    port,Network: "tcp",}
}
func (s *MsTcpServer) Register(name string, service interface{}) {if s.serviceMap == nil {s.serviceMap = make(map[string]interface{})}v := reflect.ValueOf(service)if v.Kind() != reflect.Pointer {panic(errors.New("service not pointer"))}s.serviceMap[name] = service
}
func (s *MsTcpServer) Run() {addr := fmt.Sprintf("%s:%d", s.Host, s.Port)listen, err := net.Listen(s.Network, addr)if err != nil {panic(err)}s.listener = listenfor {conn, err := s.listener.Accept()if err != nil {log.Println(err)continue}msConn := &MsTcpConn{conn: conn, rspChan: make(chan *MsRpcResponse, 1), s: s}go s.readHandle(msConn)go msConn.writeHandle()}
}func (s *MsTcpServer) readHandle(msConn *MsTcpConn) {defer func() {if err := recover(); err != nil {log.Println(err)msConn.conn.Close()}}()msg := s.decodeFrame(msConn.conn)if msg == nil {msConn.rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgRequest {req := msg.Data.(*Request)//查找注册的服务匹配后进行调用,调用完发送到一个channel当中service, ok := s.serviceMap[req.ServiceName]rsp := &MsRpcResponse{RequestId: req.RequestId, CompressType: msg.Header.CompressType, SerializeType: msg.Header.SerializeType}if !ok {rsp.Code = 500rsp.Msg = "no service found"msConn.rspChan <- rspreturn}v := reflect.ValueOf(service)reflectMethod := v.MethodByName(req.MethodName)args := make([]reflect.Value, len(req.Args))for i := range req.Args {of := reflect.ValueOf(req.Args[i].AsInterface())of = of.Convert(reflectMethod.Type().In(i))args[i] = of}result := reflectMethod.Call(args)if len(result) == 0 {//无返回结果rsp.Code = 200msConn.rspChan <- rspreturn}resArgs := make([]interface{}, len(result))for i := 0; i < len(result); i++ {resArgs[i] = result[i].Interface()}var err errorif _, ok := result[len(result)-1].Interface().(error); ok {err = result[len(result)-1].Interface().(error)}if err != nil {rsp.Code = 500rsp.Msg = err.Error()}rsp.Code = 200rsp.Data = resArgs[0]msConn.rspChan <- rsplog.Println("接收数据成功")return}
}func (s *MsTcpServer) Close() {if s.listener != nil {s.listener.Close()}
}func (*MsTcpServer) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {req := &Request{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req} else {req := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req}return msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {rsp := &Response{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp} else {rsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp}return msg}return nil
}func loadSerialize(serializeType SerializeType) (Serializer, error) {switch serializeType {case Gob://gobs := &GobSerializer{}return s, nilcase ProtoBuff:s := &ProtobufSerializer{}return s, nil}return nil, errors.New("no serializeType")
}func compress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzip//创建一个新的 byte 输出流var buf bytes.Bufferw := gzip.NewWriter(&buf)_, err := w.Write(body)if err != nil {return nil, err}if err := w.Close(); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}func unCompress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzipreader, err := gzip.NewReader(bytes.NewReader(body))defer reader.Close()if err != nil {return nil, err}buf := new(bytes.Buffer)// 从 Reader 中读取出数据if _, err := buf.ReadFrom(reader); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}type MsRpcClient interface {Connect() errorInvoke(context context.Context, serviceName string, methodName string, args []any) (any, error)Close() error
}type MsTcpClient struct {conn   net.Connoption TcpClientOption
}type TcpClientOption struct {Retries           intConnectionTimeout time.DurationSerializeType     SerializeTypeCompressType      CompressTypeHost              stringPort              int
}var DefaultOption = TcpClientOption{Host:              "127.0.0.1",Port:              9112,Retries:           3,ConnectionTimeout: 5 * time.Second,SerializeType:     Gob,CompressType:      Gzip,
}func NewTcpClient(option TcpClientOption) *MsTcpClient {return &MsTcpClient{option: option}
}func (c *MsTcpClient) Connect() error {addr := fmt.Sprintf("%s:%d", c.option.Host, c.option.Port)conn, err := net.DialTimeout("tcp", addr, c.option.ConnectionTimeout)if err != nil {return err}c.conn = connreturn nil
}var reqId int64func (c *MsTcpClient) Invoke(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, c.option.ConnectionTimeout)defer cancel()req := &MsRpcRequest{}req.RequestId = atomic.AddInt64(&reqId, 1)req.ServiceName = serviceNamereq.MethodName = methodNamereq.Args = argsheaders := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgRequest)//压缩类型headers[7] = byte(c.option.CompressType)//序列化headers[8] = byte(c.option.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(req.RequestId))serializer, err := loadSerialize(c.option.SerializeType)if err != nil {return nil, err}var body []byteif ProtoBuff == c.option.SerializeType {pReq := &Request{}pReq.RequestId = atomic.AddInt64(&reqId, 1)pReq.ServiceName = serviceNamepReq.MethodName = methodNamelist, err := structpb.NewList(args)log.Println(err)pReq.Args = list.Valuesbody, err = serializer.Serialize(pReq)} else {body, err = serializer.Serialize(req)}fmt.Println(body)if err != nil {return nil, err}log.Println(body)body, err = compress(body, c.option.CompressType)if err != nil {return nil, err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = c.conn.Write(headers[:])if err != nil {return nil, err}log.Println(body)log.Println("len:", len(body))err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return nil, err}rspChan := make(chan *MsRpcResponse)go c.readHandle(rspChan)rsp := <-rspChanreturn rsp, nil
}func (c *MsTcpClient) Close() error {if c.conn != nil {return c.conn.Close()}return nil
}func (c *MsTcpClient) readHandle(rspChan chan *MsRpcResponse) {defer func() {if err := recover(); err != nil {log.Println(err)c.conn.Close()}}()for {msg := c.decodeFrame(c.conn)if msg == nil {log.Println("未解析出任何数据")rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgResponse {if msg.Header.SerializeType == ProtoBuff {rsp := msg.Data.(*Response)asInterface := rsp.Data.AsInterface()marshal, _ := json.Marshal(asInterface)rsp1 := &MsRpcResponse{}json.Unmarshal(marshal, rsp1)rspChan <- rsp1} else {rsp := msg.Data.(*MsRpcResponse)rspChan <- rsp}return}}
}func (*MsTcpClient) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {req := &Request{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req} else {req := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req}return msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {rsp := &Response{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp} else {rsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp}return msg}return nil
}type MsTcpClientProxy struct {client *MsTcpClientoption TcpClientOption
}func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {return &MsTcpClientProxy{option: option}
}func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {client := NewTcpClient(p.option)p.client = clienterr := client.Connect()if err != nil {return nil, err}for i := 0; i < p.option.Retries; i++ {result, err := client.Invoke(ctx, serviceName, methodName, args)if err != nil {if i >= p.option.Retries-1 {log.Println(errors.New("already retry all time"))client.Close()return nil, err}continue}client.Close()return result, nil}return nil, errors.New("retry time is 0")
}

对rpc做了初步实现,属于简单实现,并没有处理更为复杂的心跳,超时,连接管理等,需要大家自行去完善

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/720615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

汽车零部件制造中的信息抽取技术:提升效率与质量的关键

一、引言 在汽车制造业中&#xff0c;零部件的生产是整个制造流程的关键一环。这些零部件&#xff0c;包括但不限于制动系统、转向系统和传动系统&#xff0c;是确保汽车安全、可靠运行的基础。为了满足现代汽车工业对效率和质量的严格要求&#xff0c;制造商们纷纷投入到高度…

Jetpack Compose: Hello Android

Jetpack Compose 是一个现代化的工具包&#xff0c;用于使用声明式方法构建原生 Android UI。在本博文中&#xff0c;我们将深入了解一个基本的 “Hello Android” 示例&#xff0c;以帮助您开始使用 Jetpack Compose。我们将探讨所提供代码片段中使用的函数和注解。 入门 在…

软件测试--性能测试工具JMeter

软件测试--性能测试工具JMeter 主流性能测试工具1.主流性能测试工具Loadrunner和Jmeter对比 —— 相同点2.主流性能测试工具Loadrunner和Jmeter对比 —— 不同点JMeter基本使用JMeter环境搭建1.安装JDK:2.安装Jmeter:3.注意点:JMeter功能概要1. JMeter文件目录介绍1.1 bin目…

瑞_23种设计模式_享元模式

文章目录 1 享元模式&#xff08;Flyweight Pattern&#xff09;1.1 介绍1.2 概述1.3 享元模式的结构1.4 享元模式的优缺点1.5 享元模式的使用场景 2 案例一2.1 需求2.2 代码实现 3 案例二3.1 需求3.2 代码实现 4 JDK源码解析&#xff08;Integer类&#xff09; &#x1f64a; …

13-Java代理模式 ( Proxy Pattern )

Java代理模式 摘要实现范例 代理模式&#xff08;Proxy Pattern&#xff09;使用一个类代表另一个类的功能 代理模式创建具有现有对象的对象&#xff0c;以便向外界提供功能接口 代理模式属于结构型模式 摘要 1. 意图 为其他对象提供一种代理以控制对这个对象的访问2. 主…

力扣206反转链表

206.反转链表 力扣题目链接(opens new window) 题意&#xff1a;反转一个单链表。 示例: 输入: 1->2->3->4->5->NULL 输出: 5->4->3->2->1->NULL 1&#xff0c;双指针 2&#xff0c;递归。递归参考双指针更容易写&#xff0c; 为什么不用头插…

3.1_2024ctf青少年比赛部分web题

php后门 根据x-powered-by知道php的版本 该版本存在漏洞&#xff1a; PHP 8.1.0-dev 开发版本后门 根据报错信息&#xff0c;进行提示&#xff0c;前 GET / HTTP/1.1 Host: challenge.qsnctf.com:31639 User-Agentt:12345678system(cat /flag);var_dump(2*3);zerodium12345678…

【小白学机器学习6】真实值,观测值,拟合值,以及数据的误差的评价:集中趋势,离散度,形状等

目录 1 世界上有哪几种值&#xff1f;只有3种值 1.1 真值/真实值/理想值/主观值&#xff08;形而上学世界里&#xff09; 1.2 实际值/现实值/观测值/样本值&#xff08;看到的/记录下来的&#xff09; 1.3 拟合值/预测值&#xff08;算出来的&#xff09; 2 对数据的各种…

springboot项目单纯使用nacos注册中心功能

Spring Boot 项目完全可以单独使用 Nacos 作为注册中心。Nacos 是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它支持服务的注册与发现&#xff0c;能够与 Spring Boot 应用无缝集成&#xff0c;为微服务架构提供了强大的支持。 在使用 Nacos 作为注册中…

Python实现DMI工具判断信号:股票技术分析的工具系列(3)

Python实现DMI工具判断信号&#xff1a;股票技术分析的工具系列&#xff08;3&#xff09; 介绍算法解释 代码rolling函数介绍完整代码 介绍 先看看官方介绍&#xff1a; DMI (趋向指标&#xff09; 用法 1.PDI线从下向上突破MDI线&#xff0c;显示有新多头进场&#xff0c;为…

BUUCTF---[BJDCTF2020]藏藏藏1

1.题目描述 2.下载附件&#xff0c;解压之后是一张图片和一个文本 3.把图片放在winhex,发现图片里面包含压缩包 4.在kali中使用binwalk查看&#xff0c;然后使用foremost分离&#xff0c;在使用tree查看分离出来的文件&#xff0c;最后将zip文件使用unzip进行解压。步骤如下 5.…

pdf编辑软件哪个好用?5款PDF编辑器分享

pdf编辑软件哪个好用&#xff1f;PDF编辑软件在现代办公和学术研究中发挥着举足轻重的作用&#xff0c;它们不仅具备基础的编辑和修改功能&#xff0c;还能够支持多种注释工具&#xff0c;帮助我们高效地管理和整理PDF文件。无论是需要调整文档布局、添加文本或图像&#xff0c…

C++ 前缀和

目录 例1 例2 例3 例4 例5 例6 例7 例8 例1 DP34 【模板】前缀和 分析&#xff1a;dp和arr的大小并不是固定的&#xff0c;就是有没有偏移量&#xff0c;这里的n是从1开始&#xff0c;不如直接放到下标1处&#xff0c;在最后的减法时&#xff0c;如果用第一个参考代码会…

rtt的io设备框架面向对象学习-touch设备

目录 1.触摸设备基类2.触摸设备基类的子类3.初始化/构造流程3.1设备驱动层3.2 设备驱动框架层3.3 io设备管理层 4.总结5.使用5.1实例 1.触摸设备基类 此层处于设备驱动框架层。此层的类是抽象类。 在/ components / drivers / include / drivers /touch.h定义了如下touch设备…

Ai学社致力于Ai视觉设计和AI绘画

Ai学社来啦&#xff01;致力于短时间搞定Ai视觉设计、AI绘画。 遍知首席Ai讲师&#xff0c;教大家如何利用Ai迅速提升工作效率&#xff0c;升职加xin&#xff01;目前申请对ai感兴趣的均可以参加&#xff01;免费报名。 招生人数&#xff1a;本批次至少招募100名。招生时间&…

LeetCode-02

225. 用队列实现栈 用两个队列实现栈的功能&#xff0c;思路如下&#xff1a; 往空队列中放新元素把非空队列中的元素依次放入刚才添加了新元素的队列&#xff0c;直到非空队列变为空队列 class MyStack(object):def __init__(self):self.queue1 []self.queue2 []def push(…

【教程】Kotlin语言学习笔记(四)——方法(持续更新)

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 【Kotlin语言学习】系列文章 第一章 《认识Kotlin》 第二章 《数据类型》 第三章 《数据容器》 第四章 《方法》 文章目录 【…

突发,Anthropic推出突破性Claude 3系列模型,性能超越GPT-4

&#x1f989; AI新闻 &#x1f680; 突发&#xff0c;Anthropic推出突破性Claude 3系列模型 摘要&#xff1a;人工智能创业公司Anthropic宣布推出其Claude 3系列大型语言模型&#xff0c;该系列包括Claude 3 Haiku、Claude 3 Sonnet和Claude 3 Opus三个子模型&#xff0c;旨…

Cesium 自定义Primitive-绘制圆

一、创作来源 1、cesium的entity绘制圆 2、不使用entity的情况下&#xff0c;使用自定义的primitive来动态绘制圆 3、结合上一篇文章的圆&#xff0c;执行动态圆的更新 二、编写步骤 1、创建绘制线的类 包括构造函数、绘图函数以及销毁函数 import { Viewer, ScreenSpaceEven…

docker-compose启动postgres数据库,实现主从备份

文章目录 1. 主库2. 从库3. 测试 1. 主库 创建pg-m 目录&#xff0c;并进入该目录创建docker-compose.yml文件&#xff0c;内容如下&#xff1a; version: "3.1" services:pg_master:image: postgres:15.3container_name: pg_masterenvironment:POSTGRES_PASSWORD:…