RabbitMQ死信队列

RabbitMQ死信队列

1、过期时间TTL

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被

删除。RabbitMQ可以对消息和队列设置TTL,目前有两种方法可以设置:

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。

  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者TTL较小的那个数值为准。消息在队列的生存时间一旦超

过设置的TTL值,就称为dead message被投递到死信队列,消费者将无法再收到该消息。

1.1 设置队列TTL

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version><relativePath/></parent><groupId>com.example</groupId><artifactId>spring-boot-rabbitmq-ttl</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq-ttl</name><description>spring-boot-rabbitmq-ttl</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置类

package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TTLRabbitMQConfiguration {// 1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange() {return new DirectExchange("ttl_direct_exchange", true, false);}// 2.队列的过期时间@Beanpublic Queue directttlQueue() {//设置过期时间Map<String, Object> args = new HashMap<>();//这里一定是int类型args.put("x-message-ttl", 5000);return new Queue("ttl.direct.queue", true, false, false, args);}@Beanpublic Binding ttlBingding() {return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");}
}

Service

package com.example.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder() {//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生产成功:" + orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_direct_exchange";String routingKey = "ttl";// 队列中会产生一条消息并且5秒钟后会消失rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);}
}

启动类

package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitmqDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqDemoApplication.class, args);}}

配置文件

# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030

测试

package com.example;import com.example.service.OrderService;
import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {@Autowiredprivate OrderService orderService;@Testpublic void test() {orderService.makeOrder();}}
订单生产成功:8a965457-330b-4e2b-9087-a40cbfdee033

在这里插入图片描述

在这里插入图片描述

1.2 设置消息TTL

配置类

package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TTLRabbitMQConfiguration1 {//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttlMessageDirectExchange() {return new DirectExchange("ttl_message_direct_exchange", true, false);}@Beanpublic Queue directttlMessageQueue() {return new Queue("ttl.message.direct.queue", true, false, false);}@Beanpublic Binding ttlMessageBingding() {return BindingBuilder.bind(directttlMessageQueue()).to(ttlMessageDirectExchange()).with("ttlmessage");}
}

Service

package com.example.service;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService1 {@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder() {//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生产成功:" + orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_message_direct_exchange";String routingKey = "ttlmessage";//给消息设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {public Message postProcessMessage(Message message) {//这里就是字符串message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}};rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor);}
}

测试类

package com.example;import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest1 {@Autowiredprivate OrderService1 orderService1;@Testpublic void test1() {orderService1.makeOrder();}}
订单生产成功:9eb9e120-1379-4e54-bc43-1944b3c22713

在这里插入图片描述

在这里插入图片描述

2、死信队列

DLX,全称 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变

成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列。消

息变成死信,可能是由于以下原因:

  • 消息被拒绝

  • 消息过期

  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队

列的属性,当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由

到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可。

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version><relativePath/></parent><groupId>com.example</groupId><artifactId>spring-boot-rabbitmq-dlx</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq-dlx</name><description>spring-boot-rabbitmq-dlx</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置类

package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadRabbitMqConfiguration {//1.声明注册direct模式的交换机@Beanpublic DirectExchange deadDirect() {return new DirectExchange("dead_direct_exchange", true, false);}//2.队列的过期时间@Beanpublic Queue deadQueue() {return new Queue("dead.direct.queue", true);}@Beanpublic Binding deadbinds() {return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");}
}
package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TTLRabbitMQConfiguration {//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange() {return new DirectExchange("ttl_direct_exchange", true, false);}//2.队列的过期时间@Beanpublic Queue directttlQueue() {//设置过期时间Map<String, Object> args = new HashMap<>();// ttl队列最大可以接受5条消息,超过的条数也是会被移入死信队列,过期之后依然会被移入死信队列// args.put("x-max-length",5);// 这里一定是int类型args.put("x-message-ttl", 5000);// 死信队列的交换机args.put("x-dead-letter-exchange", "dead_direct_exchange");// fanout不需要配置// 路由args.put("x-dead-letter-routing-key", "dead");return new Queue("ttl.direct.queue", true, false, false, args);}@Beanpublic Binding ttlBingding() {return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");}}

Service

package com.example.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder() {//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生产成功:" + orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_direct_exchange";String routingKey = "ttl";// 队列中会产生一条消息并且5秒钟后会消失rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);}
}

启动类

package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitmqDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqDemoApplication.class, args);}}

配置文件

# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030

测试类

package com.example;import com.example.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {@Autowiredprivate OrderService orderService;@Testpublic void test() {orderService.makeOrder();}}
订单生产成功:74adb096-734c-4484-8947-032ef1ee7c5d

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/51615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】文件操作 -- 详解

一、什么是文件 磁盘上的文件是文件。 1、为什么要使用文件 举个例子&#xff0c;当我们想实现一个 “通讯录” 程序时&#xff0c;在通讯录中新建联系人、删除联系人等一系列操作&#xff0c;此时的数据存储于内存中&#xff0c;程序退出后所有数据都会随之消失。为了让通讯录…

第60步 深度学习图像识别:误判病例分析(Pytorch)

基于WIN10的64位系统演示 一、写在前面 上期内容基于Tensorflow环境做了误判病例分析&#xff08;传送门&#xff09;&#xff0c;考虑到不少模型在Tensorflow环境没有迁移学习的预训练模型&#xff0c;因此有必要在Pytorch环境也搞搞误判病例分析。 本期以SqueezeNet模型为…

窗口看门狗

从下往上看: 1. 时钟设置 RCC_APB1PeriphClockCmd(RCC_APB1Periph_WWDG,ENABLE);//使能独立看门狗时钟 WWDG_SetPrescaler(WWDG_Prescaler_8);//看门狗预分频器WWDG counter clock (PCLK1/4096)/8 2.设置窗口值 实际就是设置WWDG_CR的低七位值, 但是这个值要大于0x40(也就是…

【分享】小型园区组网场景

小型园区组网图 在小型园区中&#xff0c;S2700&S3700通常部署在网络的接入层&#xff0c;S5700&S6700通常部署在网络的核心&#xff0c;出口路由器一般选用AR系列路由器。 接入交换机与核心交换机通过Eth-Trunk组网保证可靠性。 每个部门业务划分到一个VLAN中&#…

Vulnhub: DriftingBlues: 2靶机

kali&#xff1a;192.168.111.111 靶机&#xff1a;192.168.111.207 信息收集 端口扫描 nmap -A -sC -v -sV -T5 -p- --scripthttp-enum 192.168.111.207 80端口的/blog目录为wordpress wpscan收集wordpress用户和爆破密码 wpscan --url http://driftingblues.box/blog -e…

非凸联合创始人李佐凡受邀出席复旦DSBA项目座谈会

8月17日&#xff0c;非凸科技联合创始人&CTO李佐凡受邀参加复旦管院数据科学与商业分析专业硕士&#xff08;DS&BA&#xff09;项目发展座谈会&#xff0c;与学校教授、老师在生源背景、课程教学、职业发展、学生培养和企业合作方面进行深入交流&#xff0c;旨在更好地…

【C++练习】普通方法+利用this 设置一个矩形类(Rectangle), 包含私有成员长(length)、 宽(width), 定义一下成员函数

题目 设置一个矩形类(Rectangle), 包含私有成员长(length)、 宽(width), 定义成员函数: void set_ len(int l); //设置长度 设置宽度void set_ wid(int w); 获取长度: int get len(); 获取宽度: int get _wid); 显示周长和面积: v…

民族传统文化分享系统uniapp 微信小程序

管理员、用户可通过Android系统手机打开系统&#xff0c;注册登录后可进行管理员后端&#xff1b;首页、个人中心、用户管理、知识分类管理、知识资源管理、用户分享管理、意见反馈、系统管理&#xff0c;用户前端&#xff1b;首页、知识资源、用户分享、我的等。 本系统的使用…

GO-vscode远程开发和调试

本文内容主要包括&#xff1a; 概述&#xff1a; 主要就是把代码放到服务器上然后远程去开发和调试 工具&#xff1a; vscode 远程端&#xff1a; linux 一.安装远程插件 vscode安装Remote - SSH&#xff0c;Remote Explorer&#xff0c;Remote Development&#xff0c…

卷积过程详细讲解

1&#xff1a;单通道卷积 以单通道卷积为例&#xff0c;输入为&#xff08;1,5,5&#xff09;&#xff0c;分别表示1个通道&#xff0c;宽为5&#xff0c;高为5。假设卷积核大小为3x3&#xff0c;padding0&#xff0c;stride1。 卷积过程如下&#xff1a; 相应的卷积核不断…

企微配置回调服务

1、企微配置可信域名 2、企微获取成员userID 3、企微获取用户敏感数据 4、企微配置回调服务 文章目录 一、简介1、概述2、相关文档地址 二、企微配置消息服务器1、配置消息接收参数2、参数解析3、参数拼接规则 三、代码编写—使用已有库1、代码下载2、代码修改3、服务代码编写 …

基于swing的超市信息管理系统java jsp仓库进销存mysql源代码

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 基于swing的超市信息管理系统 系统有1权限&#xff1…

Linux驱动开发(Day5)

思维导图&#xff1a; 不同设备号文件绑定&#xff1a;

回归预测 | MATLAB实现SA-ELM模拟退火算法优化极限学习机多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现SA-ELM模拟退火算法优化极限学习机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现SA-ELM模拟退火算法优化极限学习机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本…

ElasticSearch-集成ik分词器

本文已收录于专栏 《中间件合集》 目录 背景介绍版本选择优势说明集成过程1.下载安装包2.解压安装包3.重启ElasticSearch服务3.1通过ps -ef | grep elastic查看正在启动的es进程号3.2使用kill -9 xxx 杀死进程3.3使用 ./elasticsearch 启动es服务 分词测试细粒度分词方式分词请…

CTFshow 限时活动 红包挑战9 详细题解

CTFshow红包挑战9 题目源码开源了。源码如下&#xff1a; common.php <?phpclass user{public $id;public $username;private $password;public function __toString(){return $this->username;}}class cookie_helper{private $secret "*************"; /…

【Java架构-包管理工具】-Maven私服搭建-Nexus(三)

本文摘要 Maven作为Java后端使用频率非常高的一款依赖管理工具&#xff0c;在此咱们由浅入深&#xff0c;分三篇文章&#xff08;Maven基础、Maven进阶、私服搭建&#xff09;来深入学习Maven&#xff0c;此篇为开篇主要介绍Maven私服搭建-Nexus 文章目录 本文摘要1. Nexus安装…

本地搭建CFimagehost私人图床【公网远程访问】

文章目录 1.前言2. CFImagehost网站搭建2.1 CFImagehost下载和安装2.2 CFImagehost网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

Redis数据结构之Set

Set 类型是一个无序并唯一的键值集合&#xff0c;它的存储顺序不会按照插入的先后顺序进行存储。Redis 中集合是通过哈希表实现的&#xff0c;所以添加&#xff0c;删除&#xff0c;查找的复杂度都是 O(1)。相对于列表&#xff0c;集合也有两个特点&#xff1a;无序、不可重复 …

js reverse实现数据的倒序

2023.8.25今天我学习了如何在数组顺序进行倒序排列&#xff0c;如&#xff1a; 原数组为&#xff1a; 我们只需要对数组使用reverse()方法 let demo [{id: 1, name: 一号},{id: 2, name: 二号},{id: 3, name: 三号},]demo.reverse()console.log(demo) 扩展&#xff1a; 当我…