rabbimq之java.net.SocketException: Connection reset与MissedHeartbeatException分析

一、前言

在android前端中接入了rabbitmq消息队列来处理业务,在手机网络环境错综复杂,网络信号不稳定,可能导致mq的频繁断开与连接,在日志中,发现有很多这样的日志,java.net.SocketException: Connection reset,接下来通过源码调试来分析下该错误可能产生的原因。MissedHeartbeatException则是在客户端在多次未收到服务端的消息后,认为服务端已经断开,则抛出该异常。

二、分析

java.net.SocketException: Connection reset在网络搜了一圈,基本上说的是客户端连接着mq,但是服务端已经断开与客户端的连接,此时客户端还在执行接收数据操作,就会发生该错误。

三、MQ的心跳机制

MQ在创建连接的时候则会进行初始化开启心跳服务initializeHeartbeatSender();

  private void initializeHeartbeatSender() {this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);}

在rabbitmq中,客户端会间隔1/2的心跳周期来定时发送心跳

    /*** Sets the heartbeat in seconds.*/public void setHeartbeat(int heartbeatSeconds) {synchronized(this.monitor) {if(this.shutdown) {return;}// cancel any existing heartbeat taskif(this.future != null) {this.future.cancel(true);this.future = null;}if (heartbeatSeconds > 0) {// wake every heartbeatSeconds / 2 to avoid the worst case// where the last activity comes just after the last heartbeatlong interval = SECONDS.toNanos(heartbeatSeconds) / 2;ScheduledExecutorService executor = createExecutorIfNecessary();Runnable task = new HeartbeatRunnable(interval);this.future = executor.scheduleAtFixedRate(task, interval, interval, TimeUnit.NANOSECONDS);}}}

发送心跳,此时如果发生IO异常,这边没处理

    private final class HeartbeatRunnable implements Runnable {private final long heartbeatNanos;private HeartbeatRunnable(long heartbeatNanos) {this.heartbeatNanos = heartbeatNanos;}@Overridepublic void run() {try {LogUtils.log("心跳定时器发送");long now = System.nanoTime();if (now > (lastActivityTime + this.heartbeatNanos)) {frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));frameHandler.flush();}} catch (IOException e) {// ignore}}}

结合官方文档和客户端源代码,心跳默认超时时间是60秒,并且每隔30秒进行一次心跳检查,如果超过两次心跳检查都没有确定节点检查,则会关闭连接

3.1测试

测试用例中, 将心跳周期设置为30秒

public static void main(String[] args) {String queueName="123456";ExecutorService executor= Executors.newFixedThreadPool(10); ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.0.11.211");factory.setPort(5672);factory.setUsername("admin");factory.setVirtualHost("/");factory.setPassword("admin");factory.setConnectionTimeout(5000);       factory.setAutomaticRecoveryEnabled(false); factory.setTopologyRecoveryEnabled(false);   factory.setRequestedHeartbeat(30);executor.submit(() -> {try {Connection connection = factory.newConnection();LogUtils.log("连接创建成功");connection.addShutdownListener(cause -> {LogUtils.log("断开连接:"+cause.getMessage()+" msg=>:"+cause.getCause());});Channel channel = connection.createChannel();LogUtils.log("创建通道成功:" + channel.getChannelNumber());channel.basicQos(30);channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {try {String message = new String(body, "UTF-8");LogUtils.log("消息:"+message);channel.basicReject(envelope.getDeliveryTag(), false);} catch (Exception e) {LogUtils.log("消费者异常,e:" + e.getMessage()+" consumerTag:"+consumerTag);}}});channel.addShutdownListener(cause -> {LogUtils.log("消费者断开连接:" + cause.getMessage() + " msg=>:" + cause.getCause().toString());});} catch (Exception e) {LogUtils.log("发生异常:"+e);e.printStackTrace();}});}

然后将心跳的发送业务关闭

 private final class HeartbeatRunnable implements Runnable {private final long heartbeatNanos;private HeartbeatRunnable(long heartbeatNanos) {this.heartbeatNanos = heartbeatNanos;}@Overridepublic void run() {try {LogUtils.log("心跳定时器发送");long now = System.nanoTime();if (now > (lastActivityTime + this.heartbeatNanos)) {//  frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));//   frameHandler.flush();}} catch (IOException e) {// ignore}}}

运行后,如下:

2023-09-25 09:35:02.976=>连接创建成功
2023-09-25 09:35:02.987=>创建通道成功:1
2023-09-25 09:35:17.948=>心跳定时器发送
2023-09-25 09:35:32.948=>心跳定时器发送
2023-09-25 09:35:47.949=>心跳定时器发送
2023-09-25 09:36:02.949=>心跳定时器发送
2023-09-25 09:36:17.948=>心跳定时器发送
2023-09-25 09:36:32.948=>心跳定时器发送
2023-09-25 09:36:32.960=>消费者断开连接:connection error msg=>:java.net.SocketException: Connection reset
2023-09-25 09:36:32.962=>断开连接:connection error msg=>:java.net.SocketException: Connection reset

结果分析,服务端在3个心跳周期未检测到客户端的心跳后,则会默认客户端已经断线,则将其断开。

四、MissedHeartbeatException分析

在客户端连接MQ成功后,则开始数据服务的读取this._frameHandler.initialize(this);

    private void startIoLoops() {if (executorService == null) {Thread nioThread = Environment.newThread(threadFactory,new NioLoop(socketChannelFrameHandlerFactory.nioParams, this),"rabbitmq-nio");nioThread.start();} else {this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this));}}

读取线程业务方法,如果frame不为空,则丢失心跳这边重置为0次,反之则开始计数丢失次数

private void readFrame(Frame frame) throws IOException {LogUtils.log("开始读取数据");if (frame != null) {_missedHeartbeats = 0;if (frame.getType() == AMQP.FRAME_HEARTBEAT) {LogUtils.log("读取数据:心跳"); } else {if (frame.getChannel() == 0) { // the special channel_channel0.handleFrame(frame);} else {if (isOpen()) { ChannelManager cm = _channelManager;if (cm != null) {ChannelN channel;try {channel = cm.getChannel(frame.getChannel());} catch(UnknownChannelException e) { LOGGER.info("Received a frame on an unknown channel, ignoring it");return;}channel.handleFrame(frame);}}}}} else {    LogUtils.log("开始读取数据frame为空"); handleSocketTimeout();}}

超时机制,如果进入该业务,则_missedHeartbeats 会自动加1,如果超过一定次数,则会跑出MissedHeartbeatException

    private void handleSocketTimeout() throws SocketTimeoutException {if (_inConnectionNegotiation) {throw new SocketTimeoutException("Timeout during Connection negotiation");}if (_heartbeat == 0) { // No heart-beatingreturn;}LogUtils.log("handleSocketTimeout-------_missedHeartbeats心跳:"+_missedHeartbeats);if (++_missedHeartbeats > (1)) {throw new MissedHeartbeatException("Heartbeat missing with heartbeat = " +_heartbeat + " seconds, for " + this.getHostAddress());}}

为了方便测试,将心跳设置为10s,将_missedHeartbeats 判断大于1则抛出MissedHeartbeatException异常

4.1测试
2023-09-25 10:21:16.565=>开始读取数据
2023-09-25 10:21:16.651=>开始读取数据
2023-09-25 10:21:16.658=>开始读取数据
2023-09-25 10:21:16.658=>连接创建成功
2023-09-25 10:21:16.669=>开始读取数据
2023-09-25 10:21:16.670=>创建通道成功:1
2023-09-25 10:21:16.671=>开始读取数据
2023-09-25 10:21:16.675=>开始读取数据
2023-09-25 10:21:19.177=>开始读取数据
2023-09-25 10:21:19.177=>开始读取数据frame为空
2023-09-25 10:21:19.177=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:21.659=>开始读取数据
2023-09-25 10:21:21.659=>读取数据:心跳
2023-09-25 10:21:24.160=>开始读取数据
2023-09-25 10:21:24.160=>开始读取数据frame为空
2023-09-25 10:21:24.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:26.659=>开始读取数据
2023-09-25 10:21:26.660=>读取数据:心跳
2023-09-25 10:21:29.161=>开始读取数据
2023-09-25 10:21:29.161=>开始读取数据frame为空
2023-09-25 10:21:29.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:31.661=>开始读取数据
2023-09-25 10:21:31.661=>读取数据:心跳
2023-09-25 10:21:34.161=>开始读取数据
2023-09-25 10:21:34.161=>开始读取数据frame为空
2023-09-25 10:21:34.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:36.662=>开始读取数据
2023-09-25 10:21:36.662=>开始读取数据frame为空
2023-09-25 10:21:36.662=>handleSocketTimeout-------_missedHeartbeats心跳:1
2023-09-25 10:21:36.668=>消费者断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
2023-09-25 10:21:36.671=>断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211

上面可知,服务端会在心跳周期向客户端发送心跳,如果在客户端没收到任何消息时间段内,MissedHeartbeatException超过一定次数,则将跑出该异常,官方默认是2*4=8

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91967.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

yolov5分割+检测c++ qt 中部署,以opencv方式(详细代码(全)+复制可用)

1&#xff1a;版本说明&#xff1a; qt 5.12.10 opencv 4.5.3 &#xff08;yolov5模型部署要求opencv>4.5.0&#xff09; 2&#xff1a;检测的代码 yolo.h #pragma once #include<iostream> #include<cmath> #include<vector> #include <opencv2/…

【QandA C++】内存分段和内存分页等重点知识汇总

目录 内存分段 内存分页 内存分段 程序是由若干个逻辑分段组成的&#xff0c;如可由代码分段、数据分段、栈段、堆段组成。不同的段是有不同的属性的&#xff0c;所以就用分段的形式把这些段分离出来。 分段机制下&#xff0c;虚拟地址和物理地址是如何映射的&#xff1f; …

毅速课堂:3D打印随形水路在小零件注塑中优势明显

小零件注塑中的冷却不均匀问题常常导致烧焦现象的发生。这主要是因为传统机加工方法无法制造出足够细小的水路&#xff0c;以适应小零件的复杂形状。而3D打印技术的引入&#xff0c;尤其是随形水路的设计&#xff0c;为解决这一问题提供了新的解决方案。 3D打印随形水路技术的优…

TS编译选项——编译TS文件同时对JS文件进行编译

一、允许对JS文件进行编译 我们在默认情况下编译TS项目时是不能编译js文件的&#xff0c;如下图中的hello.js文件并未编译到dist目录下&#xff08;这里配置了编译文件放到dist目录下&#xff09; 如果我们想要实现编译TS文件同时对JS文件进行编译&#xff0c;就需要在tsconfi…

列出使用Typescript的一些优点?

使用Typescript有以下优点&#xff1a; 类型安全&#xff1a;Typescript是一种静态类型语言&#xff0c;它要求在编码阶段明确定义变量和函数的类型。这种类型安全可以减少在运行时出现错误的可能性&#xff0c;并提高代码的可读性和可维护性。代码可读性和可维护性&#xff1…

使用U3D、pico开发VR(二)——添加手柄摇杆控制移动

一、将unity 与visual studio 相关联 1.Edit->Preference->External tool 选择相应的版本 二、手柄遥控人物转向和人物移动 1.添加Locomotion System组件 选择XR Origin&#xff1b; 2.添加Continuous Move Provider&#xff08;Action-based&#xff09;组件 1>…

Android - kts文件配置应用签名

升级最新的AndroidStudio后&#xff0c;gradle配置文件从Groovy 迁移到 KTS&#xff0c;这里把自己配置应用签名遇到的问题及注意事项分享下。 Google官方说明地址将 build 配置从 Groovy 迁移到 KTS 配置后的代码如下&#xff1a; signingConfigs {create("keyStore&q…

PHP 反序列化漏洞:手写序列化文本

文章目录 参考环境序列化文本Scalar Type整数浮点数布尔值字符串 Compound Type数组数据结构序列化文本 对象数据结构序列化文本 Special TypeNULL数据结构序列化文本 手写序列化文本过程中的注意事项个数描述须于现实相符序列化文本前缀的大小写变化符号公共属性 参考 项目描…

编程每日一练(多语言实现)基础篇:求总数问题

文章目录 一、实例描述二、技术要点三、代码实现3.1 C 语言实现3.2 Python 语言实现3.3 Java 语言实现3.4 JavaScript 语言实现 一、实例描述 集邮爱好者把所有的邮票存放在三个集邮册中&#xff0c;在A册内存放全部的十分之二&#xff0c;在B册内存放不知道是全部的七分之几&…

MyBatis的一级缓存和二级缓存:原理和作用

MyBatis的一级缓存和二级缓存&#xff1a;原理和作用 引言 在数据库访问中&#xff0c;缓存是一种重要的性能优化手段&#xff0c;它可以减少数据库查询的次数&#xff0c;加快数据访问速度。MyBatis作为一款流行的Java持久层框架&#xff0c;提供了一级缓存和二级缓存来帮助…

基于Java的大学生就业招聘系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

【数据结构】排序算法(一)—>插入排序、希尔排序、选择排序、堆排序

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.直接插入排序 2.希尔排序 3.直接选择排…

Synchronized 原 理

Synchronized 其 原 理 是 什 么 ? synchronized 是 Java 中实现互斥同步的一种机制。当查看被 synchronized 修饰的代码块编译后的字节码,会发现编译器生成了 monitorenter 和 monitorexit 两个字节码指令。 这两个指令的作用如下: monitorenter:当虚拟机执行到 monitor…

OpenCV之分水岭算法(watershed)

Opencv 中 watershed函数原型&#xff1a; void watershed( InputArray image, InputOutputArray markers ); 第一个参数 image&#xff0c;必须是一个8bit 3通道彩色图像矩阵序列&#xff0c;第一个参数没什么要说的。关键是第二个参数 markers&#xff0c;Opencv官方文档的说…

全网最全Python系列教程(非常详细)---集合讲解(学Python入门必收藏)

&#x1f9e1;&#x1f9e1;&#x1f9e1;这篇是关于Python中集合的讲解&#xff0c;涉及到以下内容&#xff0c;欢迎点赞和收藏&#xff0c;你点赞和收藏是我更新的动力&#x1f9e1;&#x1f9e1;&#x1f9e1; 1、集合是什么&#xff1f; 2、集合应该怎么去定义&#xff1f…

搭建前端框架

在终端进入web目录&#xff0c;然后创建vuecrud工程 创建工程并引入ElementUI和axios手把手教学>传送门:VueCLI脚手架搭建

C进阶-字符串和内存函数

文章目录 一、求字符串长度二、长度不受限制的字符串函数三、长度受限制的字符串函数介绍四、字符串查找五、错误信息报告六、字符操作七、内存操作函数 前言 C语言中对字符和字符串的处理很是频繁&#xff0c;但是C语言本身是没有字符串类型的&#xff0c;字符串通常放在常量…

力扣 -- 718. 最长重复子数组

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:int findLength(vector<int>& nums1, vector<int>& nums2) {int m nums1.size();int n nums2.size();//多开一行&#xff0c;多开一列vector<vector<int>> dp(m 1, ve…

Ghostscript 在 Linux 和 Windows 系统的应用与问题解决

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

钱小雨--进

i-love-you 今日&#xff1a;小雨 地点&#xff1a;钱塘江 2023.10.01