RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。

第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

RabbitMQ消息丢失解决方案 

针对生产者 

1、开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务  
channel.txSelect();  
try {  // 这里发送消息  
} catch (Exception e) {  channel.txRollback(); 
// 这里再次重发这条消息
}
// 提交事务  
channel.txCommit();  

缺点:

RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

2、使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

//开启confirm  
channel.confirm();  
//发送成功回调  
public void ack(String messageId){
}
// 发送失败回调  
public void nack(String messageId){  //重发该消息  
}

针对RabbitMQ

RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。

解决办法:

设置持久化有两个关注点:

第一个是创建 queue 的时候将其设置为持久化

第二个是发送消息的时候将消息设置为持久化

第一种:元配置持久化

在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。

// 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());

第二种:消息持久化

发送消息的时候将消息的deliveryMode设置为2,或者是

MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

下面是采用springboot整合的rabbitTemplate来实现消息发送 

MessageProperties props = new MessageProperties();props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式Message message = new Message(payload, props);rabbitTemplate.send(exchange, routingKey, message);// 或者直接传递Message对象,其中包含已设置持久化的MessagePropertiesrabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));// 或者使用MessagePostProcessor接口动态设置消息属性rabbitTemplate.convertAndSend(exchange,routingKey,messageBody,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});

下面是采用SpringBoot中通过new来创建连接来实现消息发送

 Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();if (Objects.nonNull(connection)) {log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");Channel channel = connection.createChannel();//消息持久化到磁盘,避免因为突然断电或重启导致消息丢失channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}

 消费者确认机制

如果上述生产端消息队列都正确投递,那么问题出现在消费端是否可以正确消费?

消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。

默认情况下,以下3种原因导致消息丢失:

1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;

2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;

3、 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。

应对方案:

  • 将自动ack机制改为手动ack机制。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收消息,业务处理//设置手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//发生异常时,可以选择重新发送消息或进行错误处理// 例如,可以选择负确认(nack),让消息重回队列// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

消息补偿机制 

以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。

如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:

  • 生产端发送消息;

  • 确认失败,将消息保存到数据库中,并设置初始状态0;

  • 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);

  • 重发消息,可多次;

  • 重发成功,更新数据库:status=1;

  • 超过固定次数重发仍然失败,人工干预。

标注:

超过最大失败次数后,对于无法被正常消费的消息可移入死信队列

  • 可人工干预手动排查

  • 也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52063.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《深入浅出WPF》读书笔记.4名称空间详解

《深入浅出WPF》读书笔记.4名称空间详解 背景 主要讲明名称空间概念,可以理解为命名空间的引用。 xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml" 👆如x可以理解为一些列命名空间的引用。 不一一列举,只讲几个特殊的…

Java基础——注释

在开发中注释是必不可少的,帮助我们更好的标记阅读代码,下面介绍几种常用的注释方式。 一、注释种类 1. 单行注释 使用//一行代码来进行注释,只能注释一行内容 2. 多行注释 使用斜杠星号的方式 /*注释多行代码*/,注释多行代…

2024最新急速暴走小米运动自动刷步卡密版PHP源码

2023最新发布的急速暴走小米运动自动刷步卡密版PHP源码。该源码使用PHPTP6layui-Mini开发,旨在实现小米运动自动刷步功能。该程序支持通过微信修改步数,并采用卡密认证方式,用户只需提交提供的卡密,即可每日自助修改步数。 需要注…

UDP网络攻击

UDP(User Datagram Protocol)作为一种无连接的网络传输协议,以其速度快和资源消耗小的特点,在多种网络服务中发挥着重要作用,UDP的无连接特性也使其成为DDoS攻击的优选协议。 UDP攻击概念 UDP攻击是一种网络攻击手段…

USB HiFi 声卡

USB HiFi声卡就像是你电脑上的一对超级耳朵,它能将你从普通音质中拯救出来,带你进入一个音乐细节丰富的世界。无论你是音乐迷、游戏玩家还是电影发烧友,这些声卡都是你的音频升级好帮手。设计师在设计USB HiFi声卡的过程,就像是调…

Linux虚拟机磁盘管理-添加磁盘

添加磁盘--添加前请选关闭虚拟机 添加步骤: 1.编辑虚拟机设置 2.选择硬盘 3.选择SCSI 4.创建新虚拟磁盘 5.设置磁盘大小 6.点击完成 开机的时候会去读取有几块硬盘,总共我们是有4块硬盘,sda\sdb\sdc\sdd 注意:新加的硬盘实际我们…

华为OD机试(C卷,100分)- 游戏分组

题目描述 部门准备举办一场王者荣耀表演赛,有 10 名游戏爱好者参与,分为两队,每队 5 人。 每位参与者都有一个评分,代表着他的游戏水平。为了表演赛尽可能精彩,我们需要把 10 名参赛者分为示例尽量相近的两队。 一队的实力可以表示为这一队 5 名队员的评分总和。 现在给你…

鸿萌数据恢复服务:SQL Server 中的“PFS 可用空间信息不正确”错误

天津鸿萌科贸发展有限公司从事数据安全服务二十余年,致力于为各领域客户提供专业的数据恢复、数据备份、网络及终端数据安全等解决方案与服务。 同时,鸿萌是国际主流数据恢复软件(Stellar、UFS、R-Studio、ReclaiMe Pro 等)的授权代理商,为专…

爬虫案例3——爬取彩票双色球数据

简介:个人学习分享,如有错误,欢迎批评指正 任务:从500彩票网中爬取双色球数据 目标网页地址:https://datachart.500.com/ssq/ 一、思路和过程 目标网页具体内容如下: ​​​​​ 我们的任务是将上图中…

使用AWS Lambda轻松开启Amazon Rekognition之旅

这是本系列文章的第一篇,旨在通过动手实践,帮助大家学习亚马逊云科技的生成式AI相关技能。通过这些文章,大家将掌握如何利用亚马逊云科技的各类服务来应用AI技术。 那么让我们开始今天的内容吧! 介绍 什么是Amazon Rekognition&…

前端宝典之五:React源码解析之深度剖析Diff算法

本文主要针对React源码进行解析,内容有: 1、Diff算法原理、两次遍历 2、Diff瓶颈及限制 3、Diff更新之单节点和多节点原理 一、Diff源码解析 以下是关于 React Diff 算法的详细解析及实例: 1、React Diff 算法的基本概念和重要性 1.1 概念…

【LeetCode每日一题】——301.删除无效的括号

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 广度优先搜索 二【题目难度】 困难 三【题目编号】 301.删除无效的括号 四【题目描述】 给…

ROS 2中,CMakeList.txt常见语法

在ROS 2中,CMakeList.txt 文件扮演着配置和管理构建过程的重要角色。这个文件遵循CMake的语法,用于定义如何编译和链接源代码。下面是一些在ROS 2项目CMakeList.txt文件中常见的语法和用法。 1. 基本结构和命令 cmake_minimum_required(VERSION )&…

【设计模式】装饰器模式和适配模式

装饰器模式 装饰器模式能够很好的对已有功能进行拓展,这样不会更改原有的代码,对其他的业务产生影响,这方便我们在较少的改动下对软件功能进行拓展。 类似于 router 的前置守卫和后置守卫。 Function.prototype.before function (beforeFn)…

【C++】————智能指针

作者主页: 作者主页 本篇博客专栏:C 创作时间 :2024年8月20日 一,什么是智能指针 在C中没有垃圾回收机制,必须自己释放分配的内存,否则就会造成内存泄露。解决这个问题最有效的方法是使用智能指针&…

异常信息转储预研笔记-堆栈地址转换

addr2line命令 addr2line -e <exec> <addr> -f | xargs cfilt<exec>: 进程名 <addr>&#xff1a;堆栈地址eg&#xff1a; addr2line -e backtrace 0x4009d2 -f | xargs cfilt此方案测试了&#xff0c;不知道什么原因只显示?? ??:0 &#xff0c;而…

Java面试题--分布式锁

分布式锁 你说一下什么是分布式锁 分布式锁是在分布式/集群环境中解决多线程并发造成的一系列数据安全问题.所用到的锁就是分布式锁&#xff0c;这种锁需要被多个应用共享才可以&#xff0c;通常使用Redis和zookeeper来实现。 分布式锁有哪些解决方案 常用的三种方案 基于…

Spring模块详解Ⅱ

目录 Spring Beans模块详解1. 什么是 Bean?2. Spring Bean的配置方式2.1 基于 XML 配置例子&#xff1a; 2.2 基于注解配置例子&#xff1a; 2.3 基于 Java 配置&#xff08;JavaConfig&#xff09;例子&#xff1a; 3. Bean 的生命周期生命周期回调的例子&#xff1a; 4. Bea…

Oracle+ASM+High冗余详解及空间计算

Oracle ASM&#xff08;Automatic Storage Management&#xff09;的High冗余模式是一种提供高度数据保护的策略&#xff0c;它通过创建多个数据副本来确保数据的可用性和安全性。 以下是关于Oracle ASM High冗余的详细解释&#xff1a; 一、High冗余的特点 1.数据冗余度 在Hi…

极速闪存启动:SD与SPI模式的智能初始化指南

最近很多客户朋友在询问我们 CS 创世 SD NAND 能不能使用 SPI 接口&#xff0c;两者使用起来有何区别&#xff0c;下面为大家详细解答。 SD MODE: CS 创世 SD NAND 支持 SD 模式和 SPI 模式&#xff0c;SD NAND 默认为 SD 模式&#xff0c;上电后&#xff0c;其初始化过程如下…