RPC教程 2.支持并发与异步的客户端

1.客户端的使用例子

func main(){//1. 建立连接client, err := rpc.Dial("tcp", "localhost:1234")//2.调用调用指定的RPC方法var reply string //string有默认值err = client.Call("HelloService.Hello", "hi", &reply)    //即是一次请求
}

对 net/rpc 而言,一个函数需要能够被远程调用,它必须满足一定的条件,否则其会被忽略。

这些条件是:

  • 方法的类型是可输出的 (the method’s type is exported)
  • 方法本身也是可输出的 (the method is exported)
  • 方法必须由两个参数,必须是输出类型或者是内建类型 (the method has two arguments, both exported or builtin types)
  • 方法的第二个参数必须是指针类型 (the method’s second argument is a pointer)
  • 方法返回类型为 error (the method has return type error)

一个输出方法的格式如下: 

func (t *T) MethodName(argType T1, replyType *T2) error

这个方法的第一个参数代表调用者(client)提供的参数,第二个参数代表要返回给调用者的计算结果。

2.定义一个请求

封装结构体 Call 来承载一次 RPC 调用所需要的信息。

type Call struct {ServiceMethod string      // The name of the service and method to call.Args          interface{} // The argument to the function (*struct).Reply         interface{} // The reply from the function (*struct).Error         error       // After completion, the error status.Done          chan *Call  // Receives *Call when Go is complete.Seq           uint64
}func (call *Call) done() {call.Done <- call
}

请求内容至少需要包括:

  • 请求的服务以及方法名
  • 请求参数和请求的回复
  • 请求出错时返回的错误信息

为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是 chan *Call,当调用结束时,会调用 call.done() 通知调用方。 

3.实现 Client

type Client struct {code     codec.Codecopt      *Optionsending  sync.Mutexheader   codec.Headermutex    sync.Mutex    //保护下面的变量seq      uint64pending  map[uint64]*Callclosing  bool //user has called Closeshutdown bool // server has told us to stop
}var ErrShutdown = errors.New("connection is shut down")func (client *Client) Close() error {client.mutex.Lock()defer client.mutex.Unlock()if client.closing {return ErrShutdown}client.closing = truereturn client.code.Close()
}func (client *Client) IsAvailable() bool {client.mutex.Lock()defer client.mutex.Unlock()return !client.closing && !client.shutdown
}
  • code是消息的编解码器,和服务端类似的。
  • sending是互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。
  • header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。
  • seq是用于给请求进行编号,从1开始编号自增,每个请求有唯一的编号。
  • pending是存储未完成的请求,键是编号,值是 Call 实例。
  • closing 和 shutdown 任意一个值置为 true,则表示 Client 处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用 Close 方法,而 shutdown 置为 true 一般是有错误发生。

 需要存储未完成的请求,可以想象,一个用户发出10个不同的请求,要是客户端不存储这些请求,那收到回复的时候,就难知道如何处理了。

所以在发起请求的时候,需要注册这个请求(往pending中添加),得到回复后需要删除(从pending中delete)

由此,需要实现和Call相关的注册和删除方法。

而terminateCalls方法是服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。该方法需要都获得sending锁和mutex锁。该方法的使用地方后面会讲到的。

func (client *Client) RegisterCall(call *Call) (uint64, error) {client.mutex.Lock()defer client.mutex.Unlock()if client.closing || client.shutdown {return 0, ErrShutdown}call.Seq = client.seq //设置Call的序号client.pending[call.Seq] = callclient.seq++return call.Seq, nil
}func (client *Client) removeCall(seq uint64) *Call {client.mutex.Lock()defer client.mutex.Unlock()call := client.pending[seq]delete(client.pending, seq)return call
}func (client *Client) terminateCalls(err error) {client.sending.Lock()defer client.sending.Unlock()client.mutex.Lock()defer client.mutex.Unlock()client.shutdown = truefor _, call := range client.pending {call.Error = errcall.done()}
}

创建客户端

按照前面的例子,创建客户端就

client, err := rpc.Dial("tcp", "localhost:1234")

那我们也按照这样来。

Dail函数通过 ...*Option 将 Option 实现为可选参数(...表示可以0个参数或多个参数),可以不填写opts参数,使用默认的option(即是gob编解码)

//使用例子 client, err := rpc.Dial("tcp", "localhost:1234")
func Dail(network, address string, opts ...*Option) (client *Client, err error) {opt, err := parseOptions(opts...)if err != nil {return nil, err}conn, err := net.Dial(network, address)if err != nil {return nil, err}return NewClient(conn, opt)
}

parseOption函数就是解析Option,判断其Option是否符合要求等。

NewClient函数,创建 Client 实例,首先需要完成一开始的协议交换,即发送 Option 信息给服务端,协商好消息的编解码方式。

func NewClient(conn net.Conn, opt *Option) (*Client, error) {// send options with serverif err := json.NewEncoder(conn).Encode(opt); err != nil {log.Println("rpc client: options error: ", err)conn.Close()return nil, err}f := codec.NewCodeFuncMap[opt.CodecType]if f == nil { //没有符合条件的编解码器err := fmt.Errorf("invalid codec type %s", opt.CodecType)log.Println("rpc client: codec error:", err)return nil, err}return &Client{seq:     1,    //序号从1开始,序号0表示可以表示错误code:    f(conn),opt:     opt,pending: make(map[uint64]*Call),}, nil
}func parseOptions(opts ...*Option) (*Option, error) {if len(opts) == 0 || opts[0] == nil {return DefaultOption, nil}if len(opts) != 1 {return nil, errors.New("number of options is more than 1")}opt := opts[0]opt.MagicNumber = DefaultOption.MagicNumberif opt.CodecType == "" {opt.CodecType = DefaultOption.CodecType}if _, ok := codec.NewCodeFuncMap[opt.CodecType]; !ok {return nil, fmt.Errorf("invalid codec type %s", opt.CodecType)}return opt, nil
}

请求和创建客户端完成后,那就是到关键的接收和发送请求了。

实现接收回复和发送请求

那先来看看发送请求。

    var reply string //string有默认值err = client.Call("HelloService.Hello", "hi", &reply) 

 先实现个send方法,其参数是*Call。内容是注册该Call,进行编码并发送给服务端。

func (client *Client) send(call *Call) {// make sure that the client will send a complete requestclient.sending.Lock()defer client.sending.Unlock()//注册,添加到pending中seq, err := client.RegisterCall(call)if err != nil {call.Error = errcall.done()return}//复用同一个headerclient.header.ServiceMethod = call.ServiceMethodclient.header.Seq = seqclient.header.Error = ""// encode and send the requestif err := client.code.WriteResponse(&client.header, call.Args); err != nil {call := client.removeCall(seq)if call != nil {call.Error = errcall.done()}}
}

代码中经常出现call.done(),done方法是为了支持异步调用的,当调用结束时,会调用 call.done() 通知调用方。 那就会有个异步调用的Go方法。

异步调用的Go方法中,会先判断chan是否符合条件,之后根据函数参数来创建Call,之后调用send方法。

func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {if done == nil {done = make(chan *Call, 10) //10或1或其他的也可以的,大于0即可} else if cap(done) == 0 {log.Panic("rpc client: done channel is unbuffered")}call := &Call{ServiceMethod: serviceMethod,Args:          args,Reply:         reply,Done:          done,}client.send(call)return call
}func (client *Client) Call(serviceMethod string, args, reply any) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}

而Call方法中,其是对 Go 的封装,阻塞 call.Done,等待响应返回,是一个同步接口。

发送解决后,如何进行接收信息呢?

调用Call方法,这是个同步接口,会一直阻塞在call := <-client.Go(...).Done这里,之后当使用call.done()时候,才会解除阻塞。但是按照目前的正常情况,是不会调用call.done()的。这时我们可以新启一个协程去接收信息,处理完信息后就调用call.done()即可。

接收功能,接收到的响应有三种情况:

  • call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
  • call 存在,但服务端处理出错,即 h.Error 不为空。
  • call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
func (client *Client) receive() {var err errorfor err == nil {var h codec.Headerif err = client.code.ReadHeader(&h); err != nil {break}call := client.removeCall(h.Seq)switch {case call == nil:err = client.code.ReadBody(nil)case h.Error != "":call.Error = fmt.Errorf(h.Error)err = client.code.ReadBody(nil)call.done()default:err = client.code.ReadBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}call.done()}}client.terminateCalls(err)
}

在recieve中就使用了terminateCalls方法。在读取Header失败break,就执行该方法。

那么这个新的协程在哪里开启好呢?那可以在创建客户端的时候就开启这个协程。

func NewClient(conn net.Conn, opt *Option) (*Client, error) {//......f := codec.NewCodeFuncMap[opt.CodecType]//前面代码没有变化,就下面封装成一个函数,其内部就使用go client.receive()return newClientCodec(f(conn), opt), nil
}func newClientCodec(code codec.Codec, opt *Option) *Client {client := &Client{seq:     1,code:    code,opt:     opt,pending: make(map[uint64]*Call),}go client.receive()return client
}

这样,接收和发送也都处理好了。至此,一个支持异步和并发的 GeeRPC 客户端已经完成。

4.测试

上一章节只实现了服务端,我们在 main 函数中手动模拟了整个通信过程。因此,这一章节我们就将 main 函数中通信部分替换为今天的客户端。

startServer 没有发生变化。

func main() {addr := make(chan string)go startServer(addr)// in fact, following code is like a simple geerpc clientclient, _ := geerpc.Dail("tcp", <-addr) //上一节是使用net.Daildefer client.Close()time.Sleep(time.Second * 1)num := 3var wg sync.WaitGroupwg.Add(num)for i := 0; i < num; i++ {go func(i int) {defer wg.Done()args := uint64(i)var reply stringif err := client.Call("foo.sum", args, &reply); err != nil {log.Fatal("call Foo.Sum error:", err)}log.Println("reply: ", reply)}(i)}wg.Wait()
}func startServer(addr chan string) {l, err := net.Listen("tcp", "localhost:10000")if err != nil {log.Fatal("network error:", err)}log.Println("start rpc server on", l.Addr())addr <- l.Addr().String()geerpc.Accept(l)
}

完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/2-client

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/637114.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用docker配置semantic slam

一.Docker环境配置 1.拉取Docker镜像 sudo docker pull ubuntu:16.04拉取的为ununtu16版本镜像&#xff0c;环境十分干净&#xff0c;可以通过以下命令查看容器列表 sudo docker images 如果想删除多余的docker image&#xff0c;可以使用指令 sudo docker rmi -f <id&g…

linux 使用笔记

1.查看运行内存 a.Free 快速查看内存的方法&#xff0c;也是经常使用的命令&#xff0c; -h 更人性化的显示内存的单元 -m 以M的形式显示 b.Top Top命令提供了实时性的运行中的程序的资源使用统计。可以根据内存的使用和大小来进行排序。 如上所示&#xff0c;top命令可以看…

伊恩·斯图尔特《改变世界的17个方程》波动方程笔记

主要是课堂的补充&#xff08;yysy&#xff0c;我觉得课堂的教育模式真有够无聊的&#xff0c;PPT、写作业、考试&#xff0c;感受不到知识的魅力。 它告诉我们什么&#xff1f; 小提琴琴弦上某个小段的加速度&#xff0c;与相邻段相对于该段的平均位移成正比。 为什么重要&…

算子开发参考

神经网络框架自定义算子 https://www.tensorflow.org/lite/guide/ops_custom?hlzh-cn https://www.megengine.org.cn/doc/stable/zh/user-guide/tools/customop.html https://www.paddlepaddle.org.cn/documentation/docs/zh/guides/custom_op/index_cn.html https://whu.obs.…

“GPC爬虫池有用吗?

作为光算科技的独有技术&#xff0c;在深入研究谷歌爬虫推出的一种吸引谷歌爬虫的手段 要知道GPC爬虫池是否有用&#xff0c;就要知道谷歌爬虫这一概念&#xff0c;谷歌作为一个搜索引擎&#xff0c;里面有成百上千亿个网站&#xff0c;对于里面的网站内容&#xff0c;自然不可…

TCP发送优化(ZeroWindow分析)

通信模型一&#xff1a; Client 创建一个 TCP 的 socket&#xff0c;并通过 SO_SNDBUF 选项设置它的发送缓冲区大小为 2048Byte&#xff0c;连接到 Server 后&#xff0c;每 1 秒发送一个 TCP报文(1024Byte)。Server 端不调用 recv()。预期的结果则分为以下几个阶段&#xff1…

Hive数学函数讲解

Hive 是一个基于 Hadoop 的数据仓库工具&#xff0c;它支持类似于 SQL 的查询语言 HiveQL&#xff0c;并且提供了许多内建的数学函数来处理数值数据。下面我将逐一讲解您提到的这些数学函数&#xff0c;并提供一些使用案例和注意事项。 ROUND() 功能&#xff1a;四舍五入到指定…

【小白向】MMDeploy安装部署|暗坑标注版

文章目录 序言正文1 安装PPLCV2 TensorRT环境相关3 编译MMDeploy4 编译SDK结束 序言 本文主要针对在编译安装OpenMMLab团队的MMDeploy模型部署工具时遇到的“难以下手”的问题。 由于OpenMMLab的用户中很大一部分都是具有快速开发需求的人&#xff0c;或者说其实相当部分OpenM…

c语言算法——大数相加

C数据类型 类型与描述1基本数据类型 它们是算术类型&#xff0c;包括整型&#xff08;int&#xff09;、字符型&#xff08;char&#xff09;、浮点型&#xff08;float&#xff09;和双精度浮点型&#xff08;double&#xff09;。2枚举类型&#xff1a; 它们也是算术类型&am…

【docker】之基础篇二

目录 一、docker的数据管理1、数据卷2、数据卷容器&#xff1a; 二、端口映射与容器互联容器之间的通信 三、Docker查看日志四、Dockerfile定制镜像1、DockerFile常用的命令2、DockerFile实操 一、docker的数据管理 在生产环境中使用docker&#xff0c;需要对数据进行持久化&a…

R语言的ggplot2绘制分组折线图?

R绘制分组折线图.R 首先看数据情况&#xff1a;group有3组。Time有3组&#xff0c;数据意思是在3组3个时间点测量了某指标&#xff0c;现在要绘制组1、组2、组3某指标y按时间的变化趋势 数据情况&#xff1a; 看看最终的效果图如下&#xff1a; 下面是本次使用的代码 .libPat…

OpenHarmony 应用开发入门 (二、应用程序包结构理解及Ability的跳转,与Android的对比)

在进行应用开发前&#xff0c;对程序的目录及包结构的理解是有必要的。如果之前有过android开发经验的&#xff0c;会发现OpenHarmony的应用开发也很简单&#xff0c;有很多概念是相似的。下面对比android分析总结下鸿蒙的应用程序包结构&#xff0c;以及鸿蒙对比android的诸多…

Spring第七天(AOP)

简介 AOP(Aspect Oriented Programing)面向切面编程&#xff0c;一种编程范式&#xff0c;指导开发者如何组织程序结构 作用 在不惊动原始设计的基础上为其进行功能增强 Spring理念&#xff1a;无入侵式/无侵入式 基本概念 连接点(JoinPoint) : 程序执行过程中的任意位置&a…

ros2学习笔记-CLI工具,记录命令对应操作。

目录 环境变量turtlesim和rqt以初始状态打开rqt node启动节点查看节点列表查看节点更多信息命令行参数 --ros-args topic话题列表话题类型话题列表&#xff0c;附加话题类型根据类型查找话题名查看话题发布的数据查看话题的详细信息查看类型的详细信息给话题发布消息&#xff0…

html Canvas粒子文字特效

代码有点长&#xff0c;下面是代码&#xff1a; <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>HTML5 Canvas粒子效果文字动画特效DEMO演示</title><link rel"stylesheet" href"css/normalize.c…

Bit.Store 加密卡集成主流 BRC20通证,助力 BTC 生态流动性

“Bit.Store 首创性的将包括 ORDI、SATS、以及 RATS 在内的主流 BRC20 资产集成到其加密卡支付中&#xff0c;通过以其推出的加密银行卡为媒介&#xff0c;助力 BTC 生态 Token 的流动性与消费。” 比特币网络在被设计之初&#xff0c;就是以一种去中心化、点对点的现金系统为定…

shardingsphere 出现 Cannot support database type ‘MySQL‘

场景 近日一个项目使用了shardingsphere后出现 java.lang.UnsupportedOperationException: Cannot support database type MySQL , 重点是在dev-pre环境中无法出现这个问题&#xff0c;而是在prod环境中会发生&#xff0c;且prod也不是100%发生&#xff0c; 当流量过大时会发…

js-cookie的使用--token的数据实现持久化

1.下载 npm install js-cookie 2.引入 import Cookies from "js-cookie"; 3.使用 // 写入cookie Cookies.set(name, value) // 读取 Cookies.get(name) // > value Cookies.get(nothing) // > undefined // 读取所有可见的cookie Cookies.get() // 删除某项co…

Vue2:全局事件总线

一、场景描述 之前我们学习了&#xff0c;通过props实现父子组件之间的通信。通过自定义组件&#xff0c;实现了子给父传递数据。 那么&#xff0c;兄弟关系的组件&#xff0c;如何通信了&#xff1f;任意组件间如何通信了&#xff1f; 这个时候&#xff0c;就要学习全局事件总…

JavaScript快速入门一

概述 JavaScript ECMAScript JavaScript特有的东西(BOM DOM) ECMAScript&#xff1a;客户端脚本语言&#xff0c;是欧洲计算机制造商协会ECMA&#xff0c;制定的标准&#xff0c;统一了所有客户端脚本语言的编码方式BOM&#xff1a;Browser Object Model&#xff0c;浏览器…