RocketMQ(四):重复消费、消息重试、死信消息的解决方案

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot

RocketMQ(四):重复消费、消息重试、死信消息的解决方案


目录

  • 一、重复消费
    • 1、消息重复的情况
    • 2、MySql唯一索引
    • 3、redis分布式锁
  • 二、消息重试
    • 1、生产者重试
    • 2、消费者重试
  • 三、死信消息
  • 四、消费堆积

一、重复消费

1、消息重复的情况

  • 发送时消息重复
    • 当一条消息已被成功发送到服务端并完成持久化
    • 此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败
    • 如果此时生产者意识到消息发送失败并尝试再次发送消息
    • 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
  • 投递时消息重复
    • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断
    • 为了保证消息至少被消费一次
    • 消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息
    • 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
    • 当消息队列RocketMQ的Broker 或客户端重启、扩容或缩容
    • 会触发 Rebalance,此时消费者可能会收到重复消息

2、MySql唯一索引

  • 因为 Message ID 有可能出现冲突(重复)的情况
  • 所以用业务唯一标识作为幂等处理的关键依据

生产者

  • 相同的唯一业务编号,发送两次
@Test
void test1() {// 业务唯一编号String key = "1300";Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, key).build();// 相同的key发送两次rocketMQTemplate.syncSend("repeatedTopic", message);rocketMQTemplate.syncSend("repeatedTopic", message);System.out.println("发送完成");
}

消费者

  • 创建user表结构,num_no字段设置为唯一索引
  • 当唯一的业务id插入唯一索引的num_no字段
  • 只能插入一次,第二次会报唯一索引重复
  • 当获取到重复数据,直接返回即可,就不在执行业务代码
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatMysqlListener implements RocketMQListener<MessageExt> {@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(MessageExt message) {// 唯一的业务id(如果是相同的两次请求,则keys值一定相同)String messageKey = message.getKeys();try {jdbcTemplate.execute("INSERT INTO `user` (`num_no`,`name`) VALUES('" + messageKey + "','名称')");} catch (DataAccessException e) {// 该message可能是重复的if (e instanceof DuplicateKeyException) {System.out.println(messageKey+"的业务编号数据重复了,直接return,就算消费了此重复数据");return;}}// 获取消息执行业务System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");}
}

执行结果:

发送完成
获取消息内容:【我是一个带key的消息】执行业务
1300的业务编号数据重复了,直接return,就算消费了此重复数据

3、redis分布式锁

Redisson分布式锁配置

@Configuration
public class RedissonConfig {@Beanpublic Redisson redisson() {Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6390").setPassword("xc@1234").setDatabase(0);return (Redisson) Redisson.create(config);}
}

生产者

@Test
void test1() {// 业务唯一编号String key = "1400";Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, key).build();// 相同的key发送两次rocketMQTemplate.syncSend("repeatedTopic", message);rocketMQTemplate.syncSend("repeatedTopic", message);System.out.println("发送完成");
}

消费者

  • 因为消费者是多线程并发消费
  • 如果遇到相同的唯一业务id,则上锁依次执行
  • 将执行过的唯一业务id放入redis
  • 下次相同业务id进入与redis集合对比,存在则证明已经执行过了
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatRedisListener implements RocketMQListener<MessageExt> {@Autowiredprivate Redisson redisson;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MessageExt message) {// 唯一的业务id(如果是相同的两次请求,则keys值一定相同)String messageKey = message.getKeys();RLock redissonLock = redisson.getLock(messageKey);try {// 添加redisson锁并实现锁续命功能// 默认过期时间是30s,每10s触发一次锁续命功能redissonLock.lock();List<String> topicBusinessKeyList = stringRedisTemplate.opsForList().range("topicBusinessKey",0,-1);if ( ObjectUtils.isNotEmpty(topicBusinessKeyList) && topicBusinessKeyList.contains(messageKey)) {System.out.println(messageKey + "的业务编号数据重复了,直接return,就算消费了此重复数据");return;}// 获取消息执行业务System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");// 讲businessKey存入redisstringRedisTemplate.opsForList().rightPush("topicBusinessKey", messageKey);} finally {redissonLock.unlock();}}
}

执行结果:

发送完成
获取消息内容:【我是一个带key的消息】执行业务
1400的业务编号数据重复了,直接return,就算消费了此重复数据

二、消息重试

1、生产者重试

  • 可以分别设置同步消息和异步消息发送的重试次数
  • 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
  • 默认重试间隔时间为 1 秒,次数为2次
  • 发送消息超时时间默认3000毫秒,如果因为超时,那么便不再尝试重试

application.yml配置文件设置

在这里插入图片描述

2、消费者重试

  • 默认的重试间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 默认多线程模式下,重试16次,设置超过 16 次的重试时间间隔均为每次 2 小时
  • 某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递
  • 在单线程的顺序模式下,重试Integer.MAX_VALUE次,间隔1秒

消费者配置

  • 实现RocketMQPushConsumerLifecycleListener接口,从prepareStart方法中获取消费者并设置它
  • 消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效
@Component
@RocketMQMessageListener(topic = "retryTopic",consumerGroup = "retry-consumer-group"
)
public class RetryListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt message) {//获取消息的重试次数System.out.println(message.getReconsumeTimes());System.out.println("消息内容:"+new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 设置消费者重试次数defaultMQPushConsumer.setMaxReconsumeTimes(2);// 实例名称-控制面板可以看到defaultMQPushConsumer.setInstanceName("消费者1号");}
}

设置重试二次的执行结果:

在这里插入图片描述

三、死信消息

  • 当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
  • 死信队列是死信Topic下分区数唯一的单独队列
  • 死信Topic名称为%DLQ%原消费者组名,死信队列的消息将不会再被消费

上一节的消费者重试两次后,就会将消息放入死信队列

在这里插入图片描述

处理死信消息方式一:

  • 监听死信队列处理消息
@Component
@RocketMQMessageListener(topic = "%DLQ%retry-consumer-group",consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息 签收了System.out.println("记录到特别的位置 文件 mysql 通知人工处理");}
}

处理死信消息方式二:

  • 控制重试次数,重试几次后,直接记录到数据库等等
@Component
@RocketMQMessageListener(topic = "%DLQ%retry-consumer-group",consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer2 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {// 业务处理try {int i = 1 / 0;} catch (Exception e) {// 重试int reconsumeTimes = messageExt.getReconsumeTimes();if (reconsumeTimes >= 3) {// 不要重试了System.out.println("记录到特别的位置 文件 mysql 通知人工处理");}else {throw new RuntimeException("异常");}}}
}

四、消费堆积

一般认为单条队列消息差值>=10w时 算堆积问题

什么情况下会出现堆积

  • 生产太快
    • 生产方可以做业务限流
    • 增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))
  • 消费者消费出现问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/191687.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker Compose及Docker 知识点整理

目录 1、Docker Compose 简介 2、为什么要使用Docker Compose 3、Docker Compose安装使用&#xff08;Linux&#xff09; 3.1 下载 3.2 mkdir docker 文件夹目录 3.3 上传docker-compose到docker文件夹 3.4 移动到 /usr/local/bin 目录下 3.5 添加执行权限 3.6 修改文…

华为云之快速部署FTP站点

华为云之快速部署FTP站点 一、本次实践介绍1.1 实践环境简介1.2 本次实践目的 二、vsftpd介绍2.1 vsftpd简介2.2 vsftpd特点 三、环境准备工作3.1 预置实验环境3.2 查看预置环境信息3.3 登录华为云3.4 查看弹性云服务器状态3.5 查看弹性公网IP地址3.6 ssh登录弹性云服务器3.6 查…

基于c#+mysql+winform学生成绩管理系统-实践作业

基于c#mysqlwinform学生成绩管理系统-实践作业 一、系统介绍二、功能展示四、其它1.其他系统实现五.获取源码 一、系统介绍 分老师与学生两个界面&#xff1b; 老师能查看学生信息并评价&#xff0c;添加&#xff0c;删除学生&#xff1b; 老师能查看学生成绩并修改&#xff0…

C++也要玩反射

C艹也要玩反射 什么是反射为什么不直接用map实现原理代码示例产品子类构造器工厂客户端使用启动时注册运行时使用什么是反射 反射,简单说,就是给定一个字符串(或其他标识),能够成功找到一个对象实例。以上一篇工厂模式的内容为例,程序中读到一个字符串“/index.html”,就…

数据管理系统-week10-数据库安全

文章目录 前言一、什么是数据库安全?二、威胁三、对抗措施四、授权和认证五、访问控制(重点)自由访问控制(DAC)强制访问控制(MAC)补充一个贝尔-lapadula模型六、加密参考文献前言 数据库安全意味着保护数据库免受有意或无意的未经授权的访问,数据库安全需要保护数据库…

整数分频,奇偶分频。

实验目标&#xff1a; 实现任意整数奇偶分频。 /* 二分频电路就是用同一个时钟信号通过一定的电路结构转变成不同频率的时钟信号。 二分频就是通过有分频作用的电路结构&#xff0c;在时钟每触发2个周期时&#xff0c;电路输出1个周期信号。 比如用一个脉冲时钟触发一个计…

Kubernetes入门学习(上)

文章目录 Kubernetes入门学习&#xff08;上&#xff09;介绍云原生 Kubernetes架构基础概念Kubernetes架构控制平面组件Node组件 组件关系 安装Kubernetes基本对象和操作Pod&#xff08;容器集&#xff09;Deployment(部署)与ReplicaSet(副本集)Service&#xff08;服务&#…

第一百八十七回 DropdownButton组件

文章目录 1. 概念介绍2. 使用方法2.1 DropdownButton2.2 DropdownMenuItem 3. 示例代码4. 内容总结5. 经验分享 我们在 上一章回中介绍了"DropdownMenu组件"相关的内容&#xff0c;本章回中将介绍 DropdownButton组件.闲话休提&#xff0c;让我们一起Talk Flutter吧…

使用 kubeadm 部署 Kubernetes 集群(三)kubeadm 初始化 k8s 证书过期解决方案

一、延长k8s证书时间 查看 apiserver 证书有效时间&#xff1a;默认是一年的有效期 [rootxuegod63 ~]# openssl x509 -in /etc/kubernetes/pki/apiserver.crt -noout -text |grep Not 延长证书过期时间 1.把 update-kubeadm-cert.sh 文件上传到 xuegod63 节点 vim update-…

leetcode做题笔记1094. 拼车

车上最初有 capacity 个空座位。车 只能 向一个方向行驶&#xff08;也就是说&#xff0c;不允许掉头或改变方向&#xff09; 给定整数 capacity 和一个数组 trips , trip[i] [numPassengersi, fromi, toi] 表示第 i 次旅行有 numPassengersi 乘客&#xff0c;接他们和放他们…

Unity 下载网络图片的方法,并把图片赋值给UI和物体的方法

Unity 下载网络图片的方法&#xff0c;可使用WWW类或UnityWebRequest类&#xff0c;其中UnityWebRequest是新版的方法。 通常我们下载图片都会转成Texture&#xff0c;然后赋值给UI或者物体。 具体实现方法&#xff1a; using System.Collections; using System.Collections…

深入理解贝叶斯分类与朴素贝叶斯模型(Naive Bayes, NB):从基础到实战

目录 贝叶斯分类 公式 决策规则 优点 贝叶斯分类器的例子——垃圾邮件问题 1. 特征&#xff08;输入&#xff09;&#xff1a; 2. 类别&#xff1a; 3. 数据&#xff1a; 4. 模型训练&#xff1a; 注&#xff1a;类别先验概率 5. 模型预测&#xff1a; 朴素贝叶斯模…

c++ int* 和 *ptr(取对应变量值)

int n 10; int* ptr; // 声明 一个名为ptr的内存 用来保存传入的变量内存地址 ptr &n; // 给已经什么的内存ptr 赋n变量的内存地址值*ptr 20; // 获取名为ptr的内存保存的变量内存地址对应的变量值,然后赋值20int* 表示 ptr 是一个指针变量 开一个ptr名字的内存,用来保存…

【开题报告】基于深度学习的驾驶员危险行为检测系统

研究的目的、意义及国内外发展概况 研究的目的、意义&#xff1a;我国每年的交通事故绝对数量是一个十分巨大的数字&#xff0c;造成了巨大的死亡人数和经济损失。而造成交通事故的一个很重要原因就是驾驶员的各种危险驾驶操作行为。如果道路驾驶员的驾驶行为能够得到有效识别…

【大数据】区分 hdfs dfs -ls 与 hdfs dfs -ls /

&#x1f60a; 如果您觉得这篇文章有用 ✔️ 的话&#xff0c;请给博主一个一键三连 &#x1f680;&#x1f680;&#x1f680; 吧 &#xff08;点赞 &#x1f9e1;、关注 &#x1f49b;、收藏 &#x1f49a;&#xff09;&#xff01;&#xff01;&#xff01;您的支持 &#x…

并行和并发的区别

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1、并发2、并行3、异同点 1、并发 当有多个线程在操作时,如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程&#xff0c;它只能把CPU运行时间划分成若…

基于SpringBoot的企业客户管理系统的设计与实现

摘 要 本论文主要论述了如何使用JAVA语言开发一个企业客户管理系统&#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述企业客户管理系统的当前背景以及系统开发的目…

npm ERR! notarget No matching version found for @eslint/eslintrc@^2.1.4.

文章目录 Intro解决流程总结前置信息了解npm 镜像源三个要用到的npm命令 官方源确认查看当前镜像源的详情解决&#xff1a; 切换镜像源后重试重新操作 事后感受 Intro 事由是今天我在用 create-react-app 新建一个用于测试的前端项目。 然后就出现以下报错&#xff1a; wuyuj…

【LeetCode热题100】【双指针】移动零

给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2: 输入: nums [0] 输出…

8.C转python

1.在文件查找中,文件夹才是目录 2.使用pip: python搞了一个网站pypi,把各种的第三方库给收集起来了 使用pip工具就可以直接从pypi里下载你想要的第三方库了 可以直接使用pip工具搜 安装完成后,即可使用import导入相关模块即可进行使用 往后运用pip中的第三方库应该都是在…