ActiveMQ断线重连技巧,即通信高可用的配置

最近在做一个内部应用的时候,应用到了ActiveMQ作为服务之间消息传递,解耦服务之间的关联,但是在应用的过程中遇到了连接断线无法重连的问题,下面基于这个问题,深入了解一下ActiveMQ的一些相关原理和知识。

一、前置知识

1.1 基础概念

ActiveMQ中有3个重要的角色:Broker、Producer、Consumer。
Broker为消息代理,它是ActiveMQ服务端角色,接收客户端的链接并提供消息通信的核心服务。
Producer是消息生产者,客户端角色
Consumer是消息消费者,客户端角色。
要保证Producer和Consumer正常通信,主要是通过Broker来实现的,Broker既代理了Producer同时也代理了Consumer,这样Broker才能知道哪些是生产者,哪些是消费者,不至于不同的消息被本不属于它的消费者给消费了。Broker内部的机制我们下一节学习。
那么生产者和消费者如何实现通信的呢?ActiveMQ定义的连接器(connector)就是用来约定ActiveMQ的节点之间如何通信的。

1.2 连接器

ActiveMQ通过网络连接器这种连接机制来实现客户端与服务端之间的通信。ActiveMQ提供了两种连接器:

  1. 传输连接器(transport connector):用于客户端和服务端之间( client-to-broker)的通信。
  2. 网络连接器(network connector):用户集群中多个服务端之间(broker-to-broker)的通信。

1.3 传输连接器包括的协议

  • tcp,默认使用的协议,符合大多数的使用场景。
  • udp,客户端使用udp协议和服务端通信,当客户端和服务端之间存在防火墙可以考虑使用udp协议。
  • vm,当客户端和服务端在同一个JVM中可以考虑使用。直接使用虚拟机本地方法调用,从而避免网络通信的开销。
  • nio,本质上还是tcp,只是使用了java NIO包,某些场景下可能性能更好。
  • ssl,基于tcp提供安全的通信。
  • http/https,允许客户端使用REST或Ajax的方式进行连接,可以通过JS给ActiveMQ发送消息。
  • multicast,客户端使用组播的方式连接到服务端。
  • websocket,可以通过HTML5中的websocket技术连接服务端。
  • amqp,高级消息队列协议,很多消息中间件都支持该协议。ActiveMQ5.8版本开始支持。
  • mqtt,MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,主要应用在loT(物联网)。
  • stomp,STOMP是在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义,就像HTTP在TCP套接字之上添加了请求-响应模型层一样。ActiveMQ5.6版本开始支持

二、ActiveMQ断线发生的场景

ActiveMQ的客户端与ActiveMQ的Broker(消息代理)之间的网络连接发生断开,如果未采用高可用的配置,那么Producer无法向MQ中生产对象,同理Customer也无法消费MQ中的消息,整个业务就会出现暂停。
默认的情况下如果ActiveMQ服务正常,那么所有Client服务启动,都会自动在Broker中进行注册,这样就能实现消息生产和消费。但是如果Client服务正常,ActiveMQ服务宕机了进行重启或当网络不稳定或出现故障导致连接断开时,ActiveMQ是不会主动实现Client与Broker进行重连的,此时所有服务都正常,由于连接未建立,所以整个业务也无法实现消息的生产和消费。

三、ActiveMQ断线重连的实现原理

ActiveMQ的断线重连机制的实现原理主要是基于网络通信和消息重试机制。当ActiveMQ的客户端与Broker之间的连接断开时,客户端会检测到这个事件,并尝试重新建立连接。,ActiveMQ的客户端会检测到连接中断事件,然后触发一个重连机制。客户端会尝试重新连接到一个或多个Broker的URL。在默认情况下,如果连接断开,客户端会新起一个线程,不断的从url参数中获取一个url来重试连接。这种重试机制通常会进行一定的次数限制,以避免无限制的重试导致资源浪费或其他问题。

另外,在重连过程中,ActiveMQ的客户端还会尝试恢复之前未发送成功的消息。这个过程主要是通过持久化消息存储来实现的。在连接断开时,客户端会将未发送成功的消息存储到持久化存储中,如数据库或文件系统等。当客户端成功连接到Broker后,会从持久化存储中恢复这些消息,并进行重新发送。

四、ActiveMQ断线重连的配置

ActiveMQ提供了客户端和服务器端通信高可用的配置,

failover,为客户端提供重连服务端的逻辑,允许配置多个上面介绍的不同协议的连接配置,并随机的从其中选择一个进行连接,如果失败则继续选择其他服务重试。failover的配置格式:failover:(tcp://ip1:61616,tcp://ip2:61616)?initialReconnectDelay=100。

fanout,采用复制的方式将消息发送给多个服务端,配置格式为:fanout:(tcp://localhost:61629,udp://localhost:61639,tcp://localhost:61649)

4.1 failover

failover是一种ActiveMQ提供的失效转移(也叫故障转移)的策略。其原理是如果服务先连接到tcp://ip1:61616这个消息队列,如果因为网络抖动或其他意外情况导致ip1无法连接,failover会自动切换到ip2:61616这个消息队列,实现了消息高可用,如果ip1的网路正常了,failover又会尝试连接回来。但是这与断线重连有什么关系呢?
经过我的验证,如果只配置了一个ActiveMQ,如:failover:(tcp://ip1:61616)?initialReconnectDelay=100 当ip1上的ActiveMQ出现了问题,此时failover无法进行故障转移,他就会在initialReconnectDelay定义的100毫秒后进行ip1的重连,从而导致Client与ip1的Broker重新建立链接,实现了断线重连的功能。

4.2 fanout

采用复制的方式将消息发送给多个服务端,这里面虽然没有断线重连,但是实现了消息发送的高可用,这里面需要注意一点,如果Customer在没有很好的处理消息的情况下,有可能Productor生产了一个消息,发送给多个消息队列,Customer消费了多次消息,导致数据重复,所以需要注意Customer消息消费逻辑的幂等性。

4.3 自定义函数实现断线重连

JMS提供了ExceptionListener接口用于侦听JMS消息链接异常,以下是基于JMS的ExceptionListener接口实现的断线自动重连的示例:

import java.util.Timer;
import java.util.TimerTask;import javax.jms.ExceptionListener;
import javax.jms.JMSException;/*** JMS 重连接实现<br>* 通过实现{@link ExceptionListener}接口侦听连接异常,* 使用定时任务迟延执行重连接尝试直至连接成功* @author guyadong* @since 2.3.8*/
class AutoReconnectAdapter implements ExceptionListener,JmsConstants{private static long START_RECONNECTDELAY = 1;/*** 用于执行自动重连的定时器对象*/private static final Timer reconnectTimer = new Timer("AMQP Reconnect"); /*** 定时重连的延迟时间(秒),从1秒开始,每次增加一倍,最大128*/private long reconnectDelay = START_RECONNECTDELAY; /*** 最大重连延迟时间*/private long maxReconnectDelay = 128;/*** 应用层实现的重连回调接口*/private final JMSReconnectCallback jmsReconnectCallback;public AutoReconnectAdapter(JMSReconnectCallback jmsReconnectCallback) {this.jmsReconnectCallback = jmsReconnectCallback;}@Overridepublic void onException(JMSException exception) {if(null != jmsReconnectCallback) {try {jmsReconnectCallback.onConnectionLost();scheduleReconnectCycle();} catch (Exception e) {logger.error(e.getMessage(),e);}}}/*** 尝试将客户端重新连接到服务器。如果成功,它将确保不再计划重新连接。* 但是,如果连接失败,延迟将增加一倍(最大128秒),并将在延迟后重新安排重新连接。*/private void attemptReconnect() {if(null != jmsReconnectCallback) {try {jmsReconnectCallback.tryReconnecting();// restore to default valuereconnectDelay = START_RECONNECTDELAY;}	catch (Exception e) {if(e instanceof JMSException || e.getCause() instanceof JMSException ) {reconnectDelay = Math.min(reconnectDelay*2, maxReconnectDelay);scheduleReconnectCycle();}else {logger.error(e.getMessage(),e);}}}}/*** 安排在{@link #reconnectDelay}指定的延迟时间后执行重连接尝试*/private void scheduleReconnectCycle() {logger.info("{} Scheduling reconnect timer, delay {} seconds",jmsReconnectCallback.ownerName(), reconnectDelay);reconnectTimer.schedule(new TimerTask() {@Overridepublic void run() {attemptReconnect();}}, reconnectDelay*1000);}
}

为了适应应用层不同的重连接实现需要,通过定义JMSReconnectCallback接口,来让断连接和重连实现抽象化,应用层可以根据自己的需要,实现此接口,执行断开连接和重连的动作

/*** JMS 重连机制回调接口* @author guyadong* @since 2.3.8*/
public interface JMSReconnectCallback{/*** 连接异常侦听* @throws Exception */public void onConnectionLost() throws Exception;/*** 尝试重连动作* @throws Exception 失败抛出异常 JMSException 或异常原因(cause)为JMSException 则继续异步执行重试*/public void tryReconnecting() throws Exception;/*** 返回当前接口对象所属的模块名,用于日志输出*/public String ownerName();
}

完整代码参见码云仓库:https://gitee.com/l0km/simplemq/blob/dev/simplemq-jms/src/main/java/gu/simplemq/jms/AutoReconnectAdapter.java

五、ActiveMQ断线重连的优化策略

断线重连如果过于频繁,也会导致网络与服务器的压力,建议从以下几点进行断线重连的优化:

减少断线重连的频率, 适当提升断线重连中间的延时
优化网络质量, 比如Client与MQ都部署在内网
调整连接超时和重试次数
使用多线程处理连接失败
使用负载均衡和集群提高可用性
优化消息恢复和持久化性能, 确保未被正常处理的消息有恢复机制
选择合适的消息恢复策略
断线重连的日志监控和告警机制
日志监控工具的选择和使用
告警机制的建立和维护,检测到断线之后最好给运维人员发送一条消息

六、总结

消息队列在大型项目的建设过程中被广泛的应用,虽然能够实现削峰异步处理,但是消息队列的引入会增加服务运维的风险和成本。今天总结了Client与ActiveMQ出现了断开链接如何实现自动重连的机制和原理,以及相关高可用配置方法。文章中如有错误,请多多指正,您的指正是我们共同进步的基石。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/209905.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot2 在Java项目中你们是如何配置时间格式响应给前端呢

在 Spring Boot 2 项目中配置时间格式&#xff0c;通常可以通过配置文件&#xff08;application.properties 或 application.yml&#xff09;或者通过 Java 代码进行配置。以下是两种常见的配置方式&#xff1a; 1. 通过配置文件配置时间格式&#xff1a; 在 application.pr…

mybaties plus插入数据,自动回显 机制

结论&#xff1a;mybaties plus会将库里数据自动回显到 要插入的数据上 测试表格 SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS 0;-- 表结构 DROP TABLE IF EXISTS t_stu; CREATE TABLE t_stu (id int NOT NULL COMMENT id,name varchar(255) CHARACTER SET utf8mb4 COLLATE…

【PyTorch】计算设备

文章目录 1. 介绍2. 查询和使用 1. 介绍 CPU设备意味着所有物理CPU和内存&#xff0c; 这意味着PyTorch的计算将尝试使用所有CPU核心。可以用以下方式表示&#xff1a; torch.device(cpu) GPU设备只代表一个GPU和相应的显存。 torch.device(cuda)如果有多个GPU&#xff0c;我们…

Java解决矩阵对角线元素的和问题

Java解决矩阵对角线元素的和问题 01 题目 给你一个正方形矩阵 mat&#xff0c;请你返回矩阵对角线元素的和。 请你返回在矩阵主对角线上的元素和副对角线上且不在主对角线上元素的和。 示例 1&#xff1a; 输入&#xff1a;mat [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a…

为什么流量对店铺转化率重要?亚马逊、速卖通等跨境卖家通过自养号测评提升店铺转化率

亚马逊、速卖通等电商平台卖家非常清楚流量对店铺转化率的重要性&#xff0c;测评补单在跨境电商卖家中扮演着重要的角色&#xff0c;是一种必要的运营手段之一。在追求更好的产品曝光和更高的转化率时&#xff0c;Listing的排名是关键因素之一。而在各个平台的Listing中&#…

正确使用AFX_MANAGE_STATE宏管理MFC模块状态, AFX_MANAGE_STATE宏作用,真的很重要!!!

简介&#xff1a; 在使用 MFC&#xff08;Microsoft Foundation Classes&#xff09;开发 DLL&#xff08;动态链接库&#xff09;时&#xff0c;正确管理 MFC 模块状态是确保功能正常运行的关键。本文将深入探讨使用 AFX_MANAGE_STATE 宏的重要性&#xff0c;以及在 DLL 中正确…

连接Redis报错解决方案

连接Redis报错&解决方案 问题描述&#xff1a;Could not connect to Redis at 127.0.0.1:6379: 由于目标计算机积极拒绝&#xff0c;无法连接。 问题原因&#xff1a;redis启动方式不正确 解决方案&#xff1a; 在redis根目录下打开命令行窗口&#xff0c;输入命令redi…

听GPT 讲Rust源代码--src/tools(12)

File: rust/src/tools/rust-analyzer/crates/rust-analyzer/src/config.rs 在Rust源代码中&#xff0c;rust/src/tools/rust-analyzer/crates/rust-analyzer/src/config.rs文件的作用是定义和解析rust-analyzer的配置文件。该文件包含了各种配置项的数据结构和枚举类型&#xf…

MQTT主题、通配符和最佳实践

MQTT主题在MQTT生态系统非常重要&#xff0c;因为代理&#xff08;broker&#xff09;依赖主题确定哪个客户端接收指定的主题。本文我们将聚集MQTT主题、MQTT通配符&#xff0c;详细讨论使用它们的最佳实践&#xff0c;也会探究SYS主题&#xff0c;提供给代理&#xff08;broke…

【npm | npm常用命令及镜像设置】

npm常用命令及镜像设置 概述常用命令对比本地安装全局安装--save &#xff08;或 -S&#xff09;--save-dev &#xff08;或 -D&#xff09; 镜像设置设置镜像方法切换回npm官方镜像选择镜像源 主页传送门&#xff1a;&#x1f4c0; 传送 概述 npm致力于让 JavaScript 开发变得…

iOS——UIPickerView选择器

UIPickerView UIPickerView是 iOS 开发中常用的用户界面组件之一&#xff0c;用于在垂直方向上显示一个滚动的列表&#xff0c;用户可以通过滚动选择其中的一项。 UIPickerView的协议方法 UIPickerView和UItableView差不多&#xff0c;UIPickerView也要设置代理和数据源。UI…

fl studio2024试用版本如何汉化中文?

fl studio2024全称Fruity Loops Studio2024&#xff0c;这款软件也被人们亲切的称之为水果&#xff0c;它是一款功能强大的音乐创作编辑软件&#xff0c;拥有全功能的录音室&#xff0c;大混音盘以及先进的音乐制作工具&#xff0c;用户通过使用该软件&#xff0c;就可以轻松制…

git上传流程

git安装网址&#xff1a;https://git-scm.com 如果您要将本地文件夹上传到名为"compiling"的GitHub仓库&#xff0c;可以按照以下步骤进行操作&#xff1a; 1.安装无脑下一步 2.cd到想上传的文件夹的上一级目录 2.初始化Git仓库&#xff1a;git init 设置分支&a…

C++特殊类设计

1.设计不能被拷贝的类 解析&#xff1a;拷贝只会放生在两个场景中 拷贝构造函数赋值运算符重载 因此想要让一个类禁止拷贝&#xff0c; 就需让该类不能调用“拷贝构造函数”以及“赋值运算符重载”&#xff0c;而C11提供的delete重载关键字可以让这件事情变得更加简单。 1.1.C9…

stl库之list链表与例题

stl中的list是双向链表&#xff0c;优点在于插入/删除元素方便&#xff0c;缺点是随机访问元素时间长 所需头文件&#xff1a;#include <list> 初始化 list<类型名> 变量名 定义一个int类型的变量a list<int> a; 在末尾插入元素 a.push_back(i); 在开…

LeetCode 每日一题 Day 8 || 简单枚举

2048. 下一个更大的数值平衡数 如果整数 x 满足&#xff1a;对于每个数位 d &#xff0c;这个数位 恰好 在 x 中出现 d 次。那么整数 x 就是一个 数值平衡数 。 给你一个整数 n &#xff0c;请你返回 严格大于 n 的 最小数值平衡数 。 示例 1&#xff1a; 输入&#xff1a;n …

Error: Cannot find module ‘@npmcli/config‘ 最新解决办法

看了网上许多这个问题的小伙伴&#xff0c;都是降级node版本来解决的。但是降级并不是我想要的结果。 真正的解决办法就是更新nvm&#xff0c;将你的nvm升级到最新版本&#xff0c;然后卸载掉npm报错的node版本&#xff0c;重新安装即可使用。 解决办法&#xff1a;更新nvm nv…

2020年第九届数学建模国际赛小美赛B题血氧饱和度的变异性解题全过程文档及程序

2020年第九届数学建模国际赛小美赛 B题 血氧饱和度的变异性 原题再现&#xff1a; 脉搏血氧饱和度是监测患者血氧饱和度的常规方法。在连续监测期间&#xff0c;我们希望能够使用模型描述血氧饱和度的模式。   我们有36名受试者的数据&#xff0c;每个受试者以1 Hz的频率连…

【开源视频联动物联网平台】J2mod库写一个Modbus RTU 服务器

J2Mod是一个Java编写的Modbus通信库&#xff0c;可以用于实现Modbus RTU服务器。以下是一个简单的示例&#xff0c;演示如何使用J2Mod库创建一个Modbus RTU服务器&#xff1a; 添加J2Mod库依赖项&#xff1a; 首先&#xff0c;确保在项目中包含J2Mod库。你可以将J2Mod库添加到…

CSPNet: A New Backbone that can Enhance Learning Capability of CNN(2019)

文章目录 -Abstract1 Introduction2 Related workformer work 3 Method3.1 Cross Stage Partial Network3.2 Exact Fusion Model 4 Experiments5 Conclusion 原文链接 源代码 - 梯度信息重用&#xff08;有别于冗余的梯度信息&#xff09;可以减少计算量和内存占用提高效率&am…