RabbitMQ死信队列
1、过期时间TTL
过期时间TTL
表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被
删除。RabbitMQ
可以对消息和队列设置TTL
,目前有两种方法可以设置:
-
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
-
第二种方法是对消息进行单独设置,每条消息
TTL
可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者TTL
较小的那个数值为准。消息在队列的生存时间一旦超
过设置的TTL
值,就称为dead message
被投递到死信队列,消费者将无法再收到该消息。
1.1 设置队列TTL
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version><relativePath/></parent><groupId>com.example</groupId><artifactId>spring-boot-rabbitmq-ttl</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq-ttl</name><description>spring-boot-rabbitmq-ttl</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
配置类
package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TTLRabbitMQConfiguration {// 1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange() {return new DirectExchange("ttl_direct_exchange", true, false);}// 2.队列的过期时间@Beanpublic Queue directttlQueue() {//设置过期时间Map<String, Object> args = new HashMap<>();//这里一定是int类型args.put("x-message-ttl", 5000);return new Queue("ttl.direct.queue", true, false, false, args);}@Beanpublic Binding ttlBingding() {return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");}
}
Service
package com.example.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder() {//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生产成功:" + orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_direct_exchange";String routingKey = "ttl";// 队列中会产生一条消息并且5秒钟后会消失rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);}
}
启动类
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitmqDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqDemoApplication.class, args);}}
配置文件
# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030
测试
package com.example;import com.example.service.OrderService;
import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {@Autowiredprivate OrderService orderService;@Testpublic void test() {orderService.makeOrder();}}
订单生产成功:8a965457-330b-4e2b-9087-a40cbfdee033
1.2 设置消息TTL
配置类
package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TTLRabbitMQConfiguration1 {//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttlMessageDirectExchange() {return new DirectExchange("ttl_message_direct_exchange", true, false);}@Beanpublic Queue directttlMessageQueue() {return new Queue("ttl.message.direct.queue", true, false, false);}@Beanpublic Binding ttlMessageBingding() {return BindingBuilder.bind(directttlMessageQueue()).to(ttlMessageDirectExchange()).with("ttlmessage");}
}
Service
package com.example.service;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService1 {@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder() {//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生产成功:" + orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_message_direct_exchange";String routingKey = "ttlmessage";//给消息设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {public Message postProcessMessage(Message message) {//这里就是字符串message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}};rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor);}
}
测试类
package com.example;import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest1 {@Autowiredprivate OrderService1 orderService1;@Testpublic void test1() {orderService1.makeOrder();}}
订单生产成功:9eb9e120-1379-4e54-bc43-1944b3c22713
2、死信队列
DLX
,全称 Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变
成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX
,绑定DLX
的队列就称之为死信队列。消
息变成死信,可能是由于以下原因:
-
消息被拒绝
-
消息过期
-
队列达到最大长度
DLX
也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队
列的属性,当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由
到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可。
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version><relativePath/></parent><groupId>com.example</groupId><artifactId>spring-boot-rabbitmq-dlx</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq-dlx</name><description>spring-boot-rabbitmq-dlx</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
配置类
package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadRabbitMqConfiguration {//1.声明注册direct模式的交换机@Beanpublic DirectExchange deadDirect() {return new DirectExchange("dead_direct_exchange", true, false);}//2.队列的过期时间@Beanpublic Queue deadQueue() {return new Queue("dead.direct.queue", true);}@Beanpublic Binding deadbinds() {return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");}
}
package com.example.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TTLRabbitMQConfiguration {//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange() {return new DirectExchange("ttl_direct_exchange", true, false);}//2.队列的过期时间@Beanpublic Queue directttlQueue() {//设置过期时间Map<String, Object> args = new HashMap<>();// ttl队列最大可以接受5条消息,超过的条数也是会被移入死信队列,过期之后依然会被移入死信队列// args.put("x-max-length",5);// 这里一定是int类型args.put("x-message-ttl", 5000);// 死信队列的交换机args.put("x-dead-letter-exchange", "dead_direct_exchange");// fanout不需要配置// 路由args.put("x-dead-letter-routing-key", "dead");return new Queue("ttl.direct.queue", true, false, false, args);}@Beanpublic Binding ttlBingding() {return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");}}
Service
package com.example.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder() {//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生产成功:" + orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_direct_exchange";String routingKey = "ttl";// 队列中会产生一条消息并且5秒钟后会消失rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);}
}
启动类
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitmqDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqDemoApplication.class, args);}}
配置文件
# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030
测试类
package com.example;import com.example.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {@Autowiredprivate OrderService orderService;@Testpublic void test() {orderService.makeOrder();}}
订单生产成功:74adb096-734c-4484-8947-032ef1ee7c5d