PRC教程 1.服务端与消息编码

1.从实现服务端开始

服务端中肯定会有进行监听的。这里先创建一个空的结构体Server。

其Accept方法是进行监听,并与客户端进行连接后, 开启新协程异步去处理ServeConn。

//server.go文件
type Server struct{}func NewServer() *Server {return &Server{}
}var DefaultServer = NewServer()func (server *Server) Accept(lis net.Listener) {for {conn, err := lis.Accept()if err != nil {log.Println("rpc server: accept error:", err)return}// 拿到客户端的连接, 开启新协程异步去处理.go server.ServeConn(conn)}
}func Accept(lis net.Listener) { DefaultServer.Accept(lis) }

如果想启动服务,过程是非常简单的,传入 listener 即可,tcp 协议和 unix 协议都支持。

lis, _ := net.Listen("tcp", "localhost:10000")
geerpc.Accept(lis)

监听请求之后就是处理信息。那处理消息中要先了解信息的格式以及信息的编解码方式。

2.消息的格式和编解码方式

消息格式

对于消息(request和response)的定义, 我们可以简单定为消息头(header)+内容体(body)。

我们定义头部结构体。

//codec.go文件
type Header struct {ServiceMethod string // format "Service.Method"Seq           uint64 // sequence number chosen by clientError         string
}
  • ServiceMethod 是服务名和方法名,通常与 Go 语言中的结构体和方法相映射。
  • Seq 是请求的序号,也可以认为是某个请求的 ID,用来区分不同的请求。
  • Error 是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于 Error 中。

消息的编解码方式

编解码方式就有很多种,Go中默认使用的是gob。(gob编码方式为go语言专用的编码方式, 无法跨语言使用)。而要是使用json来进行编解码呢,所以我们抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例来编解码。

编解码Codec实例中就应该有读取头部,读取body,写回复这三个方法。

//codec.go文件
type Code interface {ReadHeader(*Header) errorReadBody(any) errorWriteResponse(*Header, any) errorClose() error        //关闭连接
}

实现gob编码方式

定义GobCodec 结构体,一个编解码器需要有的结构

  • 读取/写入的io流(此处为socket连接)
  • 编码器:将要编码的数据写入缓冲区,等待推送给要写入的socket连接
  • 解码器:将socket连接中的数据读取到指定的对象中
  • 缓冲区(编码器需要)
//gob.go文件
type GobCodec struct {conn io.ReadWriteCloserbuf  *bufio.Writerdec  *gob.Decoderenc  *gob.Encoder
}//使用与客户端的socket连接初始化编解码器。
//dec: gob.NewDecoder(conn)使得解码时从连接中获取数据;
//enc: gob.NewEncoder(buf)编码器需要缓冲区,且该缓冲区的底层io流应该为与客户端的连接。
func NewGobCodec(conn io.ReadWriteCloser) Codec {buf := bufio.NewWriter(conn)return &GobCodec{conn: conn,buf:  buf,dec:  gob.NewDecoder(conn),enc:  gob.NewEncoder(buf),}
}

 接着实现 Close方法和接口的ReadHeaderReadBodyWriteResponse方法。

ReadHeader和WriteResponse分别是用该实例的解码器进行解码,编码器编码回复。

//gob.go文件
func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body any) error {return c.dec.Decode(body)
}func (c *GobCodec) WriteResponse(h *Header, body any) (err error) {defer func() {c.buf.Flush()if err != nil {c.Close()}}()if err := c.enc.Encode(h); err != nil {log.Println("rpc codec: gob error encoding header:", err)return err}if err := c.enc.Encode(body); err != nil {log.Println("rpc codec: gob error encoding body:", err)return err}return nil
}func (c *GobCodec) Close() error {return c.conn.Close()
}

那接着我们定义字符串来表示哪个是使用gob编解码的。

//codec.go文件
type CodeType stringconst (GobType  CodeType = "application/gob"JsonType CodeType = "application/json" // not implemented
)type NewCodecFunc func(io.ReadWriteCloser) Codecvar NewCodeFuncMap map[CodeType]NewCodecFuncfunc init() {NewCodeFuncMap = make(map[CodeType]NewCodecFunc)NewCodeFuncMap[GobType] = NewGobCodec
}

 对外提供NewCodeFuncMap ,客户端和服务端可以通过 Codec 的 CodeType得到构造函数,从而创建 Codec 实例

判断客户端使用的编解码方式

实现了GobCodec后,那目前对于该rpc,需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息,放到结构体 Option 中承载。

//server.go文件
const MagicNumber = 0x3b3f5ctype Option struct {MagicNumber int        // MagicNumber marks this's a geerpc requestCodecType   codec.Type // client may choose different Codec to encode body
}var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType:   codec.GobType,
}

一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上简单点,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:

| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------      固定 JSON 编码      ------>  | <-------   编码方式由 CodeType 决定   ------->|

 但要注意的是:在一次连接中,Option 固定在报文的最开始位置,Header 和 Body 可以有多个,即报文可能是这样的。

| Option | Header1 | Body1 | Header2 | Body2 | ...

3.服务端Accpet后的处理

定义好消息格式和编解码方式后,就回到Server的Accpet方法中。其开启新协程去处理客户的请求(ServeConn方法)。

那该方法中肯定就需要先通过客户发送的信息获取到编解码方式,即是使用 json.NewDecoder 反序列化得到 Option 实例,检查 MagicNumber 和 CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器。

//server.go文件
func (server *Server) ServeConn(conn io.ReadWriteCloser) {defer conn.Close()var opt Optionif err := json.NewDecoder(conn).Decode(&opt); err != nil {log.Println("rpc server: options error: ", err)return}if opt.MagicNumber != MagicNumber {log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)return}//目前只实现了gob编解码f := codec.NewCodeFuncMap[opt.CodecType]if f == nil {log.Printf("rpc server: invalid codec type %s", opt.CodecType)return}server.servCode(f(conn))
}

servCode方法

通过server.ServeCodec方法处理请求,主要分为解析请求信息(readRequest)处理请求(handleRequest)两步。

//server.go文件
func (server *Server) servCode(cc codec.Codec) {sending := new(sync.Mutex)wg := new(sync.WaitGroup)for {req, err := server.readRequest(cc)  //读取请求if err != nil {if req == nil {break}req.h.Error = err.Error()//发送解析请求信息出错的响应信息 invalidRequest = struct{}{}server.sendResponse(cc, req.h, invalidRequest, sending)continue}wg.Add(1)go server.handleRequest(cc, req, sending, wg)//处理请求}wg.Wait()cc.Close()
}

建立的连接是长连接,轮询读取连接中的数据并异步处理。因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要讲解几点:

  • handleRequest 使用了协程并发处理请求。
  • wg := new(sync.WaitGroup)用于不再接收请求时,等待正在执行的请求处理完成后再关闭连接,即是等server.handleRequest都执行完再退出
  • 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。对于同一个连接的不同请求,响应的发送是异步的,所以需要互斥锁来避免对同一个连接的写入冲突(sending := new(sync.Mutex))
  • 尽力而为,只有在 header 解析失败时,才终止循环。

 可能会有疑惑,sending,wg变量为什么都是new出来的,不能用栈空间变量吗?首先sync.Mutex,sync.WaitGroup都是值类型,互斥锁是不允许copy的,值传递的话,就会进行拷贝,会出错。WaitGroup也相似,其也是值类型,要是不使用指针传递的话,那函数内部调用的wg根本就不是同一个对象,会导致死锁的。

sync.Mutex和sync.WaitGroup作为函数参数,要想其是同一个对象,那就都需要传递其指针,才不会进行拷贝。

这里再抽象出一个结构体request,其存储请求的所有信息

 readRequest

其主要是先解码header,再解码body。

readRequest方法中的cc.ReadBody(&req.requestData),其参数是指针,是因为gob解码需要是指针。可以试试使用req.requestData去测试运行时有什么错误。

//server.go文件
type request struct {h *codec.Header// argv, replyv reflect.ValuerequestData uint64    //请求的body数据replyData   string    //返回给用户的data
}func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {var h codec.Headerif err := cc.ReadHeader(&h); err != nil {if err != io.EOF && err != io.ErrUnexpectedEOF {log.Println("rpc server: read header error:", err)}return nil, err}return &h, nil
}
func (server *Server) readRequest(cc codec.Codec) (*request, error) {h, err := server.readRequestHeader(cc)req := &request{h: h}// TODO: now we don't know the type of request argv//这一章节,我们只能处理用户发送过来的uint64类型的数据cc.ReadBody(&req.requestData)return req, nil
}

 handleRequest和sendResponse

sendResponse先加锁,之后调用编解码器的WriteResponse方法。

handleRequest就是处理信息,跟着调用sendResponse方法发送信息给客户端。

//server.go文件
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body any, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()if err := cc.WriteResponse(h, body); err != nil {log.Println("rpc server: write response error:", err)}
}func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {defer wg.Done()log.Println("handleRequest ", req.h, req.requestData)// req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))// server.sendResponse(cc, req.h, req.replyv.Interface(), sending)req.replyData = fmt.Sprintf(" ok my resp %d", req.h.Seq)server.sendResponse(cc, req.h, &req.replyData, sending)
}

目前是还不能判断 body 的类型,后序会实现的。

这要说回结构体request中的一些变量,原教程的是request中是使用了反射类型变量的,但我感觉这一节使用反射,可能读起来会有点疑惑,难理解。所以这里就使用具体的类型来处理,在这一节会好理解点,后序会再添加上反射reflect。

4.简单的客户端测试

在这里我们就实现了一个消息的编解码器 GobCodec,并且客户端与服务端实现了简单的协议交换(protocol exchange),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并回复客户端的请求。

接下来,我们就在 main 函数中看看如何使用刚实现的 RPC。

func startServer(addr chan string) {l, err := net.Listen("tcp", "localhost:10000")if err != nil {log.Fatal("network error:", err)}log.Println("start rpc server on", l.Addr())addr <- l.Addr().String()geerpc.Accept(l)
}func main() {addr := make(chan string)go startServer(addr)// in fact, following code is like a simple geerpc clientconn, _ := net.Dial("tcp", <-addr)defer conn.Close()// send options_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)cc := codec.NewGobCodec(conn)// send request & receive responsefor i := 0; i < 3; i++ {h := &codec.Header{ServiceMethod: "Foo.Sum",Seq:           uint64(i),}cc.WriteResponse(h, h.Seq)cc.ReadHeader(h)var reply stringcc.ReadBody(&reply)log.Println("reply:", reply)}
}
  • 在 startServer 中使用了信道 addr,确保服务端端口监听成功,客户端再发起请求。
  • 客户端首先发送 Option 进行协议交换,接下来发送消息头 h := &codec.Header{},和消息体 geerpc req ${h.Seq}
  • 最后解析服务端的响应 reply,并打印出来。

完整代码: https://github.com/liwook/Go-projects/tree/main/geerpc/1-codec

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/634728.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GLM-4多模态重磅更新!摸着OpenAI过河!

智谱CEO张鹏说&#xff1a;OpenAI摸着石头过河&#xff0c;我们摸着OpenAI过河。 摸来摸去摸了一年&#xff0c;以每3-4个月升级一次基座模型的速度&#xff0c;智谱摸着OpenAI过河的最新成绩到底怎么样&#xff1f;真如所说吗&#xff1f; 听到GLM-4发布的当天&#xff0c;我就…

C++深入之虚函数、虚继承与带虚函数的多基派生问题

基础 在讲解带虚函数的多基派生问题时&#xff0c;我们要先弄清楚不带虚函数的多基派生存在什么样的问题&#xff0c;这样才好弄明白带虚函数的多基派生问题。 多基派生的二义性问题 一般来说&#xff0c;在派生类中对基类成员的访问应当具有唯一性&#xff0c;但在多基继承…

数据备份与恢复

备份概述 一、备份方式 按照数据库服务状态分为&#xff1a; 冷备份&#xff1a;在备份时暂停数据库运行和服务&#xff0c;将整个数据库复制到备份设备中 热备份&#xff1a;在备份时不停止数据库的运行和服务 按照备份的数据分为&#xff1a; 物理备份&#xff1a;备份…

C#设计模式教程(6):原型模式(Prototype Pattern)

原型模式的定义 原型模式(Prototype Pattern)是一种创建型设计模式,其核心思想是通过复制现有对象来创建新对象,而不是通过实例化的方式。在C#中,这通常是通过实现ICloneable接口来完成的,该接口要求实现一个Clone方法,用于复制对象。 大白话理解原型模式 想象一下古…

如何应用数据图表了解家里的 Unifi 网络状况?

1. 前言 自从之前写了《【让 IT 更简单】使用 Ubiquiti 全家桶对朋友家进行网络改造》 《【Rethinking IT】如何结合 Unifi 和 MikroTik 设备打造家庭网络》两篇文章后&#xff0c;相信给各位正在用 Unifi 或者打算使用 Unifi 的朋友应该有所帮助。 那么&#xff0c;今天我就…

Spring5深入浅出篇:Spring工厂简单原理以及日志应用

Spring5深入浅出篇:Spring工厂简单原理以及日志应用 Spring⼯⼚的底层实现原理(简易版) 还是通过分析第一个Spring程序来展开说说Spring工厂的简单原理 首先创建实体类 package com.baizhiedu.basic;import java.util.List; import java.util.Map; import java.util.Properti…

Docker(二)安装指南:主要介绍在 Linux 、Windows 10 和 macOS 上的安装

作者主页&#xff1a; 正函数的个人主页 文章收录专栏&#xff1a; Docker 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01; 安装 Docker Docker 分为 stable test 和 nightly 三个更新频道。 官方网站上有各种环境下的 安装指南&#xff0c;这里主要介绍 Docker 在…

Spring 注解 @Transactiona

Transactional 是什么 Transactional 是一个用于声明事务性操作的注解&#xff0c;通常用于 Java 编程语言中的 Spring 框架中。这个注解被用来标记一个方法或类需要被事务管理器事务化的地方。 在 Spring 中&#xff0c;事务是用于管理数据库操作的机制&#xff0c;确保一系…

DAZ to maxon 实时面捕52个blendshapes 表情模板基本形中英文对照表

一、转自&#xff1a; DAZ to maxon 实时面捕52个blendshapes 表情模板基本形中英文对照表 - 哔哩哔哩 很多学员反映实时表情怎么就不同步呢&#xff1f;这个问题其实很常见。 第一&#xff1a;表情模板的顺序弄错&#xff0c;导致表情错乱。 第二&#xff1a;表情模板不标准…

leetcode—和为K的子数组

1 和为K的子数组 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1], k 2 输出&#xff1a;2示例 2&#xff1a; 输入&#xff1…

AbstractHttpMessageConverter + easyexcell优雅下载附件

介绍 AbstractHttpMessageConverter 是 Spring 框架中用于处理 HTTP 消息转换的抽象基类。它用于处理来自 HTTP 请求的消息,并将其转换为特定的 Java 对象,或者将 Java 对象转换为 HTTP 响应消息。 这个抽象类允许开发人员创建自定义的 HTTP 消息转换器,以便在 Spring MVC…

职务岗位的概念澄清及应用

背景 现在的企业数字化平台中&#xff0c;有一些术语组织管理中的术语&#xff0c;理解上很有歧义&#xff0c;并且命名和应用上简直五花八门&#xff0c;洋相百出&#xff0c;比如&#xff0c;我们的大厂&#xff0c;就把角色这次&#xff0c;可以作为分类、分组的标签就能大…

2024-01-15(SpringMVCMybatis)

1.拦截器&#xff1a;如果我们想在多个handler方法(controller中的方法)执行之前或者之后都进行一些处理&#xff0c;甚至某些情况下需要拦截掉&#xff0c;不让handler方法执行&#xff0c;那么就可以使用SpringMVC为我们提供的拦截器。 拦截器和过滤器的区别&#xff1a;过滤…

基于内容的图像web检索系统

题目&#xff1a;基于内容的图像在线检索系统 简介&#xff1a;基于内容的图像在线检索系统&#xff08;Content Based Online Image Retrieval , 以下简称 CBOIR&#xff09;&#xff0c;是计算机视觉领域中关注大规模数字图像内容检索的研究分支。典型的CBOIR系统&#xff…

分布式事务Seata实战-AT模式(注册中心为Eureka)

大致记录Seata的AT模式下创建项目过程中需要注意的点和可能遇到的问题。 本项目是以官网的给的示例&#xff08;即下图&#xff09;进行创建的&#xff0c;以Eureka为注册中心。 官网&#xff1a;Seata AT 模式 | Apache Seata™ 官方代码示例&#xff1a; 快速启动 | Apac…

算法笔记(动态规划入门题)

1.找零钱 int coinChange(int* coins, int coinsSize, int amount) {int dp[amount 1];memset(dp,-1,sizeof(dp));dp[0] 0;for (int i 1; i < amount; i)for (int j 0; j < coinsSize; j)if (coins[j] < i && dp[i - coins[j]] ! -1)if (dp[i] -1 || dp[…

Doris配置外表以及多个Hive外表的配置

1.场景分析 以Clickhouse、Doris、Starrocks等为代表的mpp分析数据库正在快速的兴起&#xff0c;以其高效查询、跨库整合能力收到广大技术人员的喜爱。本文主要浅显介绍下作者在使用Doris时&#xff0c;通过建立catlog进行跨库查询。 废话不多少&#xff0c;直接上代码 2.相关…

力扣211. 添加与搜索单词 - 数据结构设计

字典树 思路&#xff1a; 设计一棵字典树&#xff0c;每个节点存放单词的一个字符&#xff0c;节点放一个标记位&#xff0c;如果是单词结束则标记&#xff1b;字典树插入&#xff1a; 字典树默认有 26 个 slot 槽代表 a - z&#xff1b;遍历单词&#xff0c;如果字符对应槽存…

哥德巴赫猜想不成立

哥德巴赫猜想是德国人哥德巴赫与瑞士人欧拉联手提出的素数与合数关系猜想&#xff1a;≥2的偶数素数素数。后来黎曼崛起&#xff0c;他把1剔除出素数队列&#xff0c;哥猜被整理为&#xff1a;≥6的偶数素数素数&#xff0c;≥9的奇数素数素数素数。 哥猜虽然是欧洲人的课题&am…

Python自动化实战之接口请求的实现

在前文说过&#xff0c;如果想要更好的做接口测试&#xff0c;我们要利用自己的代码基础与代码优势&#xff0c;所以该章节不会再介绍商业化的、通用的接口测试工具&#xff0c;重点介绍如何通过 python 编码来实现我们的接口测试以及通过 Pycharm 的实际应用编写一个简单接口测…