RabbitMQ消息可靠性问题及解决

说明:在RabbitMQ消息传递过程中,有以下问题:

  • 消息没发到交换机

  • 消息没发到队列

  • MQ宕机,消息在队列中丢失

  • 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除

针对以上问题,提供以下解决方案:

  • 消息确认:确认消息是否发送到交换机、队列;

  • 消息持久化:持久化消息,以防MQ宕机造成消息丢失;

  • 消费者消息确认:确认消费者已正确消费消息,才把消息从队列中删除;

在这里插入图片描述

消息确认

可以使用Rabbit MQ提供的publisher confirm机制来避免消息发送到MQ过程丢失。具体实现是,publisher-confirm(发送者确定)、publisher-return(发送者回执),前者判断消息到交换机、后者判断交换机到队列


publisher-confirm(发送者确定)

  • 消息成功投递到交换机,返回ack;

  • 消息未投递到交换机,返回nack;

publisher-return(发送者回执)

  • 消息投递到交换机,但没有到队列,返回ack,即失败原因;

在生产者端添加配置

spring:rabbitmq:# rabbitMQ相关配置host: 118.178.228.175port: 5672username: rootpassword: 123456virtual-host: /# 开启生产者确认,correlated为异步,simple为同步publisher-confirm-type: correlated# 开启publish-return功能,基于callback机制publisher-returns: true# 开启消息路由失败的策略,true是调用returnCallback方法,false是丢弃消息template:mandatory: true

publisher-return(发送者回执)代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 发送者回执实现*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 回执信息* @param message 信息对象* @param replyCode 回执码* @param replyText 回执内容* @param exchange 交换机* @param routingKey 路由键值*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息发送队列失败=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);}});}
}

publisher-confirm(发送者确定)代码

    @Testpublic void sendExceptionMessage() {// 路由键值String routingKey = "exception";// 消息String message = "This is a exception message";// 给消息设置一个唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 编写confirmCallBack回调函数correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {if (confirm.isAck()) {// 消息发送交换机成功log.debug("消息送达至交换机成功");} else {// 消息发送交换机失败,打印消息log.error("消息未能送达至交换机,ID{},原因{}", correlationData.getId(), confirm.getReason());}}}, new FailureCallback() {// 消息发送交换机异常@Overridepublic void onFailure(Throwable ex) {log.error("消息发送交换机异常,ID:{},原因{}", correlationData.getId(), ex.getMessage());}});rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);}

测试,设置一个不存在的routingKey,被发送者确认(publisher-confirm)捕获到;

// 路由键值
String routingKey = "null";

在这里插入图片描述

设置一个不存在的路由,被发送者回执(publisher-return)捕获到;

rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);

在这里插入图片描述

消息持久化

消息持久化,是指把消息保存到磁盘中,在RabbitMQ宕机或者关机时,重启后,消息仍可以保存下来。消息依赖于交换机、队列,因此持久化消息,同时也需要持久化交换机、队列。

创建一个持久化的交换机、队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息持久化*/
@Configuration
public class DurableConfig {/*** 交换机持久化* @return*/@Beanpublic DirectExchange directExchange(){// 三个参数分别是:交换机名、是否持久化、没有队列与之绑定时是否自动删除return new DirectExchange("durable.direct",true,false);}/*** 队列持久化* @return*/@Beanpublic Queue durableQueue(){return QueueBuilder.durable("durable.queue").build();}/*** 交换机与队列绑定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");}}

发送一个持久化的消息

    /*** 发送持久化消息*/@Testpublic void sendDurableMessage() {String routingKey = "durable";CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))// 设置该消息未持久化消息.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);}

打开RabbitMQ管理平台,可以看到"delivery_mode: 2",表示该消息是持久化消息

在这里插入图片描述

(源码:MessageDeliveryMode类)
在这里插入图片描述

实际上,交换机、队列默认就是持久化的(durable: true),所以不用特意设置;

在这里插入图片描述

消费者消息确认

介绍

消费者消息确认,是为了确保消费者已经消费了消息,才让MQ把该消息删除;

可通过在消费者的配置文件中增加下面这行配置实现,备选项有以下三个:

  • none:关闭ack,表示不做处理,消息发给消费者之后就立即被删除;

  • auto:自动ack,表示由Spring检测代码是否出现异常,出现异常则保留消息,没有异常则删除消息;

  • manual:手动ack,可根据业务手动编写代码,返回ack;

spring:rabbitmq:listener:simple:# 设置消息确认模式acknowledge-mode: none

测试:none

可编写代码测试,下面是生产者代码,发送消息

    /*** 发送普通消息*/@Testpublic void sendNoneMessage() {String directName = "none.direct";String routingKey = "none";String message = "This is a test message";rabbitTemplate.convertAndSend(directName, routingKey, message);}

消费者代码有问题,未能正常消费消息

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "none.queue"),exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),key = {"none"}))public void getNoneMessage(String normalMessage){System.out.println(1/0);System.out.println("normalMessage = " + normalMessage);}

测试结果,程序报错,消息也没能保留下来

在这里插入图片描述
在这里插入图片描述

测试:auto

更改设置为:auto,重试

在这里插入图片描述

但是消息未被删除

在这里插入图片描述

这种情况,在实际开发中是不能允许,可以通过更改消费失败的重试机制解决。

消费失败重试机制

方法一:设置retry

因为消息被消费失败,消息会一直循环重试,无限循环,导致mq的消息处理飙升,带来不必要的压力,这种情况可以通过在消费者端添加以下配置,限制失败重试的条件来解决:

spring:rabbitmq:listener:simple:retry:# 开启消费者失败重试enabled: true# 初次失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,即后一次等待的时间是前一次等待时间的多少倍multiplier: 1# 最多重试次数max-attempts: 3# true 无状态 false 有状态 如果业务中包含事务 改为falsestateless: true

开启后,控制台可以发现,信息不回一直循环打印,而是打印数条后停止,日志信息中有提示“Retry Policy Exhausted”(重试策略已用尽)

在这里插入图片描述
这种通过配置的方式,并不会重试数次后仍保留消息,而是重试数次仍失败,随即丢弃消息,消息丢失,这在实际开发中也是不能被允许的。

方法二:路由存储消息

因此,可以通过下面这个方法,把消费失败的消息,通过交换机路由到另外的队列中存储起来,等业务代码被修复,再路由回来消费。

在这里插入图片描述

代码如下

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 错误消息队列*/
@Configuration
public class ErrorMessageQueueConfig {/*** 创建一个交换机,用于路由消费失败的消息* @return*/@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}/*** 创建一个队列,用于存储消费失败的消息* @return*/@Beanpublic Queue errorQueue(){return new Queue("error.queue");}/*** 绑定* @return*/@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 路由,当消费失败时,把消费失败的消息路由到此队列中,路由key为"error"* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

可以看到,消息消费失败后并没有被丢失,而是路由到错误队列中存储了起来。因为错误队列没有设置RabbitListener,所以可以存储消息,等带代码问题被排查出来后,可以再针对该队列设置监听方法,消费这部分错误的消息。

在这里插入图片描述

另外,值得一提的是,消费者这边的控制台会报一个警告,提示路由密钥错误。我们可以理解,在RabbitMQ底层,会把消费失败了的消息,统一路由到一个地方去,而我们这种手动把消费失败的消息路由到自定义的队列中的方式,打破了这种“默认的规则”,所以报了一个这样的警告。这种警告是在可控范围内的。

在这里插入图片描述

总结

RabbitMQ发送消息,为了确保消息的可靠性,保证消息能被交换机、队列收到,消息能被正常消费,而不会因消费失败而丢失,提供了对应的一系列方法,并且最后还提供了两种消费失败重试方法,优化了消费过程,非常Nice。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/6247.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32(HAL库)驱动AD8232心率传感器

目录 1、简介 2、CubeMX初始化配置 2.1 基础配置 2.1.1 SYS配置 2.1.2 RCC配置 2.2 ADC外设配置 2.3 串口外设配置 2.4 GPIO配置 2.5 项目生成 3、KEIL端程序整合 3.1 串口重映射 3.2 ADC数据采集 3.3 主函数代码整合 4 硬件连接 5 效果展示 1、简介 本文通过STM32…

Linux文件处理命令

目录&#xff1a; linux系统与shell环境准备linux常用命令之文件处理Linux系统登录与文件操作 1.linux系统与shell环境准备 Linux 系统简介&#xff1a; Linux 内核最初只是由芬兰人林纳斯托瓦兹&#xff08;Linus Torvalds&#xff09;在赫尔辛基大学上学时出于个人爱好而…

分布式光伏并网防孤岛保护装置AM5SE-IS

分布式光伏并网防孤岛保护装置AM5SE-IS 应用场景 防孤岛原理&#xff1a;防孤岛保护装置检测到并网点有逆功率、频率突变、 等异常数据时&#xff0c;即发生孤岛现象时&#xff0c;装置可配合断路器快速切除并网点&#xff0c;使本站与电网侧快速脱离&#xff0c;保证整个电站…

blender 纹理材质

添加材质纹理需要哪五个节点&#xff1f; 映射节点&#xff1a;调整纹理的位置、大小、缩放&#xff1b; 纹理坐标&#xff1a;怎么映射&#xff0c;以什么方式去映射这张图&#xff0c;换句话说就是如何将 2D 的图片映射到 3D 的图像上&#xff1b;纹理坐标就是以什么坐标方式…

Flutter系列(3):如何将Flutter项目打包成Android安装包

将Flutter项目打包成Android安装包&#xff0c;主要步骤如下&#xff1a; 一、生成key 进入jdk的bin目录下&#xff1a; keytool -genkey -v -keystore D:\key.jks -keyalg RSA -keysize 2048 -validity 10000 -alias key 大概会有密码等参数&#xff0c;根据自身需要&#x…

Mysql数据库

目录 1.数据库 2.数据库分类与常见的数据库 3.SQL 3.1.DDL 数据库操作 表操作 3.2.DML 3.3.DQL 3.4.DCL 管理用户 权限控制 4.Mysql常用的数据类型 1.数据库 数据库:是“按照数据结构来组织、存储和管理数据的仓库”。是一个长期存储在计算机内的、有组织的、可共…

Spring MVC异常处理【单个控制异常处理器、全局异常处理器、自定义异常处理器】

目录 一、单个控制器异常处理 1.1 控制器方法 1.2 编写出错页面 1.3 测试结果 二、全局异常处理 2.1 一个有异常的控制器类 2.2 全局异常处理器类 2.3 测试结果 三、自定义异常处理器 3.1 自定义异常处理器 3.2 测试结果 往期专栏&文章相关导读 1. Maven系列…

只需3步,使用Stable Diffusion无限生产AI数字人视频

效果演示 先看效果&#xff0c;感兴趣的可以继续读下去。 没有找到可以上传视频的地方&#xff0c;大家打开这个网盘链接观看&#xff1a;https://www.aliyundrive.com/s/CRBm5NL3xAE 基本方法 搞一张照片&#xff0c;搞一段语音&#xff0c;合成照片和语音&#xff0c;同…

SpringBoot+jasypt-spring-boot-starter实现配置文件明文加密

1.使用环境 springboot:2.1.4.RELEASE JDK:8 jasypt-spring-boot-starter:3.0.2 2.引入依赖 !-- 配置文件加密 --> <dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifactId><ver…

uni-app:请求后端数据uni.request

完整代码&#xff1a; onLoad() {uni.request({url: getApp().globalData.position Produce/select_employee,data: {username: getApp().globalData.username,},method: POST,dataType: json,success: res > {this.employee_name res.data.info.employee_name;// consol…

sketch如何在线打开?有没有什么软件可以辅助

Sketch 在线打开的方法有哪些&#xff1f;这个问题和我之前回答过的「Sketch 可以在线编辑吗&#xff1f;」是一样的答案&#xff0c;没有。很遗憾&#xff0c;Sketch 没有在线打开的方法&#xff0c;Sketch 也做不到可以在线编辑。那么&#xff0c;那些广告里出现的设计软件工…

数学建模学习(4):TOPSIS 综合评价模型及编程实战

一、数据总览 需求&#xff1a;我们需要对各个银行进行评价&#xff0c;A-G为银行的各个指标&#xff0c;下面是银行的数据&#xff1a; 二、代码逐行实现 清空代码和变量的指令 clear;clc; 层次分析法 每一行代表一个对象的指标评分 p [8,7,6,8;7,8,8,7];%每一行代表一个…

Docker 基本管理

目录 一、Docker 概述 二、为什么容器越来越受欢迎&#xff1f; 三、Docker 与 虚拟机 的区别 四、 Linux Namespace的6大类型 五、Docker 核心概念 1.镜像 2.容器 3.仓库 六、安装 Docker 1.安装依赖包 2.设置阿里云镜像源&#xff0c;安装Docker 3.查看 docker 版…

PostgreSQL 的事务管理和并发控制机制解析

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

BEVPoolv2 A Cutting-edge Implementation of BEVDet Toward Deployment 论文学习

Github Repo: https://github.com/HuangJunJie2017/BEVDet/tree/dev2.0 Arxiv Paper: https://arxiv.org/abs/2211.17111 1. 解决了什么问题&#xff1f; 多相机 3D 目标检测是自动驾驶领域的基本任务&#xff0c;受到学术界和工业界的大量关注。Lift-Splat-Shoot view trans…

MFC第十九天 记事本项目功能完善和开发、CTabCtrl类与分页模式开发

文章目录 记事本项目功能完善和开发查找界面的记忆功能 、使用F3快捷键自动向下查找功能 的开发单次替换的算法研究 CFileDialog 构造函数详解 应用另存为时选择编码 &#xff08;三种方案&#xff09;vista 样式文件对话框 bVistaStyle 为TRUE时 1pch.hCApp NotePad.cpp 对编码…

视频对比工具(基于python+ffmpeg+airtest实现视频抽帧比较工具)

VideoDiff&#xff1a;基于ffmpeg&#xff0c;实现视频抽帧比较工具 使用场景&#xff1a;在视频渲染模块发生迭代&#xff0c;快速回归测试其产出的视频是否存在问题&#xff0c;从而节省人工回归成本 源码地址&#xff1a;https://github.com/jiangliuer32/VideoDiff 原理图…

centos7中MySQL备份还原策略

目录 一、直接拷贝数据库文件 1.1在shangke主机上停止服务并且打包压缩数据库文件 1.2 在shangke主机上把数据库文件传输到localhost主机上(ip为192.168.33.157) 1.3在localhost主机上停止服务&#xff0c;解压数据库文件 1.4 在localhost主机上开启服务 1.5 测试 二、m…

JVM-Java虚拟机

JVM——Java虚拟机&#xff0c;是Java实现平台无关性的基石。 基本概念&#xff1a;JVM 是可运行 Java 代码的假想计算机 &#xff0c;包括一套字节码指令集、一组寄存器、一个栈、 一个垃圾回收&#xff0c;堆 和 一个存储方法域。JVM 是运行在操作系统之上的&#xff0c;它与…

【Docker】基本概念和底层技术

Docker 1 什么是 Docker Docker 是一种容器技术。只要开发者将其应用和依赖包进行打包&#xff0c;放入到一个轻量级的、可移植的容器中&#xff0c;就能发布到任何流行的 linux 机器上。 Docker 的要素&#xff1a; image 镜像&#xff1a;静态的container 容器&#xff1a…