消息的TTL(Time To Live)就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置存活时间,就是队列没有消费者连着的保留时间。
对每一个单独的消息单独的设置存活时间。超过了这个时间,我们认为这个消息就死了,称之为死信。
如果队列设置了TTL,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者xmessage-ttl属性来设置时间,两者是一样的效果。
死信路由
一个消息在成为死信后,会进死信路由。
记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
普通消息成为死信的条件:死信将被路由到死信交换机、再路由到普通队列,普通队列发送消息给消费者。
被拒收的消息:一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
TTL到期的消息:
消息的TTL到了,消息过期了。被队列丢弃的消息:
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
死信交换机
死信交换机Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
延时队列
先设置消息在TTL后变成死信,再设置队列里死信统一被路由到一个指定的交换机,那个指定的交换机专门处理死信,达成延时队列的效果。
手动ack&异常消息统一放在一个队列处理建议的两种方式
catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
/*** 创建队列,交换机,延迟队列,绑定关系 的configuration* 不会重复创建覆盖* 1、第一次使用队列【监听】的时候才会创建* 2、Broker没有队列、交换机才会创建*/
@Configuration
public class MyRabbitMQConfig {// @RabbitHandler
// public void listen(Message message){
// System.out.println("收到消息:------>"+message);
// }/*** 延时队列* @return*/@Beanpublic Queue orderDelayQueue(){HashMap<String, Object> arguments = new HashMap<>();/*** x-dead-letter-exchange :order-event-exchange 设置死信路由* x-dead-letter-routing-key : order.release.order 设置死信路由键* x-message-ttl :60000*/arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",30000);Queue queue = new Queue("order.delay.queue", true, false, false,arguments);return queue;}/*** 死信队列* @return*/@Beanpublic Queue orderReleaseQueue(){Queue queue = new Queue("order.release.order.queue",true,false,false);return queue;}/*** 死信路由[普通路由]* @return*/@Beanpublic Exchange orderEventExchange(){/** String name,* boolean durable,* boolean autoDelete,* Map<String, Object> arguments* */TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);return topicExchange;}/*** 交换机与延时队列的绑定* @return*/@Beanpublic Binding orderCreateOrderBinding(){/** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */Binding binding = new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);return binding;}/*** 死信路由与普通死信队列的绑定* @return*/@Beanpublic Binding orderReleaseOrderBinding(){Binding binding = new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);return binding;}
}