[RabbitMQ] 重试机制+TTL+死信队列

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 重试机制
    • 1.1 重试配置
    • 1.2 配置交换机队列
    • 1.3 发送消息
    • 1.4 消费消息
    • 1.5 运行测试
    • 1.6 手动确认
  • 2. TTL
    • 2.1 设置消息TTL
    • 2.2 设置消息队列的TTL
    • 2.3 两者的区别
  • 3. 死信队列
    • 3.1 死信队列的概念
    • 3.2 代码示例
      • 3.2.1 声明队列和交换机
      • 3.2.2 正常队列与死信交换机的绑定
      • 3.2.3 制造死信产生的条件
      • 3.2.4 发送消息
      • 3.2.5 测试死信
    • 3.3 常见面试题

1. 重试机制

在消息传递的过程中,可能会遇到各种问题,比如网络故障,服务不可用,资源不足等,这些问题都可能会导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败之后重新发送.
我们也可以对重试机制设置重试次数.超过了重试的限制次数,如果没有对队列进行死信队列的绑定,那么该消息就会发生丢失.

1.1 重试配置

  rabbitmq:host: 182.92.204.253port: 5672username: jiangruijiapassword: *****virtual-host: /listener:simple:acknowledge-mode: autoretry:initial-interval: 5000msenabled: truemax-attempts: 5
  • initial-interval: 表示初始失败的等待时间为5s
  • enabled: 表示开启消费者失败重试
  • max-attempts: 最大重试次数,包括首次发送
  • 注意: 重试机制在自动确认模式之下才会生效,如果配置为手动确认,则重试机制不会生效.

1.2 配置交换机队列

public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE = "retry_exchange";
@Bean
public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).durable(true).build();
}
@Bean
public Queue retryQueue(){return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
@Bean
public Binding retryBinding(@Qualifier("retryExchange") DirectExchange exchange,@Qualifier("retryQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("retry");
}

1.3 发送消息

@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","retry消息");return "发送成功";
}

1.4 消费消息

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message) throws UnsupportedEncodingException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());int i = 3/0;System.out.println("处理完成");}
}

1.5 运行测试

使用Postman调用接口,观察控制台信息
在这里插入图片描述
我们看到由于消费者发生异常,消息在不停地进行重试,重试5次之后,抛出了异常.
如果我们对异常进行捕获,则不会进行重试.

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message) throws UnsupportedEncodingException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {int i = 3/0;}catch (Exception e){System.out.println("处理失败");}System.out.println("处理完成");}
}

在这里插入图片描述

1.6 手动确认

如果我们把确认接收机制改为手动确认机制

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message, Channel channel) throws IOException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {Thread.sleep(1000);int i = 3/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){System.out.println("处理失败");channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}System.out.println("处理完成");}
}

运行结果:
在这里插入图片描述
我们看到,在手动控制模式的时候,重试的次数不会像自动控制那样直接生效,因为是否重试以及何时重试更多地取决于应用程序和消费者的实现逻辑.

2. TTL

TTL,即过期时间,RabbitMQ可以对消息和队列设置TTL.
当消息到达存活时间之后,还没有被消费,就会被自动清除.

2.1 设置消息TTL

目前有两种方法可以设置消息的TTL.一是设置队列的TTL,队列中所有消息都有相同的过期时间,二是设置每条消息的过期时间,每条消息都可以有不同的TTL.如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准.
首先我们针对每条消息设置TTL.方法就是我们在发送消息的时候对消息的expiretion参数进行设置,单位为毫秒.
我们先来配置交换机和队列.

public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE = "ttl_exchange";
@Bean
public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).durable(true).build();
}
@Bean
public Queue ttlQueue(){return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl");
}

发送消息

@RequestMapping("/ttl")
public String ttl(){String msg = "ttl消息";Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));message.getMessageProperties().setExpiration("10000");//设置消息过期时间为10srabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl",message);return "发送信息";
}

运行程序,观察结果:

  1. 发送消息之后,Ready消息为1
    在这里插入图片描述
  2. 过了10s之后,发现消息被删除
    在这里插入图片描述

2.2 设置消息队列的TTL

设置队列TTL的方法是在创建队列时,加入x-message-ttl参数实现,单位为毫秒.
设置队列和绑定关系

@Bean
public Queue ttlQueue2(){Map<String,Object> map = new HashMap<>();map.put("x-message-ttl",10000);//设置10s过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(map).build();
}
@Bean
public Binding ttl2Binding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl2");
}

队列的声明也可以简写为:

@Bean
public Queue ttlQueue2(){return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(10000).build();
}

发送消息:

@RequestMapping("/ttl2")
public String ttl2(){rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","ttl2消息");return "发送成功";
}

运行程序之后观察结果:
运行之后发现,新增了一个队列,队列有一个TTL标识.
在这里插入图片描述
使用Postman调用接口发送消息:
在这里插入图片描述
10s之后发现队列中的消息被删除:
在这里插入图片描述

2.3 两者的区别

设置队列的ttl属性方法,一旦消息过期,就会立即从队列中删除.如果消息设置ttl,即使消息过期,也不会立即从队列中删除,而是在即将投递到消费者之前进行判定,如果过期了,才进行删除.
这两种方法处理过期消息的原理如下:
设置队列的过期时间,队列中已经过期的消息肯定在队列头部,RabbitMQ只需要定期扫描队头是否有过期的消息即可.
而设置消息的TTL的方式,每条消息的过期时间不同,如果定期扫描整个队列,效率是非常低的,所以不如等到此消息即将被消费时再判定是否过期.

3. 死信队列

3.1 死信队列的概念

死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
有死信,就会有死信队列,当一个消息在一个队列中变为死信之后,它能被重新被发送到另一个交换机中,这个交换机就是DLX(Dead Letter Exchange),绑定DLX的队列,就被称为死信队列(DLQ,Dead Letter Queue).
在这里插入图片描述
消息变为死信一般是由于一下的几种情况:

  1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
  2. 消息过期
  3. 队列达到最大长度

3.2 代码示例

3.2.1 声明队列和交换机

交换机和队列的声明包含两部分:

  • 正常声明的交换机和队列
  • 声明死信队列和死信的交换机
    死信的交换机和队列和普通的交换机队列在声明的时候没有什么区别.
public static final String NORMAL_QUEUE = "normal_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
@Bean
public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange,@Qualifier("normalQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Queue dlxQueue(){return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange exchange,@Qualifier("dlxQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx");
}

3.2.2 正常队列与死信交换机的绑定

当一个队列中存在死信的时候,RabbitMQ会自动把这个消息重新发布到设置的DLX交换机上,进而被路由到另一个队列,即死信队列.
绑定的时候需要为普通队列设置x-dead-letter-exchangex-dead-letter-routing-key两个参数,第一个参数是绑定的死信交换机,第二个参数是死信队列的路由键.

@Bean
public Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE);map.put("x-dead-letter-routing-key","dlx");return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(map).build();
}

上面的map可以简写为:

@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").build();
}

3.2.3 制造死信产生的条件

我们通过设置队列长度限制或者是设置消息的过期时间来制造死信.如果队列中的消息过期,或者是队列中的消息超过队列的长度,全部会被路由到死信队列.

@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").ttl(10000).maxLength(10L).build();
}

3.2.4 发送消息

@RequestMapping("/normal")
public String normal(){rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal消息");return "发送成功";
}

3.2.5 测试死信

  1. 启动程序之后观察队列
    在这里插入图片描述
    我们发现队列normal_queue有5个标签,则会五个标签分别代表该队列的5个属性:
    • D: durable的缩写,设置持久化.
    • TTL: Time to Live,队列的TTL.
    • Lim: 队列设置了长度(x-max-length).
    • DLX: 死信队列的交换机
    • DLK: 死信队列的路由键
  2. 测试到达过期时间
    接下来我们给正常队列中发送一条消息.
    在这里插入图片描述
    10秒钟之后,队列过期,消息也会过期,所以消息就会进入死信队列.
    在这里插入图片描述
    在这里插入图片描述
  3. 测试超出队列长度
    我们给队列中一次性发送20条消息
    在这里插入图片描述
    我们发现其中的10条消息会直接进入死信队列.
  4. 测试消息被拒收
    编写消费者代码,并抛出异常,普通队列消费者确认接收机制使用手动应答.
@Component
public class NormalListener {@RabbitListener(queues = Constant.NORMAL_QUEUE)public void normalListener(Message message, Channel channel) throws IOException, InterruptedException {System.out.printf("接收到消息%s,tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {System.out.println("处理消息");int i = 3/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){System.out.println("消息处理失败");Thread.sleep(1000);//拒绝接收之后不放回原队列中,则会放入死信队列channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}
}
@Component
public class DLXListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void dlxListener(Message message) throws UnsupportedEncodingException {System.out.printf("死信队列接收到消息:%s,tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());}
}

在这里插入图片描述

3.3 常见面试题

  1. 死信队列的概念
    死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
  2. 来源
    1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
    2. 消息过期
    3. 队列达到最大长度
  3. 死信队列的应用场景
    它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统.
    一般的应用场景还有:
    1. 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
    2. 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占用系统资源.
    3. 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/887310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ChatGPT 桌面版发布了,如何安装?

本章教程教大家如何进行安装。 一、下载安装包 官网地址地址&#xff1a;https://openai.com/chatgpt/desktop/ 支持Windows和MacOS操作系统 二、安装步骤 Windows用户下载之后&#xff0c;会有一个exe安装包&#xff0c;点击运行安装即可。 注意事项&#xff0c;如果Windows操…

真实网络安全面试场景题

1.公司内部搭建了2台DNS服务器做主辅同步&#xff0c;公司的业务官网地址为 www.chinaddic.com。小明作为网络管理员把域名添加至DNS服务器进行测试。 问题1:使用自己电脑可以正常访问刚添加的域名&#xff0c;但处于同样网络环境同事电脑却访问不了。 出现此问题原因…

uniapp vue2项目迁移vue3项目

uniapp vue2项目迁移vue3项目&#xff0c;必须适配的部分 一、main.js 创建应用实例 // 之前 - Vue 2 import Vue from vue import App from ./App Vue.config.productionTip false // vue3 不再需要 App.mpType app // vue3 不再需要 const app new Vue({ ...App }) …

Brain.js 用于浏览器的 GPU 加速神经网络

Brain.js 是一个强大的 JavaScript 库&#xff0c;它允许开发者在浏览器和 Node.js 环境中构建和训练神经网络 。这个库的目的是简化机器学习模型的集成过程&#xff0c;使得即使是没有深厚机器学习背景的开发者也能快速上手 。 概述 Brain.js 提供了易于使用的 API&#xff…

群核科技首次公开“双核技术引擎”,发布多模态CAD大模型

11月20日&#xff0c;群核科技在杭州举办了第九届酷科技峰会。现场&#xff0c;群核科技首次正式介绍其技术底层核心&#xff1a;基于GPU高性能计算的物理世界模拟器。并对外公开了两大技术引擎&#xff1a;群核启真&#xff08;渲染&#xff09;引擎和群核矩阵&#xff08;CAD…

oracle会话追踪

一 跟踪当前会话 1.1 查看当前会话的SID,SERIAL# #在当前会话里执行 示例&#xff1a; SQL> select distinct userenv(sid) from v$mystat; USERENV(SID) -------------- 1945 SQL> select distinct sid,serial# from v$session where sid1945; SID SERIAL# …

操作系统加码主动防护:数智化有了“安全底座”

开发者圈子里流行着一个有意思的观点&#xff1a;操作系统每过20年左右&#xff0c;就会出现一次跨越式发展机遇。 上世纪六十年代开始的大型机&#xff0c;到上世纪八十年代的个人计算机&#xff0c;再到本世纪初互联网的崛起&#xff0c;无不和上述观点相吻合&#xff1a;操…

算法-快速排序-Python版详解

原题如下&#xff1a; 给定你一个长度为 n 的整数数列。请你使用快速排序对这个数列按照从小到大进行排序。并将排好序的数列按顺序输出。 输入格式 输入共两行&#xff0c;第一行包含整数 n。 第二行包含 n 个整数&#xff08;所有整数均在 1∼10^9 范围内&#xff09;&am…

【Apache paimon】-- 7 -- tag 创建与管理

目录 1、前言 2、操作说明 2.1、自动创建与删除 Tag 2.2、手动创建和删除 tag 2.3、回退到指定Tag 2.4、查询 tag 元数据和数据 2.4.1、查询 tag 元数据 2.4.2、查询 tag 数据 2.4.3、情景1:读取指定 tag 的数据 2.4.4、情景2:读取两个 tag 间的增量数据 3、其他配…

C++笔记之函数入参传递std::unique_ptr 时使用 std::move的场景

C++笔记之函数入参传递std::unique_ptr 时使用 std::move的场景 code review! 参考笔记 C++笔记之unique_ptr转移堆内空间的所有权 文章目录 C++笔记之函数入参传递std::unique_ptr 时使用 std::move的场景一.使用 std::unique_ptr 作为函数参数时的主要场景二.一个完整示例一…

找到最大“葫芦”组合

文章目录 问题描述解题思路分析1. 数据预处理2. 特殊情况处理3. 普通情况计算4. 结果输出 Java代码实现复杂度分析与优化 在经典德州扑克中&#xff0c;“葫芦”是一种较强的牌型。它由五张牌组成&#xff0c;其中三张牌面值相同&#xff0c;另外两张牌面值也相同。本文将探讨一…

strlwr(arr);的模拟实现(c基础)

hi , I am 36 适合对象c语言初学者 strlwr(arr)&#xff1b;函数是把arr数组变为小写字母,并返回arr 链接介绍一下strlwr(arr)&#xff1b;(c基础)-CSDN博客 下面进行My__strlwr(arr);模拟实现 #include<stdio.h> //返回值为arr(地址),于是用指针变量,原数组为字符型…

Hadoop分布式文件系统(一)——HDFS简介

目录 1. HDFS设计目标2. HDFS组件3. HDFS数据复制4. HDFS健壮性4.1 磁盘数据错误&#xff0c;心跳检测和重新复制4.2 集群均衡4.3 数据完整性4.4 元数据磁盘错误4.5 快照 5. HDFS数据组织5.1 数据块存储5.2 流水线复制5.3 文件的删除和恢复 参考 1. HDFS设计目标 1.错误检测和快…

基于UDP和TCP实现回显服务器

目录 一. UDP 回显服务器 1. UDP Echo Server 2. UDP Echo Client 二. TCP 回显服务器 1. TCP Echo Server 2. TCP Echo Client 回显服务器 (Echo Server) 就是客户端发送什么样的请求, 服务器就返回什么样的响应, 没有任何的计算和处理逻辑. 一. UDP 回显服务器 1. UD…

银行卡OCR识别API接口的作用有哪些?

在当今数字化高速发展的时代&#xff0c;各种创新技术不断涌现&#xff0c;为我们的生活带来了极大的便利。其中&#xff0c;银行卡 OCR 识别 API 接口就是一项非常实用的技术&#xff0c;它提高了业务办理的效率&#xff0c;准确性高&#xff0c;便捷性强&#xff0c;安全性高…

STM32完全学习——使用标准库完成PWM输出

一、TIM2初始化 我这里使用的是STM32F407ZGT6这个芯片&#xff0c;我这里使用的是定时器TIM2来完成PWM输出&#xff0c;由于这里没有使用中断&#xff0c;因此不需要初始化NVIC&#xff0c;下面先来进行定时器的相关初始化 TIM_TimeBaseInitTypeDef TIM_TimeBaseInitStruct;R…

Qt Qt::UniqueConnection 底层调用

在这里插入图片描述 步骤1&#xff1a; 1&#xff1a;判断槽函数连接方式&#xff0c; 以及信号对象是否有效2&#xff1a; 信号计算格式是否 大于 signal_index 目前调试 signal_index 不太清楚怎末计算的&#xff08;有清楚的帮忙街道&#xff09;3&#xff1a;获取槽函数对…

7-10 解一元二次方程

7-10 解一元二次方程 分数 20 全屏浏览 切换布局 作者 李祥 单位 湖北经济学院 请编写程序&#xff0c;解一元一次方程 ax2bxc0 。 已知一元二次方程的求根公式为&#xff1a; 要求&#xff1a; 若 a0&#xff0c;则为一元一次方程。 若 b0&#xff0c;则方程有唯一解&…

Oracle - 多区间按权重取值逻辑 ,分时区-多层级-取配置方案(三)

本篇紧跟第一篇&#xff0c; 和 第二篇无关 Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案 Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案(二) 先说需求&#xff1a; 某业务配置表&#xff0c;按配置的时间区间及组织层级取方…

prompt

1解释概念 中文指令&#xff1a;请借助费曼学习法&#xff0c;以简单的语言解释[特定概念]是什么&#xff0c;并提供一个例子来说明它如何应用。 Prompt:Please use the Feynman Learning Technique to explain [specific concept] insimple language,and provide an example …