RocketMQ一行代码造成消息发送失败

这是我的第 198 期分享

作者 | 丁威

来源 | 中间件兴趣圈(ID:dingwpmz_zjj)

分享 | Java中文社群(ID:javacn666)

1、问题现象


首先接到项目反馈使用 RocketMQ 会出现如下错误:

错误信息关键点:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start flow control for a while,period in queue:205ms,size of queue:880。

由于项目组并没有对消息发送失败做任何补偿,导致丢失消息发送失败,故需要对这个问题进行深层次的探讨,并加以解决。

2、问题分析


首先我们根据关键字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查询,去探究在什么时候会抛出如上错误。根据全文搜索如下图所示:

该方法是在 BrokerFastFailure 中定义的,通过名称即可以看成其设计目的:Broker端快速失败机制。

Broker 端快速失败其原理图如下:


  • 消息发送者向 Broker 发送消息写入请求,Broker 端在接收到请求后会首先放入一个队列中(SendThreadPoolQueue),默认容量为 10000。

  • Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为1。

如果 Broker 端受到垃圾回收等等因素造成单条写入数据发生抖动,单个 Broker 端积压的请求太多从而得不到及时处理,会极大的造成客户端消息发送的时间延长。

设想一下,如果由于 Broker 压力增大,写入一条消息需要500ms甚至超过1s,并且队列中积压了5000条消息,消息发送端的默认超时时间为3s,如果按照这样的速度,这些请求在轮到 Broker 执行写入请求时,客户端已经将这个请求超时了,这样不仅会造成大量的无效处理,还会导致客户端发送超时。

故 RocketMQ 为了解决该问题,引入 Broker 端快速失败机制,即开启一个定时调度线程,每隔10毫秒去检查队列中的第一个排队节点,如果该节点的排队时间已经超过了 200ms,就会取消该队列中所有已超过 200ms 的请求,立即向客户端返回失败,这样客户端能尽快进行重试,因为 Broker 都是集群部署,下次重试可以发送到其他 Broker 上,这样能最大程度保证消息发送在默认 3s 的时间内经过重试机制,能有效避免某一台 Broker 由于瞬时压力大而造成的消息发送不可用,从而实现消息发送的高可用。

从 Broker 端快速失败机制引入的初衷来看,快速失败后会发起重试,除非同一时刻集群内所有的 Broker 都繁忙,不然消息会发送成功,用户是不会感知这个错误的,那为什么用户感知了呢?难道 TIMEOUT_ CLEAN _ QUEUE 错误,Broker 不重试?

为了解开这个谜团,接下来会采用源码分析的手段去探究真相。接下来将以消息同步发送为例揭示其消息发送处理流程中的核心关键点。

MQ Client 消息发送端首先会利用网络通道将请求发送到 Broker,然后接收到请求结果后并调用 processSendResponse 方法对响应结果进行解析,如下图所示:

在这里返回的 code 为 RemotingSysResponseCode . SYSTEM_BUSY。

我们从 proccessSendResponse 方法中可以得知如果 code 为 SYSTEM_BUSY,该方法会抛出 MQBrokerException,响应 code 为 SYSTEM_BUSY,其错误描述为开头部分的错误信息。

那我们沿着该方法的调用链路,可以找到其直接调用方:DefaultMQProducerImpl 的 sendKernelImpl,我们重点考虑如果底层方法抛出  MQBrokerException 该方法会如何处理。

其关键代码如下图所示:

可以看出在 sendKernelImpl 方法中首先会捕捉异常,先执行注册的钩子函数,即就算执行失败,对应的消息发送后置钩子函数也会执行,然后再原封不动的将该异常向上抛出。

sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法调用,下面是其核心实现截图:

从这里可以看出 RocketMQ 消息发送高可用设计一个非常关键的点,重试机制,其实现是在 for 循环中 使用 try catch 将 sendKernelImpl 方法包裹,就可以保证该方法抛出异常后能继续重试。从上文可知,如果 SYSTEM_BUSY 会抛出 MQBrokerException,但发现只有上述几个错误码才会重试,因为如果不是上述错误码,会继续向外抛出异常,此时 for 循环会被中断,即不会重试。

这里非常令人意外的是连 SYSTEM_ERROR 都会重试,却没有包含 SYSTEM_BUSY,显然违背了快速失败的设计初衷,故笔者断定,这是 RocketMQ 的一个BUG,将 SYSTEM_BUSY 遗漏了,后续会提一个 PR,增加一行代码,将 SYSTEM_BUSY 加上即可。

问题分析到这里,该问题应该就非常明了。

3、解决方案


如果大家在网上搜索 TIMEOUT_CLEAN_QUEUE 的解决方法,大家不约而同提出的解决方案是增加 waitTimeMillsInSendQueue 的值,该值默认为 200ms,例如将其设置为 1000s 等等,以前我是反对的,因为我的认知里 Broker 会重试,但现在发现 Broker 不会重试,所以我现在认为该 BUG未解决的情况下适当提高该值能有效的缓解。

但这是并不是好的解决方案,我会在近期向官方提交一个PR,将这个问题修复,建议大家在公司尽量对自己使用的版本进行修改,重新打一个包即可,因为这已经违背了 Broker 端快速失败的设计初衷。

但在消息发送的业务方,尽量自己实现消息的重试机制,即不依赖 RocketMQ 本身提供的重试机制,因为受制于网络等因素,消息发送不可能百分之百成功,建议大家在消息发送时捕获一下异常,如果发送失败,可以将消息存入数据库,再结合定时任务对消息进行重试,尽最大程度保证消息不丢失。

END

Redis的自白:我为什么在单线程的这条路上越走越远?

人人都能看懂的 6 种限流实现方案!(纯干货)

关注公众号发送”进群“,磊哥拉你进读者群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546129.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生命游戏(game of life)

生命游戏 (game of life )是一款非常著名的游戏。它包括一个二维 矩形世界,这个世界中的每个方格居住着一个活着的或死了的细胞。一个细胞在下一个时刻生死取决于相邻八个方格中活着的或死了的细胞的数量。如果相邻方格活着的细胞数量过多,这…

setpriority_Java Thread类的最终void setPriority(int priority)方法(带示例)

setpriority线程类最终void setPriority(int priority) (Thread Class final void setPriority(int priority)) This method is available in package java.lang.Thread.setPriority(int priority). 软件包java.lang.Thread.setPriority(int priority)中提供了此方法。 This me…

汇编级UART串口初始化与打印

用于新PCB板调试开发,在系统最开始(内存初始化之前),尽快打印字符,验证CPU是否正常启动。 以freescale QorIQ 处理器兼容的UART为例,符合16550串口标准: /*UART DEBUG*/ /*#define CCSBAR_RESET…

Java 中的 String 有没有长度限制?

这是我的第 199 期分享作者 | Hollis来源 | Hollis(ID:hollischuang) 分享 | Java中文社群(ID:javacn666)关于String有没有长度限制的问题,我之前单独写过一篇文章分析过,最近我又抽…

c语言编程输入a是输出为a_C ++编程基本输入,输出,数据类型,声明能力倾向问题和解答...

c语言编程输入a是输出为aThis section contains C programming Basic Input, Output, Data types, Declaration etc Aptitude Questions and Answers with explanations. 本节包含C 编程的基本输入,输出,数据类型,声明等,以及有关…

关联数组(associative array)

关联数组(associative array )是一种常用的抽象数据类型。它有很多别名,例如associative container , map , mapping , dictionary , finite map , table,index 等。它的特点是由一个关键字和其他各种属性组成的集合。典型的操作包括插入,删除…

开源 免费 java CMS - FreeCMS2.1 菜单管理

2019独角兽企业重金招聘Python工程师标准>>> 项目地址:http://www.freeteam.cn/ 菜单管理 FreeCMS在设计时定位于面向二次开发友好,所以FreeCMS提供了菜单管理功能,二次开发人员可以自由增加新的功能菜单到FreeCMS。 为了让后台…

本来想用“{{”秀一波,结果却导致了内存溢出!

这是我的第 200 期分享作者 | 王磊来源 | Java中文社群(ID:javacn666)转载请联系授权(微信ID:GG_Stone)生活中的尴尬无处不在,有时候你只是想简单的装一把,但某些“老同志”总是在不…

在Ruby中使用&运算符(new_array- arr&old_Array)创建数组实例

In the last articles, we have gone through many methods through which we can create Array Instances but you all must know that those all were Public class methods and now in the upcoming articles, we will be learning about Public instance methods. 在上一篇…

Run Length Encoding

游程编码 (Run Length Encoding ) 是一种简单的编码方法,通常用于控制论中对二值图像编码。ACM有一道题目就是关于该编码。见tzu 1149 或poj 1782 。虽然是简单题,我却花了好大功夫才搞定,功力还是不足阿。 程序代码如下: #incl…

局部变量竟然比全局变量快 5 倍?

这是我的第 201 期分享作者 | 王磊来源 | Java中文社群(ID:javacn666)转载请联系授权(微信ID:GG_Stone)喽,大家好,磊哥的性能优化篇又来了!其实写这个性能优化类的文章初…

FreeMarker笔记 前言第1章 入门

简介 简介 FreeMarker是一款模板引擎:一种基于模板的、用来生成输出文本(任何来自于HTML格式的文本用来自动生成源代码)的通用工具。它是为Java程序员提供的一个开发包或者说是类库。它不是面向最终用户,而是为程序员提供的可以嵌…

优先级调度算法动态优先级_与优先级调度有关的问题及其解决方案

优先级调度算法动态优先级We are already familiar with what Priority Scheduling is. It is one of the most used process scheduling algorithm used in operating systems, in which every process is assigned with a priority. According to this algorithm, the proces…

hdoj 1013 Digital Roots

链接&#xff1a;zoj 1115 或 hdoj 1013 或poj 1519 虽说是水题&#xff0c;却几经波折才搞定。该题目中的数字可能非常大&#xff0c;所以不能使用整型数&#xff0c;只能采用字符变量 代码如下&#xff1a; #include <stdio.h>int digitalRoot(int n); int digitS…

厉害了,3万字的MySQL精华总结 + 面试100问!

这是我的第 202 期分享作者 | 派大新来源 | JavaKeeper&#xff08;ID&#xff1a;JavaKeeper&#xff09;分享 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;❝写在之前&#xff1a;不建议那种上来就是各种面试题罗列&#xff0c;然后背书式的去记忆&#x…

网页视频播放器代码大全 + 21个为您的站点和博客提供的免费视频播放器

推荐 使用 极酷 Web在线播放器。网页中嵌入视频代码综合全然版 1.avi格式 代码片断例如以下&#xff1a;  程序代码 <objectid"video"width"400"height"200"border"0"classid"clsid:CFCDAA03-8BE4-11cf-B84B-0020AFBBCCFA&q…

codejam题目_嵌套深度-Google CodeJam 2020资格回合问题解决方案

codejam题目Problem statement: 问题陈述&#xff1a; Given a string of digits S, insert a minimum number of opening and closing parentheses into it such that the resulting string is balanced and each digit d is inside exactly d pairs of matching parentheses…

hdoj 1015 Safecracker

见hdoj 1015 或 zoj 1403 或tzu 1308 我使用了枚举法&#xff0c;代码写的很罗嗦&#xff0c;可能还是深度优先搜索写起来更清晰。 /* hdoj 1015 */ #include <stdio.h> #include <string.h>#define MAX (125) #define RESLEN 5 int bubSort(int a[], int n); in…

漫话:为什么计算机起始时间是1970年1月1日?

这是我的第 203 期分享作者 | 漫画编程来源 | 漫画编程&#xff08;ID&#xff1a;mhcoding&#xff09;分享 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;问题复现1970-01-01对于开发者来说都是不陌生的&#xff0c;有些系统对于时间的处理如果不够好的话&…

puppeteer执行js_使用Node.js和Puppeteer与表单和网页进行交互– 1

puppeteer执行jsHi guys! Today lets look at another powerful function of the puppeteer API using Node.js. 嗨&#xff0c;大家好&#xff01; 今天&#xff0c;让我们看看使用Node.js的puppeteer API的另一个强大功能。 In the first part of this section, lets look a…