【RabbitMQ | 第六篇】消息重复消费问题及解决方案

在这里插入图片描述

文章目录

  • 6.消息重复消费问题
    • 6.1问题介绍
    • 6.2解决思路
    • 6.3将该消息存储到Redis
      • 6.3.1将id存入string(单消费者场景)
        • (1)实现思路
        • (2)问题
      • 6.3.2将id存入list中(多消费场景)
        • (1)实现思路
      • 6.3.3将id以key增量存入string中并设置过期时间
        • (1)实现思路
    • 6.4总结

6.消息重复消费问题

6.1问题介绍

什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者–>MQ–>消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在 两个阶段

1 :生产者多发送了消息给MQ;

2 :MQ的一条消息被消费者消费了多次

具体场景如下:

  1. 生产者发送消息给MQ在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
  2. 消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。

6.2解决思路

  1. 发送消息时让每个消息携带一个全局的唯一ID
  2. 在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:
    • 消费者获取到消息后先根据id去查询redis/db是否存在该消息
    • 如果不存在,则正常消费,消费完毕后写入redis/db
    • 如果存在,则证明消息被消费过,直接丢弃

6.3将该消息存储到Redis

6.3.1将id存入string(单消费者场景)

(1)实现思路
  • 将id号存入value中,并且value类型为string
  • 即以队列名称为key,以消息id为值
  • 每次消息过来都覆盖之前的消息
    @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以@RabbitHandlerpublic void receiveMessage1(Message message) throws UnsupportedEncodingException {//获取唯一idString messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"utf-8");//获取redis中该队列名称对应的value值String messageRedisValue = redisUtil.get("queueName4","");//检验唯一id是否存在if (messageRedisValue.equals(messageId)) {//存在return;}System.out.println("消息:"+msg+", id:"+messageId);//以队列为key,id为valueredisUtil.set("queueName4",messageId);}
(2)问题
  1. 并发冲突:如果多个消费者同时操作 Redis 中的已消费消息列表,由于 Redis 是单线程处理命令,可能会出现并发冲突导致数据不一致或丢失问题。特别是在高并发情况下,使用字符串类型的 ID 可能会增加并发冲突的风险
  2. 内存占用:字符串类型的 ID 在内存中占用空间相对较大,尤其是对于大量消息的情况下,会增加 Redis 的内存占用。
  3. 比较效率:字符串类型的 ID 比较起来相对复杂,需要进行字符串比较操作。

6.3.2将id存入list中(多消费场景)

(1)实现思路
  • 以该队列名称为key,id为value
  • 适合多消费场景的原因:
    • 顺序性:List 是一个有序集合,可以按照消息的顺序存储消息 ID。在多消费者场景下,保持消息的顺序通常是很重要的,以确保消息按照正确的顺序被消费。
    • 原子性操作:Redis 的 List 提供了多个原子性操作,比如从列表两端推入/弹出元素,这些操作可以确保多个消费者同时访问列表时不会出现数据竞争和并发问题。
    • 支持阻塞操作:List 提供了阻塞式的弹出操作(如 BLPOP、BRPOP),可以在没有消息时阻塞等待新消息的到来,这对于实现消费者轮询机制非常有用。
@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"utf-8");//获取List<String> messageRedisValue = redisUtil.lrange("queueName4");if (messageRedisValue.contains(messageId)) {return;}System.out.println("消息:"+msg+", id:"+messageId);redisUtil.lpush("queueName4",messageId);//存入list
}

6.3.3将id以key增量存入string中并设置过期时间

(1)实现思路

消息id为key消息内容为value存入string中,设置过期时间( 可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

    @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以@RabbitHandlerpublic void receiveMessage2(Message message) throws UnsupportedEncodingException {String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"utf-8");String messageRedisValue = redisUtil.get(messageId,"");if (msg.equals(messageRedisValue)) {return;}System.out.println("消息:"+msg+", id:"+messageId);//以id为key,消息内容为value,过期时间10分钟redisUtil.set(messageId,msg,10L);}

6.4总结

该篇文章介绍了消息重复消费问题及解决方案,问题可能产生的两个阶段(生产消息多发、消费者重复消息);解决方案:将消息发送时携带一个唯一id,消费方拿到消息时先去reids/db中有没有该数据,若没有则可以消费,否则不可以消费;并介绍了基于Redsi解决消息重复消费问题,①以队列名称为key,消息id为value,且value为string类型(适合只有一个消费方)②以队列名称为key,消息id为value,且value为list类型(适合有多个消费方场景)③以消息id为key,内容为value,并设置过期时间

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/757856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++提高笔记(七)---STL常用算法(排序、拷贝和替换、算术生成、集合)

&#xff08;由于上篇笔记篇幅过长&#xff0c;故开新篇 继续记录算法笔记&#xff09; 2.3常用排序算法 学习目标&#xff1a;掌握常用的排序算法 算法简介&#xff1a; sort //对容器内元素进行排序 random_shuffle //洗牌 指定范围内的元素随机调整次序 merg…

【进阶版讲解如何系统地自学Python?】一篇文章带你了解

如何系统地自学Python? 1. 前言2. 确定学习目标3. 学习Python的基础4. 实践编程5. 理解面向对象编程6. 掌握Python库和框架7. 开始一个个人项目8. 加深理解9. 参与社区10. 不断挑战自己11. 总结和回顾 1. 前言 自学Python需要计划、资源和实践&#xff0c;以下是一个系统性自…

springboot + neo4j 功能使用

集成 添加依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-neo4j</artifactId></dependency> spring:# neo4j 图数据库neo4j:uri: bolt://localhost:7687authentication:username: …

首页效果炫酷的wordpress免费主题模板

视频背景免费WP主题 简洁大气的视频背景wordpress主题&#xff0c;找大视频背景的主题可以看看这个。 https://www.wpniu.com/themes/193.html 红色全屏大图WP主题 非常经典的一款免费wordpress主题&#xff0c;红色全屏大图满足多行业使用。 https://www.wpniu.com/themes…

贴片电感的工艺结构原理及选型参数总结

🏡《总目录》 目录 1,概述2,工作原理3,结构特点3.1,耐电流叠加特性3.2,耐冲击噪音低3.3,全屏蔽结构绿色环保性好3.4,损耗低、耐热性好3.5,作业频带宽4,工艺流程4.1,线圈绕制4.2,磁芯制作4.3,导线制作4.4,封装

蓝桥杯算法基础(11):十大排序算法(冒泡排序)c语言般版

十大排序算法合集&#xff08;c语言般&#xff09; 冒泡排序 选择排序 插入排序 希尔排序 快速排序 归并排序 堆排序 计数排序 桶排序 基数排序 分类: 交换类 1.冒泡排序 2.快速排序 分配类 1.计数排序 2.基数排序 选择类 1.选择排序 归并类 1.归并排序 插入类 1.插入…

9.登入页面

登入页面 在pages中新建页面login 修改代码 <template><view></view> </template><script setup></script><style lang"scss"></style>添加头像组件 官网 https://vkuviewdoc.fsq.pub/components/avatar.html …

Oracle中使用coe_load_sql_profile脚本固定执行计划

coe_load_sql_profile.sql 是Oracle数据库环境下用于迁移或固定SQL执行计划的一个脚本&#xff0c;它可以帮助DBA将特定SQL语句的高效执行计划转化为SQL Profile&#xff0c;并将其应用到目标数据库中。 SQL Profile是一种Oracle数据库中用来指导优化器选择特定执行计划的方法。…

【靶机测试--PHOTOGRAPHER: 1【php提权】】

前期准备 靶机下载地址&#xff1a; https://vulnhub.com/entry/photographer-1%2C519/ 信息收集 nmap 扫描同网段 ┌──(root㉿kali)-[/home/test/桌面] └─# nmap -sP 192.168.47.0/24 --min-rate 3333 Starting Nmap 7.92 ( https://nmap.org ) at 2024-03-19 07:37 …

SpringCloud Alibaba Nacos 服务注册和配置中心

一、前言 接下来是开展一系列的 SpringCloud 的学习之旅&#xff0c;从传统的模块之间调用&#xff0c;一步步的升级为 SpringCloud 模块之间的调用&#xff0c;此篇文章为第十二篇&#xff0c;即介绍 SpringCloud Alibaba Nacos 服务注册和配置中心。 二、Nacos 简介 2.1 为…

SpringBoot 监控 SQL 运行情况

Druid 数据库连接池相信很多小伙伴都用过,个人感觉 Druid 是阿里比较成功的开源项目了,不像 Fastjson 那么多槽点,Druid 各方面一直都比较出色,功能齐全,使用也方便,基本的用法就不说了,今天我们来看看 Druid 中的监控功能。 准备工作 首先我们来创建一个 Spring Boot…

Android API 30及更高版本网络权限设置

目录 一、网络权限设置二、配置步骤1、在 AndroidManifest.xml 文件中添加网络权限声明2、在 AndroidManifest.xml 文件中的 application 节点下配置网络安全策略 一、网络权限设置 在 Android API 30 及更高版本中&#xff0c;Google 引入了更严格的网络安全策略&#xff0c;…

wireshark数据捕获实验简述

Wireshark是一款开源的网络协议分析工具&#xff0c;它可以用于捕获和分析网络数据包。是一款很受欢迎的“网络显微镜”。 实验拓扑图&#xff1a; 实验基础配置&#xff1a; 服务器&#xff1a; ip:172.16.1.88 mask:255.255.255.0 r1: sys sysname r1 undo info enable in…

YOLOv5目标检测学习(6):源码解析之:训练部分train.py

文章目录 前言一、导入相关包与配置二、主函数main2.1 checks&#xff1a;检查rank值来判断是否打印参数、检查git仓库、检查包的安装2.2 判断是否恢复上一次模型训练提问&#xff1a;opt.data, opt.cfg, opt.hyp, opt.weights, opt.project各是什么&#xff1f; 2.3 DDP mode&…

【数据结构】哈希表与哈希桶

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.概念 2.哈希冲突…

mysql查询条件包含IS NULL、IS NOT NULL、!=、like %* 、like %*%,不能使用索引查询,只能使用全表扫描,是真的吗???

不知道是啥原因也不知道啥时候, 江湖上流传着这么一个说法 mysql查询条件包含IS NULL、IS NOT NULL、!、like %* 、like %*%,不能使用索引查询&#xff0c;只能使用全表扫描。 刚入行时我也是这么认为的&#xff0c;还奉为真理&#xff01; 但是时间工作中你会发现还是走索引…

Linux 常用运维使用指令

查询占用 CPU 最高的前 10 个进程 ps aux|grep -v PID|sort -rn -k 3|head 查询占用内存最大的前 10 个进程 ps aux|grep -v PID|sort -rn -k 4|head Linux du 获取文件比较大的前十 du -h / | sort -rh | head -n 10 解释&#xff1a; du: 磁盘使用情况命令。 -h: 参数…

C++基础入门(命名空间,函数,引用)

文章目录 前言1,命名空间2,函数函数重载缺省参数内联函数 3,引用尾声 前言 欢迎来到这篇关于C的入门博客&#xff01;C是一门强大而又广泛应用的编程语言&#xff0c;作为一门面向对象的编程语言&#xff0c;C可以让你更好地组织和管理代码&#xff0c;提高代码的重用性和可维…

带你学会深度学习之卷积神经网络[CNN] - 3

前言 本文不讲述如泛化&#xff0c;前向后向传播&#xff0c;过拟合等基础概念。 本文图片来源于网络&#xff0c;图片所有者可以随时联系笔者删除。 CNN&#xff0c;常用于计算机视觉&#xff0c;是计算机视觉方面常见的基础模型&#xff0c;后面发展的有很多其他变种&…

实验7-3-6 字符串转换成十进制整数(PTA)

题目&#xff1a; 输入一个以#结束的字符串&#xff0c;本题要求滤去所有的非十六进制字符&#xff08;不分大小写&#xff09;&#xff0c;组成一个新的表示十六进制数字的字符串&#xff0c;然后将其转换为十进制数后输出。如果在第一个十六进制字符之前存在字符“-”&#…