RocketMQ源码分析之消息重试(真相竟然是延时消息)

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

普通消息

消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息

这里是官网定义的消息重试次数以及时间间隔。

有没有发现一个问题,这个重试消息的时间间隔和延时消息的时间间隔这么类似?

图片来源官网

待会我们进行源码分析就知道了。这里先卖个关子

client源码分析

ConsumeMessageConcurrentlyService

普通消息消费以及重试的逻辑在ConsumeMessageConcurrentlyService中,如果是顺序消息则是ConsumeMessageOrderlyService

具体的逻辑是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run

由于今天我们的重点关注点是消息如何重试的。所以我们在消息消费的一些其他细节会略过

首先消息消费的状态由ConsumeConcurrentlyStatus 这个枚举控制

注释很清晰

  • CONSUME_SUCCESS 成功消费
  • RECONSUME_LATER 消费时间,等待重试

可以看到消息消费主要是在

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

这里的listener就是实现了MessageListener接口的类。也就是我们在编写consumer需要传入的一个对象

比如像这种

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");

需要注意的是虽然消费状态只有成功和失败。但是消费结果却是由ConsumeReturnType这个枚举类定义的

  • SUCCESS 成功
  • TIME_OUT 超时
  • EXCEPTION 异常
  • RETURNNULL 消费结果返回null
  • FAILED 消费结果返回失败

可以看到这里把消费成功还是失败再次作了细分

          if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}

我们继续回归主线,看看如果消费状态为RECONSUME_LATER会作什么处理

            if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}

这里可以看到processConsumeResult方法对消息结果进行了处理。我们来看看这个方法

processConsumeResult
        switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// Maybe message is expired and cleaned, just ignore it.if (!consumeRequest.getProcessQueue().containsMessage(msg)) {log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "+ "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),msg.getQueueId(), msg.getQueueOffset());continue;}boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}

这里可以看到如果是BROADCASTING(广播消息)则只是打印了warnlog,什么处理也不会干

我们来看看集群消息

注意这里的循环条件

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)

如果所有消息消费成功则 ackIndex = consumeRequest.getMsgs().不会有消息进行下面的逻辑处理。如果有消息消费失败,才会进行下面的处理

看看下面的处理逻辑

                    boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}
  1. 发送消费失败消息给broker
  2. 如果消息发送给broker失败则将消息丢到msgBackFailed。然后再client自己进行消息重新消费,重试次数reconsumeTimes + 1

我们来看看sendMessageBack的处理逻辑

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {int delayLevel = context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg, e);}return false;}

这里有实际delayLevel延时级别一直是0。

实际发送消息到broker

this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));

我们看看sendMessageBack的具体逻辑

private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {boolean needRetry = true;try {if (brokerName != null && brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)|| mq != null && mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {needRetry = false;sendMessageBackAsNormalMessage(msg);} else {String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());}} catch (Throwable t) {log.error("Failed to send message back, consumerGroup={}, brokerName={}, mq={}, message={}",this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);if (needRetry) {sendMessageBackAsNormalMessage(msg);}} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}

这里实际走的消息发送逻辑consumerSendMessageBack

consumerSendMessageBack就是简单的消息发送,没有复杂的逻辑

我们通过RequestCode.CONSUMER_SEND_MSG_BACK 找到broker的处理逻辑

broker源码分析

处理RequestCode.CONSUMER_SEND_MSG_BACK请求的主要是SendMessageProcessor

我们进入到
org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack方法看看

这里就有延时消息的处理。我们在上面一直说过延时消息的等级一直是0.我们看看是如何处理的

如果重试次数没有超过最大重试次数。并且延时消息等级为0.

则延时消息等级为 重试次数+3

这里就清楚知道了为什么消息重试的时间和延时消息是如何相似了吧。

没错 消息重试就是用延时消息实现的 哈哈

然后最大重试次数在哪获取的呢?答案就是SubscriptionGroupConfig

也就是订阅信息的元数据

元数据再broker的文件就是subscriptionGroup.json

格式就是如下

 "GID_xiaozoujishu":{"brokerId":0,"consumeBroadcastEnable":true,"consumeEnable":true,"consumeFromMinEnable":true,"consumeMessageOrderly":false,"consumeTimeoutMinute":15,"groupName":"GID_xiaozoujishu","groupRetryPolicy":{"type":"CUSTOMIZED"},"groupSysFlag":0,"notifyConsumerIdsChangedEnable":true,"retryMaxTimes":16,"retryQueueNums":1,"whichBrokerWhenConsumeSlowly":1}

这里可以看到一个细节就是重试策略是CUSTOMIZED(CustomizedRetryPolicy)

也就是我们对应的重试时间

还有一个策略就是ExponentialRetryPolicy也就是自定义重试次数

不过最大也就只能指定32次

还有一个细节就是重试次数在哪增加的?实际也是在org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack这个方法

总结

普通消息重试默认是16次,顺序消息默认是Integer.MAX_VALUE,消息消费失败会以延时消息的方式投递到broker,超过延时消息的最大值则以2小时为重试间隔。现在提供了不同的重试策略,默认是CustomizedRetryPolicy

如果broker投递失败则会在本地再次消费该消息.重试次数的元数据存储在subscriptionGroup.json.

重试次数的增加有两种

  1. 发送到broker后在broker进行reconsumeTimes+1(broker重试)
  2. 如果发送broker失败则在本地消费进行reconsumeTimes+1(client重试)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/650035.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker 存储管理

文章目录 docker 存储管理容器存储方案docker 容器存储解决方案 docker 存储驱动基本概述存储驱动的选择原则主流的 docker 存储驱动docker 版本支持的存储驱动 overlay2 存储驱动OverlayFSoverlay2 存储驱动要求配置 docker 使用 overlay2 驱动 overlay2 存储驱动的工作机制Ov…

Linux的 .bashrc 有什么作用?

一、.bashrc 是什么? 有什么用&#xff1f; .bashrc是一个存储在你的home目录下的隐藏文件&#xff0c;它用来配置和自定义你的终端环境和行为。 每次你启动一个新的终端时&#xff0c;.bashrc文件就会被执行&#xff0c;加载你设置的环境变量&#xff0c;别名&#xff0c;函数…

林浩然矩阵江湖历险记

林浩然矩阵江湖历险记 Lin Haoran’s Matrix Adventures 在那充满神秘色彩的矩阵世界里&#xff0c;林浩然面对的挑战是驯服一个具有六个个性元素的23矩阵——“小三儿”。这个矩阵由两行三列组成&#xff0c;每一个元素都像是棋盘上的一枚棋子&#xff0c;它们紧密排列在一起&…

朴素贝叶斯分类算法

1.分类算法 分类算法是有监督学习的一个核心问题&#xff0c;他从数据中学习一个分类决策函数或分类模型&#xff0c;对新的输入进行预测&#xff0c;输出变量取有限个离散值。 &#x1f30d;分类算法的内容是要求给定特征&#xff0c;让我们得出类别。 那么如何由指定特征&…

【C++】入门基础

前言&#xff1a;C是在C的基础之上&#xff0c;容纳进去了面向对象编程思想&#xff0c;并增加了许多有用的库&#xff0c;以及编程范式等。熟悉C语言之后&#xff0c;对C学习有一定的帮助&#xff0c;因此从今天开始们将进入&#xff23;的学习。 &#x1f496; 博主CSDN主页:…

如何阅读xml电子发票

xml电子发票是官方给出的电子存档的文件格式&#xff0c;本质是文本&#xff0c;所以文件很小&#xff0c;大量发票存储&#xff0c;能够更加凸显优势。 但是xml电子发票不方便阅读&#xff0c;因为里面是xml格式&#xff0c;对于财务人员来讲&#xff0c;看“代码”简直太难了…

Linux报 “no route to host” 异常 ping: sendmsg: No route to host

公司有台服务器迁移机房后跟另一台服务器相互ping不通&#xff0c;但是两台服务器都能上网能ping其他机器&#xff0c;其他机器都能ping通这两台服务器。检查两台服务器没有防火墙规则拦截&#xff0c;交换机上也没检查到acl过滤。 下图是迁移机房的服务器ping截图 下图是nfs服…

【云原生】认识docker容器操作命令

目录 一、容器操作命令 1、创建容器 2、删除容器以及停止容器运行 3、查看容器的运行状态 4、查看容器的详细信息 5、将容器的文件传输到宿主机以及将宿主机的文件传输到容器中 6、批量删除容器 7、进入容器 二、容器的迁移 1、先在容器中创建测试文件 2、将容器存储…

Scrum敏捷研发管理全流程/scrum管理工具

Leangoo领歌是一款永久免费的专业的敏捷开发管理工具&#xff0c;提供端到端敏捷研发管理解决方案&#xff0c;涵盖敏捷需求管理、任务协同、进展跟踪、统计度量等。 Leangoo领歌上手快、实施成本低&#xff0c;可帮助企业快速落地敏捷&#xff0c;提质增效、缩短周期、加速创新…

【详解】贪吃蛇游戏----上篇(介绍控制台和API等知识)

目录 知识点&#xff1a; Win32 API 宽字符的打印 控制台操作&#xff1a; &#xff08;1&#xff09;调整控制台大小 &#xff08;2&#xff09;控制台屏幕上的坐标COORD GetStdHandle GetConsoleCursorInfo CONSOLE_CURSOR_INFO SetConsoleCursorInfo SetConsoleC…

Cesium工具应用

文章目录 0.引言1.场景截图2.卷帘对比3.反选遮罩4.鹰眼视图5.指南针与比例尺6.坐标测量7.距离测量8.面积测量9.热力图10.视频投影11.日照分析12.淹没分析13.通视分析14.可视域分析15.缓冲区分析16.地形开挖17.要素聚合18.开启地下模式19.开启等高线20.坡度坡向21.填挖方量计算2…

排序(插入排序)

现在&#xff0c;我们学习了之前数据结构的部分内容&#xff0c;即将进入一个重要的领域&#xff1a;排序&#xff0c;这是一个看起来简单&#xff0c;但是想要理清其中逻辑并不简单的内容&#xff0c;让我们一起加油把&#xff01; 排序的概念及其运用 排序的概念 排序&…

解释性人工智能(XAI)—— AI 决策的透明之道

在当今数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;已经成为我们生活中不可或缺的一部分。AI 系统的决策和行为对我们的生活产生了深远的影响&#xff0c;从医疗保健到金融服务再到自动驾驶汽车。 然而&#xff0c;有时候 AI 的决策似乎像黑盒子一样难以理解&am…

[C#]winform部署yolov5实例分割模型onnx

【官方框架地址】 https://github.com/ultralytics/yolov5 【算法介绍】 YOLOv5实例分割是目标检测算法的一个变种&#xff0c;主要用于识别和分割图像中的多个物体。它是在YOLOv5的基础上&#xff0c;通过添加一个实例分割模块来实现的。 在实例分割中&#xff0c;算法不仅…

Redis2-事务 连接Java 整合springboot 注解缓存

一、订阅和发布 Redis 发布订阅 (pub/sub) 是一种消息通信模式&#xff1a;发送者 (pub) 发送消息&#xff0c;订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 Redis的发布和订阅 客户端订阅频道发布的消息 频道发布消息 订阅者就可以收到消息 发布订阅的代…

ENVI下基于知识决策树提取地表覆盖信息

基于知识的决策树分类是基于遥感影像数据及其他空间数据,通过专家经验总结、简单的数学统计和归纳方法等,获得分类规则并进行遥感分类。分类规则易于理解,分类过程也符合人的认知过程,最大的特点是利用的多源数据。 决策树分类主要的工作是获取规则,本文介绍使用CART算法…

NQA测试机制—UDP Jitter测试

概念 UDP Jitter是以UDP报文为承载&#xff0c;通过记录在报文中的时间戳信息来统计时延、抖动、丢包的一种测试方法。Jitter&#xff08;抖动时间&#xff09;是指相邻两个报文的接收时间间隔减去这两个报文的发送时间间隔。 UDP Jitter测试的过程如下&#xff1a; 1. 源端&a…

shell编程之循环语句与函数

一 echo命令 echo -n 表示不换行输出 echo -e 表示输出转义符 常用的转义符 二 date date查看当前系统时间 -d 你描述的日期&#xff0c;显示指定字符串所描述的时间&#xff0c;而非当前时间 %F 完整日期格式&#xff0c;等价于 %Y-%m-%d % T 时间&#xff08;24小时…

1.26学习总结

连通性判断 DFS连通性判断步骤&#xff1a; 1.从图上任意一点u开始遍历&#xff0c;标记u已经走过 2.递归u的所有符合连通条件的邻居点 3.递归结束&#xff0c;找到了的所有与u的连通点&#xff0c;就是一个连通块 4.然后重复这个步骤找到所有的连通块 BFS连通性判断步骤…

linux 查看zookeeper server运行版本号

zookeeper版本查看运行命令&#xff1a;echo stat|nc localhost 2181 显示如下图所示&#xff1a; Zookeeper version: 3.4.5-cdh6.3.2--1, built on 11/08/2019 13:15 GMT Clients: /127.0.0.1:44814[0](queued0,recved1,sent0) Latency min/avg/max: 0/0/0 Received: 9 Se…