RPC教程 4.超时处理机制

0.前言

对比原教程,这里使用context来处理子协程的泄露问题。

1.为什么需要超时处理机制

超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力。

纵观整个远程调用的过程,需要客户端处理超时的地方有:

与服务端建立连接,导致的超时
发送请求到服务端,写报文导致的超时
等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:

读取客户端请求报文时,读报文导致的超时
发送响应报文时,写报文导致的超时
调用映射服务的方法时,处理报文导致的超时

其RPC 在 3 个地方添加超时处理机制。分别是:

  1. 客户端创建连接时
  2. 客户端 Client.Call() 整个过程导致的超时(不仅包含发送报文,还包括等待处理,接收报文所有阶段
  3. 服务端处理报文,即 Server.handleRequest 超时。

 2.客户端创建连接超时

为了实现简单,把一些超时时间设定放在Option结构体中。有两个超时时间,连接超时ConnectTimeout,服务端处理超时HandleTimeout。

type Option struct {MagicNumber    int            // MagicNumber marks this's a geerpc requestCodecType      codec.CodeType // client may choose different Codec to encode bodyConnectTimeout time.Duration    //0 表示没有限制HandleTimeout  time.Duration
}var DefaultOption = &Option{MagicNumber:    MagicNumber,CodecType:      codec.GobType,ConnectTimeout: time.Second * 10,    //默认连接超时是10s
}

客户端连接时候是使用Dail方法,那我们就为 Dial 添加一层超时处理的外壳即可。

重点在dialTimeout函数。

  1. 将 net.Dial 替换为 net.DialTimeout,如果连接创建超时,将返回错误。
  2. 开启子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After() 信道先接收到消息,则说明 NewClient 执行超时,返回错误。
  3. 这里使用了context来通知子协程进行退出。要是没有这个通知的话,假如NewClient要处理很久,而这时case<-time.After(opt.ConnectTimeout)已经到时间了,那执行dialTimeout的协程也就退出了,那子协程中的通道ch就会一直被阻塞(因为没有接收方),那该子协程就不能退出,会泄露。用contex的cancel()就可以通知子协程退出。

 需要讲下newClientFunc,为什么需要这个类型呢,大家应该明白就是直接是用NewClient函数就行的,为什么还要多此一举,要从函数参数中把该函数入参呢,为什么不直接在代码里把f(conn,opt)就写成NewClient(conn,opt)呢。

这是为了后面方便的,下一节会支持HTTP协议的,那就会有HTTP连接,而现在的是TCP连接,而HTTP连接对比TCP连接还有些操作的。这样我们把这个做成参数可以入参,这样我们就可以复用dialTimeout,我们把建立HTTP连接的函数传递给dialTimeout,不再需要为HTTP连接的再重新写dialTimeout。而这也方便了后面对该函数的测试。

type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)type clientResult struct {client *Clienterr    error
}func dialTimeout(f newClientFunc, network, address string, opt *Option) (client *Client, err error) {//超时连接检测conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)if err != nil {return nil, err}//设置超时时间的情况ch := make(chan clientResult)ctx, cancel := context.WithCancel(context.Background())defer cancel()go func(ctx context.Context) {client, err = f(conn, opt) //在这一节,f(conn, opt)就是NewClient(conn, opt)select {case <-ctx.Done():returndefault:ch <- clientResult{client: client, err: err}}}(ctx)if opt.ConnectTimeout == 0 {result := <-chreturn result.client, result.err}select {case <-time.After(opt.ConnectTimeout):cancel() //超时通知子协程结束退出return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)case result := <-ch:return result.client, result.err}
}func Dail(network, address string, opts ...*Option) (client *Client, err error) {opt, err := parseOptions(opts...)if err != nil {return nil, err}return dialTimeout(NewClient, network, address, opt)
}

3.Client.Call 超时

Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活。

这里的超时处理,不止是客户端发送给服务端所需的时间,还包括了客户端等待服务端发送回复的这段时间。即是这个超时时间是要等接收完服务端的回复信息

代码case call := <-call.Done表示要等待收到服务端的回复信息,要是这时候先执行case <-ctx.Done(),那就表示超时了。

所以后面的测试Client.Call 超时要留意其超时。

func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error {call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))select {case <-ctx.Done():client.removeCall(call.Seq)return errors.New("rpc client: call failed: " + ctx.Err().Error())case call := <-call.Done:return call.Error}//之前的写法// 	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done// 	return call.Error
}

用户可以使用 context.WithTimeout 创建具备超时检测能力的 context 对象来控制。

	var reply intctx, _ := context.WithTimeout(context.Background(), time.Second*5)client.Call(ctx, "My.Sum", args, &reply);

4.服务端处理超时

和客户端连接超时处理相似,也是使用context来控制子协程的退出。

开启一个新协程去执行call方法。通道called用来表示消息处理发送是否完毕。超时没有限制时,主协程就会阻塞在timeout==0的<-called中,等到发送完毕后,通道called有数据了,子协程结束,主协程也解除阻塞,退出。

超时有限制情况,假如超时了,就会执行在 case <-time.After(timeout)处调用cancel(),那子协程中的case <-ctx.Done()就会处理,发送超时处理的信息给客户端并退出子协程。

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {defer wg.Done()ctx, cancel := context.WithCancel(context.Background())defer cancel()called := make(chan struct{})go func(ctx context.Context) {err := req.svc.call(req.mtype, req.argv, req.replyv)select {case <-ctx.Done():req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)server.sendResponse(cc, req.h, invalidRequest, sending)default:if err != nil {fmt.Println("call err:", err)req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)} else {server.sendResponse(cc, req.h, req.replyv.Interface(), sending)}called <- struct{}{}}}(ctx)if timeout == 0 {<-calledreturn}select {case <-time.After(timeout):cancel()case <-called:return}
}

上一节的handleRequest中是没有timeout这个参数的,所以在使用该方法时候,需要加上timeout。需要修改下(Server).Serveconn方法。

func (server *Server) ServeConn(conn io.ReadWriteCloser) {//其余的没有改变server.servCode(f(conn), &opt)  //之前是server.servCode(f(conn))
}
//使用handleRequest的地方
func (server *Server) servCode(cc codec.Codec, opt *Option) {//...................for {go server.handleRequest(cc, req, sending, &wg, opt.HandleTimeout)}
}

5.测试

第一个测试用例,用于测试连接超时。NewClient 函数耗时 3s,ConnectionTimeout 分别设置为 1s 和 0 两种场景。

这里newClientFunc类型就派上用场了,这里就可以设置该函数耗时,方便测试。

func TestClient_dialTimeout(t *testing.T) {t.Parallel() //表示该测试将与(并且仅与)其他并行测试并行运行。l, _ := net.Listen("tcp", "localhost:10000")f := func(conn net.Conn, opt *Option) (*Client, error) {conn.Close()time.Sleep(time.Second * 2)return nil, nil}//命令行执行 go test -run TestClient_dialTimeout/timeout 测试t.Run("timeout", func(t *testing.T) {_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: time.Second})_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")})
//命令行执行 go test -run TestClient_dialTimeout/0 测试t.Run("0", func(t *testing.T) {_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: 0})_assert(err == nil, "0 means no limit")})
}func _assert(condition bool, msg string, v ...interface{}) {if !condition {panic(fmt.Sprintf("assertion failed: "+msg, v...))}
}

第二个测试用例,用于测试处理超时。Bar.Timeout 耗时 2s。

场景一:客户端设置超时时间为 1s,服务端无限制(这个就是Client.Call超时的情况,需要注意)

场景二:服务端设置超时时间为1s,客户端无限制。

type Bar intfunc (b *Bar) Timeout(argv int, reply *int) error {time.Sleep(time.Second * 3)    // 模拟3s的工作return nil
}func startServer(addr chan string) {var b Bar_ = Register(&b)l, _ := net.Listen("tcp", "localhost:10000")addr <- l.Addr().String()Accept(l)
}func TestClient_Call(t *testing.T) {t.Parallel()addrCh := make(chan string)go startServer(addrCh)addr := <-addrChtime.Sleep(time.Second)t.Run("client_timeout", func(t *testing.T) {client, _ := Dail("tcp", addr)ctx, _ := context.WithTimeout(context.Background(), time.Second*1)var reply interr := client.Call(ctx, "Bar.Timeout", 1, &reply)_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")})t.Run("server_hander_timeout", func(t *testing.T) {client, _ := Dail("tcp", addr, &Option{HandleTimeout: time.Second,})var reply interr := client.Call(context.Background(), "Bar.Timeout", 1, &reply)_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")})
}

完整代码:https://githubfast.com/liwook/Go-projects/tree/main/geerpc/4-timeout

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/641336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【江科大】STM32:串口HEX/文本数据接收和发送(代码部分)(下)

串口发送 #include "stm32f10x.h" // Device header#include<stdio.h> #include<stdarg.h> void Serial_Init(void) {RCC_APB2PeriphClockCmd(RCC_APB2Periph_USART1,ENABLE);RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA,ENABLE);GPI…

latex画边框框加粗的表格

示例代码 \begin{table}[H]% 这四个距离分别控制toprule和buttomrule的上、下空白间距&#xff0c;如果不是0的话以后竖线和横线连不起来\abovetopsep0pt\aboverulesep0pt\belowrulesep0pt\belowbottomsep0pt\centering% \heavyrulewidth是\toprule和\bottomrule的默认宽度&am…

Vector容器的详细介绍

一、vector基本概念 1.1 功能&#xff1a; -vector 数据结构和数组非常相似&#xff0c;也称单端数组 1.2 vector与普通数组区别&#xff1a; - 不同之处在于数组是静态空间&#xff0c;而vector可以动态拓展 1.3 动态拓展&#xff1a; 并不是在原空间之后续接新空间&#xff…

Java面试题03——CAS

1.什么是CAS CAS英文全(Compare And Swap)指比较并交换。 CAS算法包含3个参数&#xff08;V,E,N&#xff09; V表示要更新的变量E表示预期的值N表示新值 在且仅在V值等于E值时&#xff0c;才会将V值设为N&#xff0c;如果V值和E值不同&#xff0c;则说明已经有其他线程做了更新…

如何做好培训管理?推荐使用这款培训管理系统(内附详细步骤+免费模板)

本文将为大家讲解&#xff1a;1、如何做好培训管理&#xff1f;2、如何使用零代码平台搭建培训管理系统&#xff1f; 培训管理&#xff0c;作为企业人力资源管理的核心环节&#xff0c;对于确保员工具备完成任务所需的专业知识和技能发挥着至关重要的作用。它不仅是提升员工绩…

北斗卫星为野外科考人员提供安全保障

北斗卫星为野外科考人员提供安全保障 自第二次青藏高原综合科学考察研究启动以来&#xff0c;青海不断提升科考服务保障能力&#xff0c;推动科考全程信息化&#xff0c;有效促进科考成果转化。 为保障科考人员的人身安全&#xff0c;青海省青藏科学考察服务中心开发了基于北…

CF1630A And Matching 题解

And Matching 传送门 You are given a set of n n n ( n n n is always a power of 2 2 2) elements containing all integers 0 , 1 , 2 , … , n − 1 0, 1, 2, \ldots, n-1 0,1,2,…,n−1 exactly once. Find n 2 \frac{n}{2} 2n​ pairs of elements such that: Ea…

宝藏api推荐,热门、免费、好用

台风信息查询&#xff1a;提供西北太平洋及南海地区过去两年及当前年份所有编号台风的信息查询&#xff0c;包括台风实时位置、过去路径、预报路径及登陆信息等要素。短信验证码&#xff1a;可用于登录、注册、找回密码、支付认证等等应用场景。支持三大运营商&#xff0c;3秒可…

第08章_面向对象编程(高级)(static,单例设计模式,理解mian方法,代码块,final,抽象类与抽象方法,接口,内部类,枚举类,注解,包装类)

文章目录 第08章_面向对象编程(高级)本章专题与脉络1. 关键字&#xff1a;static1.1 类属性、类方法的设计思想1.2 static关键字1.3 静态变量1.3.1 语法格式1.3.2 静态变量的特点1.3.3 举例1.3.4 内存解析 1.4 静态方法1.4.1 语法格式1.4.2 静态方法的特点1.4.3 举例 1.5 练习 …

UI设计中的插画运用优势(下)

6. 插画赋予设计以美学价值&#xff0c;更容易被接受 即使所有人都在分析和争论产品的可用性和易用性&#xff0c;大家在对美的追求上&#xff0c;始终保持着一致的态度。一个设计是否具备可取性&#xff0c;是否能够通过甲方、客户和实际用户&#xff0c;是每个设计人都需要面…

高频一体式读写器的应用及其原理

高频一体式读写器作为一款读写设备&#xff0c;将RFID读写模块和天线集于一体&#xff0c;通过天线与RFID标签进行无线通信&#xff0c;实现对标签的识别和内存数据的读出或写入操作。具备安全、准确、快速、扩展、兼容性强等特点&#xff0c;具备非接触识别、远距离识别、环境…

Laravel 10.x 里如何使用ffmpeg

原理上很简单&#xff0c;就是使用命令行去调用ffmpeg&#xff0c;然后分析一下输出是不是有错误。 安装 首先安装 symfony/process&#xff0c;主要用于包装一下&#xff0c;用来代替 exec, passthru, shell_exec and system 。 composer require symfony/process composer…

PowerShell install 一键部署grafana

grafana 前言 Grafana 是一款开源的数据可视化和监控仪表盘工具。它提供了丰富的数据查询、可视化和报警功能,可用于实时监控、数据分析和故障排除等领域。 通过 Grafana,您可以连接到各种不同的数据源,包括时序数据库(如 Prometheus、InfluxDB)和关系型数据库(如 MySQ…

linux性能优化-磁盘I_O优化

1.文件系统 1.1.文件系统的工作原理 文件系统是在磁盘的基础上&#xff0c;提供了一个用来管理文件的树状结构。 接下来我们就看看Linux 文件系统的工作原理。 1.1.1索引节点和目录项 在 Linux 中一切皆文件 ,文件系统,本身是对存储设备上的文件&#xff0c;进行组织管理的…

【Linux】—— 共享内存

本期我将要带大家学习的是有关进程间通信的另一种方式——共享内存。共享内存是一种用于进程间通信的高效机制&#xff0c;允许多个进程访问和操作同一块内存区域。 目录 &#xff08;一&#xff09;深刻理解共享内存 1.1 概念解释 1.2 共享内存原理 1.3 共享内存数据结构 …

基于SpringBoot的药品管理系统

文章目录 项目介绍主要功能截图&#xff1a;部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &…

2024年华为OD机试真题-最大坐标值-Python-OD统一考试(C卷)

题目描述: 小明在玩一个游戏,游戏规则如下: 在游戏开始前,小明站在坐标轴原点处(坐标值为0)。 给定一组指令和一个幸运数,每个指令都是一个整数,小明按照指定的要求前进或者后退指定的步数。前进代表朝坐标轴的正方向走,后退代表朝坐标轴的负方向走。 幸运数为一个整数…

【css技巧】css实现边框渐变

海鲸AI-ChatGPT4.0国内站点&#xff0c;支持设计稿转代码&#xff1a;https://www.atalk-ai.com 在CSS中实现边框渐变效果&#xff0c;你不能直接应用渐变到border属性上&#xff0c;因为CSS标准不支持这样的操作。但是&#xff0c;你可以使用一些技巧来模拟边框渐变的效果。以…

数据治理能解决AI疲劳问题吗?

这篇文章强调了AI疲劳开始的两个阶段&#xff0c;并介绍了数据质量报告等数据治理措施如何能够推动构建值得信赖和健壮的模型。 数据治理和AI疲劳听起来像是两个不同的概念&#xff0c;但两者之间有着内在的联系。为了更好地理解它&#xff0c;让我们从它们的定义开始。 数据治…

JS 将字符串‘10.3%‘ 经过运算加2转换为 ‘12.3%‘

文章目录 需求分析 需求 已知 字符串 a ‘10.3%’&#xff0c;现需将转换为 字符串’12.3%’ 分析 去掉百分号&#xff0c;将字符串转换为数字 const aNumber parseFloat(10.3%); const resultNumber aNumber 2;将结果转换为带百分号的字符串 const resultString re…