优雅停机
- 优雅停机
- a.优雅停机概述
- b.服务端实现优雅停机
- c.客户端实现优雅停机
- d.优雅启动
优雅停机
a.优雅停机概述
当我们快速关闭服务提供方时,注册中心感知、以及通过watcher机制通知调用方一定不能做到实时,一定会有延时,同时我们的心跳检测也会有一定的时间间隔。也就意味着当一个提供方实际上已经下线了,但是他依然在调用方的健康列表中,调用方依然认为他健康依然会给他发送消息,最后的结果就是超时等待,不断重试而已。所以如何在服务下线时快速的让调用方感知,很重要。
大概可以有以下几种解决方案:
1.通过控制台人工通知调用方,让他们手动摘除要下线的机器,这种方式很原始也很直接。但这样对于提供方上线的过程来说太繁琐了,每次上线都要通知到所有调用我接口的团队,整个过程既浪费时间又没有意义,显然不能被正常接受。
2.通过服务发现机制感知,这种方式我们探讨过,因为存在一定的时间差,所以会出现一定的问题。
3.不强依赖“服务发现”来通知调用方要下线的机器,由服务提供方自己来通知行不行。在rpc里面调用方跟服务提供方之间是长连接,我们可以在提供方应用内存里面维护一份调用方连接集合,当服务要关闭的时候,挨个去通知调用方去下线这台机器。
实时上第三种方式已经很好了,但是依旧会出现一些问题 ,如请求的时间点跟收到服务提供方关闭通知的时间点很接近,只比关闭通知的时间早不到1ms,如果再加上网络传输时间的话,那服务提供方收到请求的时候,它应该正在处理关闭逻辑。这就说明服务提供方关闭的时候,并没有正确处理关闭后接收到的新请求。
优雅停机方案:
因为服务提供方已经开始进入关闭流程,那么很多对象就可能已经被销毁了,关闭后再收到的请求按照正常业务请求来处理,肯定没法保证能处理的。所以我们可以在关闭的时候,设置一个请求”挡板”,挡板的作用就是告诉调用方,我已经开始进入关闭流程了,我不能再处理你这个请求了。
这种场景在生活中其实很常见,举一个例子:
银行办理业务,在交接班或者有其他要事情处理的时候,银行柜台工作人员会拿出一个纸板,放在窗口前,上面写到“该窗口已关闭”。在该窗口排队的人虽然有一万个不愿,也只能换到其它窗口办理业务,因为柜台工作人员会把当前正在办理的业务处理完后正式关闭窗口。
基于这个思路,我们可以这么处理:
1.调用方发起请求,给调用方一个特殊的响应,使用响应码标记即可,就是告诉调用方我已经收到这个请求了,但是我正在关闭,并没有处理这个请求。
2.调用方收到这个异常响应后,rpc框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试至其他节点,这样就可以实现对业务无损
**问题一:**那要怎么捕获到关闭事件呢?
以通过捕获操作系统的进程信号来获取,在java 语言里面,可以使用Runtime的addShutdownHook方法,可以注册关闭的钩子。在yrpc启动的时候,我们提前注册关闭钩子,并在里面添加处理程序,负责开启关闭标识和安全关闭服务,服务在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常,返回特定结果。
以下是测试用例:
public static void main(String[] args) {Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("程序正在关闭");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("合法请求已经被处理完成");}));while (true) {try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("正在处理请求");}}
b.服务端实现优雅停机
修改core模块下的DcyRpcBootstrap
启动类的start()
类:
- 优先注册一个应用程序的钩子
public void start() {// 优先注册应用程序钩子Runtime.getRuntime().addShutdownHook(new DcyRpcShutdownHook());// 略...
}
在core模块下创建shutdownHook
包下
在该包下创建ShutdownHolder
类:
- 标记请求挡板
- 请求的计数器 可以用 LongAdder 或 AtomicInteger
public class ShutdownHolder {// 标记请求挡板public static AtomicBoolean BAFFLE = new AtomicBoolean(false);// 请求的计数器public static LongAdder REQUEST_COUNTER = new LongAdder(0);
}
在该包下创建DcyRpcShutdownHook
类:
- 继承 Thread
public class DcyRpcShutdownHook extends Thread{@Overridepublic void run() {// 1.打开挡板(boolean需要线程安全)ShutdownHolder.BAFFLE.set(true);// 2.等待计数器归零(正常的请求处理结束)// - 等待归零,继续执行 最多等十秒long start = System.currentTimeMillis();while (true) {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}if (ShutdownHolder.REQUEST_COUNTER.sum() == 0L && System.currentTimeMillis() - start > 10000) {break;}}// 3.阻塞结束后,放行。执行其他猜中,如释放资源}
}
修改core模块下的channelHandler.handler
包下的MethodCallHandler
类的channelRead0()
方法
-
1.先封装响应
-
2.获得通道
-
3.查看挡板是否打开: 如果已打开,返回一个错误的响应
-
4.计数器加一
-
5.限流操作
-
6.处理限流
-
7.写出响应
-
8.计数器减一
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcRequest dcyRpcRequest) throws Exception {// 1.封装响应DcyRpcResponse dcyRpcResponse = DcyRpcResponse.builder().requestId(dcyRpcRequest.getRequestId()).compressType(dcyRpcRequest.getCompressType()).serializeType(dcyRpcRequest.getSerializeType()).build();// 2.获得通道Channel channel = channelHandlerContext.channel();// 3.查看挡板是否打开: 如果已打开,返回一个错误的响应if (ShutdownHolder.BAFFLE.get()) {dcyRpcResponse.setCode(ResponseCode.CLOSING.getCode());channel.writeAndFlush(dcyRpcResponse);return;}// 4.计数器加一ShutdownHolder.REQUEST_COUNTER.increment();// 略...// 8.计数器减一ShutdownHolder.REQUEST_COUNTER.decrement();}
c.客户端实现优雅停机
修改core模块下的channelHandler.handler
包下的MySimpleChannelInboundHandler
类:channelRead0
方法
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcResponse dcyRpcResponse) throws Exception {// 略.....else if (code == ResponseCode.CLOSING.getCode()) {completableFuture.complete(null);log.info("当前请求id【{}】,访问被拒绝,目标服务器正处于关闭状态,响应码【{}】", dcyRpcResponse.getRequestId(), code);// 修正负载均衡器// 从健康列表中移除DcyRpcBootstrap.CHANNEL_CACHE.remove(socketAddress);DcyRpcRequest dcyRpcRequest = DcyRpcBootstrap.REQUEST_THREAD_LOCAL.get();DcyRpcBootstrap.getInstance().getConfiguration().getLoadBalancer().reLoadBalance(dcyRpcRequest.getRequestPayload().getInterfaceName(), DcyRpcBootstrap.CHANNEL_CACHE.keySet().stream().toList());}
}
d.优雅启动
这就好比我们日常生活中的热车,行驶之前让发动机空跑一会,
可以让汽车的各个部件都“热”起来,减小磨损。换到应用上来看,原理也是一样的。运行了一段时间后的应用,执行速度会比刚启动的应用更快。这是因为在 Java里面,在运行过程中,JVM 虚拟机会把高频的代码编译成机器码,被加载过的类也会被缓存到 JVM 缓存中,再次使用的时候不会触发临时加载,这样就使得“热点”代码的执行不用每次都通过解释,从而提升执行速度。
但是这些“临时数据”,都在我们应用重启后就消失了。重启后的这些“红利”没有了之后,如果让我们刚启动的应用就承担像停机前一样的流量,这会使应用在启动之初就处于高负载状态,从而导致调用方过来的请求可能出现大面积超时,进而对线上业务产生损害行为。