RabbitMQ生产者的可靠性

目录

MQ使用时会出现的问题

生产者的可靠性

1、生产者重连

2、生产者确认

3、数据持久化

交换机持久化

队列持久化

消息持久化

LazyQueue懒加载


MQ使用时会出现的问题

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

生产者的可靠性

1、生产者重连

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者yml文件添加配置开启重试机制

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

关掉RabbitMQ服务:

docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码 

2、生产者确认

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。 默认两种机制都是关闭状态,需要通过配置文件来开启。

 在生产者yml文件添加配置开启重试机制

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@Configuration
public class MqConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("触发return callback,");log.debug("交换机exchange: {}", returnedMessage.getExchange());log.debug("routingKey: {}", returnedMessage.getRoutingKey());log.debug("消息message: {}", returnedMessage.getMessage());log.debug("replyCode: {}", returnedMessage.getReplyCode());log.debug("replyText: {}", returnedMessage.getReplyText());}});}
}

定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

 /*** 生产者消息确认* @throws InterruptedException*/@Testpublic void testPublisherConfirm() throws InterruptedException {// 准备消息CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息回调失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confirm callback回执");if (result.isAck()){log.debug("发送消息成功,收到 ack!");}else {log.debug("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 发送消息rabbitTemplate.convertAndSend("mq.direct","blue","hello",cd);Thread.sleep(2000);}

 当路由不对时,提示没有这个路由

 当交换机不对时,提示没有这个交换机

总结:

SpringAMQP中生产者消息确认的几种返回值情况

1、消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK

2、临时消息投递到了MQ,并且入队成功,返回ACK

3、持久消息投递到了MQ,并且入队完成持久化,返回ACK

4、其它情况都会返回NACK,告知投递失败

如何处理生产者的确认消息?
1、生产者确认需要额外的网络和系统资源开销,尽量不要使用
2、如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
3、对于nack消息可以有限次数重试,依然失败则记录异常消息

3、数据持久化

MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

1、MQ宕机,内存中的消息会丢失
2、内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞小

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

 设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

 设置为Durable就是持久化模式,Transient就是临时模式。

消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

 @Testpublic void persistence() {// 队列名称String queueName = "simple.queue";// 消息//MessageDeliveryMode.NON_PERSISTENT 非持久化//MessageDeliveryMode.PERSISTENT 持久化Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

 消息写入磁盘

LazyQueue懒加载

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

 控制台配置Lazy模式

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

 通过bean声明

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

也可以基于注解来声明队列并设置为Lazy模式: 

更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。 可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

 也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/124758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jetson Xavier NX FFmpeg支持硬件编解码

最近在用Jetson Xavier NX板子做视频处理&#xff0c;但是CPU进行视频编解码&#xff0c;效率比较地下。 于是便考虑用硬解码来对视频进行处理。 通过jtop查看&#xff0c;发现板子是支持 NVENC硬件编解码的。 1、下载源码 因为需要对ffmpeg进行打补丁修改&#xff0c;因此需…

【c++|opencv】一、基础操作---2.图像信息获取

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 图像信息获取&#xff0c;roi 1. 图像信息获取 // 获取图像信息#include <iostream> #include <opencv2/opencv.hpp>using namespace cv; …

JVM与Java体系结构

目录 一、Java虚拟机整体架构祥图 二、Java代码执行过程详图 三、汇编语言、机器语言、高级语言关系 四、JVM的架构模型 五、JVM的生命周期 &#xff08;一&#xff09;虚拟机的启动 &#xff08;二&#xff09;虚拟机的执行 &#xff08;三&#xff09;虚拟机的退出 …

使用示例和应用程序全面了解高效数据管理的Golang MySQL数据库

Golang&#xff0c;也被称为Go&#xff0c;已经成为构建强大高性能应用程序的首选语言。在处理MySQL数据库时&#xff0c;Golang提供了一系列强大的库&#xff0c;简化了数据库交互并提高了效率。在本文中&#xff0c;我们将深入探讨一些最流行的Golang MySQL数据库库&#xff…

数据库管理-第113期 Oracle Exadata 04-硬件选择(20231020)

数据库管理-第113期 Oracle Exadata 04-硬件选择&#xff08;2023010290&#xff09; 本周没写文章&#xff0c;主要是因为到上海参加了Oracle CAB/PAB会议&#xff0c;这个放在后面再讲&#xff0c;本期讲一讲Exadata&#xff0c;尤其是存储节点的硬件选择及其对应的一些通用…

mac安装并使用wireshark

mac安装并使用wireshark 1 介绍 我们在日常开发过程中&#xff0c;遇到了棘手的问题时&#xff0c;免不了查看具体网络请求情况&#xff0c;这个时候就需要用到抓包工具。比较著名的抓包工具就属&#xff1a;wireshark、fildder。我这里主要介绍wireshark。 2 安装 以mac安装为…

C#,数值计算——分类与推理Svmpolykernel的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { public class Svmpolykernel : Svmgenkernel { public int n { get; set; } public double a { get; set; } public double b { get; set; } public double d { get; set; …

多线程---synchronized特性+原理

文章目录 synchronized特性synchronized原理锁升级/锁膨胀锁消除锁粗化 synchronized特性 互斥 当某个线程执行到某个对象的synchronized中时&#xff0c;其他线程如果也执行到同一个对象的synchronized就会阻塞等待。 进入synchronized修饰的代码块相当于加锁 退出synchronize…

IDEA MyBatisX插件介绍

一、前言 前几年写代码的时候&#xff0c;要一键生成DAO、XML、Entity基础代码会采用第三方工具&#xff0c;比如mybatis-generator-gui等&#xff0c;现在IDEA或Eclipse都有对应的插件&#xff0c;像IDEA中MyBatisX就是一个比较好用的插件。 二、MyBatisX安装配置使用 MyBa…

GO学习之 通道(nil Channel妙用)

GO系列 1、GO学习之Hello World 2、GO学习之入门语法 3、GO学习之切片操作 4、GO学习之 Map 操作 5、GO学习之 结构体 操作 6、GO学习之 通道(Channel) 7、GO学习之 多线程(goroutine) 8、GO学习之 函数(Function) 9、GO学习之 接口(Interface) 10、GO学习之 网络通信(Net/Htt…

Mac电脑Android Studio和VS Code配置Flutter开发环境(图文超详细)

一、安装Android Studio 官网地址&#xff1a; https://developer.android.google.cn/ 历史版本下载地址&#xff1a; https://developer.android.com/studio/archive?hlzh-cn 二、安装Xcode 到App Store下载安装最新版本&#xff0c;如果MacOS更新不到13.0以上就无法安装…

Go-Python-Java-C-LeetCode高分解法-第十二周合集

前言 本题解Go语言部分基于 LeetCode-Go 其他部分基于本人实践学习 个人题解GitHub连接&#xff1a;LeetCode-Go-Python-Java-C 欢迎订阅CSDN专栏&#xff0c;每日一题&#xff0c;和博主一起进步 LeetCode专栏 我搜集到了50道精选题&#xff0c;适合速成概览大部分常用算法 突…

消息中间件——RabbitMQ(一)Windows/Linux环境搭建(完整版)

前言 最近在学习消息中间件——RabbitMQ&#xff0c;打算把这个学习过程记录下来。此章主要介绍环境搭建。此次主要是单机搭建&#xff08;条件有限&#xff09;&#xff0c;包括在Windows、Linux环境下的搭建&#xff0c;以及RabbitMQ的监控平台搭建。 环境准备 在搭建Rabb…

机器人的触发条件有什么区别,如何巧妙的使用

简介​ 维格机器人触发条件,分为3个,分别是: 有新表单提交时、有记录满足条件时、有新的记录创建时 。 看似3个,其实是能够满足我们非常多的使用场景。 本篇将先介绍3个条件的触发条件,然后再列举一些复杂的触发条件如何用现有的触发条件来满足 注意: 维格机器人所有的…

Linux(Centos7)防火墙端口操作记录

1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误&#xff0c;并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…

除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂…

代码随想录打卡第五十五天|● 300.最长递增子序列 ● 674. 最长连续递增序列 ● 718. 最长重复子数组

300.最长递增子序列 **题目&#xff1a;**给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0…

深度学习| U-Net网络

U-Net网络 基础知识和CNN的关系反卷积ReLU激活函数 U-Net入门U-Net网络结构图为什么需要跳跃连接U-Net的输入U-Net的应用 基础知识 理解U-Net网络结构需要相关知识点。 和CNN的关系 U-Net也是CNN&#xff08;Convolutional Neural Network&#xff0c;卷积神经网络&#xff…

Flutter PopupMenuButton下拉菜单

下拉菜单是移动应用交互中一种常见的交互方式,可以使用下拉列表来展示多个内容标签,实现页面引导的作用。在Flutter开发中,实现下拉弹框主要有两种方式,一种是继承Dialog组件使用自定义布局的方式实现,另一种则是使用官方的PopupMenuButton组件进行实现。 如果没有特殊的…

Android任务栈和启动模式

Andrcid中的任务栈是一种用来存放Activity实倒的容器。任务最大的特点就是先进后出&#xff0c;它主要有两个基本操作&#xff0c;分别是压栈和出栈。通常Andaid应用程序都有一个任务栈&#xff0c;每打开一个Activity时&#xff0c;该Activity就会被压入任务栈。每销毁一个Act…