RabbitMQ高级

文章目录

  • 一.消息可靠性
    • 1.生产者消息确认
    • 2.消息持久化
    • 3.消费者确认
    • 4.消费者失败重试


MQ的一些常见问题

1.消息可靠性问题:如何确保发送的消息至少被消费一次

2.延迟消息问题:如何实现消息的延迟投递

3.高可用问题:如何避免单点的MQ故障而导致的不可用问题

4.消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

一.消息可靠性

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • -发送时丢失:

    • 生产者发送的消息未送达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接收到消息后未消费就宕机

1.生产者消息确认

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack

  • publisher-return,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

在这里插入图片描述

简单来说:在publisher-confirm下的nack是消息投递到交换机失败返回的信息;在publisher-confirm下的ack是消息成功到达了消费者;在publisher-return下的ack是消息到达了交换机但是路由失败的返回信息


SpringAMQP实现生产者确认

一.想要实现生产者消息确认机制,需要在配置文件编写开启代码,即在微服务的application.yml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated publisher-returns: true template:mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型

    • simple:同步等待confirm结果,直到超时

    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback(推荐使用)

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

二.配置ReturnCallback

ReturnCallback是消息到达交换机但是没有成功进行路由的回调函数(作用于全局)

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString());});}
}

对于代码中的ApplicationContext是负责管理和组织Spring应用中的各个组件,如bean、配置文件等.

通过实现ApplicationContextAware这个接口,bean可以获取对ApplicationContext的引用,并因此获得访问应用上下文中的其他bean、资源和容器特性的能力,所以说实现了接口等同于获取到了bean容器,就可以获取到 rabbitTemplate并进行设置唯一的ReturnCallback

在回调函数中,消息路由失败会返回很多信息,其中使用路由键,消息的交换机的名称可以实现重发消息

三.在生产者类中发送消息并同时实现ConfirmCallback

ConfirmCallback同样是回调函数,与ReturnCallback不同的是ConfirmCallback可以创建多次

ConfirmCallback是对消息还没有进入到交换机就丢失的一种消息返回策略,当丢失后,执行回调函数并可以记录消息的失败原因和UUID,成功也是同理

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {// 消息体String message = "hello, spring amqp!";// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){ // ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 发送消息rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

需要注意的是:在手动添加交换机的过程中,想要使用通配符"#"的话,应该设置交换机为topic类型!

总结:

SpringAMQP中处理消息确认的几种情况:

  • publisher-comfirm:

    • 消息成功发送到exchange,返回ack

    • 消息发送失败,没有到达交换机,返回nack

    • 消息发送过程中出现异常,没有收到回执

  • 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback


2.消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

1.交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false);
}

2.队列持久化:

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 .build();

但在springamqp中,其实已经在声明交换机和队列的时候将其持久化了,发送消息的方法convertAndSend()内部也将消息做了持久化,了解消息持久化的设置方法可以将以后不是很重要的交换机,队列,消息设置为非持久化


3.消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto(推荐):自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,抛出异常后,会不断重新发送消息即失败重试机制

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

配置方式是修改application.yml文件,添加下面配置:

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

4.消费者失败重试

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

第三种处理策略是将重试失败的消息重新投递到指定的交换机,然后在投递到指定的队列中,形成了一个交换机-队列的错误消息存放容器,在这个容器中存放不仅有错误消息,还有错误消息头的异常栈信息

在这里插入图片描述

实现方式:

1.定义接收失败消息的交换机、队列及其绑定关系:

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true); 
}
@Bean
public Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

2.定义RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

总结:如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/611222.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

了解不同方式导入导出的速度之快

目录 一、用工具导出导入 Navicat(速度慢) 1.1、导入: 共耗时: 1.2、导出表 共耗时: 二、用命令语句导出导入 2.1、mysqldump速度快 导出表数据和表结构 共耗时: 只导出表结构 导入 共耗时&…

【前沿技术杂谈:ChatGPT】ChatGPT——热潮背后的反思

【前沿技术杂谈:ChatGPT】ChatGPT——热潮背后的反思 缘起:无中生有,涅槃重生人工智能技术人工智能的发展史无中生有内容自动生成技术的发展代表企业OpenAI-GPT系列技术的发展历程ChatGPT新特点 热潮:万众瞩目,群雄逐鹿…

Super关键字

与this关键字用法相同,但super关键字指的是父类的对象 我们常用super.来调用父类对象的属性或者方法 super关键字出来调用父亲的属性还可以调用父亲的方法,方式也是super. super() super()表示调用父类的无参构造 super(参数列表)表示调用父类的有参…

Java中的网络编程

文章目录 网络基础知识IP 地址端口协议 Java 中网络编程InetAddress(静态类)UDP 通信原理UDP 发送数据步骤UDP 接收数据步骤UDP 发送接收案例 TCP 通信原理TCP 发送数据步骤TCP 接收数据步骤TCP 发送接收案例 网络基础知识 概述:在网络通信协…

认识Linux指令之与时间相关的指令

01.date命令 date 指定格式显示时间: date %Y:%m:%d date 用法:date [OPTION]... [FORMAT] 1.在显示方面 在显示方面,使用者可以设定欲显示的格式,格式设定为一个加号后接数个标记,其中常用的标记列表如下 %H : …

【揭秘APT攻击】——内网渗透实战攻略,带你领略网络安全的绝密世界!

🌈个人主页: Aileen_0v0 🔥热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 💫个人格言:"没有罗马,那就自己创造罗马~" 目录 介绍 什么是内网? 什么是内网渗透? 内网渗透的目的: 内网…

天鹅目标检测数据集VOC格式280张

天鹅,一种优雅而美丽的鸟类,以其洁白的羽毛、优美的身姿和动人的歌声而闻名。 天鹅属于鸟纲、鸭科,是一种大型水禽。它们的羽毛通常是白色、黑色或灰色,非常光滑且富有光泽。天鹅的头部和颈部非常细长,呈现出优雅的曲…

烟雾识别摄像机

烟雾识别摄像机是一种具有智能识别功能的监控设备,它能够通过图像识别技术检测和识别烟雾,提供实时监测和报警功能。这种摄像机通常应用于各种场所,如工厂、仓库、办公楼、酒店等,起到了重要的安全监测作用。 烟雾识别摄像机的工作…

新手练习项目 4:简易2048游戏的实现(C++)

名人说:莫听穿林打叶声,何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder) 目录 一、效果图二、代码(带注释)三、说明 一、效果图 二、代码(带…

光伏方案设计有什么注意点?

光伏方案设计是实现光伏发电系统高效运行的关键环节。在进行光伏方案设计时,有几个重要的注意点需要特别关注。 首先,要充分考虑地理位置和气候条件。不同地区的日照时数、太阳辐射强度和日照角度都有所不同,这些因素直接影响光伏发电系统的发…

FS【1】:SSP

文章目录 前言1. Abstract2. Introduction2.1. Motivation2.1.1. Few-shot Segmentation (FSS) Task2.1.2. Few-shot Segmentation (FSS) Problem 2.2. Contribution 3. Methods3.1. Motivation3.2. Overview of the architecture4.3. Self-support Prototype4.4. Adaptive Sel…

C++:多态究竟是什么?为何能成为面向对象的重要手段之一?

C:多态究竟是什么?为何能成为面向对象的重要手段之一? 前言一、多态的概念二、多态的定义及实现2.1 多态的构成条件2. 2 虚函数2.3 虚函数的重写2.3.1 虚函数重写的例外1:协变(基类与派生类虚函数返回值类型不同)2.3.2 虚函数重写…

领域驱动模型之各层实体严格分层处理

为什么要分层处理呢? 在领域驱动模型中,分为应用层(application)、领域层(domain)、基础设施层(infrastructure)。各层只能处理和访问自己所属层的 entity 或者 dto 对象&#xff0…

基于ssm的理财通的设计与实现+jsp论文

摘 要 在如今社会上,关于信息上面的处理,没有任何一个企业或者个人会忽视,如何让信息急速传递,并且归档储存查询,采用之前的纸张记录模式已经不符合当前使用要求了。所以,对理财信息管理的提升&#xff0c…

针对大规模服务日志敏感信息的长效治理实践

文章目录 1 背景2 目标与措施3 实施3.1 脱敏工具类3.2 JSON脱敏3.3 APT自动脱敏3.3.1 本地缓存问题3.3.2 JDK序列化问题 3.4 弃用方案 4 规划5 总结 1 背景 近年来,国家采取了多项重要举措来加强个人数据保护,包括实施《中华人民共和国网络安全法》和《…

TypeError: Cannot read properties of undefined (reading ‘namespace‘)

项目场景: 背景: Java 项目中使用 activi 流程引擎, 创建一个带有排他网关的 申请审核流程, 创建之后 查看 流程图出现 如下所示的 错误信息。 前端页面 不显示 任何 流程图信息。 问题描述 问题: 例如&#xff1…

鸿蒙系列--属性动画

一、定义 当组件的通用属性发生改变时而产生的属性渐变效果 说明: 当组件的通用属性发生改变时,组件状态由初始状态逐渐变为结束状态的过程中,会创建多个连续的中间状态,逐帧播放后,就会形成动画 二、创建 给组件(如…

SCS模型(径流曲线法)概述

目录 1.介绍:2.计算公式:参考文献:小结: 1.介绍: SCS模型(径流曲线法)是由美国农业部水土保持局(Soil Conservation Service) 基于经验提出,最初用于预测在农业用地小型流域降雨所累…

【算法每日一练]-dfs (保姆级教程 篇9) #俄罗斯方块 #ABC Puzzle #lnc的工资

目录 今日知识点: 二维图形的状态压缩,存下所有的合法状态然后暴力遍历 dfs的优化剪枝 二项式定理 俄罗斯方块 ABC Puzzle lnc的工资 俄罗斯方块 322D 题意:在4*4方格中分别给出3个俄罗斯方块,问是否可以经过旋转&#xf…

C/C++动态内存管理

文章目录 前言1.C/C内存分布2.C语言中动态内存管理方式:malloc/calloc/realloc/free3.C内存管理方式3.1 new/delete操作内置类型3.2 new和delete操作自定义类型 4. operator new与operator delete函数4.1 operator new与operator delete函数 5. new和delete的实现原…