RabbitMQ - 如保证消息的可靠性?

目录

一、消息可靠性

1.1、生产者消息确认(生产者角度)

1.1.1、理论

1.1.2、实践

1.2、消息持久化(消息角度)

1.2.1、理论

1.3、消费者消息确认(消费者角度)

1.3.1、理论

1.3.2、实践

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论


一、消息可靠性


1.1、生产者消息确认(生产者角度)

1.1.1、理论

在生产者这边,RabbitMQ 提供了 消息确认机制 来确保生产者的消息到达队列。

具体的,生产者将消息发送给 MQ 之后,会返回一个结果给生产者,表示消息是否处理成功,具体有以下两种响应:

  1. publish-confirm 响应
    1. 消息成功投递到交换机,返回 ack.
    2. 消息未投递到交换机(比如交换机不存在,或者是交换机名字写错了),返回 nack.
  2. publish-return 响应
    1. 消息投递到交换机,但是没有路由到队列(比如指定的队列名字写错了),返回 ack,以及路由失败的原因.

最后生产者这边的回调接收到响应后,根据不同的 ack 执行不同的“策略”(类似于你去买书,然后拿到书以后具体要干啥,都由你决定).

Ps:确认机制发送消息时,需要给每一个消息设置一个全局唯一的 id, 以区分不同消息,避免 ack 冲突.

1.1.2、实践

a)再 publisher 微服务的 application.yml 中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated publisher-returns: true template:mandatory: true

配置说明:

  1. publish-confirm-type:开启publisher-confirm,这里支持两种类型,
    1. simple(不推荐,类似死等,占用资源):同步等待confirm结果,直到超时.
    2. correlated(推荐):异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback.
  2. publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback.
  3. template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息.

b)每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString());});}
}

Ps:ApplicationContextAware 就是 Spring 容器启动时的要执行的通知接口,通过 setApplicationContext 方法实现具体的通知.

c)生产者发送消息,指定 ID,消息  ConfirmCallback

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {// 消息体String message = "hello, spring amqp!";// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){ // ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 发送消息rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

1.2、消息持久化(消息角度)

1.2.1、理论

MQ 默认时内存存储消息,通过开启持久化功能(设置 durable = true),就可以将消息持久化到文件中,保证保证消息不丢失.

Ps:消息要持久化的前提是交换机(不一定,但最好是)和队列是持久化的.

1.2.2、实践

a)交换机持久化

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false);
}

b)队列持久化

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

c)消息持久化

    public void testDurableMessage() {//1.构造一个持久的消息Message message = MessageBuilder.withBody("hello".getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("simple.queue", message);}

Ps:delivery_mode = 2 就表示消息要持久化.

1.3、消费者消息确认(消费者角度)

1.3.1、理论

RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ收到ack回执后才会删除该消息.

SpringAMQP 允许配置三种确认模式:

  • manual:手动ack,需要在消费者执行的消息代码结束后,调用api发送ack。
  • auto:自动ack,由 spring 监测消费者的执行的消费代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,然后会将消息重新加入到队列,再发送给消费者,然后再次异常...,无限循环.
  • none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

1.3.2、实践

这里只需要配置以下 application.yml 文件,添加以下配置:

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论

刚刚讲到,消费者消费确认,SpringAMQP 提供了三种确认模式,其中 auto 这种方式,在消费者执行消费代码遇到异常时,会重新将消息加入到队列中,然后发送给消费者,再次异常,无限循环,导致 mq 的消息处理飙升,带来不必要的压力.

假设消费任务如下:

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到消息:" + msg);System.out.println("开始消费!");System.out.println(1/0);System.out.println("消费完成!");}
}

我们可以利用 Spring 的 retry 机制,在消费者出现异常时,利用本地重试,而不是无限制的加入到 mq 队列,只需要对消费者的配置文件进行以下配置:

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 4 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式以后,若重试次数耗尽,并且消息依然失败,则需要有 MessageRecoverer 接口来处理,他包含三种不同的实现:

  1. RejectAndDontRequeueRecoverer(默认方式):重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  3. RepublishMessageRecoverer(推荐方式):重试耗尽后,将失败消息投递到指定的交换机,再由交换机投递到指定的队列.
     

上述第三种方式比较推荐,如下图:

1.4.2、实践

这里就测试以下推荐方案 RepublishMessageRecoverer

a)首先要定义用来接收失败消息的交换机、队列、绑定关系,最后定义 RepublishMessageRecoverer(Bean 的方式注入,覆盖 Spring 默认的方案):

@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true);}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}}

b)定义消费者执行的消费任务

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到消息:" + msg);System.out.println("开始消费!");System.out.println(1/0);System.out.println("消费完成!");}
}

c)启动消费者,如下:

d)查看失败队列中具体信息(异常栈信息和信息信息)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/82244.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Matlab图像处理-Lab模型

Lab模型 Lab模型是由CIE(国际照明委员会)制定的一种彩色模型。该模型与设备无关,弥补了RGB模型和CMYK模型必须依赖于设备颜色特性的不足; 另外,自然界中的任何颜色都可以在Lab空间中表现出来,也就是说RGB和…

20230917后台面经总结

1.ping底层原理 Ping 是 ICMP 的一个重要应用,主要用来测试两台主机之间的连通性。Ping 的原理是通过向目的主机发送 ICMP Echo 请求报文,目的主机收到之后会发送 Echo 回答报文。Ping 会根据时间和成功响应的次数估算出数据包往返时间以及丢包率。 基…

Unity之手游UI的点击和方向移动

一 Button的点击 1.1 新建UI -> Button 1.2 在Button上面右击添加空物体 1.3 创建脚本挂载到空物体上面 脚本内容添加点击方法,来控制物体的显示隐藏 using System.Collections; using System.Collections.Generic; using Unity.VisualScripting; using Unit…

Nginx 文件解析漏洞复现

一、漏洞说明 Nginx文件解析漏洞算是一个比较经典的漏洞&#xff0c;接下来我们就通过如下步骤进行漏洞复现&#xff0c;以及进行漏洞的修复。 版本条件&#xff1a;IIS 7.0/IIS 7.5/ Nginx <8.03 二、搭建环境 cd /vulhub/nginx/nginx_parsing_vulnerability docker-compos…

【C++基于多设计模式下的同步异步日志系统】

文章目录 [toc] 1 :peach:项目介绍:peach:2 :peach:开发环境:peach:3 :peach:核心技术:peach:4 :peach:环境搭建:peach:5 :peach:日志系统介绍:peach:5.1 :apple:为什么需要日志系统&#xff1f;:apple:5.2 :apple:日志系统技术实现:apple:5.2.1 :lemon:同步写日志:lemon:5.2.2…

【excel密码】excel文件加密的三种方式

想要给Excel文件进行加密&#xff0c;方法有很多&#xff0c;今天分享三种Excel加密方法给大家。 一、打开密码 设置了打开密码的excel文件&#xff0c;打开文件就会提示输入密码才能打开excel文件&#xff0c;只有输入了正确的密码才能打开并且编辑文件&#xff0c;如果密码…

Unity实现简易太阳系

开发环境&#xff1a;Unity 2022.3.5f1c1 Visual Studio 2022 太阳系相关星体&#xff1a;太阳、八大行星、月球 模拟星系&#xff1a;太阳系、地月系 功能&#xff1a;支持行星以太阳为中心&#xff0c;任意轴进行公转&#xff0c;此处演示同一平面。 a1-a8为公转轴&#xff…

《ADS2011射频电路设计与仿真实例》功率放大器设计的输入输出匹配

徐兴福这本书的6.6 Smith圆图匹配这一节中具体匹配时&#xff0c;直接给出了电容与串联微带的值&#xff0c;没有给出推导过程&#xff0c;我一开始以为是省略了详细推导过程&#xff0c;后来发现好像基本上是可以随便自己设的。以输入匹配&#xff08;书本6.6.4输入匹配电路的…

景联文科技:数据供应商在新一轮AI热潮中的重要性

景联文科技是AI基础行业的头部数据供应商&#xff0c;可协助人工智能企业解决整个人工智能链条中数据标注环节的相对应问题。 随着全球新一轮AI热潮来袭&#xff0c;大量训练数据已成为推动AI算法模型进步和演化的不可或缺的重要因素。数据的质量和数量直接影响了模型训练和性能…

现在全国融资融券两融利率最低是多少?哪家证券公司券商费率低?

融资融券是指投资者通过向券商借入资金&#xff08;融资&#xff09;或借入证券&#xff08;融券&#xff09;&#xff0c;以达到获得更高收益、降低交易风险、提高资金利用效率的目的。通过融资&#xff0c;投资者可以用借入的资金买入更多的证券&#xff1b;通过融券&#xf…

多目标优化算法:基于非支配排序的海象优化算法(NSWOA)MATLAB

一、海象优化算法WOA 海象优化算法&#xff08;Walrus Optimization Algorithm&#xff0c;WOA&#xff09;由Trojovsk等人于2023年提出&#xff0c;该算法模拟海象的进食&#xff0c;迁移&#xff0c;逃跑和对抗捕食者的过程&#xff0c;WOA包含探索、迁移和开发三个阶段&…

cms之帝国cms安装

内容摘要 帝国网站管理系统&#xff0c;英文名称为EmpireCMS&#xff0c;简称“帝国CMS”&#xff0c;本文将介绍帝国网站管理系统的安装方法。 前言&#xff1a; 本文安装教程是以帝国CMS7.5版本为基础进行图文讲解。 各位看官&#xff0c;一定要按照每个步骤去执行&#xf…

【Django入门】第一个Django项目

Django&#xff0c;广为人知的Python Web框架&#xff0c;以其强大而又灵活的特点脱颖而出。其宣传口号是&#xff1a;“为完美主义者开发的框架”。这篇文章将为你揭示创建第一个Django项目的魔法以及Django项目的基本结构。 为什么选择Django&#xff1f; 在深入学习前&…

基于SSM的博客系统开发

文章目录 前言1.技术选型&#xff1a;2.主要功能&#xff1a;3.项目展示&#xff1a;前台页面&#xff1a;后台页面&#xff1a; 总结 前言 提示&#xff1a;人类与强权的斗争&#xff0c;就是记忆与遗忘的斗争。 --米兰昆德拉《笑忘录》 1.技术选型&#xff1a; 开发工具&am…

vue 01 创建一个简单vue页面

去vue官网下载vue.js 引用vue.js vue语法 一个vue实例&#xff0c;绑定一个容器&#xff0c;一对一关系 <!DOCTYPE html> <html><head><meta charset"UTF-8"/><title>初始Vue</title><script type"text/javascript&qu…

【JAVASE】图书管理系统

⭐ 作者&#xff1a;小胡_不糊涂 &#x1f331; 作者主页&#xff1a;小胡_不糊涂的个人主页 &#x1f4c0; 收录专栏&#xff1a;浅谈Java &#x1f496; 持续更文&#xff0c;关注博主少走弯路&#xff0c;谢谢大家支持 &#x1f496; 图书管理系统 1. 设计思路图2. 创建 boo…

MySQL常见面试题(一)

&#x1f600;前言 在数据库管理系统中&#xff0c;存储引擎起着核心的角色&#xff0c;它决定了数据管理和存储的方式。MySQL作为一个领先的开源关系型数据库管理系统&#xff0c;提供了多种存储引擎来满足不同的需求和优化不同的应用。除了选择合适的存储引擎&#xff0c;数据…

四川百幕晟科技:抖店精选联盟怎么使用?

近年来&#xff0c;电商平台的兴起让很多人纷纷加入进来&#xff0c;希望通过在网上销售产品来赚取更多的利润。在这个竞争激烈的市场中&#xff0c;如何找到稳定的渠道来推广自己的产品成为了每个卖家的追求。抖店精选联盟是一个不错的选择&#xff0c;可以帮助卖家快速提升销…

Mybatis学习笔记8 查询返回专题

Mybatis学习笔记7 参数处理专题_biubiubiu0706的博客-CSDN博客 1.返回实体类 2.返回List<实体类> 3.返回Map 4.返回List<Map> 5.返回Map<String,Map> 6.resultMap结果集映射 7.返回总记录条数 新建模块 依赖 目录结构 1.返回实体类 如果返回多条,用…

PDF文件的页眉页脚无法删除的原因和三种替代方法

大家好&#xff01; 今天六分职场为大家介绍一个PDF的常用操作。有的时候我们需要为PDF文件添加页眉页脚&#xff0c;但如果我们这个PDF文档是从其他地方参考的&#xff0c;经常会发现无法直接编辑或者删除PDF文件中页眉页脚。 不用担心&#xff0c;我们使用WPS的PDF软件&…