消息队列的幂等问题解决方案

消息队列的幂等性问题是指在处理重复消息时,保证消息被多次消费但只产生一次影响。由于网络延迟、消费端异常等原因,消息可能会被重复投递或消费,因此消息处理的幂等性是保证系统数据一致性的重要环节。

1. 解决幂等问题常见方案

1.1 唯一请求ID(去重ID)

每个消息都包含一个唯一的 ID,生产者在生成消息时给每个消息赋予一个唯一的标识(如 UUID)。
消费者在处理消息前,先检查该消息的 ID 是否已经处理过。如果已经处理过,则直接丢弃;如果没有处理过,则进行处理并记录这个 ID。
记录方式可以是数据库表或缓存(如 Redis),用于存储已经处理过的消息 ID。

1.2 数据库唯一约束

在数据库中为相关字段设置唯一约束。例如,在处理丁丹时,可以在订单表中为订单号设置唯一索引。
在插入数据时,如果因为唯一约束导致插入失败,则说明该消息已经被处理过,这样就保证了幂等性。

1.3 状态检查

在处理消息时,首先查询当前状态,看是否已经达到目标状态。
只有当状态符合预期时,才进行下一步处理,并更新状态。
这种方式适用于多步骤的业务逻辑,每个步骤都有明确的状态变化。

1.4 使用Redis原子操作

利用Redis的SETNX(SET if Not Exists)操作。先尝试将消息的唯一ID设置到Redis中,如果成功(返回1),则表明是第一次处理,可以继续处理消息;如果失败(返回0),则说明消息已经被处理过。
使用Redis的TTL功能,可以给记录的消息ID设置过期时间,避免Redis中记录无限增长。

1.5 幂等性业务逻辑

将业务逻辑设计为幂等的,即多次执行相同的操作不会影响最终结果。例如,扣减库存操作可以设计为将库存设置为剩余数量而非从库存中减去数量
使用数学运算的性质。例如,订单金额计算可以直接将订单的最终状态写入,而不是进行增量计算。

1.6 实现示例

以使用唯一请求ID和Redis为例:
1.生产者
生成消息时,附加一个唯一的ID,如UUID。

String uniqueId = UUID.randomUUID().toString();
Message message = new Message(uniqueId, data);

2.消费者
在消费消息时,先检查Redis中是否存在该消息ID。

String uniqueId = message.getUniqueId();
boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(uniqueId, "1", 10, TimeUnit.MINUTES);if (isFirstProcess) {// 处理消息processMessage(message);
} else {// 消息已经处理过,直接丢弃
}

注意事项
去重 ID 的有效期:在使用 Redis 等缓存进行去重时,要注意设置合理的过期时间,以免 Redis 中存储过多的已处理消息 ID。
数据一致性:幂等操作的目标是保证数据的一致性。在实际实现时,要结合业务需求确保数据的正确性,避免因重复消费导致的状态不一致。
性能影响:为保证幂等性而查询数据库或缓存可能带来性能开销,因此需要根据业务场景选择合适的实现方案。

2. RocketMQ 和 RabbitMQ 的幂等性实现示例

2.1 RocketMQ幂等实现

RocketMQ 提供了消息幂等性处理的功能,通过消费者端的逻辑来确保消息的幂等性。
1. 生产者发送消息
生成一个唯一的业务 ID(如订单号、交易 ID)并将其作为消息的属性发送到 RocketMQ。

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.start();String uniqueId = UUID.randomUUID().toString();
Message message = new Message("TopicTest", "TagA", uniqueId, data.getBytes());SendResult sendResult = producer.send(message);
producer.shutdown();

2. 消费者处理消息
消费者在处理消息前,检查唯一业务 ID 是否已经处理过。
使用 Redis 的 SETNX 或数据库的唯一约束来判断是否已处理。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.subscribe("TopicTest", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {String uniqueId = msg.getKeys();// 使用 Redis 判断是否已处理过boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(uniqueId, "1", 10, TimeUnit.MINUTES);if (isFirstProcess) {// 处理消息processMessage(msg);} else {// 消息已经处理过,直接忽略}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});consumer.start();

2.2 RabbitMQ 幂等性实现

在 RabbitMQ 中,幂等性处理主要在消费者端完成,类似于 RocketMQ。
1. 生产者发送消息
生产者发送消息时,同样附带一个唯一的业务 ID。

String uniqueId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueId);Message message = new Message(data.getBytes(), messageProperties);
rabbitTemplate.send("exchange", "routingKey", message);

2. 消费者处理消息
在消费者处理消息前,检查唯一业务 ID 是否已存在。
可以通过 Redis 或数据库来进行幂等性检查。

@RabbitListener(queues = "queueName")
public void processMessage(Message message) {String uniqueId = message.getMessageProperties().getMessageId();// 使用 Redis 判断是否已处理过boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(uniqueId, "1", 10, TimeUnit.MINUTES);if (isFirstProcess) {// 处理消息processBusinessLogic(message);} else {// 消息已经处理过,直接忽略}
}

3. 电商支付模块的幂等性问题

在电商系统中,支付模块的幂等性尤为重要,因为重复支付会导致资金错误和用户体验问题。常见的支付幂等性处理策略包括:
1.唯一交易 ID
每个支付请求都生成一个唯一的交易 ID,并将其作为支付请求的唯一标识。
支付服务在接收到请求时,先查询数据库中是否已经存在该交易 ID。如果存在,说明支付请求已经处理过;如果不存在,才进行支付处理并记录该交易 ID。
2. 数据库事务
利用数据库的事务和唯一约束,保证同一个交易 ID 的支付记录只能插入一次。
通过数据库的原子性操作,确保支付的原子性和幂等性。
3. 支付状态检查
在处理支付请求时,首先检查当前订单的支付状态。
如果订单已经是支付成功状态,则直接返回成功;如果未支付或支付中,则进行下一步支付操作。

public synchronized boolean processPayment(String orderId, BigDecimal amount) {// 查询订单状态Order order = orderRepository.findById(orderId);if (order.getStatus() == OrderStatus.PAID) {// 已支付,直接返回return true;}// 进行支付操作boolean paymentSuccess = paymentGateway.pay(orderId, amount);if (paymentSuccess) {// 更新订单状态order.setStatus(OrderStatus.PAID);orderRepository.save(order);}return paymentSuccess;
}

4. 幂等性 Token
在发起支付请求时,生成一个幂等性 Token 并将其发送到支付服务端。
支付服务端接收到请求后,使用该 Token 判断该请求是否已经处理过。

实现示例

1.支付请求
在发起支付请求时,生成唯一交易 ID,并在支付请求中附带该 ID。

String transactionId = UUID.randomUUID().toString();
PaymentRequest paymentRequest = new PaymentRequest(orderId, amount, transactionId);

2. 支付服务端
支付服务端接收到支付请求后,检查交易 ID 是否已经处理过。
使用数据库或 Redis 记录已经处理过的交易 ID。

public boolean processPayment(PaymentRequest request) {String transactionId = request.getTransactionId();boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(transactionId, "1", 10, TimeUnit.MINUTES);if (isFirstProcess) {// 执行支付逻辑boolean success = paymentGateway.pay(request.getOrderId(), request.getAmount());if (success) {// 支付成功,更新订单状态updateOrderStatus(request.getOrderId(), OrderStatus.PAID);}return success;} else {// 重复请求,直接返回成功return true;}
}

通过在 RocketMQ 和 RabbitMQ 中使用唯一 ID 来确保消息处理的幂等性,以及在电商支付模块中利用唯一交易 ID 和状态检查,可以有效避免重复处理带来的问题,保证系统数据的一致性和正确性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/54368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决【WVP服务+ZLMediaKit媒体服务】加入海康摄像头后,能发现设备,播放/点播失败,提示推流超时!

环境介绍 每人搭建的环境不一样,情况不一样,但是原因都是下面几种: wvp配置不当网络端口未放开网络不通 我搭建的环境: WVP服务:windows下,用idea运行的源码 ZLM服务:虚拟机里 问题描述 1.…

【人工智能学习笔记】5 计算机视觉基础

计算机视觉概述 定义:计算机视觉(Computer Vision)是一门研究如何使机器“看”的科学,也可以看作是研究如何使人工系统从图像活多维数据中“感知”的科学终极目标:计算机视觉成为机器认知世界的基础,终极目…

superset 解决在 mac 电脑上发送 slack 通知的问题

参考文档: https://superset.apache.org/docs/configuration/alerts-reports/ 核心配置: FROM apache/superset:3.1.0USER rootRUN apt-get update && \apt-get install --no-install-recommends -y firefox-esrENV GECKODRIVER_VERSION0.29.0 RUN wget -q https://g…

【高级篇】ENC编码器如何挂载Windows共享目录进行录像

【高级篇】ENC编码器如何挂载Windows共享目录进行录像 Windows共享目录前提条件1、打开控制面板,点击 程序 菜单2、点击 启用或关闭Windows功能 菜单3、如下图,勾选SMB1.0/CIFS文件共享支持,并点击确认按钮,然后根据提示重启电脑 创建共享目录…

如何利用Samba跨平台分享Ubuntu文件夹

1.安装Samba 终端输入sudo apt install samba 2.配置Samba 终端输入sudo vim /etc/samba/smb.conf 打开配置文件 滑动文件到最底下 输入以下内容 [Share] # 要共享的文件夹路径 path /home/xxx/sambashare read only no browsable yes编辑完成后按一下Esc按键后输入:wq回…

ABAP-Swagger 一种公开 ABAP REST 服务的方法

ABAP-Swagger An approach to expose ABAP REST services 一种公开 ABAP REST 服务的方法 Usage 1: develop a class in ABAP with public methods 2: implement interface ZIF_SWAG_HANDLER, and register the public methods(example method zif_swag_handler~meta) 3: …

Docker 以外置数据库方式部署禅道

2.安装步骤 2.1.参考资料 禅道官网文档: https://www.zentao.net/book/zentaopms/docker-1111.html https://www.zentao.net/book/zentaopms/405.html 2.2.详细步骤 ssh 登录服务器创建目录 /opt/zentao /opt/zentao/data /opt/zentao/db cd /opt mkdir zentao mkdir zentao…

开源免费的NAS系统-TrueNAS CORE上创建CentOS7虚拟机

目录 文章目录 目录1、说明2、准备工作2.1、准备安装镜像2.1、创建用户2.2、开启 ssh 服务2.3、设置用户权限2.4、上传系统镜像2.5、 添加虚拟机 3、开始安装系统3.1、启动虚拟机3.2、选择语言3.3、配置网络3.4、设置 root 密码3.5、删除光驱3.6、重启虚拟机3.7、使用 ssh 连接…

C++ | Leetcode C++题解之第414题第三大的数

题目&#xff1a; 题解&#xff1a; class Solution { public:int thirdMax(vector<int> &nums) {int *a nullptr, *b nullptr, *c nullptr;for (int &num : nums) {if (a nullptr || num > *a) {c b;b a;a &num;} else if (*a > num &&am…

【Python】练习:控制语句(二)第4关

第4关&#xff1a;控制结构综合实训 第一题第二题&#xff08;※&#xff09;第三题&#xff08;※&#xff09;第四题&#xff08;※&#xff09;第五题&#xff08;※&#xff09;第六题&#xff08;※&#xff09; 第一题 #第一题def rankHurricane(velocity):#请在下面编写…

记录|C#的资源路径设置的资料整理

目录 前言一、在这里插入图片描述 https://bbs.csdn.net/topics/360001606 二、三、添加到资源文件中四、获得图片的三种路径方法五、给资源文件添加文件夹更新时间 前言 参考文章&#xff1a; 原本以为C# winform中进行图片等文件的路径的读取是直接可以按照资源文件中显示的来…

【ArcGIS微课1000例】0121:面状数据共享边的修改方法

文章目录 一、共享边概述二、快速的修改办法1. 整形共享边2. 修改边3. 概化边缘一、共享边概述 面状数据共享边指的是两个或多个面状数据(如多边形)共同拥有的边界。在地理信息系统(GIS)、三维建模、大数据分析等领域,面状数据共享边是描述面状空间数据拓扑关系的重要组成…

CORS漏洞及其防御措施:保护Web应用免受攻击

1. 背景- 什么是CORS&#xff1f; 在当今互联网时代&#xff0c;Web 应用程序的架构日益复杂。一个后端服务可能对应一个前端&#xff0c;也可能与多个前端进行交互。跨站资源共享&#xff08;CORS&#xff09;机制在这种复杂的架构中起着关键作用&#xff0c;但如果配置不当&…

Django学习实战篇五(适合略有基础的新手小白学习)(从0开发项目)

前言&#xff1a; 本章中&#xff0c;我们开始引入前端框架Bootstrap 来美化界面。在前面的章节中&#xff0c;我们通过编写后端代码来处理数据。数据之于网站&#xff0c;就相当于灵魂之于人类。而网站的前端就相当于人的形体外貌。其中HTML是骨架&#xff0c;而CSS是皮肤&…

Thymeleaf模版引擎

Thymeleaf是面向Web和独立环境的现代服务器端Java模版引擎&#xff0c;能够处理HTML、XML、JavaScript、CSS甚至纯文本。Thymeleaf旨在提供一个优雅的、高度可维护的创建模版的方式。为了实现这一目标&#xff0c;Thymeleaf建立在自然模版的概念上&#xff0c;将其逻辑注入到模…

2024/9/20 使用QT实现扫雷游戏

有三种难度初级6x6 中级10x10 高级16x16 完成游戏 游戏失败后&#xff0c;无法再次完成游戏&#xff0c;只能重新开始一局 对Qpushbutton进行重写 mybutton.h #ifndef MYBUTTON_H #define MYBUTTON_H #include <QObject> #include <QWidget> #include <QPus…

Kafka 3.0.0集群部署教程

1、集群规划 主机名 ip地址 node.id process.roles kafka1 192.168.0.29 1 broker,controller Kafka2 192.168.0.30 2 broker,controller Kafka3 192.168.0.31 3 broker,controller 将kafka包上传以上节点/app目录下 mkdir /app 解压kafka包 tar -zxvf kafka_…

Spring Boot框架在心理教育辅导系统中的应用案例

目 录 摘 要 I ABSTRACT II 1绪 论 1 1.1研究背景 1 1.2设计原则 1 1.3论文的组织结构 2 2 相关技术简介 3 2.1Java技术 3 2.2B/S结构 3 2.3MYSQL数据库 4 2.4Springboot框架 4 3 系统分析 6 3.1可行性分析 6 3.1.1技术可行性 6 3.1.2操作可行性 6 3.1.3经济可行性 6 3.1.4法律…

Css_动态渐变圆圈旋转效果

1、效果图 2、实现代码 <template><div class"box"><div class"line"></div><div class"lineNew"></div></div> </template><script lang"ts" setup></script><styl…

【图像匹配】基于Harris算法的图像匹配,matlab实现

博主简介&#xff1a;matlab图像代码项目合作&#xff08;扣扣&#xff1a;3249726188&#xff09; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于基于Harris算法的图像匹配&#xff0c;用matlab实现。 一、案例背景和算法介绍 …