Golang RPC实现-day01

导航

  • Golang RPC实现
    • 一、主体逻辑设计
    • 二、服务设计
      • 1、监听和接收请求
      • 2、处理请求
        • (1)服务结构体定义
        • (2)确认请求方和服务方编解码格式
        • (3)循环读取请求
        • (4)解析请求的内容
        • (5)响应请求
    • 三、读取和发送数据到连接中代码

Golang RPC实现

  • 先来一个最简单的版本,后续更新。
  • RPC也可以说是一种自定义的应用层协议
  • 所以我们需要自定义消息格式,消息包括 请求头和请求体,所以我们定义一个消息结构体
type request struct {h            *codec.Header // header of requestargv, replyv reflect.Value // argv and replyv of request
}
  • 请求头结构体
type Header struct {ServiceMethod string // format "Service.Method" 调用的服务和方法Seq           uint64 // sequence number chosen by client 连接IdError         string
}
  • 请求的第一条消息用来确定后续消息的格式和编码,这里规定第一条消息是以Json格式编码,对应的结构体如下
type Option struct {MagicNumber int        // MagicNumber marks this's a geerpc requestCodecType   codec.Type // client may choose different Codec to encode body
}
  • 消息格式和编码结构体
var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType:   codec.GobType,
}

一、主体逻辑设计

func main() {log.SetFlags(0)addr := make(chan string)go startServer(addr)//这是RPC的服务端逻辑// in fact, following code is like a simple geerpc clientconn, _ := net.Dial("tcp", <-addr) //客户端拨号建立链接,每次服务逻辑,都通过conn来确定当前客户端/服务端是在哪一个连接上。defer func() { _ = conn.Close() }()time.Sleep(time.Second)// send options,先发了一个,定义后续数据的编码方式_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)//Encode方法就是发送一个Option结构体内容到conn连接中cc := codec.NewGobCodec(conn)// send request & receive responsefor i := 0; i < 5; i++ {//发五次请求h := &codec.Header{//定义每次请求的请求头ServiceMethod: "Foo.Sum",Seq:           uint64(i),}_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))//发送请求体和body内容_ = cc.ReadHeader(h)//接收服务端的响应的请求头内容,处理相应得按顺序,不能并发接收响应数据log.Println("clinet receive response:", h.ServiceMethod)var reply string_ = cc.ReadBody(&reply)//接收服务端的响应的请求体内容log.Println("reply:", reply)//打印}
}

二、服务设计

逻辑就是

  1. 监听请求
  2. 循环获取请求,异步处理请求
  3. 获取当前连接的第一条消息,确认后续消息的格式和编码
  4. 获取请求内容
  5. 响应请求

1、监听和接收请求

func startServer(addr chan string) {// pick a free portl, err := net.Listen("tcp", ":0")if err != nil {log.Fatal("network error:", err)}log.Println("start rpc server on", l.Addr())addr <- l.Addr().String()geerpc.Accept(l)//服务端接收请求
}

2、处理请求

(1)服务结构体定义
type Server struct{}
// NewServer returns a new Server.
func NewServer() *Server {return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
(2)确认请求方和服务方编解码格式
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {for {//不断接收请求conn, err := lis.Accept()if err != nil {log.Println("rpc server: accept error:", err)return}go server.ServeConn(conn)//异步处理请求}
}
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {log.Println("服务端处理连接中..... ")defer func() { _ = conn.Close() }()var opt Optionif err := json.NewDecoder(conn).Decode(&opt); err != nil {//解析第一个Option,确定后续协议消息的格式log.Println("rpc server: options error: ", err)return}if opt.MagicNumber != MagicNumber {//服务方编码方式是否与客户端相同log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)return}f := codec.NewCodecFuncMap[opt.CodecType]//服务方编码方式是否与客户端相同if f == nil {//服务端是否存在客户端对应编码方式log.Printf("rpc server: invalid codec type %s", opt.CodecType)return}server.serveCodec(f(conn))//第一个确认包通过后,再发后续消息,通过conn拿到连接信息,保证服务端后续能向conn发送信息
}
(3)循环读取请求
func (server *Server) serveCodec(cc codec.Codec) {sending := new(sync.Mutex) // make sure to send a complete responsewg := new(sync.WaitGroup)  // wait until all request are handledfor {req, err := server.readRequest(cc)//反复从请求方接收请求,这里会把请求头和请求体内容获取到req的结构体中if err != nil {if req == nil {//直到没有请求过来break // it's not possible to recover, so close the connection}req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)continue}wg.Add(1)go server.handleRequest(cc, req, sending, wg)//异步处理数据}wg.Wait()_ = cc.Close()
}
(4)解析请求的内容
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {// TODO, should call registered rpc methods to get the right replyv// day 1, just print argv and send a hello messagedefer wg.Done()log.Println("handleRequest :", req.h, req.argv.Elem())req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
(5)响应请求
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()//time.Sleep(time.Second)if err := cc.Write(h, body); err != nil {log.Println("rpc server: write response error:", err)}
}

三、读取和发送数据到连接中代码

package codecimport ("bufio""encoding/gob""io""log"
)type GobCodec struct {conn io.ReadWriteCloserbuf  *bufio.Writerdec  *gob.Decoderenc  *gob.Encoder
}var _ Codec = (*GobCodec)(nil)func NewGobCodec(conn io.ReadWriteCloser) Codec {buf := bufio.NewWriter(conn)return &GobCodec{conn: conn,buf:  buf,dec:  gob.NewDecoder(conn),enc:  gob.NewEncoder(buf),}
}func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body interface{}) error {return c.dec.Decode(body)
}func (c *GobCodec) Write(h *Header, body interface{}) (err error) {defer func() {_ = c.buf.Flush()if err != nil {_ = c.Close()}}()if err = c.enc.Encode(h); err != nil {log.Println("rpc: gob error encoding header:", err)return}if err = c.enc.Encode(body); err != nil {log.Println("rpc: gob error encoding body:", err)return}return
}func (c *GobCodec) Close() error {return c.conn.Close()
}

欢迎大家关注我的博客在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/12492.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt学习笔记1.3.4 QtCore-Qt资源系统

文章目录 资源收集文件(.qrc)外部二进制资源内编译(compiled-in)资源压缩使用应用程序中的资源使用库中的资源 Qt资源系统是一种 独立于平台的机制&#xff0c;用于在应用程序的可执行文件中存储二进制文件。如果您的应用程序总是需要一组特定的文件(图标、翻译文件等)&#x…

QT状态机8-使用恢复策略自动恢复属性

当状态分配的属性不再活动时,可能希望将其恢复到初始值,通过设置全局的恢复策略可以使状态机进入一个状态而不用明确制定属性的值。 QStateMachine machine; machine.setGlobalRestorePolicy(QStateMachine::RestoreProperties);当设置了恢复策略以后,状态机将自动恢复所有…

sklearn中多分类和多标签分类评估方法总结

一、任务区分 多分类分类任务&#xff1a;在多分类任务中&#xff0c;每个样本只能被分配到一个类别中。换句话说&#xff0c;每个样本只有一个正确的标签。例如&#xff0c;将图像分为不同的物体类别&#xff0c;如猫、狗、汽车等。 多标签分类任务&#xff1a;在多标签分类任…

助力数字农林业发展服务香榧智慧种植,基于YOLOv5全系列【n/s/m/l/x】参数模型开发构建香榧种植场景下香榧果实检测识别系统

作为一个生在北方但在南方居住多年的人&#xff0c;居然头一次听过香榧&#xff08;fei&#xff09;这种作物&#xff0c;而且这个字还不会念&#xff0c;查了以后才知道读音&#xff08;fei&#xff09;&#xff0c;三声&#xff0c;这着实引起了我的好奇心&#xff0c;我相信…

STM32使用ADC单/多通道检测数据

文章目录 1. STM32单片机ADC功能详解 2. AD单通道 2.1 初始化 2.2 ADC.c 2.3 ADC.h 2.4 main.c 3. AD多通道 3.1 ADC.c 3.2 ADC.h 3.3 main.c 3.4 完整工程文件 1. STM32单片机ADC功能详解 STM32单片机ADC功能详解 2. AD单通道 这个代码实现通过ADC功能采集三脚电…

【Vue2】关于response返回数据的错误小记

关于Vue2中response返回数据的一个错误小记 如图&#xff0c;在这里返回的时候&#xff0c;后端是通过List< String >返回的&#xff0c;response接收到的实际上是一个Array数组&#xff0c;但是赋值给searchedTaskList的时候&#xff0c;需要在.then包括的范围里面赋值给…

【SpringBoot】 什么是springboot(二)?springboot操作mybatisPlus、swagger、thymeleaf模板

文章目录 SpringBoot第三章1、整合mybatsPlus1-234-67-10问题 2、整合pageHelper分页3、MP代码生成器1、编写yml文件2、导入依赖3、创建mp代码生成器4、生成代码5、编写配置类扫描mapper类6、编写控制器类 4、swagger1、什么是swagger2、作用3、发展历程4、一个简单的swagger项…

ElastiCache Serverless for Redis应用场景和性能成本分析

一. 前言 传统基于实例节点的 Redis 缓存架构中&#xff0c;扩展性是一个重要影响因素。在很多场景中&#xff0c;例如广告投放、电商交易、游戏对战&#xff0c;流量是经常变化的。无论是主从还是集群模式&#xff0c;当大流量进入时&#xff0c;Redis 处理能力达到上限&…

“打工搬砖记”中吃什么的轮盘功能实现(二)

文章目录 打工搬砖记转盘主要的逻辑实现转盘的素材小结 打工搬砖记 先来一个吃什么轮盘的预览图&#xff0c;这轮盘文案加字呈圆形铺出来&#xff0c;开始后旋转到指定的选项处停下来。 已上线小程序“打工人搬砖记”&#xff0c;可以扫码进行预览观看。 转盘主要的逻辑实现…

如何使用Docker安装并运行Nexus容器结合内网穿透实现远程管理本地仓库

前言 作者简介&#xff1a; 懒大王敲代码&#xff0c;计算机专业应届生 今天给大家聊聊如何使用Docker安装并运行Nexus容器结合内网穿透实现远程管理本地仓库&#xff0c;希望大家能觉得实用&#xff01; 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496…

openlayer实现ImageStatic扩展支持平铺Wrapx

地图平铺&#xff08;Tiling&#xff09;是地图服务中常见的技术&#xff0c;用于将大尺寸的地图数据分割成许多小块&#xff08;瓦片&#xff09;&#xff0c;便于高效加载和展示。这种技术特别适用于网络环境&#xff0c;因为它允许浏览器只加载当前视图窗口内所需的地图瓦片…

IT行业现状与未来趋势分析

IT行业现状与未来趋势显示出持续的活力和变革&#xff0c;以下是上大学网&#xff08;www.sdaxue.com&#xff09;关于IT行业现状与未来趋势分析&#xff0c;供大家参考。 当前现状&#xff1a; 市场需求持续增长&#xff1a;随着信息时代的深入发展&#xff0c;各行各业对信息…

LLM Agent智能体综述(超详细)

前言 &#x1f3c6;&#x1f3c6;&#x1f3c6;在上一篇文章中&#xff0c;我们介绍了如何部署MetaGPT到本地&#xff0c;获取OpenAI API Key并配置其开发环境&#xff0c;并通过一个开发小组的多Agent案例感受了智能体的强大&#xff0c;在本文中&#xff0c;我们将对AI Agent…

5G消息和5G阅信的释义与区别 | 赛邮科普

5G消息和5G阅信的释义与区别 | 赛邮科普 在 5G 技术全面普及的当下&#xff0c;历史悠久的短信服务也迎来了前所未有的变革。5G 阅信和 5G 消息就是应运而生的两种短信形态&#xff0c;为企业和消费者带来更加丰富的功能和更加优质的体验。 这两个产品名字和形态都比较接近&am…

618速递丨各平台内卷严重,这些行业能否率先炸场?

根据最新发布的《中国网络视听发展研究报告&#xff08;2024&#xff09;》显示&#xff0c;71.2%的受访用户因为看短视频和直播进行网上购物&#xff0c;超40%的用户认为短视频和直播是他们的主要消费渠道&#xff0c;内容消费正成为各大电商争夺的关键赛道。 今年618&#x…

信创厂商选择要点

信创厂商选择要点 信创项目推进&#xff0c;不可避免的要与众多信创厂商打交道。选择靠谱的供应商&#xff0c;合理避坑&#xff0c;是信创项目成败的关键因素。个人认为技术突破能力、产品服务能力、生态建设能力、平滑迁移能力是评估一个信创厂商是否合格的重要标准。 技术…

【iOS】——RunLoop学习

文章目录 一、RunLoop简介1.RunLoop介绍2.RunLoop功能3.RunLoop使用场景4.Run Loop 与线程5.RunLoop源代码和模型图 二、RunLoop Mode1.CFRunLoopModeRef2.RunLoop Mode的五种模式3.RunLoop Mode使用 三、RunLoop Source1.CFRunLoopSourceRefsourc0&#xff1a;source1: 2.CFRu…

Vue中使用$t(‘xxx‘)实现中英文切换;

&#xff08;原文链接&#xff09; 介绍 {{$t(key)}} &#xff1a;是VueI18n插件提供的函数&#xff0c;主要用于根据当前语言环境返回对应的翻译文本&#xff0c;以便在页面上显示多语言内容。 key&#xff1a;作为参数传递给函数$t()的字符串&#xff0c;用于指定需要翻译的…

基于springboot+vue+Mysql的在线BLOG网

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

虾皮选品:Shopee首季盈利2.4亿;TikTok美区电商权限要求降低

2024年5月14号&#xff0c;跨境电商日报&#xff1a; 1.Ozon已成功回款 2.TikTok降低美区达人开通电商权限要求 3.Shopee首季盈利2.4亿 4.6月1日起&#xff0c;亚马逊退货处理费收取标准更新 5.欧盟委员会对从中国台湾地区和越南进口的不锈钢冷轧产品征收反补贴和反倾销税…