springboot RabbitMQ客户端连接故障恢复

最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。

一、添加自动恢复配置automaticRecovery
 CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactoryConfigurer.configure(factory);//设置TCP连接超时时间,默认:60000ms
factory.getRabbitConnectionFactory().setConnectionTimeout(properties.getConnectionTimeout());
//启用或禁用连接自动恢复,默认:false
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(properties.isAutomaticRecovery());
//设置连接恢复时间间隔,默认:5000ms
factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(properties.getNetworkRecoveryInterval());
//启用或禁用拓扑恢复,默认:true【拓扑恢复功能可以帮助消费者重新声明之前定义的队列、交换机和绑定等拓扑结构】
factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(properties.isTopologyRecovery());
//替换默认异常处理DefaultExceptionHandler
factory.getRabbitConnectionFactory().setExceptionHandler(new DefaultMqExceptionHandler());
//添加连接监听器
factory.addConnectionListener(new DefaultMqConnectionListener(factory));

通过上述配置如果RabbitMQ服务器发生故障,则会自动重启恢复连接及队列的消费者,如果恢复失败则会间隔5000ms再次重试;在这里提一个问题,如果服务重试一直失败,重试的上限是多少?带着这个问题我们分析下源码。

二、RabbitMQ客户端实现连接的自动恢复功能

AutorecoveringConnection#beginAutomaticRecovery是在 RabbitMQ 客户端库层面实现的连接的自动恢复功能。当 RabbitMQ 连接出现故障时,它会尝试重新建立连接,以确保消息传递的可靠性。

  private synchronized void beginAutomaticRecovery() throws InterruptedException {//获取故障恢复连接的间隔时间,实际是设置的:networkRecoveryIntervalfinal long delay = this.params.getRecoveryDelayHandler().getDelay(0);if (delay > 0)  {//等待指定的间隔时间this.wait(delay);}//调用恢复通知监听器this.notifyRecoveryListenersStarted();//获取恢复建立的连接对象final RecoveryAwareAMQConnection newConn = this.recoverConnection();//如果为null则直接返回if (newConn == null) {return;}//连接已经恢复建立,恢复监听器、channel等资源LOGGER.debug("Connection {} has recovered", newConn);this.addAutomaticRecoveryListener(newConn);this.recoverShutdownListeners(newConn);this.recoverBlockedListeners(newConn);this.recoverChannels(newConn);// don't assign new delegate connection until channel recovery is completethis.delegate = newConn;//判断是否恢复拓扑结构,如果开启则开启拓扑结构恢复if (this.params.isTopologyRecoveryEnabled()) {notifyTopologyRecoveryListenersStarted();recoverTopology(params.getTopologyRecoveryExecutor());}this.notifyRecoveryListenersComplete();}

addAutomaticRecoveryListener自动恢复监听器

private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {final AutorecoveringConnection c = this;// this listener will run after shutdown listeners,// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135RecoveryCanBeginListener starter = cause -> {try {if (shouldTriggerConnectionRecovery(cause)) {//开始自动回复c.beginAutomaticRecovery();}} catch (Exception e) {newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);}};synchronized (this) {newConn.addRecoveryCanBeginListener(starter);}
}

init初始化

public void init() throws IOException, TimeoutException {//建立连接,否则抛出异常this.delegate = this.cf.newConnection();//自动回复监听器this.addAutomaticRecoveryListener(delegate);
}
三、消费端实现消息的消费和处理

SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run是应用程序层面实现消息的消费和处理,它负责从RabbitMQ中接收消息并进行相应的逻辑处理:

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count...try {//消费端初始化方法initialize();//当消费端是活跃状态,或者队列非空,或者消费端未被关闭则进入主循环while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}catch (InterruptedException e) {...}
}

消费端initialize初始化方法:

	private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();//启动消费端初始化this.consumer.start();this.start.countDown();}catch (QueuesNotAvailableException e) {if (isMissingQueuesFatal()) {throw e;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw e;}}catch (FatalListenerStartupException ex) {if (isPossibleAuthenticationFailureFatal()) {throw ex;}else {Throwable possibleAuthException = ex.getCause().getCause();if (!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {throw ex;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw possibleAuthException;}}}catch (Throwable t) { //NOSONARthis.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw t;}if (getTransactionManager() != null) {/** Register the consumer's channel so it will be used by the transaction manager* if it's an instance of RabbitTransactionManager.*/ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());}}

消费端异常等待处理处理:

protected void handleStartupFailure(BackOffExecution backOffExecution) {//获取等待时间间隔,参考FixedBackOff类实现long recoveryInterval = backOffExecution.nextBackOff();if (BackOffExecution.STOP == recoveryInterval) {synchronized (this) {if (isActive()) {logger.warn("stopping container - restart recovery attempts exhausted");stop();}}return;}try {if (logger.isDebugEnabled() && isActive()) {logger.debug("Recovering consumer in " + recoveryInterval + " ms.");}//当前时间加上等待时间long timeout = System.currentTimeMillis() + recoveryInterval;//如果当前时间小于等待时间,则休眠200毫秒,再次尝试while (isActive() && System.currentTimeMillis() < timeout) {Thread.sleep(RECOVERY_LOOP_WAIT_TIME);}}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException("Irrecoverable interruption on consumer restart", e);}}

FixedBackOff回退等待时间类实现:

public class FixedBackOff implements BackOff {// 默认恢复重试间隔public static final long DEFAULT_INTERVAL = 5000L;//最大重试次数,可以认为无限大public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;// 默认恢复重试间隔private long interval = 5000L;//最大重试次数,可以认为无限大private long maxAttempts = Long.MAX_VALUE;public FixedBackOff() {}public FixedBackOff(long interval, long maxAttempts) {this.interval = interval;this.maxAttempts = maxAttempts;}public void setInterval(long interval) {this.interval = interval;}public long getInterval() {return this.interval;}public void setMaxAttempts(long maxAttempts) {this.maxAttempts = maxAttempts;}public long getMaxAttempts() {return this.maxAttempts;}public BackOffExecution start() {return new FixedBackOffExecution();}private class FixedBackOffExecution implements BackOffExecution {private long currentAttempts;private FixedBackOffExecution() {this.currentAttempts = 0L;}//获取下一次尝试的时间间隔,可以认为一直都是5000mspublic long nextBackOff() {++this.currentAttempts;return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;}public String toString() {String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE ? "unlimited" : String.valueOf(FixedBackOff.this.maxAttempts);return "FixedBackOff{interval=" + FixedBackOff.this.interval + ", currentAttempts=" + this.currentAttempts + ", maxAttempts=" + attemptValue + '}';}}
}

总结:综上源码分析可知消费端故障恢复重试等待时间是5000ms,重试次数可以认为是无限制(Long最大值)

mainloop主循环逻辑:

		private void mainLoop() throws Exception { // NOSONAR Exceptiontry {if (SimpleMessageListenerContainer.this.stopNow.get()) {this.consumer.forceCloseAndClearQueue();return;}//接收客户端发送过来的消息,至少获取一条boolean receivedOk = receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {checkAdjust(receivedOk);}long idleEventInterval = getIdleEventInterval();if (idleEventInterval > 0) {if (receivedOk) {updateLastReceive();}else {long now = System.currentTimeMillis();long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive = getLastReceive();if (now > lastReceive + idleEventInterval&& now > lastAlertAt + idleEventInterval&& SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException("Invalid listener", ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/**  These will normally be wrapped by an LEFE if thrown by the*  listener, but we will also honor it if thrown by an*  error handler.*/}}

receiveAndExecute接收和处理消息:

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager = getTransactionManager();if (transactionManager != null) {try {if (this.transactionTemplate == null) {this.transactionTemplate =new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status -> { // NOSONAR null never returnedRabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {//接收处理消息return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}//接收处理消息return doReceiveAndExecute(consumer);}

调用具体的消息监听器消费消息:

	private void doExecuteListener(Channel channel, Object data) {if (data instanceof Message) {Message message = (Message) data;if (this.afterReceivePostProcessors != null) {for (MessagePostProcessor processor : this.afterReceivePostProcessors) {message = processor.postProcessMessage(message);if (message == null) {throw new ImmediateAcknowledgeAmqpException("Message Post Processor returned 'null', discarding message");}}}if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));}else {invokeListener(channel, message);}}else {invokeListener(channel, data);}}

GitHub代码:https://github.com/mingyang66/spring-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/57845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于AVR128单片机抢答器proteus仿真设计

一、系统方案 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 void timer0_init() //定时器初始化 { TCCR00x07; //普通模式&#xff0c;OC0不输出&#xff0c;1024分频 TCNT0f_count; //初值&#xff0c;定时为10ms TIFR0x01; //清中断标志…

Oracle-day3:子查询、with as语句、聚合函数

一、单行子查询 /*一、单行子查询格式&#xff1a;select <列明表> from 表名(查询select 语句)where 列或表达式 比较运算符(SELECT 列名 FROM 表名 WHERE 条件)-- 子查询&#xff0c;必须要使用小括号括起来---最大值函数&#xff1a;max()最小值函数: min()二、 from…

ChatGPT Prompting开发实战(二)

一、基于LangChain源码react来解析prompt engineering 在LangChain源码中一个特别重要的部分就是react&#xff0c;它的基本概念是&#xff0c;LLM在推理时会产生很多中间步骤而不是直接产生结果&#xff0c;这些中间步骤可以被用来与外界进行交互&#xff0c;然后产生new con…

Oracle ASM (Automatic Storage Management)

[TOC](Oracle ASM (Automatic Storage Management)) Oracle ASM (Automatic Storage Management) 是 Oracle 的一个磁盘管理和文件系统服务&#xff0c;用于简化数据库文件的分布。 在使用 ASM 管理磁盘时&#xff0c;如果想要增加存储空间&#xff0c;您可以向现有的磁盘组添加…

IdentityServer密码长度超长会导致跳转到登录页

应用系统项目的安全要求越来越高&#xff0c;基本都是采取https等加密证书传输&#xff0c;无法使用https的&#xff0c;也是要求不能明文传输内容&#xff0c;因此做一些等保要求&#xff0c;密码需要加密后才能传输给服务端&#xff0c;所以前端会采取一些密码手段&#xff0…

block层:7. 请求下发

blk_dispatch 源码基于5.10 1. blk_mq_sched_dispatch_requests void blk_mq_sched_dispatch_requests(struct blk_mq_hw_ctx *hctx) {// 队列struct request_queue *q hctx->queue;// 队列已停止或者被暂停if (unlikely(blk_mq_hctx_stopped(hctx) || blk_queue_quiesc…

【Android Framework系列】第12章 RecycleView相关原理及四级缓存策略分析

1 RecyclerView简介 RecyclerView是一款非常强大的widget&#xff0c;它可以帮助您灵活地显示列表数据。当我开始学习 RecyclerView的时候&#xff0c;我发现对于复杂的列表界面有很多资源可以参考&#xff0c;但是对于简单的列表展现就鲜有可参考的资源了。虽然RecyclerView的…

『赠书活动 | 第十八期』《深入浅出SSD:固态存储核心技术、原理与实战》

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 『赠书活动 &#xff5c; 第十八期』 本期书籍&#xff1a;《深入浅出SSD&#xff1a;固态存储核心技术、原理与实战》 赠书规则&#xff1a;评论区&#xff1a;点赞&…

BPM在企业扮演什么角色?一文秒懂!

如果将企业各职能部门比作各司其职的器官组织&#xff0c;那工作流程就是将其串联为整体&#xff0c;指挥其发挥作用的中枢神经网络&#xff0c;流程作为企业管理意志的延伸&#xff0c;对企业运营管理的影响至关重要。 但随着企业IT建设的多元化&#xff0c;系统能力边界扩展&…

Java 线程池

线程池 Java 线程池是一种多线程处理技术&#xff0c;它可以在程序中预先创建一定数量的线程&#xff0c;将任务提交到线程池中&#xff0c;线程池会自动调度线程执行任务。通过使用线程池&#xff0c;可以避免反复创建和销毁线程的开销&#xff0c;提高程序性能&#xff0c;同…

monorepo更新组件报错,提示“无法加载文件 C:\Program Files\nodejs\pnpm.ps1,因为在此系统上禁止运行脚本”

解决方法&#xff1a; 第一步&#xff1a;管理员身份运行 window.powershell&#xff0c; win x打开powerShell命令框&#xff0c;进入到对应项目路径。 第二步&#xff1a;执行&#xff1a;get-ExecutionPolicy&#xff0c;显示Restricted&#xff0c;表示状态是禁止的; 第…

面试前的准备:程序员应该如何备战面试

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

盲盒电商小程序

一、准备阶段 在开始制作盲盒小程序前&#xff0c;你需要在乔拓云平台上创建一个账号&#xff0c;并登录到后台管理页面。在后台管理页面&#xff0c;你可以找到商城管理模块&#xff0c;点击进入商城编辑制作页面。 二、小程序商城模板选择与编辑 1.在商城编辑制作页面&#x…

2023.08.27 学习周报

文章目录 摘要文献阅读1.题目2.重点3.引言4.方法5.实验结果6.结论 深度学习Majorization-Minimization算法1.基本思想2.要求3.示意图 总结 摘要 This week, I read a computer science on the prediction of atmospheric pollutants in urban environments based on coupled d…

AUTOSAR汽车电子系统架构标准

AUTOSAR 目录 AUTOSAR RTE SWC和BSW SWC访问代码实现 ARXML&#xff08;AUTOSAR XML&#xff09; Interface Client-Server接口代码实现 AutoSAR OS Application AUTOSAR&#xff08;Automotive Open System Architecture&#xff09;正式发布日期是2003年&#xff0c;…

Linux系统编程:基础知识入门学习笔记汇总

Linux基础shell编程——>Linux 系统编程——>&#xff08;计算机网络&#xff09;——>Linux 网络编程 来源&#xff1a;黑马程序员-Linux系统编程 45小时 评价 这个老师好像讲了很多课程&#xff0c;都还不错我由于赶时间之前学过Linux的Shell编程和Linux的网络编程&…

swagger 2.10.5 整合 spring boot

参考&#xff1a; http://springfox.github.io/springfox/ https://github.com/springfox/springfox http://springfox.github.io/springfox/docs/current/ https://github.com/springfox/springfox-demos https://github.com/springfox/springfox-demos/tree/2.9.2 https://gi…

pandas读取excel,再写入excel

需求是这样的&#xff0c;从一个表读取数据&#xff0c;然后每次执行创建一个新表将值写入 读取这个表 写入到这个表 分别对应的是e、h列数据&#xff0c;代码如下&#xff1a; import pandas as pd import openpyxl import datetime dfpd.read_excel(rC:\Users\admin\Deskt…

设计模式-职责链模式

文章目录 职责链模式模式概述主要角色适用场景实现步骤优点注意事项 定义职责链结构示例总结 职责链模式 职责链模式是一种行为设计模式&#xff0c;它可以将请求的发送者和请求的处理者解耦&#xff0c;并按照预定义的顺序处理请求。职责链模式常用于需要逐级审批或转交处理的…

【电路设计】220V AC转低压DC电路概述

前言 最近因项目需要,电路板上要加上一个交流220V转低压直流,比如12V或者5V这种。一般来说,比较常见也比较简单的做法是使用一个变压器将220V AC进行降压,比如降到22V AC,但是很遗憾的是,支持220V的变压器一般体积很大,而板子留给电源部分的面积又非常有限,所以不得不研…