[RabbitMQ] 重试机制+TTL+死信队列

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 重试机制
    • 1.1 重试配置
    • 1.2 配置交换机队列
    • 1.3 发送消息
    • 1.4 消费消息
    • 1.5 运行测试
    • 1.6 手动确认
  • 2. TTL
    • 2.1 设置消息TTL
    • 2.2 设置消息队列的TTL
    • 2.3 两者的区别
  • 3. 死信队列
    • 3.1 死信队列的概念
    • 3.2 代码示例
      • 3.2.1 声明队列和交换机
      • 3.2.2 正常队列与死信交换机的绑定
      • 3.2.3 制造死信产生的条件
      • 3.2.4 发送消息
      • 3.2.5 测试死信
    • 3.3 常见面试题

1. 重试机制

在消息传递的过程中,可能会遇到各种问题,比如网络故障,服务不可用,资源不足等,这些问题都可能会导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败之后重新发送.
我们也可以对重试机制设置重试次数.超过了重试的限制次数,如果没有对队列进行死信队列的绑定,那么该消息就会发生丢失.

1.1 重试配置

  rabbitmq:host: 182.92.204.253port: 5672username: jiangruijiapassword: *****virtual-host: /listener:simple:acknowledge-mode: autoretry:initial-interval: 5000msenabled: truemax-attempts: 5
  • initial-interval: 表示初始失败的等待时间为5s
  • enabled: 表示开启消费者失败重试
  • max-attempts: 最大重试次数,包括首次发送
  • 注意: 重试机制在自动确认模式之下才会生效,如果配置为手动确认,则重试机制不会生效.

1.2 配置交换机队列

public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE = "retry_exchange";
@Bean
public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).durable(true).build();
}
@Bean
public Queue retryQueue(){return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
@Bean
public Binding retryBinding(@Qualifier("retryExchange") DirectExchange exchange,@Qualifier("retryQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("retry");
}

1.3 发送消息

@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","retry消息");return "发送成功";
}

1.4 消费消息

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message) throws UnsupportedEncodingException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());int i = 3/0;System.out.println("处理完成");}
}

1.5 运行测试

使用Postman调用接口,观察控制台信息
在这里插入图片描述
我们看到由于消费者发生异常,消息在不停地进行重试,重试5次之后,抛出了异常.
如果我们对异常进行捕获,则不会进行重试.

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message) throws UnsupportedEncodingException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {int i = 3/0;}catch (Exception e){System.out.println("处理失败");}System.out.println("处理完成");}
}

在这里插入图片描述

1.6 手动确认

如果我们把确认接收机制改为手动确认机制

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message, Channel channel) throws IOException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {Thread.sleep(1000);int i = 3/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){System.out.println("处理失败");channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}System.out.println("处理完成");}
}

运行结果:
在这里插入图片描述
我们看到,在手动控制模式的时候,重试的次数不会像自动控制那样直接生效,因为是否重试以及何时重试更多地取决于应用程序和消费者的实现逻辑.

2. TTL

TTL,即过期时间,RabbitMQ可以对消息和队列设置TTL.
当消息到达存活时间之后,还没有被消费,就会被自动清除.

2.1 设置消息TTL

目前有两种方法可以设置消息的TTL.一是设置队列的TTL,队列中所有消息都有相同的过期时间,二是设置每条消息的过期时间,每条消息都可以有不同的TTL.如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准.
首先我们针对每条消息设置TTL.方法就是我们在发送消息的时候对消息的expiretion参数进行设置,单位为毫秒.
我们先来配置交换机和队列.

public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE = "ttl_exchange";
@Bean
public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).durable(true).build();
}
@Bean
public Queue ttlQueue(){return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl");
}

发送消息

@RequestMapping("/ttl")
public String ttl(){String msg = "ttl消息";Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));message.getMessageProperties().setExpiration("10000");//设置消息过期时间为10srabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl",message);return "发送信息";
}

运行程序,观察结果:

  1. 发送消息之后,Ready消息为1
    在这里插入图片描述
  2. 过了10s之后,发现消息被删除
    在这里插入图片描述

2.2 设置消息队列的TTL

设置队列TTL的方法是在创建队列时,加入x-message-ttl参数实现,单位为毫秒.
设置队列和绑定关系

@Bean
public Queue ttlQueue2(){Map<String,Object> map = new HashMap<>();map.put("x-message-ttl",10000);//设置10s过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(map).build();
}
@Bean
public Binding ttl2Binding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl2");
}

队列的声明也可以简写为:

@Bean
public Queue ttlQueue2(){return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(10000).build();
}

发送消息:

@RequestMapping("/ttl2")
public String ttl2(){rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","ttl2消息");return "发送成功";
}

运行程序之后观察结果:
运行之后发现,新增了一个队列,队列有一个TTL标识.
在这里插入图片描述
使用Postman调用接口发送消息:
在这里插入图片描述
10s之后发现队列中的消息被删除:
在这里插入图片描述

2.3 两者的区别

设置队列的ttl属性方法,一旦消息过期,就会立即从队列中删除.如果消息设置ttl,即使消息过期,也不会立即从队列中删除,而是在即将投递到消费者之前进行判定,如果过期了,才进行删除.
这两种方法处理过期消息的原理如下:
设置队列的过期时间,队列中已经过期的消息肯定在队列头部,RabbitMQ只需要定期扫描队头是否有过期的消息即可.
而设置消息的TTL的方式,每条消息的过期时间不同,如果定期扫描整个队列,效率是非常低的,所以不如等到此消息即将被消费时再判定是否过期.

3. 死信队列

3.1 死信队列的概念

死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
有死信,就会有死信队列,当一个消息在一个队列中变为死信之后,它能被重新被发送到另一个交换机中,这个交换机就是DLX(Dead Letter Exchange),绑定DLX的队列,就被称为死信队列(DLQ,Dead Letter Queue).
在这里插入图片描述
消息变为死信一般是由于一下的几种情况:

  1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
  2. 消息过期
  3. 队列达到最大长度

3.2 代码示例

3.2.1 声明队列和交换机

交换机和队列的声明包含两部分:

  • 正常声明的交换机和队列
  • 声明死信队列和死信的交换机
    死信的交换机和队列和普通的交换机队列在声明的时候没有什么区别.
public static final String NORMAL_QUEUE = "normal_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
@Bean
public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange,@Qualifier("normalQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Queue dlxQueue(){return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange exchange,@Qualifier("dlxQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx");
}

3.2.2 正常队列与死信交换机的绑定

当一个队列中存在死信的时候,RabbitMQ会自动把这个消息重新发布到设置的DLX交换机上,进而被路由到另一个队列,即死信队列.
绑定的时候需要为普通队列设置x-dead-letter-exchangex-dead-letter-routing-key两个参数,第一个参数是绑定的死信交换机,第二个参数是死信队列的路由键.

@Bean
public Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE);map.put("x-dead-letter-routing-key","dlx");return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(map).build();
}

上面的map可以简写为:

@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").build();
}

3.2.3 制造死信产生的条件

我们通过设置队列长度限制或者是设置消息的过期时间来制造死信.如果队列中的消息过期,或者是队列中的消息超过队列的长度,全部会被路由到死信队列.

@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").ttl(10000).maxLength(10L).build();
}

3.2.4 发送消息

@RequestMapping("/normal")
public String normal(){rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal消息");return "发送成功";
}

3.2.5 测试死信

  1. 启动程序之后观察队列
    在这里插入图片描述
    我们发现队列normal_queue有5个标签,则会五个标签分别代表该队列的5个属性:
    • D: durable的缩写,设置持久化.
    • TTL: Time to Live,队列的TTL.
    • Lim: 队列设置了长度(x-max-length).
    • DLX: 死信队列的交换机
    • DLK: 死信队列的路由键
  2. 测试到达过期时间
    接下来我们给正常队列中发送一条消息.
    在这里插入图片描述
    10秒钟之后,队列过期,消息也会过期,所以消息就会进入死信队列.
    在这里插入图片描述
    在这里插入图片描述
  3. 测试超出队列长度
    我们给队列中一次性发送20条消息
    在这里插入图片描述
    我们发现其中的10条消息会直接进入死信队列.
  4. 测试消息被拒收
    编写消费者代码,并抛出异常,普通队列消费者确认接收机制使用手动应答.
@Component
public class NormalListener {@RabbitListener(queues = Constant.NORMAL_QUEUE)public void normalListener(Message message, Channel channel) throws IOException, InterruptedException {System.out.printf("接收到消息%s,tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {System.out.println("处理消息");int i = 3/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){System.out.println("消息处理失败");Thread.sleep(1000);//拒绝接收之后不放回原队列中,则会放入死信队列channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}
}
@Component
public class DLXListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void dlxListener(Message message) throws UnsupportedEncodingException {System.out.printf("死信队列接收到消息:%s,tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());}
}

在这里插入图片描述

3.3 常见面试题

  1. 死信队列的概念
    死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
  2. 来源
    1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
    2. 消息过期
    3. 队列达到最大长度
  3. 死信队列的应用场景
    它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统.
    一般的应用场景还有:
    1. 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
    2. 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占用系统资源.
    3. 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/887310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ChatGPT 桌面版发布了,如何安装?

本章教程教大家如何进行安装。 一、下载安装包 官网地址地址&#xff1a;https://openai.com/chatgpt/desktop/ 支持Windows和MacOS操作系统 二、安装步骤 Windows用户下载之后&#xff0c;会有一个exe安装包&#xff0c;点击运行安装即可。 注意事项&#xff0c;如果Windows操…

uniapp vue2项目迁移vue3项目

uniapp vue2项目迁移vue3项目&#xff0c;必须适配的部分 一、main.js 创建应用实例 // 之前 - Vue 2 import Vue from vue import App from ./App Vue.config.productionTip false // vue3 不再需要 App.mpType app // vue3 不再需要 const app new Vue({ ...App }) …

Brain.js 用于浏览器的 GPU 加速神经网络

Brain.js 是一个强大的 JavaScript 库&#xff0c;它允许开发者在浏览器和 Node.js 环境中构建和训练神经网络 。这个库的目的是简化机器学习模型的集成过程&#xff0c;使得即使是没有深厚机器学习背景的开发者也能快速上手 。 概述 Brain.js 提供了易于使用的 API&#xff…

群核科技首次公开“双核技术引擎”,发布多模态CAD大模型

11月20日&#xff0c;群核科技在杭州举办了第九届酷科技峰会。现场&#xff0c;群核科技首次正式介绍其技术底层核心&#xff1a;基于GPU高性能计算的物理世界模拟器。并对外公开了两大技术引擎&#xff1a;群核启真&#xff08;渲染&#xff09;引擎和群核矩阵&#xff08;CAD…

oracle会话追踪

一 跟踪当前会话 1.1 查看当前会话的SID,SERIAL# #在当前会话里执行 示例&#xff1a; SQL> select distinct userenv(sid) from v$mystat; USERENV(SID) -------------- 1945 SQL> select distinct sid,serial# from v$session where sid1945; SID SERIAL# …

算法-快速排序-Python版详解

原题如下&#xff1a; 给定你一个长度为 n 的整数数列。请你使用快速排序对这个数列按照从小到大进行排序。并将排好序的数列按顺序输出。 输入格式 输入共两行&#xff0c;第一行包含整数 n。 第二行包含 n 个整数&#xff08;所有整数均在 1∼10^9 范围内&#xff09;&am…

strlwr(arr);的模拟实现(c基础)

hi , I am 36 适合对象c语言初学者 strlwr(arr)&#xff1b;函数是把arr数组变为小写字母,并返回arr 链接介绍一下strlwr(arr)&#xff1b;(c基础)-CSDN博客 下面进行My__strlwr(arr);模拟实现 #include<stdio.h> //返回值为arr(地址),于是用指针变量,原数组为字符型…

Hadoop分布式文件系统(一)——HDFS简介

目录 1. HDFS设计目标2. HDFS组件3. HDFS数据复制4. HDFS健壮性4.1 磁盘数据错误&#xff0c;心跳检测和重新复制4.2 集群均衡4.3 数据完整性4.4 元数据磁盘错误4.5 快照 5. HDFS数据组织5.1 数据块存储5.2 流水线复制5.3 文件的删除和恢复 参考 1. HDFS设计目标 1.错误检测和快…

基于UDP和TCP实现回显服务器

目录 一. UDP 回显服务器 1. UDP Echo Server 2. UDP Echo Client 二. TCP 回显服务器 1. TCP Echo Server 2. TCP Echo Client 回显服务器 (Echo Server) 就是客户端发送什么样的请求, 服务器就返回什么样的响应, 没有任何的计算和处理逻辑. 一. UDP 回显服务器 1. UD…

STM32完全学习——使用标准库完成PWM输出

一、TIM2初始化 我这里使用的是STM32F407ZGT6这个芯片&#xff0c;我这里使用的是定时器TIM2来完成PWM输出&#xff0c;由于这里没有使用中断&#xff0c;因此不需要初始化NVIC&#xff0c;下面先来进行定时器的相关初始化 TIM_TimeBaseInitTypeDef TIM_TimeBaseInitStruct;R…

Qt Qt::UniqueConnection 底层调用

在这里插入图片描述 步骤1&#xff1a; 1&#xff1a;判断槽函数连接方式&#xff0c; 以及信号对象是否有效2&#xff1a; 信号计算格式是否 大于 signal_index 目前调试 signal_index 不太清楚怎末计算的&#xff08;有清楚的帮忙街道&#xff09;3&#xff1a;获取槽函数对…

7-10 解一元二次方程

7-10 解一元二次方程 分数 20 全屏浏览 切换布局 作者 李祥 单位 湖北经济学院 请编写程序&#xff0c;解一元一次方程 ax2bxc0 。 已知一元二次方程的求根公式为&#xff1a; 要求&#xff1a; 若 a0&#xff0c;则为一元一次方程。 若 b0&#xff0c;则方程有唯一解&…

Oracle - 多区间按权重取值逻辑 ,分时区-多层级-取配置方案(三)

本篇紧跟第一篇&#xff0c; 和 第二篇无关 Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案 Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案(二) 先说需求&#xff1a; 某业务配置表&#xff0c;按配置的时间区间及组织层级取方…

(免费送源码)计算机毕业设计原创定制:Java+JSP+HTML+JQUERY+AJAX+MySQL springboot计算机类专业考研学习网站管理系统

摘 要 大数据时代下&#xff0c;数据呈爆炸式地增长。为了迎合信息化时代的潮流和信息化安全的要求&#xff0c;利用互联网服务于其他行业&#xff0c;促进生产&#xff0c;已经是成为一种势不可挡的趋势。在大学生在线计算机类专业考研学习网站管理的要求下&#xff0c;开发一…

Varjo:垂直起降机混合现实培训解决方案

混合电动垂直起降机&#xff08;VTOL&#xff09;作为一种新型的航空运输机具有超越传统汽车的安全性、与飞机相当的速度以及无与伦比的灵活起降功能。电动垂直起降机能够在建筑顶部、直升机场或是没有跑道的地区起飞或降落&#xff0c;且排放要远远低于由航空汽油驱动的传统飞…

Android 实现悬浮球的功能

Android 实现悬浮球的功能 在 Android 中&#xff0c;实现悬浮球可以通过以下方式实现&#xff0c;常见的方法是使用 WindowManager 创建一个悬浮窗口。以下是具体的实现步骤&#xff1a; 1. 配置权限 在 AndroidManifest.xml 中添加悬浮窗权限&#xff1a; <uses-permis…

[Python3学习笔记-基础语法] Python3 基础语法

本篇文章详细介绍Python3的基础语法&#xff0c;主要包括编码、标识符、Python保留字、注释、行缩进、多行语句、Number类型、字符串、空行、print打印等。 这些是Python最基础的东西&#xff0c;掌握好了才能更好的学习后续的内容。 有兴趣共同结伴学习Python的朋友&#xff0…

RabbitMQ3:Java客户端快速入门

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

Gradio学习笔记记录

安装指令&#xff1a;pip install gradio方法介绍 Interface》用于构建一些简单的页面&#xff0c;可以直接用这个指令搞定 形式》接收三个参数分别为处理函数、输入、输出三部分&#xff0c;呈现一般左/上为输入&#xff0c;右或下为输出 fn&#xff1a;将用户界面 &#xff0…

✅ Qt流式布局

Qt流式布局 前段时间&#xff0c;曾经对某个软件的一个“流式布局”有点感兴趣&#xff0c;什么叫“流式布局”呢&#xff1f;请看下图: 简而言之&#xff0c;流式布局就是布局应能够根据界面尺寸的变化自动调整其内部控件的位置。然而&#xff0c;Qt 提供的标准布局&#xff…