文章目录
- 消息ttl过期成为死信
- 队列达到最大长度成为死信
- MyOrder.java
- RabbitMQDirectConfig.java
- OrderProducer.java
- PayConsumer.java
- DeadOrderConsumer.java
- application.yaml
死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的原因:
消息 TTL (Time To Live ) : x-message-ttl
队列达到最大长度(队列满了无法再添加数据到 mq 中) : x-max-length
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
消息ttl过期成为死信
map.put("x-message-ttl",2000); // 消息存活时间1s
rabbitmq中,设置了死信队列。a消息设置了ttl,a消息已经被消费,但消费者未给通知,a消息ttl过期后不会被送到死信队列
在RabbitMQ中
死信队列的概念:
RabbitMQ的死信队列是一种用于处理失败或无法路由的消息的机制。当消息处理失败、过期、被拒绝或无法路由时,这些消息可以被发送到死信队列。
消息的TTL(Time To Live):
TTL表示消息的过期时间。在RabbitMQ中,可以对消息设置TTL,意味着消息在一定时间内如果没有被消费,则会被认为是死信。但这里的关键是,TTL的判断通常发生在消息即将被投递给消费者之前。
消息被消费的情况:
如果a消息已经被消费,但消费者未给出确认通知(即未发送ack确认),那么这条消息在RabbitMQ内部的状态仍然是未确认的。然而,这并不影响消息的TTL判断。一旦消息被成功地从队列中取出并传递给消费者,TTL机制就不再对其起作用,因为此时消息已经处于消费者的控制之下。
死信队列与TTL的关联:
当消息的TTL过期时,如果这条消息还在队列中等待消费,那么它会被标记为死信并发送到死信队列(如果配置了死信队列的话)。但是,如果消息已经被消费,即使消费者没有发送确认通知,它也不会因为TTL过期而被送到死信队列。
综上所述:
a消息设置了TTL,并且已经被消费,但消费者未给出确认通知。
在这种情况下,即使a消息的TTL过期,它不会被送到死信队列。
原因是消息已经被消费,RabbitMQ认为该消息已经不在其控制之下,因此TTL机制不再适用。
a消息在已经被消费的情况下,不会因为TTL过期而被送到死信队列。
队列达到最大长度成为死信
MyOrder.java
package com.example.direct;import java.io.Serializable;public class MyOrder implements Serializable {private String orderId;private String orderNumber;private String customerName;private Integer productId;private String productName;private Float productPrice;private Integer productCount;private Float orderPrice;public MyOrder(){}public MyOrder(String orderId, String orderNumber, String customerName, Integer productId, String productName, Float productPrice, Integer productCount, Float orderPrice) {this.orderId = orderId;this.orderNumber = orderNumber;this.customerName = customerName;this.productId = productId;this.productName = productName;this.productPrice = productPrice;this.productCount = productCount;this.orderPrice = orderPrice;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getOrderNumber() {return orderNumber;}public Integer getProductId() {return productId;}public void setProductId(Integer productId) {this.productId = productId;}public void setOrderNumber(String orderNumber) {this.orderNumber = orderNumber;}public String getCustomerName() {return customerName;}public void setCustomerName(String customerName) {this.customerName = customerName;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public Float getProductPrice() {return productPrice;}public void setProductPrice(Float productPrice) {this.productPrice = productPrice;}public Integer getProductCount() {return productCount;}public void setProductCount(Integer productCount) {this.productCount = productCount;}public Float getOrderPrice() {return orderPrice;}public void setOrderPrice(Float orderPrice) {this.orderPrice = orderPrice;}@Overridepublic String toString() {return "MyOrder{" +"orderId=" + orderId +", orderNumber='" + orderNumber + '\'' +", customerName='" + customerName + '\'' +", productName='" + productName + '\'' +", productPrice=" + productPrice +", productCount=" + productCount +", orderPrice=" + orderPrice +'}';}
}
RabbitMQDirectConfig.java
package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQDirectConfig {// 1. 创建交换机
// @Bean
// public DirectExchange newDirectExchange(){
// return new DirectExchange("myDirectExchangeAAA",true,false);
// }//2. 创建队列
// @Bean
// public Queue newQueueA(){
// return new Queue("queueAAA",true);
// }//3. 绑定队列到交换机中
// @Bean
// public Binding bindingA(){
// return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
// }//==================死信//1. 创建交换机@Beanpublic DirectExchange newExchange(){return new DirectExchange("normalExchange",true,false);}//2. 创建队列@Beanpublic Queue newQueue(){Map<String ,Object> map = new HashMap<>();//map.put("x-message-ttl",2000); // 消息存活时间1smap.put("x-max-length",6); // 队列达到最大长度 为6map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字return new Queue("normalQueueA",true,false,false,map);}//3. 绑定@Beanpublic Binding binding(){return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");}//4. 创建死信交换机@Beanpublic DirectExchange newDeadExchange(){return new DirectExchange("deadExchange",true,false);}//5. 创建死信队列@Beanpublic Queue newDeadQueue(){return new Queue("deadQueueA",true,false,false);}//6. 绑定@Beanpublic Binding bindingDead(){return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");}}
OrderProducer.java
package com.example.direct;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;@RestController
@RequestMapping("a")
public class OrderProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/submitOrder")public String submitOrder(){Map<String,Object> map = new HashMap<>();map.put("orderNumber","2222");//Stringmap.put("productId",1111);//Integerfor(int i=0;i<=130;i++){String orderId = UUID.randomUUID().toString().replace("-","");map.put("orderId",orderId);rabbitTemplate.convertAndSend("normalExchange", "key1", map);}return "生产者下单成功";}}
PayConsumer.java
package com.example.direct;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
public class PayConsumer {@RabbitHandler@RabbitListener(queues = "normalQueueA")public void process(Map map, Channel channel, Message message) throws IOException {try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("支付服务接收到的消息:" + map);String orderId = (String)map.get("orderId");//StringInteger productId = (Integer)map.get("productId");//IntegerString orderNum = (String)map.get("orderNumber");//StringSystem.out.println("支付服务接收到的orderId:" + orderId);System.out.println("支付服务接收到的productId:" + productId);System.out.println("支付服务接收到的orderNum:" + orderNum);//告诉broker,消息已经被确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
DeadOrderConsumer.java
package com.example.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class DeadOrderConsumer {// 获得死信队列中的消息@RabbitHandler@RabbitListener(queues = "deadQueueA")public void process(Map map){System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);String orderId = (String)map.get("orderId");//StringInteger productId = (Integer)map.get("productId");//IntegerString orderNum = (String)map.get("orderNumber");//StringSystem.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);System.out.println("取消支付后,从死信队列中接收到的productId:" + productId);System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);}
}
application.yaml
server:servlet:context-path: /app
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated # 确认交换机已经接收到生产者的消息了publisher-returns: true # 消息已经到了队列(交换机与队列绑定成功的)listener:simple:acknowledge-mode: manual # 手动消息确认concurrency: 1 #消费者数量max-concurrency: 1 #消费者最大数量prefetch: 1 #消费者每次从队列中取几个消息