MQ如何防止消息被重复消费?

被询问如何防止MQ消息被重复消费时,其实是在考察候选人对消息队列、分布式系统设计以及容错机制的理解,通过这些问题,可以全面了解候选人在处理MQ消息重复消费问题时的思考方式、技术能力和实践经验,从而评估其是否适合担任相关岗位。

MQ实现策略

MQ提供了以下几种方式来防止消息被重复消费:

1.消费者手动确认消息

在消费者消费消息后,通过调用basic.ack()方法手动确认消息已被消费。这样一来,RabbitMQ就会从队列中删除该消息,防止消息被重复消费。

java代码示例:

// 创建连接和频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 设置消息确认模式为手动确认
channel.basicQos(1);
// 定义消息消费者
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {// 模拟消费消息String message = new String(body, "UTF-8");System.out.println("Received message: " + message);// 手动确认消息消费channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 消息消费失败,进行异常处理channel.basicNack(envelope.getDeliveryTag(), false, true);}}
};
// 开始消费消息
channel.basicConsume(queueName, false, consumer);

2.消息去重(使用乐观锁)

在消费者消费消息前,可以将消息的唯一标识保存在数据库或缓存中。在消费者接收到消息后,先检查数据库或缓存中是否存在该消息的唯一标识,如果存在,则表示该消息已经被消费过,可以忽略;如果不存在,则表示该消息是新的,可以进行消费。

java代码示例:

// 创建连接和频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义消息消费者
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {// 模拟消费消息String message = new String(body, "UTF-8");System.out.println("Received message: " + message);// 判断消息是否已经消费过,可以通过数据库或缓存进行判断if (!isMessageConsumed(message)) {// 进行消息消费consumeMessage(message);}// 手动确认消息消费channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 消息消费失败,进行异常处理channel.basicNack(envelope.getDeliveryTag(), false, true);}}
};
// 开始消费消息
channel.basicConsume(queueName, false, consumer);

3.使用消息的全局唯一标识

可以在消息的属性中添加一个全局唯一标识,例如UUID,确保每条消息都具有唯一性。消费者在消费消息时,可以通过检查全局唯一标识来判断消息是否已经被消费过。

java代码示例:

// 创建连接和频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义消息消费者
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {// 模拟消费消息String message = new String(body, "UTF-8");System.out.println("Received message: " + message);// 判断消息是否已经消费过,可以通过全局唯一标识进行判断String messageId = properties.getMessageId();if (!isMessageConsumed(messageId)) {// 进行消息消费consumeMessage(message);// 将消息的全局唯一标识保存到数据库或缓存中saveMessageId(messageId);}// 手动确认消息消费channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 消息消费失败,进行异常处理channel.basicNack(envelope.getDeliveryTag(), false, true);}}
};
// 开始消费消息
channel.basicConsume(queueName, false, consumer);

4.设置消息的过期时间

可以为消息设置一个过期时间,在消费者消费消息时,先判断消息是否已经过期,如果已经过期,则不进行消费。

java代码示例:

// 创建连接和频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();// 设置队列的消息过期时间
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 5000); // 设置消息过期时间为5秒
channel.queueDeclare(queueName, true, false, false, arguments);// 发布消息
String message = "Hello, RabbitMQ!";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("5000") // 设置消息的过期时间为5秒.build();
channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);// 关闭连接和频道
channel.close();
connection.close();

通过在队列声明时设置x-message-ttl参数来设置队列的消息过期时间。然后,通过发布消息时设置expiration属性来设置消息的过期时间。这里将消息的过期时间设置为5秒。需要注意的是,RabbitMQ的消息过期时间精度是毫秒级别的,可以通过设置整数或字符串形式的时间间隔来指定过期时间。如果同时设置了队列和消息的过期时间,以较小的那个为准。

5.使用幂等操作

对于一些幂等操作,可以将操作的唯一标识保存在数据库或缓存中。在消费者消费消息时,先检查操作的唯一标识是否存在,如果存在,则表示该操作已经执行过,可以忽略;如果不存在,则表示该操作是新的,可以进行执行。

java代码示例:

// 创建连接和频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();// 定义消息消费者
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {// 模拟消费消息String message = new String(body, "UTF-8");System.out.println("Received message: " + message);// 判断消息是否已经处理过,可以通过幂等操作来判断if (!isMessageProcessed(message)) {// 进行消息处理processMessage(message);// 标记消息已处理,将消息的唯一标识保存到数据库或缓存中saveProcessedMessage(message);}// 手动确认消息消费channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 消息消费失败,进行异常处理channel.basicNack(envelope.getDeliveryTag(), false, true);}}
};// 开始消费消息
channel.basicConsume(queueName, false, consumer);// 关闭连接和频道
channel.close();
connection.close();

通过isMessageProcessed()方法判断消息是否已经被处理过。如果消息已经被处理过,则忽略该消息;如果消息还未被处理过,则进行消息处理,并将消息的唯一标识保存起来。这里的processMessage()方法是具体的消息处理逻辑,saveProcessedMessage()方法将消息的唯一标识保存到数据库或缓存中,以便后续判断消息是否已经被处理过。需要根据具体的业务逻辑实现isMessageProcessed()、processMessage()saveProcessedMessage()方法来实现幂等操作。

消息重复消费的原因多种多样,不可避免。所以只能从消费者端入手,只要能保证消息处理的幂等性就可以确保消息不被重复消费。SS 而幂等性的保证又有很多方案:

  1. 给每一条消息都添加一个唯一id,在本地记录消息表及消息状态,处理消息时基于数据库表的id唯一性做判断

  2. 消息去重使用乐观锁,同样是记录消息表,利用消息状态字段实现基于乐观锁的判断,保证幂等

  3. 基于业务本身的幂等性。比如根据id的删除、查询业务天生幂等;新增、修改等业务可以考虑基于数据库id唯一性、或者乐观锁机制确保幂等。本质与消息表方案类似。

  4. 设置消息的过期时间,可以为消息设置一个过期时间,在消费者消费消息时,先判断消息是否已经过期,如果已经过期,则不进行消费。

  5. 消费者手动确认消息,在消费者消费消息后,通过调用basic.ack()方法手动确认消息已被消费。这样一来,RabbitMQ就会从队列中删除该消息,防止消息被重复消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/717243.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Puzzles

题目链接&#xff1a;Submit - Codeforces​​​​​​ 解题思路&#xff1a; 题目大概意思就是在一个数组里找n个数里的最大值减最小值的最小值&#xff0c;先排序&#xff0c;然后将第i n - 1项减去第i项与最小值作比较&#xff0c;输出最小值即可&#xff0c;注意循环结束…

NTP网络校时服务器(GPS北斗卫星校时系统)应用场景

NTP网络校时服务器&#xff08;GPS北斗卫星校时系统&#xff09;应用场景 NTP网络校时服务器&#xff08;GPS北斗卫星校时系统&#xff09;应用场景 随着大数据、云计算时代的到来,各行业信息化建设的不断提升,信息化下的各个系统不再单独处理各自业务,而是趋于协同工作,因此,各…

YOLOv应用开发与实现

一、背景与简介 YOLO&#xff08;You Only Look Once&#xff09;是一种流行的实时目标检测系统&#xff0c;其核心思想是将目标检测视为回归问题&#xff0c;从而可以在单个网络中进行端到端的训练。YOLOv作为该系列的最新版本&#xff0c;带来了更高的检测精度和更快的处理速…

代码随想录day34||● 860.柠檬水找零 ● 406.根据身高重建队列 ● 452. 用最少数量的箭引爆气球

860. 柠檬水找零 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:bool lemonadeChange(vector<int>& bills) {int five0,ten0,twenty0;for(int bill:bills){if(bill5)five;if(bill10){if(five<0)return false;ten;five--;}if(bill20){if(ten&g…

【框架】MyBatis 框架重点解析

MyBatis 框架重点解析 1. MyBatis 执行流程 会话工厂生产的 SqlSession 对象提供了对数据库执行SQL命令所需的所有方法&#xff0c;包括但不限于以下功能&#xff1a; 数据库操作&#xff1a;SqlSession可以执行查询&#xff08;select&#xff09;、插入&#xff08;insert&a…

腾讯云幻兽帕鲁游戏存档迁移教程,本地单人房迁移/四人世界怎么迁移存档?

腾讯云幻兽帕鲁游戏存档迁移的方法主要包括以下几个步骤&#xff1a; 登录轻量云控制台&#xff1a;首先&#xff0c;需要登录到轻量云控制台&#xff0c;这是进行存档迁移的前提条件。在轻量云控制台中&#xff0c;可以找到接收存档的服务器卡片&#xff0c;并点击进入实例详情…

Jmeter 安装

JMeter是Java的框架&#xff0c;因此在安装Jmeter前需要先安装JDK&#xff0c;此处安装以Windows版为例 1. 安装jdk&#xff1a;Java Downloads | Oracle 安装完成后设置环境变量 将环境变量JAVA_HOME设置为 C:\Program Files\Java\jdk1.7.0_25 在系统变量Path中添加 C:\Pro…

股票技术指标(包含贪婪指数)

股票技术指标是用于分析股票价格和成交量数据&#xff0c;以便预测未来市场走势的工具。技术分析师使用这些指标来识别市场趋势、价格模式、交易信号和投资机会。技术指标通常基于数学公式&#xff0c;并通常在股票价格图表上以图形形式表示。 技术指标主要分为以下几类&#x…

A Brief Introduction of the Tqdm Module in Python

DateAuthorVersionNote2024.02.28Dog TaoV1.0Release the note. 文章目录 A Brief Introduction of the Tqdm Module in PythonIntroductionKey FeaturesInstallation Usage ExamplesBasic UsageAdvanced Usage A Brief Introduction of the Tqdm Module in Python Introducti…

力扣hot100:42.接雨水

什么时候能用双指针&#xff1f; &#xff08;1&#xff09;对撞指针&#xff1a; ①两数和问题中可以使用双指针&#xff0c;先将两数和升序排序&#xff0c;可以发现规律&#xff0c;如果当前两数和大于target&#xff0c;则右指针向左走。 ②接雨水问题中&#xff0c;左边最…

【算法集训】基础算法:枚举

一、基本理解 枚举的概念就是把满足题目条件的所有情况都列举出来&#xff0c;然后一一判定&#xff0c;找到最优解的过程。 枚举虽然看起来麻烦&#xff0c;但是有时效率上比排序高&#xff0c;也是一个不错的方法、 二、最值问题 1、两个数的最值问题 两个数的最小值&…

Vscode安装,ssh插件与配置

原因 发现很多新人在练习linux&#xff0c;可是只有windows机的时候&#xff0c;一般都是下载虚拟机&#xff0c;然后在虚拟机上安装ubuntu等linux平台。每次需要在linux中写代码&#xff0c;就打开ubuntu&#xff0c;然后在终端上用vim写代码&#xff0c;或者先编辑代码文本&…

css实现上下左右居中

css实现子盒子在父级盒子中上下左右居中 几种常用的上下左右居中方式 HTML代码部分 <div class"box"><img src"./img/77.jpeg" alt"" class"img"> </div>css部分 方式一 利用子绝父相和margin:auto实现 <sty…

内存管理 -----分段分页

分段 分段&#xff1a;程序的分段地址空间&#xff0c;分段寻址方案 两个问题 分段 &#xff1a;是更好分离和共享 左边是有序的逻辑地址&#xff0c;右边是无序的物理地址&#xff0c;然后需要有一种映射的关系&#xff08;段关联机制&#xff09; 各个程序的分配相应的地址…

Gin入门指南:从零开始快速掌握Go Web框架Gin

官网:https://gin-gonic.com/ GitHub:https://github.com/gin-gonic 了解 Gin Gin 是一个使用 Go 语言开发的 Web 框架,它非常轻量级且具有高性能。Gin 提供了快速构建 Web 应用程序所需的基本功能和丰富的中间件支持。 以下是 Gin 框架的一些特点和功能: 快速而高效:…

【简说八股】面试官:你知道什么是IOC么?

回答 Spring的IOC&#xff08;Inversion of Control&#xff0c;控制反转&#xff09;是Spring框架的核心特性之一。它通过将对象的创建和依赖关系的管理交给Spring容器来实现&#xff0c;降低了组件之间的耦合性&#xff0c;使得代码更加灵活、可维护。 在传统的开发模式中&…

Sora模型风口,普通人如何抓住-最新AI系统ChatGPT网站源码,AI绘画系统

一、前言说明 PandaAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧。已支持…

边缘计算与任务卸载基础知识

目录 边缘计算简介任务卸载简介参考文献 边缘计算简介 边缘计算是指利用靠近数据生成的网络边缘侧的设备&#xff08;如移动设备、基站、边缘服务器、边缘云等&#xff09;的计算能力和存储能力&#xff0c;使得数据和任务能够就近得到处理和执行。 一个典型的边缘计算系统为…

前端按钮动画

效果示例 代码示例 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevic…

OSCP靶场--Resourced

OSCP靶场–Resourced 考点(1.rpc枚举 2.crackmapexec密码喷洒&#xff0c;hash喷洒 3.ntds.dit system提取域hash 4.基于资源的约束委派攻击rbcd) 1.nmap扫描 ## ┌──(root㉿kali)-[~/Desktop] └─# nmap -sV -sC -p- 192.168.188.175 --min-rate 2000 Starting Nmap 7.9…