导航
- Golang RPC实现
- 一、主体逻辑设计
- 二、服务设计
- 1、监听和接收请求
- 2、处理请求
- (1)服务结构体定义
- (2)确认请求方和服务方编解码格式
- (3)循环读取请求
- (4)解析请求的内容
- (5)响应请求
- 三、读取和发送数据到连接中代码
Golang RPC实现
- 先来一个最简单的版本,后续更新。
RPC
也可以说是一种自定义的应用层协议- 所以我们需要自定义消息格式,消息包括 请求头和请求体,所以我们定义一个消息结构体
type request struct {h *codec.Header // header of requestargv, replyv reflect.Value // argv and replyv of request
}
- 请求头结构体
type Header struct {ServiceMethod string // format "Service.Method" 调用的服务和方法Seq uint64 // sequence number chosen by client 连接IdError string
}
- 请求的第一条消息用来确定后续消息的格式和编码,这里规定第一条消息是以
Json
格式编码,对应的结构体如下
type Option struct {MagicNumber int // MagicNumber marks this's a geerpc requestCodecType codec.Type // client may choose different Codec to encode body
}
- 消息格式和编码结构体
var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType: codec.GobType,
}
一、主体逻辑设计
func main() {log.SetFlags(0)addr := make(chan string)go startServer(addr)//这是RPC的服务端逻辑// in fact, following code is like a simple geerpc clientconn, _ := net.Dial("tcp", <-addr) //客户端拨号建立链接,每次服务逻辑,都通过conn来确定当前客户端/服务端是在哪一个连接上。defer func() { _ = conn.Close() }()time.Sleep(time.Second)// send options,先发了一个,定义后续数据的编码方式_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)//Encode方法就是发送一个Option结构体内容到conn连接中cc := codec.NewGobCodec(conn)// send request & receive responsefor i := 0; i < 5; i++ {//发五次请求h := &codec.Header{//定义每次请求的请求头ServiceMethod: "Foo.Sum",Seq: uint64(i),}_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))//发送请求体和body内容_ = cc.ReadHeader(h)//接收服务端的响应的请求头内容,处理相应得按顺序,不能并发接收响应数据log.Println("clinet receive response:", h.ServiceMethod)var reply string_ = cc.ReadBody(&reply)//接收服务端的响应的请求体内容log.Println("reply:", reply)//打印}
}
二、服务设计
逻辑就是
- 监听请求
- 循环获取请求,异步处理请求
- 获取当前连接的第一条消息,确认后续消息的格式和编码
- 获取请求内容
- 响应请求
1、监听和接收请求
func startServer(addr chan string) {// pick a free portl, err := net.Listen("tcp", ":0")if err != nil {log.Fatal("network error:", err)}log.Println("start rpc server on", l.Addr())addr <- l.Addr().String()geerpc.Accept(l)//服务端接收请求
}
2、处理请求
(1)服务结构体定义
type Server struct{}
// NewServer returns a new Server.
func NewServer() *Server {return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
(2)确认请求方和服务方编解码格式
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {for {//不断接收请求conn, err := lis.Accept()if err != nil {log.Println("rpc server: accept error:", err)return}go server.ServeConn(conn)//异步处理请求}
}
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {log.Println("服务端处理连接中..... ")defer func() { _ = conn.Close() }()var opt Optionif err := json.NewDecoder(conn).Decode(&opt); err != nil {//解析第一个Option,确定后续协议消息的格式log.Println("rpc server: options error: ", err)return}if opt.MagicNumber != MagicNumber {//服务方编码方式是否与客户端相同log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)return}f := codec.NewCodecFuncMap[opt.CodecType]//服务方编码方式是否与客户端相同if f == nil {//服务端是否存在客户端对应编码方式log.Printf("rpc server: invalid codec type %s", opt.CodecType)return}server.serveCodec(f(conn))//第一个确认包通过后,再发后续消息,通过conn拿到连接信息,保证服务端后续能向conn发送信息
}
(3)循环读取请求
func (server *Server) serveCodec(cc codec.Codec) {sending := new(sync.Mutex) // make sure to send a complete responsewg := new(sync.WaitGroup) // wait until all request are handledfor {req, err := server.readRequest(cc)//反复从请求方接收请求,这里会把请求头和请求体内容获取到req的结构体中if err != nil {if req == nil {//直到没有请求过来break // it's not possible to recover, so close the connection}req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)continue}wg.Add(1)go server.handleRequest(cc, req, sending, wg)//异步处理数据}wg.Wait()_ = cc.Close()
}
(4)解析请求的内容
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {// TODO, should call registered rpc methods to get the right replyv// day 1, just print argv and send a hello messagedefer wg.Done()log.Println("handleRequest :", req.h, req.argv.Elem())req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
(5)响应请求
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()//time.Sleep(time.Second)if err := cc.Write(h, body); err != nil {log.Println("rpc server: write response error:", err)}
}
三、读取和发送数据到连接中代码
package codecimport ("bufio""encoding/gob""io""log"
)type GobCodec struct {conn io.ReadWriteCloserbuf *bufio.Writerdec *gob.Decoderenc *gob.Encoder
}var _ Codec = (*GobCodec)(nil)func NewGobCodec(conn io.ReadWriteCloser) Codec {buf := bufio.NewWriter(conn)return &GobCodec{conn: conn,buf: buf,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),}
}func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body interface{}) error {return c.dec.Decode(body)
}func (c *GobCodec) Write(h *Header, body interface{}) (err error) {defer func() {_ = c.buf.Flush()if err != nil {_ = c.Close()}}()if err = c.enc.Encode(h); err != nil {log.Println("rpc: gob error encoding header:", err)return}if err = c.enc.Encode(body); err != nil {log.Println("rpc: gob error encoding body:", err)return}return
}func (c *GobCodec) Close() error {return c.conn.Close()
}
欢迎大家关注我的博客