IdleStateHandler 心跳机制源码详解

优质博文:IT-BLOG-CN

一、心跳机制

Netty支持心跳机制,可以检测远程服务端是否存活或者活跃。心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制。在服务器和客户端之间一定时间内没有数据交互时, 即处于idle状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 即一个PING-PONG交互。当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保TCP连接的有效性。

Netty提供了IdleStateHandler可以对三种类型心跳进行检测,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。同时,还提供了ReadTimeoutHandlerWriteTimeoutHandler检测连接的有效性。

名称作用
IdleStateHandler当连接的空闲时间(读或者写)太长时,将会触发一个IdleStateEvent事件。然后,你可以通过你的ChannelInboundHandler中重写userEventTrigged方法来处理该事件。
ReadTimeoutHandler如果在指定的事件没有发生读事件,就会抛出这个异常,并自动关闭这个连接。你可以在exceptionCaught方法中处理这个异常。
WriteTimeoutHandler当一个写操作不能在一定的时间内完成时,抛出此异常,并关闭连接。你同样可以在exceptionCaught方法中处理这个异常。

二、IdleStateHandler 简介

IdleStateHandler也是一个ChannelHandler,也需要被载入到ChannelPipeline中,加入我们在服务器端的ChannelInitializer中。我们在channel链中加入了IdleSateHandler,第一个参数是5,单位是秒,那么这样做的意思就是:在服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内该链上的channelRead方法都没有被触发,就会调用userEventTriggered方法:

//创建服务类
ServerBootstrap serverBootstrap = new ServerBootstrap();//boss和worker
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();try {//设置线程池serverBootstrap.group(boss,worker);//设置socket工厂,Channel 是对 Java 底层 Socket 连接的抽象serverBootstrap.channel(NioServerSocketChannel.class);//设置管道工厂serverBootstrap.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {//设置后台转换器(二进制转换字符串)ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ServerSocketHandler());}});

并且还是个ChannelInboundHandler,是用来处理入站事件的。看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
名称作用
readerIdleTimeSeconds读超时。即当在指定的时间间隔内没有从Channel读取到数据时,会触发一个READER_IDLEIdleStateEvent事件
writerIdleTimeSeconds写超时。即当在指定的时间间隔内没有数据写入到Channel时,会触发一个WRITER_IDLEIdleStateEvent事件
allIdleTimeSeconds读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个ALL_IDLEIdleStateEvent事件

三、IdleStateHandler 源码

【1】handlerAddedhandlerRemovedIdleStateHandler是在创建IdleStateHandler实例并添加到ChannelPipeline时添加定时任务来进行定时检测的,具体在initialize(ctx)方法实现;同时在从ChannelPipeline移除或Channel关闭时,移除这个定时检测,具体在destroy()实现。IdleStateHandlerchannelActive()方法在socket通道建立时被触发。

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isActive() && ctx.channel().isRegistered()) {this.initialize(ctx);}}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {this.destroy();
}public void channelInactive(ChannelHandlerContext ctx) throws Exception {this.destroy();super.channelInactive(ctx);
}

【2】initialize:根据配置的readerIdleTimeWriteIdleTIme等超时事件参数往任务队列taskQueue中添加定时任务task。我先先查看ReaderIdleTimeoutTask的作用。

private void initialize(ChannelHandlerContext ctx) {switch(this.state) {case 1:case 2:return;default:this.state = 1;this.initOutputChanged(ctx);this.lastReadTime = this.lastWriteTime = this.ticksInNanos();if (this.readerIdleTimeNanos > 0L) {// 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中this.readerIdleTimeout = this.schedule(ctx, new IdleStateHandler.ReaderIdleTimeoutTask(ctx), this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (this.writerIdleTimeNanos > 0L) {this.writerIdleTimeout = this.schedule(ctx, new IdleStateHandler.WriterIdleTimeoutTask(ctx), this.writerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (this.allIdleTimeNanos > 0L) {this.allIdleTimeout = this.schedule(ctx, new IdleStateHandler.AllIdleTimeoutTask(ctx), this.allIdleTimeNanos, TimeUnit.NANOSECONDS);}}
}

定时任务添加到对应线程EventLoopExecutor对应的任务队列taskQueue中,在对应线程的run()方法中循环执行。只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state状态设置为1,防止重复初始化。调用initOutputChanged方法,初始化监控出站数据属性

private void initOutputChanged(ChannelHandlerContext ctx) {if (this.observeOutput) {Channel channel = ctx.channel();Unsafe unsafe = channel.unsafe();ChannelOutboundBuffer buf = unsafe.outboundBuffer();// 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数if (buf != null) {this.lastMessageHashCode = System.identityHashCode(buf.current());this.lastPendingWriteBytes = buf.totalPendingWriteBytes();this.lastFlushProgress = buf.currentProgress();}}
}

这边会触发一个ReaderIdleTimeoutTasknextDelay的初始化值为超时秒数readerIdleTimeNanos。如果检测的时候没有正在读,且计算多久没读了,nextDelay -= 当前时间 - 上次读取时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了。则创建IdleStateEvent事件,IdleState枚举值为READER_IDLE,然后调用channelIdle方法分发给下一个ChannelInboundHandler,通常由用户自定义一个ChannelInboundHandler来捕获并处理。

private final class ReaderIdleTimeoutTask extends IdleStateHandler.AbstractIdleTask {ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {super(ctx);}protected void run(ChannelHandlerContext ctx) {long nextDelay = IdleStateHandler.this.readerIdleTimeNanos;if (!IdleStateHandler.this.reading) {nextDelay -= IdleStateHandler.this.ticksInNanos() - IdleStateHandler.this.lastReadTime;}if (nextDelay <= 0L) {IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, IdleStateHandler.this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);boolean first = IdleStateHandler.this.firstReaderIdleEvent;IdleStateHandler.this.firstReaderIdleEvent = false;try {IdleStateEvent event = IdleStateHandler.this.newIdleStateEvent(IdleState.READER_IDLE, first);IdleStateHandler.this.channelIdle(ctx, event);} catch (Throwable var6) {ctx.fireExceptionCaught(var6);}} else {IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);}}
}

firstxxxxIdleEvent作用:假设当你的客户端应用每次接收数据是30秒,而你的写空闲时间是25秒,那么,当你数据还没有写出的时候,写空闲时间触发了。实际上是不合乎逻辑的。因为你的应用根本不空闲。

Netty的解决方案是:记录最后一次输出消息的相关信息,并使用一个值firstXXXXIdleEvent表示是否再次活动过,每次读写活动都会将对应的first值更新为true,如果是false,说明这段时间没有发生过读写事件。同时如果第一次记录出站的相关数据和第二次得到的出站相关数据不同,则说明数据在缓慢的出站,就不用触发空闲事件。

总的来说,这个字段就是用来对付 “客户端接收数据奇慢无比,慢到比空闲时间还多” 的极端情况。所以,Netty默认是关闭这个字段的。

总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发·UserEventTriggered`方法:

protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {ctx.fireUserEventTriggered(evt);
}

【3】写任务的run方法逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对 出站较慢数据的判断。

if (hasOutputChanged(ctx, first)) {return;
}

如果这个方法返回true,就不执行触发事件操作了,即使时间到了。看看该方法实现:

private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {if (observeOutput) {// 如果最后一次写的时间和上一次记录的时间不一样,说明写操作进行过了,则更新此值if (lastChangeCheckTimeStamp != lastWriteTime) {lastChangeCheckTimeStamp = lastWriteTime;// 但如果,在这个方法的调用间隙修改的,就仍然不触发事件if (!first) { // #firstWriterIdleEvent or #firstAllIdleEventreturn true;}}Channel channel = ctx.channel();Unsafe unsafe = channel.unsafe();ChannelOutboundBuffer buf = unsafe.outboundBuffer();// 如果出站区有数据if (buf != null) {// 拿到出站缓冲区的 对象 hashcodeint messageHashCode = System.identityHashCode(buf.current());// 拿到这个 缓冲区的 所有字节long pendingWriteBytes = buf.totalPendingWriteBytes();// 如果和之前的不相等,或者字节数不同,说明,输出有变化,将 "最后一个缓冲区引用" 和 “剩余字节数” 刷新if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {lastMessageHashCode = messageHashCode;lastPendingWriteBytes = pendingWriteBytes;// 如果写操作没有进行过,则任务写的慢,不触发空闲事件if (!first) {return true;}}}}return false;
}

如果用户没有设置需要观察出站情况。就返回false,继续执行事件。如果设置了观察出站的情况,且最后一次写的时间和上一次记录的时间不一样,说明写操作刚刚做过了,则更新此值,但仍然需要判断这个first的值,如果这个值还是false,说明在这个写事件是在两个方法调用间隙完成的,或者是第一次访问这个方法,就仍然不触发事件。如果不满足上面的条件,就取出缓冲区对象,如果缓冲区没对象了,说明没有发生写的很慢的事件,就触发空闲事件。反之,记录当前缓冲区对象的hashcode和剩余字节数,再和之前的比较,如果任意一个不相等,说明数据在变化,或者说数据在慢慢的写出去。那么就更新这两个值,留在下一次判断。继续判断first,如果是fasle,说明这是第二次调用,就不用触发空闲事件了。

【4】所有事件的run方法:这个类叫做AllIdleTimeoutTask,表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致,除了这里:

long nextDelay = allIdleTimeNanos;
if (!reading) {// 当前时间减去 最后一次写或读 的时间 ,若大于0,说明超时了nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}

这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。最后调用ctx.fireUserEventTriggered(evt)方法。

通常这个使用的是最多的。构造方法一般是:

pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

读写都是0表示禁用,30表示30秒内没有任务读写事件发生,就触发事件。注意,当不是0的时候,这三个任务会重叠。

四、总结

【1】IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;
【2】Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内没有收到客户端的心跳数据包则认为客户端已经下线,将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,服务端都能感知到,而客户端不能感知到服务端的非正常下线;
【3】要想实现客户端感知服务端的存活情况,需要进行双向的心跳;Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/194133.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

bean依赖属性配置

bean依赖属性配置 文章目录 bean依赖属性配置 Data ConfigurationProperties(prefix "cartoon") public class CartoonProperties {private Cat cat;private Mouse mouse; }cartoon:cat:name: whatage: 5mouse:name: howage: 6这样的话&#xff0c;业务bean无需在读…

FPC和PCB有哪些区别?

现在电子技术越来越先进&#xff0c;CPU可以做到5nm工艺&#xff0c;电路板可以做到几十层&#xff0c;可折叠屏应用多款手机中。 什么是FPC&#xff1f; FPC&#xff1a;Flexible Printed Circuit&#xff0c;柔性电路板&#xff0c;又被称为“软板” FPC 以聚酰亚胺或聚酯薄…

Active Stereo Without Pattern Projector论文精读

1.背景补充 主动立体相机和被动立体相机的主要区别在于它们获取立体视觉信息的方式 主动立体相机12&#xff1a; 主动立体视觉是指寻找最佳的视角去重建目标或者场景1。主动视觉的实现方式通常有&#xff1a;改变环境中的光照条件、改变相机的视角、移动相机自身位置等&…

利用 LD_PRELOAD劫持动态链接库,绕过 disable_function

目录 LD_PRELOAD 简介 程序的链接 动态链接库的搜索路径搜索的先后顺序&#xff1a; 利用LD_PRELOAD 简单的劫持 执行id命令 反弹shell 引申至 PHP 绕过disable_function 方法1&#xff1a;使用蚁剑的扩展工具绕过disable_function 方法2&#xff1a;利用 mail 函数…

电磁兼容EMC理论基础汇总

目录 0. 序言 1. EMC的基础介绍 1.1 EMC电磁兼容的定义 1.2 EMC的重要性 1.3 EMC的三要素 2. 库仑定律 3. 趋肤效应与趋肤深度 4. 电阻抗公式 4.1 电阻 4.2 容抗 4.3 感抗 4.4 电路元件的非理想性 5. 麦克斯韦方程组 5.1 高斯磁定律 5.2 高斯定律 5.3 法拉…

Appwidget开发基本介绍

本篇主要对appwidget开发进行简单介绍&#xff0c;为后续漏洞挖掘相关做前置铺垫 appwidget简介 官方解释如下&#xff1a; 应用微件是可以嵌入其他应用&#xff08;如主屏幕&#xff09;并接收定期更新的微型应用视图。这些视图称为界面中的微件&#xff0c;您可以使用应用微…

软件工程精品课程教学网站的设计与实现

系统功能需求分析 本系统要求采用Browser/Server模式设计开发&#xff0c;可以作为一般高等院校的网络学堂&#xff1b;可以为教师的辅助教学或者网络教学提供一个完善的教学网站&#xff1b;学生可以利用本教学网站来完成一些课程的学习任务。 2.2.1 功能划分 《软件工程》教学…

Sakila数据库和World数据库

Sakila数据库和World数据库 安装MySQL8.2的时候多出两个样例数据库 Sakila数据库和World数据库 Sakila数据库是一个关于DVD租赁的样例数据库&#xff0c;用于展示MySQL的各种功能和特性。Sakila数据库中包含了多个表&#xff0c;包括电影、演员、客户、租赁记录等&#xff0c;可…

Oracle(2-6) Backup and Recovery Overview

文章目录 一、基础知识1、Categories of Failures 故障类别2、Causes of Statement Failures 语句失败的原因故障情况Resolutions 决议 3、User Process Failures 用户进程失败故障情况Resolutions 决议 4、Possible User Errors 用户错误类型故障情况Resolutions 决议 5、Inst…

实验6 二叉树操作

0x01 实验目的 掌握二叉树的基本概念&#xff0c;二叉树的存储结构使用链表。 0x02 实验内容 输入一个完全二叉树的层次遍历字符串&#xff0c;创建这个二叉树&#xff0c;输出这个二叉树的前序遍历字符串、中序遍历字符串、后序遍历字符串、结点数目、二叉树高度(上述每一个…

计算UDP报文CRC校验的总结

概述 因公司项目需求&#xff0c;遇到需要发送带UDP/IP头数据包的功能&#xff0c;经过多次尝试顺利完成&#xff0c;博文记录以备忘。 环境信息 操作系统 ARM64平台的中标麒麟Kylin V10 工具 tcpdump、wireshark、vscode 原理 请查看大佬的博文 UDP伪包头定义&#x…

MQ - 消息系统

消息系统 1、消息系统的演变 在大型系统中&#xff0c;会需要和很多子系统做交互&#xff0c;也需要消息传递&#xff0c;在诸如此类系统中&#xff0c;你会找到源系统&#xff08;消息发送方&#xff09;和 目的系统&#xff08;消息接收方&#xff09;。为了在这样的消息系…

数据结构和算法-哈夫曼树以相关代码实现

文章目录 总览带权路径长度哈夫曼树的定义哈夫曼树的构造法1法2 哈夫曼编码英文字母频次总结实验内容&#xff1a; 哈夫曼树一、上机实验的问题和要求&#xff08;需求分析&#xff09;&#xff1a;二、程序设计的基本思想&#xff0c;原理和算法描述&#xff1a;三、调试和运行…

Matter学习笔记(3)——交互模型

一、简介 1.1 交互方式 交互模型层定义了客户端和服务器设备之间可以执行哪些交互。发起交互的节点称为发起者&#xff08;通常为客户端设备&#xff09;&#xff0c;作为交互的接收者的节点称为目标&#xff08;通常为服务器设备&#xff09;。 节点通过以下方式进行交互&a…

Spring Initial 脚手架国内镜像地址

官方的脚手架下载太慢了&#xff0c;并且现在没有了Java8的选项&#xff0c;所以找到国内的脚手架镜像地址&#xff0c;推荐给大家。 首先说官方的脚手架 官方的脚手架地址为&#xff1a; https://start.spring.io/ 但是可以看到&#xff0c;并没有了Java8的选项。 所以推荐…

3dMax拼图生成工具Puzzle2D使用教程

Puzzle2D for 3dsMax拼图生成工具使用教程 Puzzle2D简介&#xff1a; 2D拼图随机生成器&#xff08;英文&#xff1a;Puzzle2D&#xff09; &#xff0c;是一款由#沐风课堂#用MAXScript脚本语言开发的3dsMax建模小工具&#xff0c;可以随机创建2D可编辑样条线拼图图形。可批量…

【tensorflow学习-选择动作】 学习tensorflow代码调用过程

a actor.choose_action(s) def choose_action(self, s):s s[np.newaxis, :]return self.sess.run(self.action, {self.s: s}) # get probabilities for all actions输入&#xff1a;s 输出&#xff1a;self.sess.run(self.action, {self.s: s}) &#xff1a;a

解决:UnboundLocalError: local variable ‘js’ referenced before assignment

解决&#xff1a;UnboundLocalError: local variable ‘js’ referenced before assignment 文章目录 解决&#xff1a;UnboundLocalError: local variable js referenced before assignment背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景 在使…

实战案例:chatglm3 基础模型多轮对话微调

chatglm3 发布了&#xff0c;这次还发了base版本的模型&#xff0c;意味着我们可以基于这个base模型去自由地做SFT了。 本项目实现了基于base模型的SFT。 base模型 https://huggingface.co/THUDM/chatglm3-6b-base由于模型较大&#xff0c;建议离线下载后放在代码目录&#…

OSG编程指南:专栏内容介绍及目录

1、专栏介绍 OpenSceneGraph&#xff08;OSG&#xff09;场景图形系统是一个基于工业标准 OpenGL 的软件接口&#xff0c;它让程序员能够更加快速、便捷地创建高性能、跨平台的交互式图形程序。本专栏基于 OSG 3.6.5版本进行源码的编写及扩展&#xff0c;也通用于其他OSG版本的…