Spring AMQP 随笔 8 Retry MessageRecoverer ErrorHandler

0. 列位,响应式布局好麻烦的 …

有意思的,chrome devtool 在调试响应式的分辨率的时候,比如说在 宽度远远大于 768 的时候,按说浏览器也知道大概率是 web端方式打开,样式也是如此渲染,但一些事件(没有鼠标悬停,还是维持触控的感觉)却还是移动端上面的。


前面看过了 spring amqp 的官方文档,但是它的文档不是按一个个 完整的流程 写的,还是老一套 按目录(或者叫 纲目?)拆解 出来的。这就导致,我了解了 RetryTemplate, MessageRecoverer, ErrorHandler 的作用,但是很难想象他们一起工作的样子。

所以,临时起意,走一遍源码,好久没有做这个事儿了。


本文不会按照这几个组件 加载,运行的顺序展开。主打一个 debug 的顺序打开

1. 翻开源码前

debug源码,首先需要写一个极简的demo,我这边的做法是:

  • 配置: 关闭 拒绝时重排(这个跟本文无关,只是补充一下)
  • 配置:开启 消费端 重试
  • 代码:在消费端抛出一个异常,诱发重试
  • 现象:重试耗尽后,消息 转投 死信队列

1.1 最简单的,先见名知意

建议看一下 接口定义中 文档注释,比较详细

本文默认读者有 RetryTemplate 的经验

  • MessageRecoverer 消息的恢复器(… 这个确实很抽象,作用的位置也深,实现非常简单)
    如果猜不出来它的作用,建议看下他的几个实现类,数量很少,代码就几行
  • ErrorHandler 错误处理器

1.2 容易纠结的点

因为,大概看了一下 运行时堆栈,基本跟 MessageRecoverer 不沾边。所以,比较纠结—— Retry 拦截器链 跟 ErrorHandler 的关系,到底是 一先一后的顺序,还是 内外嵌套的结构

解开这个,只需要 轻点debug,看一下运行时堆栈信息即可

// user method annotated by @RabbitListener
onSubscribeTestQueue:93, ExampleQueueClientConfiguration (org.pajamas.example.starter.integration.module.client)invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:169, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)
invoke:119, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)
invoke:77, HandlerAdapter (org.springframework.amqp.rabbit.listener.adapter)
invokeHandler:263, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
invokeHandlerAndProcessResult:209, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#doInvokeListener(org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener, com.rabbitmq.client.Channel, java.lang.Object)
onMessage:148, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
doInvokeListener:1670, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
actualInvokeListener:1589, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
invokeListener:-1, 1153028279 (org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$1982)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
invokeJoinpointUsingReflection:344, AopUtils (org.springframework.aop.support)
invokeJoinpoint:198, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:163, ReflectiveMethodInvocation (org.springframework.aop.framework)
doWithRetry:97, RetryOperationsInterceptor$1 (org.springframework.retry.interceptor)
doExecute:329, RetryTemplate (org.springframework.retry.support)
execute:225, RetryTemplate (org.springframework.retry.support)
invoke:122, RetryOperationsInterceptor (org.springframework.retry.interceptor)
proceed:186, ReflectiveMethodInvocation (org.springframework.aop.framework)// aop拦截器链:可以看到只有 Retry 一个advice (上面这一坨都是)
invoke:215, JdkDynamicAopProxy (org.springframework.aop.framework)
invokeListener:-1, $Proxy256 (org.springframework.amqp.rabbit.listener)// 这里是 aop 跟 amqp.listenerContainer 之间的分界线
// 在这里停留,看一下 我们代码抛出给 spring.retry 的异常 到这里 怎么是往下走的?
// step into ...
invokeListener:1577, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
doExecuteListener:1568, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
executeListener:1512, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
doReceiveAndExecute:994, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
receiveAndExecute:941, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
access$1600:85, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
mainLoop:1319, SimpleMessageListenerContainer$AsyncMessageProcessingConsumer (org.springframework.amqp.rabbit.listener)
run:1225, SimpleMessageListenerContainer$AsyncMessageProcessingConsumer (org.springframework.amqp.rabbit.listener)
run:748, Thread (java.lang)

Note: 因为只有一个advice, 即 Retry 的拦截器,所以,Spring.Retry 与 Spring.Amqp.ErrorHandler 必然是 一先一后 的顺序结构

2. 源码,启动!

// org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#executeListener/*** Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).* @param channel the Rabbit Channel to operate on* @param data the received Rabbit Message* @see #invokeListener* @see #handleListenerException*/
protected void executeListener(Channel channel, Object data) {if (!isRunning()) {if (logger.isWarnEnabled()) {logger.warn("Rejecting received message(s) because the listener container has been stopped: " + data);}throw new MessageRejectedWhileStoppingException();}Object sample = null;if (this.micrometerHolder != null) {sample = this.micrometerHolder.start();}try {// aop >>> user code doExecuteListener(channel, data);if (sample != null) {this.micrometerHolder.success(sample, data instanceof Message? ((Message) data).getMessageProperties().getConsumerQueue(): queuesAsListString());}}// ListenerExecutionFailedException(shorten as 'LEFE') extends AmqpException ( extends RuntimeException )// 	cause here is AmqpRejectAndDontRequeueException(shorten as 'ARADRE')catch (RuntimeException ex) {if (sample != null) {this.micrometerHolder.failure(sample, data instanceof Message? ((Message) data).getMessageProperties().getConsumerQueue(): queuesAsListString(), ex.getClass().getSimpleName());}Message message;if (data instanceof Message) {message = (Message) data;}else {message = ((List<Message>) data).get(0);}// if message.properties.finalRetryForMessageWithNoId(default false) then rethrowcheckStatefulRetry(ex, message);// ConditionalRejectingErrorHandler.handleError(t)// 	if is-not-ARADRE && exceptionStrategy.isfatal(t)// 	 	if //			this.errorHandler.discardFatalsWithXDeath(default true) //			&& is-a-LEFE//			&& t has fail message//	  			throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present")//	 	else//			throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal");//	else //		do nothing ...handleListenerException(ex);// step to throw ex;}
}// 最后这一个异常,直接跳转到 catch 的地方,已经到 main looooooop了
// org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
// catch ARADRE but do nothing ... (bacuse of it wrapped in LEFE had been handled)
// return main looooooop
  • 这里很多注释,是我自己写的,如果没有看过官方文档,也许会对一些名词(比如那些个 异常类型)有些陌生

3. 考虑到读者 也许没有心情 看官方文档

大概 解释一下 为什么要有这几个 异常类型

官网不舍得放图,么事,我不惜笔墨的

在这里插入图片描述

在这里插入图片描述

这样看,有没有体会到:官方文档没有把这俩放在同一个流程中

4. 伪代码,启动!

现在,这个顺序结构已经有了

try {// aop(retry) -> invoke listener mothod
} catch {// listenContainer.ErrorHandler ...
}

5. 那么好 …

当然,源码继续启动,看一下几个关键的时刻

5.1 Retry max-attempts exhausted

代码位置,可以在控制台的异常日志找到,或者 跟踪用户抛出的异常,一层层往外抛出时候,总会被这里捕获一次的

// org.springframework.retry.support.RetryTemplate#handleRetryExhausted// invoke spring.retry.recover callback// org.springframework.retry.interceptor.RetryOperationsInterceptor.ItemRecovererCallback#recover(retryContext)// org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean#recover(args[], retryContext.latest-throwable)// throw new LEFE(this.messageSupplier.get(), new ARADRE(latest-throwable), message);
/* args: [logger,messageRecoverer, (default RejectAndDontRequeueRecoverer)retryTemplate]
*/
  • MessageRecoverer 做了 new ARADRE(user ex)
  • spring.amqp 的 一个 retry recover-callback 做了 new LEFE(new ARADRE(user ex))

现在对 MessageRecoverer 的理解不难了,因为这个案例中,我们抛出的异常 不在其配置的strategy 里面(不会因为这种失败,而触发重排), 这就是 ARADRE 的语义

5.1.1 Locate MessageRecoverer

debug可以看到 这个 MessageRecoverer 不是显式声明出来(step into 被直接跳过了),我们看不到他的声明位置,说明是一个lambda 表达式

可以通过所在类 StatelessRetryOperationsInterceptorFactoryBean,找进去

	protected Object recover(Object[] args, Throwable cause) {// this.messageRecovererMessageRecoverer messageRecoverer = getMessageRecoverer();Object arg = args[1];if (messageRecoverer == null) {this.logger.warn("Message(s) dropped on recovery: " + arg, cause);}else if (arg instanceof Message) {messageRecoverer.recover((Message) arg, cause);}else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) {((MessageBatchRecoverer) messageRecoverer).recover((List<Message>) arg, cause);}return null;}

FactoryBean 是在 Spring.amqp configuration 加载配置的时候用的,这是泛泛的说法,不难理解

5.1.2 回过头看 configuration 加载配置的地方

// org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactory
// org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer#configure
// org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer#configure(T, org.springframework.amqp.rabbit.connection.ConnectionFactory, org.springframework.boot.autoconfigure.amqp.RabbitProperties.AmqpContainer)protected void configure(T factory, ConnectionFactory connectionFactory,RabbitProperties.AmqpContainer configuration) {Assert.notNull(factory, "Factory must not be null");Assert.notNull(connectionFactory, "ConnectionFactory must not be null");Assert.notNull(configuration, "Configuration must not be null");factory.setConnectionFactory(connectionFactory);if (this.messageConverter != null) {factory.setMessageConverter(this.messageConverter);}factory.setAutoStartup(configuration.isAutoStartup());if (configuration.getAcknowledgeMode() != null) {factory.setAcknowledgeMode(configuration.getAcknowledgeMode());}if (configuration.getPrefetch() != null) {factory.setPrefetchCount(configuration.getPrefetch());}if (configuration.getDefaultRequeueRejected() != null) {factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected());}if (configuration.getIdleEventInterval() != null) {factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());}factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());ListenerRetry retryConfig = configuration.getRetry();if (retryConfig.isEnabled()) {RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless(): RetryInterceptorBuilder.stateful();RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers).createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);builder.retryOperations(retryTemplate);// set message recovererMessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer: new RejectAndDontRequeueRecoverer();builder.recoverer(recoverer);// step into ...// 这里构造 args[]factory.setAdviceChain(builder.build());}
}// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder#build
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder.StatelessRetryInterceptorBuilder#build
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder#applyCommonSettings

6. 总结一下

就这样,吃饭!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/840934.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【IC设计】牛客网-序列检测习题总结

文章目录 状态机基础知识VL25 输入序列连续的序列检测VL26 含有无关项的序列检测VL27 不重叠序列检测VL28 输入序列不连续的序列检测参考资料 状态机基础知识 VL25 输入序列连续的序列检测 timescale 1ns/1ns module sequence_detect(input clk,input rst_n,input a,output re…

csdn的insCode怎么用IDE和linux终端

1.进入insCode&#xff0c;选择工作台 找到我的项目&#xff0c;没有项目的话可以新建一个。 选择在IDE中编辑&#xff0c;界面如下&#xff1a; 右边有个终端&#xff0c;点击即可出现linux的xterm终端。

边用边充电影响寿命吗?看看计算机指令组成与操作类型

计算机指令集体系结构之指令 指令由操作码和地址码字段组成。 操作码指明了指令要完成的操作。 长度可以固定&#xff1a;比如RISC&#xff08;reduced instruction set computer&#xff09;精简指令集计算机 与之对应的RISC&#xff08;复杂指令集计算机&#xff09;&…

福昕PDF使用技巧

因为突然间学校的企业版WPS突然很多功能就不能使用了&#xff0c;所以转向福昕PDF。 一、合并文件 添加需要合并的文件&#xff0c;可以使用ctrla等方式全选 找到最上方的“合并文件” 二、文本注释

IDEA打开项目报错

IDEA打开项目报错&#xff1a; Cannot read scheme C:\Users\xxxxxx\AppData\Roaming\JetBrains\IntelliJIdea2023.2\qaplug_profiles\Default.xmljava.lang.AbstractMethodError: Receiver class com.soldevelo.qaplug.scanner.AnalysisProfileManager$2 does not define or i…

【讲解下PDM,PDM是什么?】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

【汽车之家注册/登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…

借助Kong记录接口的请求和响应内容

和APISIX类似&#xff0c;Kong也是一个Api GateWay。 运行在调用Api之前&#xff0c;以插件的扩展方式为Api提供管理, 如 鉴权、限流、监控、健康检查等. Kong是基于Lua语言、Nginx以及OpenResty开发的&#xff0c;拥有动态路由、负载均衡、高可用、高性能、熔断&#xff08;基…

通过RAG架构LLM应用程序

在之前的博客文章中&#xff0c;我们已经描述了嵌入是如何工作的&#xff0c;以及RAG技术是什么。本节我们我们将使用 LangChain 库以及 RAG 和嵌入技术在 Python 中构建一个简单的 LLM 应用程序。 我们将使用 LangChain 库在 Python 中构建一个简单的 LLM 应用程序。LangChai…

自己手写一个单向链表【C风格】

//单链表 #include <iostream> #define MAX_SIZE 20 #define OK 1 #define ERROR 0 #define TRUE 1 #define FALSE 0typedef int ElemType;//元素的类型 typedef int Status;//返回状态typedef struct Node {ElemType data;//链表中保存的数据struct Node* next;//指向下…

【CSP CCF记录】201909-1 小明种苹果

题目 过程 #include<bits/stdc.h> using namespace std; int N,M; long long tree[1010]; int main() {cin>>N>>M;long long result0,max0;//result剩余苹果&#xff0c;max最大疏果个数 int id0;//id最大疏果的果树编号 for(int i1;i<N;i){long long b0…

构建php环境

目录 php简介 官网php安装包 选择下载稳定版本 &#xff08;建议使用此版本&#xff0c;文章以此版本为例&#xff09; 安装php解析环境 准备工作 安装依赖 zlib-devel 和 libxml2-devel包。 安装扩展工具库 安装 libmcrypt 安装 mhash 安装mcrypt 安装php 选项含…

Gin框架学习笔记(六)——gin中的日志使用

gin内置日志组件的使用 前言 在之前我们要使用Gin框架定义路由的时候我们一般会使用Default方法来实现&#xff0c;我们来看一下他的实现&#xff1a; func Default(opts ...OptionFunc) *Engine {debugPrintWARNINGDefault()engine : New()engine.Use(Logger(), Recovery())…

uniapp微信小程序解决type=“nickname“获取昵称,v-model绑定值为空问题!

解决获取 type"nickname"值为空问题 文章目录 解决获取 type"nickname"值为空问题效果图Demo解决方式通过表单收集内容通过 uni.createSelectorQuery 效果图 开发工具效果图&#xff0c;真机上还会显示键盘输入框 Demo 如果通过 v-model 结合 blur 获取不…

【Linux】写时拷贝技术COW (copy-on-write)

文章目录 Linux写时拷贝技术(copy-on-write)进程的概念进程的定义进程和程序的区别PCB的内部构成 程序是如何被加载变成进程的&#xff1f;写时复制&#xff08;Copy-On-Write, COW&#xff09;写时复制机制的原理写时拷贝的场景 fork与COWvfork与fork Linux写时拷贝技术(copy-…

VUE3 学习笔记(十)查看vue版本

命令&#xff1a; npm list vue(空) (在项目的根目录下执行以下命令即可查看项目所使用的vue版本) npm list vue version(空) npm info vue (全局查看vue版本号&#xff0c;详细) npm list vue -g(全局查看vue版本号&#xff0c;简单) npm view vue version(查看项目依赖的vue…

开源博客项目Blog .NET Core源码学习(26:App.Hosting项目结构分析-14)

后台管理页面的系统管理下主要包括用户管理、角色管理、按钮管理和菜单管理&#xff0c;其中创建用户时要指定角色&#xff0c;创建角色时需指定菜单权限&#xff0c;按钮管理也是基于各菜单项进行设置&#xff0c;只有菜单管理相对独立&#xff0c;因此本文学习并分析App.Host…

蓝桥杯【第15届省赛】Python B组 32.60 分

F 题列表越界访问了……省一但没什么好名次 测评链接&#xff1a;https://www.dotcpp.com/oj/train/1120/ C 语言网真是 ** 测评&#xff0c;时间限制和考试的不一样&#xff0c;E 题给我整时间超限&#xff1f; A&#xff1a;穿越时空之门 100&#x1f3c6; 【问题描述】 随…

使用梦畅闹钟,结合自定义bat、vbs脚本等实现定时功能

梦畅闹钟-每隔一段时间运行一次程序 休息五分钟bat脚本&#xff08;播放音乐视频&#xff0c;并锁屏&#xff09; chcp 65001 echo 回车开始休息5分钟 pause explorer "https://www.bilibili.com/video/BV1RT411S7Tk/?p47" timeout /t 3 /nobreak rundll32.exe use…

什么是SSL证书?如何选择SSL证书?

在浏览网站的时候&#xff0c;你会不会有这样一些疑问。 为什么有的网站是http://开头&#xff0c;有的却是https://&#xff1f;它们有什么区别吗&#xff1f; 经常访问的网站&#xff0c;浏览器突然提示“安全证书过期”&#xff0c;提醒你不要浏览该网址&#xff1f; 这一切…