RabbitMQ5-死信队列

目录

死信的概念

死信的来源

死信实战

死信之TTl

死信之最大长度

死信之消息被拒


死信的概念

死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息 TTL 过期

    TTL是Time To Live的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 mq 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue=false

死信实战

死信之TTl

消费者 C1:

package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}

生产者:

package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}

消费者 C2:

package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

启动 C1 ,之后关闭消费者,模拟其接收不到消息,再启动 Producer:

启动 C2 消费者,它消费死信队列里面的消息:

死信之最大长度

消费者 C1:

package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//设置正常队列的长度限制params.put("x-max-length",6);//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}

生产者:

package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}

消费者 C2:

package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

死信之消息被拒

拒收消息 "info5"

消费者 C1:

package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启手动应答channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});}}

生产者:

package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}

消费者 C2:

package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/67312.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10JavaWeb——SpringBootWeb案例01

前面我们已经讲解了Web前端开发的基础知识&#xff0c;也讲解了Web后端开发的基础(HTTP协议、请求响应)&#xff0c;并且也讲解了数据库MySQL&#xff0c;以及通过Mybatis框架如何来完成数据库的基本操作。 那接下来&#xff0c;我们就通过一个案例&#xff0c;来将前端开发、后…

JAVA 接口、抽象类的关系和用处 详细解析

接口 - Java教程 - 廖雪峰的官方网站 一个 抽象类 如果实现了一个接口&#xff0c;可以只选择实现接口中的 部分方法&#xff08;所有的方法都要有&#xff0c;可以一部分已经写具体&#xff0c;另一部分继续保留抽象&#xff09;&#xff0c;原因在于&#xff1a; 抽象类本身…

ResNeSt: Split-Attention Networks论文学习笔记

这张图展示了一个名为“Split-Attention”的神经网络结构&#xff0c;该结构在一个基数组&#xff08;cardinal group&#xff09;内进行操作。基数组通常指的是在神经网络中处理的一组特征或通道。图中展示了如何通过一系列操作来实现对输入特征的注意力机制。 以下是图中各部…

设计模式Python版 原型模式

文章目录 前言一、原型模式二、原型模式示例三、原型管理器 前言 GOF设计模式分三大类&#xff1a; 创建型模式&#xff1a;关注对象的创建过程&#xff0c;包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模式。结构型模式&#xff1a;关注类和对…

神经网络的通俗介绍

人工神经网络&#xff0c;是一种模仿人类大脑工作原理的数学模型。人类的大脑是由无数的小“工作站”组成的&#xff0c;每个工作站叫做“神经元”。这些神经元通过“电线”互相连接&#xff0c;负责接收、处理和传递信息。 一、人类大脑神经网络 人类大脑的神经网络大概长这…

OpenEuler学习笔记(八):安装OpenEuler

在VMware Workstation中安装OpenEuler 准备工作 下载并安装VMware Workstation虚拟机软件。前往OpenEuler官网下载OpenEuler系统镜像文件。 创建虚拟机 打开VMware Workstation&#xff0c;点击“创建新的虚拟机”&#xff0c;选择“自定义”&#xff0c;点击“下一步”。选择…

Leetcode::119. 杨辉三角 II

119. 杨辉三角 II 已解答 简单 相关标签 相关企业 给定一个非负索引 rowIndex&#xff0c;返回「杨辉三角」的第 rowIndex 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: rowIndex 3 输出: [1,3,3,1]示例 2: 输入: rowIndex 0…

让Android adb支持互联网调试脱离局域网

某些特殊场景下由于不方便&#xff0c;手机不在身边&#xff0c;但需要进行adb调试。 首先可以先开启adb的无线调试模式&#xff0c;我使用的是第二种方式。 在Android手机上安装一个终端模拟器&#xff0c;并赋予root权限&#xff0c;随后执行&#xff1a; setprop service.…

Dest1ny漏洞库:用友 U8-CRM 系统 ajaxgetborrowdata.php 存在 SQL 注入漏洞

用友U8-CRM系统ajaxgetborrowdata.php存在SQL注入漏洞&#xff0c;文件多个方法存在SQL注入漏洞&#xff0c;未经身份验证的攻击者通过漏洞执行任意SQL语句&#xff0c;调用xp_cmdshell写入后门文件&#xff0c;执行任意代码&#xff0c;从而获取到服务器权限。 hunter app.n…

能说说MyBatis的工作原理吗?

大家好&#xff0c;我是锋哥。今天分享关于【Redis为什么这么快?】面试题。希望对大家有帮助&#xff1b; 能说说MyBatis的工作原理吗&#xff1f; MyBatis 是一款流行的持久层框架&#xff0c;它通过简化数据库操作&#xff0c;帮助开发者更高效地与数据库进行交互。MyBatis…

DeepSeek崛起:中国AI新星如何撼动全球资本市场格局

引言 近期&#xff0c;中国人工智能实验室DeepSeek发布的两款开源模型——DeepSeek V3和DeepSeek R1——以其优异的性能和低廉的成本迅速爆火&#xff0c;引发了全球资本市场的震动&#xff0c;尤其对美国资本市场产生了显著影响。DeepSeek R1更是能够在数学、代码和推理任务上…

将5分钟安装Thingsboard 脚本升级到 3.9

稍微花了一点时间&#xff0c;将5分钟安装Thingsboard 脚本升级到最新版本 3.9。 [rootlab5 work]# cat one-thingsboard.shell echo "test on RHEL 8.10 " source /work/java/install-java.shell source /work/thingsboard/thingsboard-rpm.shell source /work/po…

算法刷题Day30

题目链接 描述 解题思路 考点&#xff1a;动态规划 dp[i][j]表示当前坐标的最小路径和dp初始化状态转移&#xff1a; dp[i][j] matrix[i][j] min(dp[i-1][j],dp[i][j-1]) 比较正上方和正左方的路径和哪个小。取小的那条路 代码 import copy class Solution:def minPathS…

大数据Hadoop入门2

目录 第三部分&#xff08;Hadoop MapReduce和Hadoop YARN&#xff09; 1.课程内容-大纲-学习目标 2.理解先分再合、分而治之的思想 3.hadoop团队针对MapReduce的设计构思 4.Hadoop MapReduce介绍、阶级划分和进程组成 5.Hadoop MapReduce官方示例-圆周率PI评估 6.Hadoo…

基于ESP8266的多功能环境监测与反馈系统开发指南

项目概述 本系统集成了物联网开发板、高精度时钟模块、环境传感器和可视化显示模块&#xff0c;构建了一个智能环境监测与反馈装置。通过ESP8266 NodeMCU作为核心控制器&#xff0c;结合DS3231实时时钟、DHT11温湿度传感器、光敏电阻和OLED显示屏&#xff0c;实现了环境参数的…

开发环境搭建-3:配置 JavaScript 开发环境 (fnm+ nodejs + pnpm + nrm)

在 WSL 环境中配置&#xff1a;WSL2 (2.3.26.0) Oracle Linux 8.7 官方镜像 node 官网&#xff1a;https://nodejs.org/zh-cn/download 点击【下载】&#xff0c;选择想要的 node 版本、操作系统、node 版本管理器、npm包管理器 根据下面代码提示依次执行对应代码即可 基本概…

npm:升级自身时报错:EBADENGINE

具体报错信息如下&#xff1a; 1.原因分析 npm和当前的node版本不兼容。 // 当前实际版本: Actual: {"npm":"10.2.4","node":"v20.11.0"}可以通过官网文档查看与自己 node 版本 兼容的是哪一版本的npm&#xff0c;相对应进行更新即可…

WPS数据分析000005

目录 一、数据录入技巧 二、一维表 三、填充柄 向下自动填充 自动填充选项 日期填充 星期自定义 自定义序列 1-10000序列 四、智能填充 五、数据有效性 出错警告 输入信息 下拉列表 六、记录单 七、导入数据 ​编辑 八、查找录入 会员功能 Xlookup函数 VL…

翼星求生服务器搭建【Icarus Dedicated Server For Linux】

一、前言 本次搭建的服务器为Steam平台一款名为Icarus的沙盒、生存、建造游戏,由于官方只提供了Windows版本服务器导致很多热爱Linux的小伙伴无法释怀,众所周知Linux才是专业服务器的唯一准则。虽然Github上已经有大佬制作了容器版本但是容终究不够完美,毕竟容器无法与原生L…

机器学习-线性回归(参数估计之经验风险最小化)

给定一组包含 &#x1d441; 个训练样本的训练集 我们希望能够 学习一个最优的线性回归的模型参数 &#x1d498; 现在我们来介绍线性回归的一种模型参数估计方法&#xff1a;经验风险最小化。 我们前面说过&#xff0c;对于标签 &#x1d466; 和模型输出都为连续的实数值&…