【RabbitMQ】死信(延迟队列)的使用

目录

一、介绍

1、什么是死信队列(延迟队列)

2、应用场景

3、死信队列(延迟队列)的使用

4、死信消息来源

二、案例实践

1、案例一

2、案例二(消息接收确认 )

3、总结


一、介绍

1、什么是死信队列(延迟队列)

  •         死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
  •         死信队列(Dead Letter Queue)延迟队列(Delay Queue)是两种不同的队列类型,但在实际应用中它们可以结合使用。
  •         死信队列是当消息在队列中因为过期、被拒绝等原因无法正常处理时,会被重新发送到另一个交换机上,这个交换机就是死信交换机。死信队列可以用于实现重试机制、日志审计等特殊应用逻辑。

        延迟队列则是一种特殊的队列类型,它允许将消息延迟指定的时间后才能被消费者消费。这种队列通常用于处理那些需要在特定时间点被处理的任务,例如定时任务、限时优惠等。在RabbitMQ中,可以通过设置消息的TTL(生存时间)来实现延迟队列的功能。当消息在队列中超过了TTL,它就会被移除并被发送到指定的死信交换机,进而被路由到死信队列中。

结合使用死信队列和延迟队列可以实现一些复杂的应用逻辑。

例如:

        可以将某个需要延迟处理的消息发送到延迟队列中,并在消息过期之前将其存储在死信队列中。这样,当消息从延迟队列中移除时,它会被自动发送到死信队列中,然后由消费者消费并执行相应的操作。这种结合使用的方式可以提供更高的灵活性和可靠性,使得系统能够更好地应对各种异常情况。

2、应用场景

死信队列(Dead Letter Queue)和延迟队列(Delay Queue)在以下应用场景中表现优异:

        这些场景都是通过结合使用死信队列和延迟队列来提高系统的可靠性、鲁棒性和灵活性。通过合理地设置死信队列和延迟队列的参数,可以实现各种复杂的业务逻辑和异常处理机制。

  1. 库存解锁服务:例如,当一个商品被锁定且无法被购买时,可以将其放入死信队列中,并在一定时间后重新发送到原始队列进行处理,从而解锁库存。
  2. 定时关单功能:例如,用户在商城下单成功并点击去支付后,如果在指定时间内未支付,系统可以自动将订单放入死信队列中,并在一定时间后重新发送到原始队列进行处理。
  3. 保证订单业务的消息数据不丢失:当消息消费发生异常时,可以将消息投入死信队列中,以便后续等到环境好了之后,再消费死信队列中的消息。
  4. 实现重试机制:当某个消息处理失败时,可以将它放入死信队列中,并在一段时间后重新发送到交换机进行重试。这样可以提高系统的鲁棒性,确保消息能够被正确处理。
  5. 处理定时任务和秒杀活动:使用延迟队列可以将消息延迟到特定的时间后进行处理,例如定时任务、秒杀活动等。这样可以确保在特定的时间点执行相应的操作。
  6. 日志处理和审计:将日志消息发送到死信队列中,可以在日志发生异常时进行记录和审计,以便分析和排查问题。

3、死信队列(延迟队列)的使用

使用死信队列(Dead Letter Queue)和延迟队列(Delay Queue)可以提高系统的可靠性和灵活性,特别是在处理异常情况、重试机制和定时任务等方面。

需要注意的是,在使用死信队列和延迟队列时,需要考虑系统的可用性和性能。如果大量的消息被放入死信队列中,可能会导致系统资源的过度消耗。因此,需要合理配置死信队列和延迟队列的大小和数量,以及监控和管理系统的性能和资源使用情况。

  1. 定义死信队列和延迟队列:在RabbitMQ中,可以在定义队列时指定队列类型为死信队列或延迟队列。例如,使用RabbitMQ的命令行工具或管理界面创建死信队列或延迟队列。
  2. 配置交换机和队列属性:在绑定交换机和队列时,可以设置一些属性来控制消息的路由和消费。例如,可以设置消息的TTL(生存时间)来实现延迟队列的功能,或者设置消息的优先级和延迟时间等属性。
  3. 发送消息到队列:使用RabbitMQ的生产者将消息发送到定义的队列中。如果需要将消息放入延迟队列中,可以在发送消息时设置相应的延迟时间。
  4. 处理消息:消费者从队列中获取消息并进行处理。如果消息无法正常处理,可以将它放入死信队列中。在处理消息时,可以根据需要设置重试机制,以便在消息处理失败时重新发送消息到队列中进行重试。
  5. 监控和管理:使用RabbitMQ的管理界面或命令行工具监控和管理死信队列和延迟队列的状态和消息。可以查看队列的大小、消息的延迟时间、消费者数量等信息,并根据需要进行调整和管理。

4、死信消息来源

  •  消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject 或 nack),并且 requeue =false

二、案例实践

1、案例一

生产者

在生产者的Config里面添加队列 。

 //    =============死信队列===============// 定义QueueA Bean,返回QueueA实例@Beanpublic Queue QueueA() {Map<String, Object> config = new HashMap<>();//message在该队列queue的存活时间最大为5秒config.put("x-message-ttl", 5000);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)config.put("x-dead-letter-exchange", "deadExchange");//x-dead-letter-routing-key参数是给这个DLX指定路由键config.put("x-dead-letter-routing-key", "deadQueue");// 返回QueueA实例return new Queue("QueueA", true, true, false, config);}// 定义DirectExchangeA Bean,返回DirectExchangeA实例@Beanpublic DirectExchange DirectExchangeA() {// 返回DirectExchangeA实例return new DirectExchange("DirectExchangeA");}// 定义bindingA Bean,将QueueA绑定到DirectExchangeA,并设置路由键为A.A@Beanpublic Binding bindingA() {// 将QueueA绑定到DirectExchangeAreturn BindingBuilder// 设置路由键为A.A.bind(QueueA()).to(DirectExchangeA()).with("A.A");}// 定义QueueB Bean,返回QueueB实例@Beanpublic Queue QueueB() {// 返回QueueB实例return new Queue("QueueB");}// 定义DirectExchangeB Bean,返回DirectExchangeB实例@Beanpublic DirectExchange DirectExchangeB() {// 返回DirectExchangeB实例return new DirectExchange("DirectExchangeB");}// 定义bindingB Bean,将QueueB绑定到DirectExchangeB,并设置路由键为B.B@Beanpublic Binding bindingB() {// 将QueueB绑定到DirectExchangeBreturn BindingBuilder// 设置路由键为B.B.bind(QueueB()).to(DirectExchangeB()).with("B.B");}

消费者

在消费者里面添加QueueAQueueB 

QueueA

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void Queue02(String msg) {log.warn("QueueA,接收到信息:" + msg);}}

QueueB

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueB")
public class ReceiverQB {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void Queue02(String msg) {log.warn("QueueB,接收到信息:" + msg);}}

编写Controller层

    @RequestMapping("send5")public String send5() {rabbitTemplate.convertAndSend("DirectExchangeA", "A.A", "Hello,A");return "🐉";}

结果:

2、案例二(消息接收确认 )

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

配置文件
        默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:rabbitmq:listener:simple:acknowledge-mode: manual

在消费者里的更改ReceiverQA 

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void QueueA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.warn("QueueA,接收到信息:" + msg);channel.basicAck(tag, false);}}
  • 需要注意的 basicAck 方法需要传递两个参数

    • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

    • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

3、总结

  • 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  • 消息确认

    • 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)

    • 生产者和Server(broker)之间的消息确认

    • 消费者和Server(broker)之间的消息确认

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647544.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity学习之坦克游戏制作(2)游戏场景的制作

文章目录 1. 基础场景的搭建2. 游戏主面板2.1 拼出面板2.2 创建新面板2.3 设置面板复用2.4 退出界面 3. 坦克基类3.1 创建基类脚本3.1.1 基类基本属性3.1.2 抽象开火函数3.1.3 受伤虚函数3.1.4 死亡虚函数 4 玩家——基础移动旋转摄像机跟随4.1 玩家对象脚本4.2 控制坦克移动4.…

移动端打包成功后禁止生成 report.html 文件,并不自动打开该文件

目录 【问题】移动端 npm run build 打包后生成并打开 report.html 文件package.json 文件vue.config.js 代码 【解决】打包后去除 report.html 文件vue.config.js 代码 参考 【问题】移动端 npm run build 打包后生成并打开 report.html 文件 package.json 文件 {"name&…

蓝牙----蓝牙协议栈L2CAP

蓝牙协议栈----L2CAP L2CAP的功能术语介绍L2CAP信道L2CAP的工作模式经典蓝牙的分段和分解过程 L2CAP—逻辑链路控制和适配层协议 L2CAP的功能 经典蓝牙的L2CAP层实现了协议复用、数据分段与重组、封装调度等操作。BLE的L2CAP层是经典蓝牙L2CAP层的简化版本&#xff1a; 在基…

爬虫js逆向分析——x平台(实现)

爬虫js逆向分析——x平台&#xff08;实现&#xff09; &#xff08;仅供学习&#xff0c;本案例只是分析流程没有账号&#xff09;网址&#xff1a;https://xuexi.chinabett.com/ 1.分析请求包格式 打开控制台&#xff0c;并勾选保存日志&#xff0c;然后点击登录看发送了什…

ICSpector:一款功能强大的微软开源工业PLC安全取证框架

关于ICSpector ICSpector是一款功能强大的开源工业PLC安全取证框架&#xff0c;该工具由微软的研究人员负责开发和维护&#xff0c;可以帮助广大研究人员轻松分析工业PLC元数据和项目文件。 ICSpector提供了方便的方式来扫描PLC并识别ICS环境中的可疑痕迹&#xff0c;可以用于…

Unity配置表xlsx/xls打包后读取错误问题

前言 代码如下&#xff1a; //文本解析private void ParseText(){//打开文本 读FileStream stream File.Open(Application.streamingAssetsPath excelname, FileMode.Open, FileAccess.Read, FileShare.Read);//读取文件流IExcelDataReader excelRead ExcelReaderFactory…

vue实现甘特图

目录 实现效果 一、安装依赖 二、使用 二、绕过license 实现效果 一、安装依赖 npm i --save vue-gantt-schedule-timeline-calendar 实现甘特图需先安装上述依赖&#xff0c;安装依赖实际上是通过gantt-schedule-timeline-calendar来实现的。所以node_module中因包含以下…

JQuery下载和一些语法

最近学了六种jQuery选择器&#xff0c;我想把学过案例和知识结合起来&#xff0c;给大家分享下&#xff01; 那么既然学jQuery选择器&#xff0c;肯定要先了解下jQuery是什么吧&#xff01;jQuery是一个快速、简洁的JavaScript框架&#xff0c;相当于用jQuery能更加高效的创建…

【快影】怎么制作卡拉OK字幕

您好&#xff0c;您添加了字幕之后可以添加动画&#xff0c;选择卡拉OK&#xff0c;其中 卡拉OK1是支持修改颜色的&#xff0c;卡拉OK2只支持修改文字的底色。

Denoising diffusion implicit models 阅读笔记2

Denoising diffusion probabilistic models (DDPMs)从马尔科夫链中采样生成样本&#xff0c;需要迭代多次&#xff0c;速度较慢。Denoising diffusion implicit models (DDIMs)的提出是为了在复用DDPM训练的网络的前提下&#xff0c;加速采样过程。 加速采样的基本思路是&#…

geemap学习笔记052:影像多项式增强

前言 下面介绍的主要内容是应用Image.polynomial() 对图像进行多项式增强。 1 导入库并显示地图 import ee import geemap ee.Initialize() Map geemap.Map(center[40, -100], zoom4)2 多项式增强 # 使用函数 -0.2 2.4x - 1.2x^2 对 MODIS 影像进行非线性对比度增强。# L…

创建第一个 Spring 项目(IDEA社区版)

文章目录 创建 Spring 项目创建一个普通的 Maven 项目添加 Spring 依赖IDEA更换国内源 运行第一个 Spring 项目新建启动类存储 Bean 对象将Bean注册到Spring 获取并使用 Bean 对象 创建 Spring 项目 创建一个普通的 Maven 项目 首先创建一个普通的 Maven 项目 添加 Spring 依…

全面解析开源大语言模型:BLOOM

大型语言模型 &#xff08;LLM&#xff09; 的兴起一直是自然语言处理 &#xff08;NLP&#xff09; 领域的一个决定性趋势&#xff0c;导致它们在各种应用程序中的广泛采用。然而&#xff0c;这种进步往往是排他性的&#xff0c;大多数由资源丰富的组织开发的 LLM 仍然无法向公…

什么是servlet

什么是servlet 什么是servlet Servlet&#xff08;Server Applet&#xff09;是 Java Servlet 的简称&#xff0c;称为小服务程序或服务连接器&#xff0c;用 Java 编写的服务器端程序&#xff0c;具有独立于平台和协议的特性&#xff0c;主要功能在于交互式地浏览和生成数据…

java数据结构与算法刷题-----LeetCode769. 最多能完成排序的块

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 解题思路 这道题可以理解为&#xff0c;只能保证块内有序的情况下&#xf…

大模型学习笔记一:大模型应用开发基础

文章目录 一、大模型一些概念介绍 一、大模型一些概念介绍 1&#xff09;产品和大模型的区别&#xff08;产品通过调用大模型来具备的能力&#xff09; 2&#xff09;AGI定义 概念&#xff1a;一切问题可以用AI解决 3&#xff09;大模型通俗原理 根据上文&#xff0c;猜测下…

WordPress反垃圾评论插件Akismet有什么用?如何使用Akismet插件?

每次我们成功搭建好WordPress网站后&#xff0c;都可以在后台 >> 插件 >> 已安装的插件&#xff0c;在插件列表中可以看到有一个“Akismet反垃圾邮件&#xff1a;垃圾邮件保护”的插件&#xff08;个人觉得是翻译错误&#xff0c;应该是反垃圾评论&#xff09;。具…

vue实现在线Excel表格功能

目录 1.安装x-data-spreadsheet xlsx 2.引入 3.使用 1.安装x-data-spreadsheet xlsx npm i x-data-spreadsheet xlsx2.引入 import zhCN from "x-data-spreadsheet/src/locale/zh-cn"; import Spreadsheet from "x-data-spreadsheet"; import * as X…

【c++】高精度算法(洛谷刷题2024)乒乓球详解

系列文章目录 第一题 乒乓球 视频&#xff1a;http://【洛谷题单 - 算法 - 高精度】https://www.bilibili.com/video/BV1Ym4y1s7BD?vd_source66a11ab493493f42b08b31246a932bbb 目录 系列文章目录 第一题 乒乓球 前言 一、题目以及引领思考 二、题解与代码 1.输入输出案例 …

C语言实现快速排序算法(附带源代码)

快速排序 在区间中随机挑选一个元素作基准&#xff0c;将小于基准的元素放在基准之前&#xff0c;大于基准的元素放在基准之后&#xff0c;再分别对小数区与大数区进行排序。 动态效果过程演示&#xff1a; 快速排序&#xff08;Quick Sort&#xff09;是一种常用的排序算法&…