0.前言
对比原教程,这里使用context来处理子协程的泄露问题。
1.为什么需要超时处理机制
超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力。
纵观整个远程调用的过程,需要客户端处理超时的地方有:
与服务端建立连接,导致的超时
发送请求到服务端,写报文导致的超时
等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:
读取客户端请求报文时,读报文导致的超时
发送响应报文时,写报文导致的超时
调用映射服务的方法时,处理报文导致的超时
其RPC 在 3 个地方添加超时处理机制。分别是:
- 客户端创建连接时
- 客户端
Client.Call()
整个过程导致的超时(不仅包含发送报文,还包括等待处理,接收报文所有阶段) - 服务端处理报文,即
Server.handleRequest
超时。
2.客户端创建连接超时
为了实现简单,把一些超时时间设定放在Option结构体中。有两个超时时间,连接超时ConnectTimeout,服务端处理超时HandleTimeout。
type Option struct {MagicNumber int // MagicNumber marks this's a geerpc requestCodecType codec.CodeType // client may choose different Codec to encode bodyConnectTimeout time.Duration //0 表示没有限制HandleTimeout time.Duration
}var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType: codec.GobType,ConnectTimeout: time.Second * 10, //默认连接超时是10s
}
客户端连接时候是使用Dail方法,那我们就为 Dial 添加一层超时处理的外壳即可。
重点在dialTimeout函数。
- 将
net.Dial
替换为net.DialTimeout
,如果连接创建超时,将返回错误。 - 开启子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果
time.After()
信道先接收到消息,则说明 NewClient 执行超时,返回错误。 - 这里使用了context来通知子协程进行退出。要是没有这个通知的话,假如NewClient要处理很久,而这时case<-time.After(opt.ConnectTimeout)已经到时间了,那执行dialTimeout的协程也就退出了,那子协程中的通道ch就会一直被阻塞(因为没有接收方),那该子协程就不能退出,会泄露。用contex的cancel()就可以通知子协程退出。
需要讲下newClientFunc,为什么需要这个类型呢,大家应该明白就是直接是用NewClient函数就行的,为什么还要多此一举,要从函数参数中把该函数入参呢,为什么不直接在代码里把f(conn,opt)就写成NewClient(conn,opt)呢。
这是为了后面方便的,下一节会支持HTTP协议的,那就会有HTTP连接,而现在的是TCP连接,而HTTP连接对比TCP连接还有些操作的。这样我们把这个做成参数可以入参,这样我们就可以复用dialTimeout,我们把建立HTTP连接的函数传递给dialTimeout,不再需要为HTTP连接的再重新写dialTimeout。而这也方便了后面对该函数的测试。
type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)type clientResult struct {client *Clienterr error
}func dialTimeout(f newClientFunc, network, address string, opt *Option) (client *Client, err error) {//超时连接检测conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)if err != nil {return nil, err}//设置超时时间的情况ch := make(chan clientResult)ctx, cancel := context.WithCancel(context.Background())defer cancel()go func(ctx context.Context) {client, err = f(conn, opt) //在这一节,f(conn, opt)就是NewClient(conn, opt)select {case <-ctx.Done():returndefault:ch <- clientResult{client: client, err: err}}}(ctx)if opt.ConnectTimeout == 0 {result := <-chreturn result.client, result.err}select {case <-time.After(opt.ConnectTimeout):cancel() //超时通知子协程结束退出return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)case result := <-ch:return result.client, result.err}
}func Dail(network, address string, opts ...*Option) (client *Client, err error) {opt, err := parseOptions(opts...)if err != nil {return nil, err}return dialTimeout(NewClient, network, address, opt)
}
3.Client.Call 超时
Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活。
这里的超时处理,不止是客户端发送给服务端所需的时间,还包括了客户端等待服务端发送回复的这段时间。即是这个超时时间是要等接收完服务端的回复信息。
代码case call := <-call.Done表示要等待收到服务端的回复信息,要是这时候先执行case <-ctx.Done(),那就表示超时了。
所以后面的测试Client.Call 超时要留意其超时。
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error {call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))select {case <-ctx.Done():client.removeCall(call.Seq)return errors.New("rpc client: call failed: " + ctx.Err().Error())case call := <-call.Done:return call.Error}//之前的写法// call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done// return call.Error
}
用户可以使用 context.WithTimeout
创建具备超时检测能力的 context 对象来控制。
var reply intctx, _ := context.WithTimeout(context.Background(), time.Second*5)client.Call(ctx, "My.Sum", args, &reply);
4.服务端处理超时
和客户端连接超时处理相似,也是使用context来控制子协程的退出。
开启一个新协程去执行call方法。通道called用来表示消息处理发送是否完毕。超时没有限制时,主协程就会阻塞在timeout==0的<-called中,等到发送完毕后,通道called有数据了,子协程结束,主协程也解除阻塞,退出。
超时有限制情况,假如超时了,就会执行在 case <-time.After(timeout)处调用cancel(),那子协程中的case <-ctx.Done()就会处理,发送超时处理的信息给客户端并退出子协程。
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {defer wg.Done()ctx, cancel := context.WithCancel(context.Background())defer cancel()called := make(chan struct{})go func(ctx context.Context) {err := req.svc.call(req.mtype, req.argv, req.replyv)select {case <-ctx.Done():req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)server.sendResponse(cc, req.h, invalidRequest, sending)default:if err != nil {fmt.Println("call err:", err)req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)} else {server.sendResponse(cc, req.h, req.replyv.Interface(), sending)}called <- struct{}{}}}(ctx)if timeout == 0 {<-calledreturn}select {case <-time.After(timeout):cancel()case <-called:return}
}
上一节的handleRequest中是没有timeout这个参数的,所以在使用该方法时候,需要加上timeout。需要修改下(Server).Serveconn方法。
func (server *Server) ServeConn(conn io.ReadWriteCloser) {//其余的没有改变server.servCode(f(conn), &opt) //之前是server.servCode(f(conn))
}
//使用handleRequest的地方
func (server *Server) servCode(cc codec.Codec, opt *Option) {//...................for {go server.handleRequest(cc, req, sending, &wg, opt.HandleTimeout)}
}
5.测试
第一个测试用例,用于测试连接超时。NewClient 函数耗时 3s,ConnectionTimeout 分别设置为 1s 和 0 两种场景。
这里newClientFunc类型就派上用场了,这里就可以设置该函数耗时,方便测试。
func TestClient_dialTimeout(t *testing.T) {t.Parallel() //表示该测试将与(并且仅与)其他并行测试并行运行。l, _ := net.Listen("tcp", "localhost:10000")f := func(conn net.Conn, opt *Option) (*Client, error) {conn.Close()time.Sleep(time.Second * 2)return nil, nil}//命令行执行 go test -run TestClient_dialTimeout/timeout 测试t.Run("timeout", func(t *testing.T) {_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: time.Second})_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")})
//命令行执行 go test -run TestClient_dialTimeout/0 测试t.Run("0", func(t *testing.T) {_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: 0})_assert(err == nil, "0 means no limit")})
}func _assert(condition bool, msg string, v ...interface{}) {if !condition {panic(fmt.Sprintf("assertion failed: "+msg, v...))}
}
第二个测试用例,用于测试处理超时。Bar.Timeout
耗时 2s。
场景一:客户端设置超时时间为 1s,服务端无限制(这个就是Client.Call超时的情况,需要注意)
场景二:服务端设置超时时间为1s,客户端无限制。
type Bar intfunc (b *Bar) Timeout(argv int, reply *int) error {time.Sleep(time.Second * 3) // 模拟3s的工作return nil
}func startServer(addr chan string) {var b Bar_ = Register(&b)l, _ := net.Listen("tcp", "localhost:10000")addr <- l.Addr().String()Accept(l)
}func TestClient_Call(t *testing.T) {t.Parallel()addrCh := make(chan string)go startServer(addrCh)addr := <-addrChtime.Sleep(time.Second)t.Run("client_timeout", func(t *testing.T) {client, _ := Dail("tcp", addr)ctx, _ := context.WithTimeout(context.Background(), time.Second*1)var reply interr := client.Call(ctx, "Bar.Timeout", 1, &reply)_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")})t.Run("server_hander_timeout", func(t *testing.T) {client, _ := Dail("tcp", addr, &Option{HandleTimeout: time.Second,})var reply interr := client.Call(context.Background(), "Bar.Timeout", 1, &reply)_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")})
}
完整代码:https://githubfast.com/liwook/Go-projects/tree/main/geerpc/4-timeout