RabbitMQ入门指南(九):消费者可靠性

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、消费者确认机制

二、失败重试机制

三、失败处理策略

四、业务幂等性

1.通过唯一标识符保证操作的幂等性

2.通过业务判断保证操作的幂等性

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。


当RabbitMQ向消费者投递消息后,了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息,可能会出现各种故障:

  • 网络故障:在消息投递过程中,如果RabbitMQ和消费者之间的网络连接出现故障,可能会导致消息无法正确投递给消费者。
  • 消费者宕机:如果消费者在接收消息后突然宕机,那么消息可能无法被正确处理。
  • 消费者处理异常:消费者在接收到消息后,由于处理不当或者出现异常情况,可能会导致消息处理失败。

以上情况都可能导致消息丢失,因此RabbitMQ需要知道消费者的处理状态,以便在消息处理失败时重新投递消息。

一、消费者确认机制

RabbitMQ的消费者确认机制Consumer Acknowledgement)是一种确保消息被成功处理的机制。当消费者处理消息结束后,需要向RabbitMQ发送一个回执,以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要,因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。

回执有三种可选值:

  • ACK(确认):表示消费者成功处理了消息,RabbitMQ会从队列中删除该消息。
  • NACK(否定确认):表示消息处理失败,RabbitMQ需要再次投递该消息。
  • REJECT(拒绝):表示消息处理失败并且被拒绝,RabbitMQ会从队列中删除该消息。

在实际应用中,一般使用ACK或NACK两种方式。REJECT方式的使用相对较少,通常只在消息格式存在问题,即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ACK,处理失败时返回NACK。

在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式 :

spring:rabbitmq:listener:simple:acknowledge-mode: auto

RabbitMQ 支持三种不同的确认模式,这些模式通过acknowledge-mode属性进行配置:

  • none:关闭ACK。消费者接收到消息后不需要发送任何确认给发送者,发送者将继续发送下一条消息。在这种模式下,如果消费者处理消息失败,消息将会丢失,无法保证消息的可靠性。
  • manual:手动ACK。消费者接收到消息后需要手动发送确认给发送者,发送者才会继续发送下一条消息。在这种模式下,如果消费者处理消息失败,可以手动发送NACK给发送者,告诉发送者这条消息处理失败,以便发送者重新发送消息。这种模式可以保证消息的可靠性,但需要消费者手动处理确认和NACK。
  • auto:自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP(面向切面编程)对消息处理逻辑做了环绕增强。当业务正常执行时,Spring AMQP会自动返回ACK。当业务出现异常时,根据异常判断返回不同结果:业务异常,自动返回NACK;消息处理或校验异常,自动返回REJECT。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 如果消费者持续出现异常,消息会不断地在队列中重新排队并重新发送,这可能会导致消息处理延迟和队列持续增长,给系统带来不必要的压力。

为了解决这个问题,Spring框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列 。

在consumer服务的application.yml文件中添加配置:

spring:rabbitmq:listener:simple:retry:enabled: trueinitial-interval: 1000msmultiplier: 1max-attempts: 3stateless: true
enabled: true开启消费者失败重试
initial-interval: 1000ms初始的等待时长
multiplier: 1每次重试的等待时长是上次等待时长的倍数
max-attempts: 3最大重试次数
stateless: truetrue表示重试是无状态的,即每次重试都是独立的,不会考虑之前的重试状态。如果业务中包含事务,需要改为false。

通过这样的配置,当消费者出现异常时,消息会在本地进行重试,而不是无限期地重新排队发送。在达到最大重试次数后,SpringAMQP会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的,并且消息不会被重新排队发送给消费者。

这种失败重试机制可以有效地减少消息处理的延迟和队列的增长,提高系统的稳定性和可用性。当然,在使用失败重试机制时,也需要考虑到业务逻辑和异常处理的合理性,避免因过度重试而导致的问题。

三、失败处理策略

在失败重试机制中,当达到最大重试次数后,消息会被直接丢弃。尽管这在某些场景中可能是可接受的,但对于那些对消息可靠性要求极高的业务来说,这显然是一个潜在的风险点。

Spring AMQP为此提供了强大的支持,允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的,它有三种不同的实现方式:

  • RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝消息,并丢弃该消息。这是默认的处理方式。
  • ImmediateRequeueMessageRecoverer:当重试次数耗尽后,返回一个NACK给生产者,使消息重新入队,以便再次发送。
  • RepublishMessageRecoverer:当重试次数耗尽后,可以将失败的消息投递到一个指定的交换机和队列中,这个交换机和队列专门用来存放异常的消息。

在处理策略中,一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后,它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理,使得开发人员可以更精细地处理和诊断问题。

在consumer服务中定义处理失败消息的交换机和队列:

    @Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}

定义一个RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

通过这样的配置,当消息在尝试多次重试后仍然失败时,它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息,进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。

四、业务幂等性

在计算机科学和软件开发中,幂等性是一个重要的概念。简单来说,如果一个操作或函数不论执行一次还是多次,其结果都是相同的,那么称这个操作或函数是幂等的。在业务处理中,幂等性尤其关键。它可以保证系统的稳定性,确保在某些异常情况下,多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域,幂等性是确保系统稳定和数据准确的重要前提。

1.通过唯一标识符保证操作的幂等性

为每个操作生成唯一的标识符(如ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能,只要开启这个功能即可。

Jackson的消息转换器示例:

@Bean
public MessageConverter messageConverter(){// 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:

    @Testvoid testSendMessage2Queue() {String queueName = "demo.queue";String msg = "Idempotent Test";rabbitTemplate.convertAndSend(queueName, msg);}

运行测试用例,查看结果:

2.通过业务判断保证操作的幂等性

业务判断,是一种基于业务逻辑和状态的检查,以确定是否对重复的请求或消息进行处理。在多种业务场景中,这一策略的思路各有不同。

比如在支付订单的案例中,业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”(需要防止重复支付)。因此,在执行这一业务时,可以判断订单的状态是否为“未支付”。若状态不是“未支付”,则说明该订单已经被处理过,无需重复处理。与基于唯一标识符的方案相比,业务判断方案无需对原有数据库进行改造,因此更为推荐。

以支付修改订单的业务为例:

    @Overridepublic void markOrderPaySuccess(Long orderId) {// 查询订单Order order = getById(orderId);// 判断订单状态,订单不存在或者订单状态不是1,放弃处理if (order == null || order.getStatus() != 1) {return;}// 尝试更新订单order.setStatus(2);order.setPayTime(LocalDateTime.now());orderService.updateById(order);}

以上代码示例判断和更新是两步动作 ,极小概率下可能存在线程安全问题,所以可以进行以下修改:

    @Overridepublic void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();}
 

总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容,希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/580005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

词表示:语言与计算的桥梁

目录 前言1 什么是词表示2 独热表示3 上下文表示4 分布式表示结语 前言 在自然语言处理领域,词语的表示是一个基本挑战。将词语转换为计算机可以理解的符号,衡量词语之间的相似度,捕捉它们之间复杂的关系,是使机器能够理解和处理…

Bluetooth Mesh 入门学习干货,参考Nordic资料(更新中)

蓝牙网状网络(Bluetooth mesh)概念 概述 蓝牙Mesh Profile | Bluetooth Technology Website规范(Mesh v1.1 后改名Mesh ProtocolMesh Protocol | Bluetooth Technology WebsiteMesh Protocol)是由蓝牙技术联盟(Bluetooth SIG)开…

EasyExcel实现动态表头(注解实现)

要实现上述动态头,按每日统计,每月统计,每年统计。而时间是一直变化,所以我们需要表头也一直动态生成。 首先,我们需要定义所需要实体类 public class CountDayData {ExcelProperty(value "业务员姓名")p…

css 设置字体渐变色和阴影

一、需求 我们平时写样式的时候可能遇到需要将字体设置成渐变色,这样能使界面整体美化提升,那么css怎么去实现这个功能呢?下面我介绍一种常用的方法,欢迎大家补充 二、渐变实现 先看效果图: 直接上代码: /…

Seem环境安装

创建虚拟环境 conda create -n seem python3.8 conda activate seem 安装相关依赖:(不按照的话会报错) sudo apt-get install openmpi-bin libopenmpi-devconda install gcc_linux-64pip install mpi4py 导入环境 export PYTHONPATH$(pwd…

开发效率之把握需求、减少返工

前言 当年初入软件开发行业的我,拿到需求就莽,要设计没设计,要分析没分析,结果就是没理清楚需求,致使频频返工。 需求没理解对,做得再多再好也白搭。 估算需求把握程度 假如每个IF分支的“是”加一分&…

2024 年全球顶级的 3 款 桌面 PDF 转换工具

桌面 PDF 转换器工具是一种软件应用程序,使用户能够将 PDF 文件与不同的文件格式相互转换。奇客PDF转换、Nitro Pro 和 Foxit PhantomPDF 是市场上最好的桌面 PDF 转换工具。 在选择最好的 PDF 转换器软件时,需要考虑的一个重要因素是它与其他软件的集成…

nvm 的安装及使用 (Node版本管理器)

目录 1、nvm 介绍 2、nvm安装 3、nvm 使用 4、node官网可以查看node和npm对应版本 5、nvm安装指定版本node 6、安装cli脚手架 1、nvm 介绍 NVM 全称 node.js version management ,专门针对 node 版本进行管理的工具,通过它可以安装和切换不同版本的…

数据链路层解读

基本介绍 概述 数据链路层使用的信道主要有两种类型 点对点信道。使用一对一的点对点通信方式的信道。广播信道。使用一对多的广播通信方式的信道。由于广播信道上连接的主机很多,必须使用专用的共享信道协议来协调这些主机的数据发送,因此通信过程比较…

【Windows】共享文件夹拍照还原防火墙设置(入站,出站设置)---图文并茂详细讲解

目录 一 共享文件夹(两种形式) 1.1 普通共享与高级共享区别 1.2 使用 二 拍照还原 2.1 是什么 2.2 使用 三 防火墙设置(入栈,出站设置) 3.1 引入 3.2 入站出站设置 3.2.1入站出站含义 3.3入站设置 3.4安装jdk 3.5使用tomcat进行访…

sql_lab之sqli中的堆叠型注入(less-38)

堆叠注入(less-38) 1.判断注入类型 http://127.0.0.3/less-38/?id1 and 12 -- s 没有回显 http://127.0.0.3/less-38/?id1 and 11 -- s 有回显 则说明是单字节’注入 2.查询字段数 http://127.0.0.3/less-38/?id1 order by 4 -- s 报错 http:/…

智能优化算法应用:基于人工兔算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于人工兔算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于人工兔算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.人工兔算法4.实验参数设定5.算法结果6.参考文…

取证工具volatility插件版学习记录

更新时间:2023年12月18日11:48:29 1. 背景描述 在以前学习过volatility的基础功能,主要是使用volatility独立版进行学习的,前几天遇到一个ctf赛事,需要用到的是volatility的mimikatz模块,因为以前没使用过那个模块&…

【Filament】立方体贴图(6张图)

1 前言 本文通过一个立方体贴图的例子,讲解三维纹理贴图(子网格贴图)的应用,案例中使用 6 张不同的图片给立方体贴图,图片如下。 读者如果对 Filament 不太熟悉,请回顾以下内容。 Filament环境搭建绘制三角…

HTML制作暴雨特效

🎀效果展示 🎀代码展示 <body> <!-- partial:index.partial.html --> <canvas id="canvas-club">

python消费rabbitmq

队列经常用&#xff0c;能保持信息一致性。也能跨语言&#xff0c;java写的生产者&#xff0c;推到队列中&#xff0c;python写的消费者消费。 这里&#xff0c;生成者&#xff0c;我们是java&#xff0c;已经发了一条消息了。 python是使用pika来链接rabbitmq 安装pika pip…

扩展mybatis-plus,保留逻辑删、逻辑查的前提下,扩展硬删除、硬查询

引入相关依赖 <!-- 提示&#xff1a;1. common-mybatis-plus:2100.8.8 中只有4个类文件&#xff0c;是对硬删除、硬查询的扩展支持&#xff0c;如果你不想引入依赖的话&#xff0c;你可以把这四个文件复制到自己的项目中即可2. common-mybatis-plus:2100.8.8 对应mybatis-p…

青少年CTF-qsnctf-Web-include01include02(多种方法-知识点较多-建议收藏!)

PHP常见伪协议 php://filter是PHP中独有的一种协议&#xff0c;它是一种过滤器&#xff0c;可以作为一个中间流来过滤其他的数据流。通常使用该协议来读取或者写入部分数据&#xff0c;且在读取和写入之前对数据进行一些过滤&#xff0c;例如base64编码处理&#xff0c;rot13处…

MongoDB ReplicaSet 部署

文章目录 前言1. 环境准备2. 生成密钥3. 配置参数4. 创建 ReplicaSet5. 副本集维护5.1 新增成员5.2 移除节点5.4 主节点降级5.5 阻止选举5.6 允许副本节点读5.7 延迟观测 6. 连接副本集 后记 前言 本篇文章介绍 MongoDB ReplicaSet 如何搭建&#xff0c;及常用的维护方法。 1…

求简单表达式的值

题目&#xff1a;在键盘输入类似(56-20)/(42)这样的表达式输出结果 此题分为两部分&#xff08;1&#xff09;将表达式转换成后缀表达式&#xff08;2&#xff09;计算后缀表达式的值 需要注意的是本题要定义两个不同的栈 一个数据类型是字符&#xff0c;一个数据类型是doubl…