图解RocketMQ之生产者如何进行消息重试

大家好,我是苍何。

上一篇留了一个小问题,如果消费者出现异常,消费某一条消息失败,这时候 RocketMQ 会怎么处理呢?

你可能会用你聪明绝顶的脑袋瓜子想,苍何你是不是傻,失败了肯定重试啊,RocketMQ 一定有什么重试机制。

没错,RocketMQ 就是用重试机制来解决消息消费失败问题,那如果我问你重试都有哪些策略呢?生产端和消费端如何进行消息重试呢?

这个时候你肯定一脸懵逼,啥玩意?


别急,我们这一篇就专门来叨叨 RocketMQ 的消息重试机制。

在正式开始学(chui)习(bi)之前,有必要给大家推荐下 RocketMQ 官方的中文社区,阿里官方出品,收藏起来吧:https://rocketmq-learning.com/

RocketMQ官方社区

什么情况下需要消息重试

消息重试的目的是为了保证消息的完整性,防止消息丢失,是业务兜底策略。什么情况下需要消息重试呢?

其实也就是什么情况下会发生消息丢失(毕竟不丢失就不需要重试嘛)

  • 消息自身原因:

例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。

这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功。

所以最好提供一种定时重试机制,即过10秒后再重试。(而不是立马就重试)

消息自身原因

  • 业务处理失败:

依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。

这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

业务处理原因

消息重试能解决哪些问题

我们都知道消息队列一大核心作用就是异步解耦,也就是让上下游服务不耦合依赖,可以通过 mq 来进行业务逻辑关联处理。

但是如果下游服务消费消息出错了,这个时候,如何保证整个调用链路的完整性呢?不可能失败就失败了吧,比如订单消息给到 mq,库存系统消费来扣减库存。

库存系统消费消息的时候如果发生异常,那么对应的这一条订单就没有扣减库存,这是很严重的问题,明明已经卖出去了,库存里面居然还有,颇有一种用不完的感觉😂

这种情况已经严重影响到业务整个链路数据完整性了,也是我们最不希望看到的,所以就需要消息重试。

消息重试主要可以解决以下问题:

  • 临时性故障处理: 当消费者因为网络波动、服务器暂时不可用等临时性问题无法正常消费消息时,重试机制可以在稍后再次尝试投递,提高消息处理的成功率。
  • 业务处理异常: 如果消费者在处理消息时遇到业务逻辑异常,重试机制可以让消费者有机会在稍后再次处理该消息。
  • 提高系统可靠性: 通过多次重试,可以降低因单次失败导致的消息丢失风险,提高整个消息系统的可靠性和稳定性。
  • 应对突发流量: 在消费者遇到突发高流量无法及时处理所有消息时,重试机制可以帮助削峰填谷,让消息在稍后的低峰期被重新处理。
  • 分布式事务处理: 在实现最终一致性的分布式事务中,消息重试可以作为一种补偿机制,确保事务的最终完成。

消息重试主要解决的问题

重试又分生产端的消息重试和消费端的消息重试,我们一起看看吧。

生产端的消息重试

生产者向 RocketMQ 的 broker 发送消息时,因自身原因如网络波动等导致没有发送成功,这时发送端没收到 RocketMQ 的 ACK 心跳包。

消息都没发出去,消费者自然收不到。

在生产端,针对不同的发送方式,会有不同的重试策略。

那好奇的你肯定会问,发送方式又有哪些呢?其实 RocketMQ 提供了 3 种发送方式,分别是:

  • 同步发送
  • 异步发送
  • 单向发送

下面针对不同发送方式简单看下 RocketMQ 提供的重试策略吧:

同步发送

同步发送是 RocketMQ 默认的发送方式,默认调用 producer.Send () 方法不指定回调使用的都是同步发送,每次发送都需要等待响应,配合重试机制这种发送方式的可靠性是极高的,缺点当然是吞吐量低了。

RocketMQ 的同步发送默认 2 次重试(加上第一次发送,共3次尝试),重试间隔是 1 秒。

当然了,我们也可以修改默认重试次数和时间间隔:

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

异步发送

异步发送指的是生产者发送消息完后,不用等待 RocketMQ 服务端返回 ack,而是通过回调函数处理结果,所以需要指定回调函数,下面我写个发送的简单 demo:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}
});// 等待一段时间以确保异步发送完成
producer.shutdown();

异步发送默认是不会进行消息重试的,因其本身无法立即确定发送状态,要想开启重试,也可以设置 setRetryTimesWhenSendFailed。

对于高吞吐量场景或非关键消息,可以考虑使用异步发送来提高性能

单向发送

单向发送是指不关心发送结果的发送方式,因其不关心发送结果故而也不支持重试,很少有业务会使用单向发送。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 单向发送
producer.sendOneway(msg);producer.shutdown();

PmHub 生产端消息如何重试

在 PmHub 中抽象了消息组件,所有消息发送都在 pmhub-base-notice 中。

因 PmHub 消息发送业务场景目前主要是基于企业微信的消息推送及流程状态通知,所以我们使用的是默认的发送方式,也即同步发送。

因无自定义重试需求,默认的 1 秒重试 1 次,一共重试 2 次也足够,所以我们并没做过多改动,所以 PmHub 用的是默认的同步发送以及默认的重试策略。

下面是具体的发送代码,大家也可以直接去 GitHub 中下载源码观看更佳:

/**  * 推送到微信topic  * */public static void push2Wx(com.laigeoffer.pmhub.base.notice.domain.entity.Message ob){  try {  String key = IdUtil.simpleUUID();  ObjectMapper objectMapper = new ObjectMapper();  // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。  String endpoint = addr;  // 消息发送的目标Topic名称,需要提前创建。  String topic = WX_TOPIC;  ClientServiceProvider provider = ClientServiceProvider.loadService();  ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);  ClientConfiguration configuration = builder.build();  // 初始化Producer时需要设置通信配置以及预绑定的Topic。  Producer producer = provider.newProducerBuilder()  .setTopics(topic)  .setClientConfiguration(configuration)  .build();  // 普通消息发送。  Message message = provider.newMessageBuilder()  .setTopic(topic)  // 设置消息索引键,可根据关键字精确查找某条消息。  .setKeys(key)  // 设置消息Tag,用于消费端根据指定Tag过滤消息。  .setTag(mqTag)  // 消息体。  .setBody(objectMapper.writeValueAsString(ob).getBytes())  .build();  // 发送消息,需要关注发送结果,并捕获失败等异常。  SendReceipt sendReceipt = producer.send(message);  LogFactory.get().info("Send message successfully, messageId={}", sendReceipt.getMessageId());  producer.close();  } catch (ClientException | IOException e) {  LogFactory.get().error("推送微信消息时发生错误:", e);  }  }

生产端的消息是重试 hold 住了,但如果消费者出现异常,消费某一条消息失败,这时候 RocketMQ 会怎么处理呢?

我们下一篇会专门针对消费端的消息重试做更深入的了解,也是面试重灾区

好啦,今天的分享结束。

我是苍何,这是图解 RocketMQ 教程的第 6 篇,我们下篇见~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/51565.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单据新增,限制单据栏位的录入值,设置过滤条件

希望通过开发实现 单据头的组织栏位,只能选择101开头的组织,实现的效果如下: 代码如下: using Kingdee.BOS.Util; using Kingdee.BOS.Core.DynamicForm.PlugIn; using Kingdee.BOS.Core.DynamicForm.PlugIn.Args; using System.ComponentModel;namespace cux.button.test {…

基于opencv的人脸识别(实战)

前言 经过这几天的学习,我已经跃跃欲试了,相信大家也是,所以我决定自己做一个人脸识别程序。我会把自己的思路和想法都在这篇博客内讲清楚,大家可以当个参考,🌟仅供学习使用🌟。 &#x1f31f…

分享10个好用的论文编辑服务/平台

学境思源,一键生成论文初稿: AcademicIdeas - 学境思源AI论文写作 如果您对自己的学术写作能力存在怀疑,论文编辑服务/平台或许能提供帮助。为了帮助您做出更好的选择,今天的分享我们列出了2024年“全网”最好用的10个论文编辑服…

怎么样建设数字化车间?

建设数字化车间是一个综合性的过程,旨在通过现代信息技术、智能设备和自动化技术对车间进行优化改造,提高生产效率和产品质量。以下是一些关键步骤和要点,用于指导数字化车间的建设: 一、明确建设目标和需求 分析现状&#xff1…

【轨物方案】开关柜在线监测物联网解决方案

随着物联网技术的发展,电力设备状态监测技术也得到了迅速发展。传统的电力成套开关柜设备状态监测方法主要采用人工巡检和定期维护的方式,这种方法不仅效率低下,而且难以保证设备的实时性和安全性。因此,基于物联网技术的成套开关…

Mybatis-Plus-常用的注解:@TableName、@TableId、@TableField、@TableLogic

1、TableName 经过之前的测试,在使用MyBatis-Plus实现基本的CRUD时,我们并没有指定要操作的表,只是在Mapper接口继承BaseMapper时,设置了泛型User,而操作的表为user表由此得出结论,MyBatis-Plus在确定操作…

Python:随机数、随机选择的应用

step1:导入 导入的random相当于是创建了random文件里的的一个对象 import random random() 产生0~1随机数 randint(a,b)产生a~b的整数 闭区间,可以取到a,b random.choice(touple_name)从touple_name(数组、列表..)中随机选择元素 import rand…

Java人力资源招聘社会校招类型招聘小程序

✨💼【职场新风尚!解锁人力资源招聘新神器:社会校招类型招聘小程序】✨ 🎓【校招新体验,一键触达梦想企业】🎓 还在为错过校园宣讲会而懊恼?别怕,社会校招类型招聘小程序来救场&am…

L2TP VPN

目录 一、实验目的 二、实验环境 三、实验内容 1、实验规划: 2、关键内容: 3、实施步骤: 四、实验总结 一、实验目的 1、了解L2TP的实现原理; 2、掌握Client-Initiated场景下的L2TP的配置。 二、实验环境 华为eNSP模拟器…

pytest 测试框架中 setup、teardown 方法不生效

pytest 测试框架中 setup、teardown 方法不生效 源码有改动: 将 setup、teardown改为:setup_method、teardown_method 可生效 def setup_method(self):print("测试用例执行前的初始化,如:打开浏览器,加载网页...")def setup_class…

MybatisPlus(一)

目录 入门: 使用MybatisPlus的基本步骤: 常见注解 常见配置 总结 核心功能 条件构造器 自定义SQL Service接口 IService接口基本用法 IService的Lambda查询 IService的Lambda更新 IService批量新增 入门: 使用MybatisPlus的基本步…

WebSocket程序设计

协议说明 WebSocket 是一种在单个TCP连接上进行全双工通信的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。Websocket主要用在B/S架构的应用程序中,在 WebSocket API 中,浏览器和服务器只…

2024全面解析:从零基础到精通的大模型学习路线,非常详细零基础入门到精通,收藏我这一篇就够了

大模型学习路线规划 第一阶段:基础理论入门 目标:了解大模型的基本概念和背景。 内容: 人工智能演进与大模型兴起。 大模型定义及通用人工智能定义。 GPT模型的发展历程。 第二阶段:核心技术解析 目标:深入学习大模…

python 图片转文字、语音转文字、文字转语音保存音频并朗读

一、python图片转文字 1、引言 pytesseract是基于Python的OCR工具, 底层使用的是Google的Tesseract-OCR 引擎,支持识别图片中的文字,支持jpeg, png, gif, bmp, tiff等图片格式 2、环境配置 python3.6PIL库安装Google Tesseract OCR 3、安…

mac下通过brew安装mysql的环境调试

mac安装mysql 打开终端,运行命令(必须已经装过homebrew哦): 安装brewbin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"已安装brew直接运行:brew install mysql8.0报…

洛科威岩棉板在生产生活中广泛应用,以优秀表现实现隔热性能最大化

无论是在住宅领域还是工业生产领域,隔热保温都扮演着极其重要的角色,选用的材料是否足够出色,决定了大家居家生活的舒适度,以及生产过程中能耗的高低。近些年来,洛科威岩棉优秀的隔热性能逐渐得到了各行各业的青睐&…

HomeDepot commercedesk 平台EDI自测流程

Home Depot Canada 是一家全球知名的家居建材零售公司,在加拿大拥有多家分店。它是美国的家居建材零售巨头 Home Depot 在加拿大的子公司。Home Depot 主要销售各种家庭装修和建筑材料,包括工具、家具、装饰品、电器等。公司提供广泛的产品选择和专业的顾…

机器学习(二十四):信息增益、独热编码和回归树

一、纯度测量方式——熵 第一步,定义:一个子集里,某一类别的数据在子集中的占比 例如,下图这组输入数据,根据耳朵形状划分为两个子集,尖耳朵子集里,有四只猫,1只狗。则是4/5 第二步…

NVIDIA正偷偷复活卡皇泰坦,性能秒杀5090Ti

PC 硬件圈的瓜年年有,但最近似乎格外的多噢! 首先针对 13、14 代酷睿 CPU 不稳定问题,Intel 终于做出了正式回应: 他们在对退回的 CPU 进行大量分析后得出,确认是过高的运行电压和微代码算法错误导致了不稳定情况。 …

2-48 基于matlab的EM算法聚类可视化程序

基于matlab的EM算法聚类可视化程序,通过期望最大化算法(EM)优化类别间距,使得类别间距最大、类内间距最小。输出聚类前后结果及收敛曲线。程序已调通,可直接运行。 2-48 期望最大化算法(EM) 聚类…