一: RabbitMQ 高级特性
前面主要讲解了 RabbitMQ 的概念和应用。RabbitMQ 实现了 AMQP 0-9-1 规范,并在此基础上进行了多项扩展。在 RabbitMQ 官方网站中详细介绍了其特性,我们将其中一些重要且常用的特性挑选出来进行讲解。
1.1 消息确认
生产者发送的消息在到达消费者端后,可能会出现以下几种情况:
情况 | 描述 |
---|---|
消息处理成功 | 消费端成功处理消息,完成相应的业务逻辑。 |
消息处理异常 | 消费端在处理消息时发生异常,可能导致消息未被正确消费。 |
RabbitMQ 在向消费者发送消息后,会立即将该消息从队列中删除,但如果消费者处理消息时出现异常,则可能导致消息丢失。为了解决这一问题,RabbitMQ 提供了消息确认机制,用于确保消息被消费者成功接收并正确处理。在消费者订阅队列时,可以通过设置 autoAck 参数来控制消息确认机制,根据该参数的不同,消息确认机制分为两种模式。
确认模式 | 描述 | 适用场景 |
---|---|---|
自动确认 | 当 autoAck 等于 true 时,RabbitMQ 会在消息发送后立即将其置为确认,并从内存或磁盘中删除,无论消费者是否真正消费成功。 | 适用于对消息可靠性要求不高的场景。 |
手动确认 | 当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式调用 Basic.Ack 命令,确认后才删除消息。 | 适用于对消息可靠性要求较高的场景,确保消息被正确处理后再删除。 |
// 创建一个 DefaultConsumer 对象,用于处理接收到的消息
DefaultConsumer consumer = new DefaultConsumer(channel) {// 当有消息送达时会触发该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 输出接收到的消息内容,将字节数组转换为字符串System.out.println("接收到消息: " + new String(body));}
};// 开始监听队列,指定队列名称、是否自动确认消息以及消费者对象
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, false, consumer);
当 autoAck 参数设置为 false 时,对于 RabbitMQ 服务端而言,队列中的消息会分为两部分:一部分是尚未投递给消费者的消息,另一部分是已投递但尚未收到消费者确认的消息。如果 RabbitMQ 长时间未收到消费者的确认信号,并且该消费者已断开连接,则 RabbitMQ 会将消息重新放回队列,等待重新投递给下一个消费者,或者可能再次投递给原来的消费者。
从 RabbitMQ 的 Web 管理平台上也可以看到当前队列中 Ready 状态和 Unacked 状态的消息数
状态 | 描述 |
---|---|
Ready | 等待投递给消费者的消息数。 |
Unacked | 已投递给消费者但尚未收到消费者确认信号的消息数。 |
1.1.1 自动确认
自动确认之前已经提到过,这里不再赘述。
1.1.2 手动确认
消费者在收到消息后,可以选择确认消息、拒绝消息或跳过消息。RabbitMQ 提供了多种确认应答方式,消费者可以通过调用其对应的 channel 方法进行操作,主要包括以下三种方式:
确认类型 | 描述 | 方法 |
---|---|---|
肯定确认 | RabbitMQ 知道消息已被成功处理,可以将其丢弃。 | Channel.basicAck(long deliveryTag, boolean multiple) |
否定确认 | 消费者调用方法通知 RabbitMQ 拒绝该消息,可选择是否重新入队。 | Channel.basicReject(long deliveryTag, boolean requeue) |
批量否定确认 | 如果需要批量拒绝消息,可以使用 Basic.Nack 命令,通知 RabbitMQ 拒绝多条消息,可选择是否重新入队。 | Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue) |
下面是参数解释:
参数 | 作用 |
---|---|
deliveryTag | 消息的唯一标识,用于标识 RabbitMQ 中的每条消息。它是一个由 RabbitMQ 生成的单调递增的 64 位长整型值,每个通道(Channel)独立维护,确保唯一性,消费者在确认、拒绝或重新入队消息时,必须通过对应的 deliveryTag 来标识消息。 |
multiple | 是否启用批量操作,用于减少网络流量。在消息确认或拒绝时,如果设置为 true,则会批量操作所有小于或等于指定 deliveryTag 的消息。如果设置为 false,则仅对当前指定的 deliveryTag 消息进行操作。 |
requeue | 表示拒绝这条消息后如何处理,控制被拒绝的消息是否重新入队,用于消息的再投递。true: 消息会重新进入队列,等待被其他消费者或同一消费者再次消费。false: 消息会直接从队列中移除,不再被重新投递。适用于无法处理的消息或需要丢弃的场景。 |
我们将基于 SpringBoot 演示消息的确认机制。与直接使用 RabbitMQ Java Client 库相比,Spring-AMQP 对消息确认机制的使用方式有所不同,但是也提供了三种策略来实现消息确认。
确认模式 | 描述 | 特点 |
---|---|---|
AcknowledgeMode.NONE | 消息一旦投递给消费者,不管是否成功处理,RabbitMQ 都会自动确认并移除消息。如果消费者处理失败,消息可能会丢失。 | 适用于对消息可靠性要求较低的场景。 |
AcknowledgeMode.AUTO | 默认模式。消费者在成功处理消息时会自动确认,但如果处理过程中抛出异常,则不会确认消息。 | 提供了一定的可靠性,但在异常情况下消息会回到队列。 |
AcknowledgeMode.MANUAL | 手动确认模式。消费者需要在成功处理消息后显式调用 basicAck 方法确认。如果消息未被确认,RabbitMQ 会重新投递消息。 | 提高消息处理的可靠性,确保消息不会因处理失败而丢失。适用于高可靠性场景。 |
如果需要配置的话在配置文件中进行配置即可,下面演示一种确认模式的使用:
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
- 生产者代码:
public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
@Configuration
public class RabbitMQConfig {// 声明交换机@Bean("ackExchange")public Exchange ackExchange() {return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}// 声明队列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}// 绑定队列和交换机@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
}
@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息到指定交换机和路由键@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");return "发送成功!";}
}
- 消费者代码
@Component
public class AckQueueListener {// 监听队列 Constant.ACK_QUEUE@RabbitListener(queues = Constant.ACK_QUEUE)public void listenerQueue(Message message, Channel channel) throws Exception {try {// 获取消息内容和 deliveryTagString messageBody = new String(message.getBody(), "UTF-8");long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息: %s, deliveryTag: %d%n", messageBody, deliveryTag);// 模拟处理失败int num = 3 / 0;System.out.println("处理完成");} catch (Exception e) {// 处理异常逻辑,日志记录或其他操作System.err.println("消息处理失败: " + e.getMessage());// 根据业务需求,这里可以选择是否重新入队或丢弃消息channel.basicNack(deliveryTag, false, true);}}
}
1.2 持久性
前面我们讨论了如何在消费端处理消息时确保消息不丢失,但如果 RabbitMQ 服务停止或崩溃后,如何确保生产者发送的消息不丢失呢?默认情况下,RabbitMQ 在退出或崩溃时会丢失队列和消息,除非明确配置其持久化机制。RabbitMQ 的持久化包括三个部分:交换机的持久化、队列的持久化和消息的持久化。
1.2.1 交换机持久化
交换器的持久化通过在声明交换机时将 durable 参数设置为 true 来实现,这会将交换机的属性保存到服务器中。当 RabbitMQ 服务器发生意外或关闭后,重启时交换机会自动恢复,无需重新创建,相当于一直存在。如果未设置持久化,则在 RabbitMQ 重启后,交换机的元数据会丢失。对于长期使用的交换机,建议将其设置为持久化,以确保其可靠性和持久性。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
1.2.2 队列的持久化
队列的持久化通过在声明队列时将 durable 参数设置为 true 来实现。如果队列未设置持久化,在 RabbitMQ 服务重启后,该队列会被删除,同时其中的消息也会丢失(队列消失,消息无处存储)。队列的持久化可以保证队列本身的元数据在异常情况下不会丢失,但不能保证队列中的消息不丢失。要确保消息的可靠性,还需将消息设置为持久化。我们之前创建的队列都是持久化队列。
QueueBuilder.durable(Constant.ACK_QUEUE).build(); // 持久化队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); // 非持久化队列
1.2.3 消息的持久化
要实现消息的持久化,需要 MessageProperties 中的 deliveryMode 设置为 2,即 MessageDeliveryMode.PERSISTENT。这样可以确保消息在服务器重启或出现异常时不会丢失。
public enum MessageDeliveryMode {NON_PERSISTENT, // 非持久化PERSISTENT // 持久化
}
设置队列和消息的持久化后,RabbitMQ 服务重启后消息依旧存在。如果仅设置队列持久化,重启后消息会丢失;如果仅设置消息持久化,重启后队列消失,消息也无法存储。因此,单独设置消息持久化而不设置队列持久化是没有意义的,二者需同时设置才能确保消息可靠性。
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", // 内容类型 (ContentType)null, // 内容编码 (ContentEncoding)null, // 头信息 (Headers)2, // deliveryMode: 持久化 (2 表示消息持久化)0, // 优先级 (Priority)null, // CorrelationIdnull, // ReplyTonull, // Expirationnull, // MessageIdnull, // Timestampnull, // Typenull, // UserIdnull, // AppIdnull // ClusterId
);
// 发送非持久化消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()
);// 发送持久化消息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化msg.getBytes()
);
RabbitMQ 默认会将消息视为持久化消息,除非队列被声明为非持久化或消息在发送时被标记为非持久化。但是需要注意,将所有消息都设置为持久化会显著影响 RabbitMQ 的性能,因为将消息写入磁盘的速度远远慢于写入内存的速度。对于可靠性要求不高的消息,可以选择不进行持久化处理,以提升系统的整体吞吐量。我们在选择是否持久化消息时,需要在消息的可靠性和系统吞吐量之间进行权衡。如果使用 RabbitTemplate 发送持久化消息,代码示例如下:
// 要发送的消息内容
String message = "This is a persistent message";// 创建一个 Message 对象,并设置为持久化
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message messageObject = new Message(message.getBytes(), messageProperties);// 使用 RabbitTemplate 发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject
);
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的。
问题 | 描述 | 解决方法 |
---|---|---|
消费者未处理完成时宕机 | 如果消费者在订阅队列时将 autoAck 参数设置为 true,那么消息在被消费者接收后,尚未处理完毕就发生宕机,消息将丢失。 | 将 autoAck 参数设置为 false,并采用手动确认方式,确保消息被正确处理后再确认。详细可参考消息确认章节。 |
消息未及时落盘时 RabbitMQ 宕机 | 持久化的消息在正确写入 RabbitMQ 后,还需要一段时间才能真正存入磁盘。在此过程中,RabbitMQ 并不会为每条消息都立即调用内核的 fsync 方法进行同步存盘,而是可能暂时存储在操作系统缓存中。如果此时 RabbitMQ 节点发生宕机或重启,未落盘的消息会丢失。 | 1.开启 RabbitMQ 的发布确认模式,确保消息被可靠写入磁盘后再确认发送成功。 2. 引入事务机制 3. 引入 RabbitMQ 的仲裁队列 (后⾯再讲), |
1.3 发送发确认
持久化的消息在正确写入 RabbitMQ 后,还需要一段时间才能真正存入磁盘。在此过程中,RabbitMQ 并不会为每条消息都立即调用内核的 fsync 方法进行同步存盘,而是可能暂时存储在操作系统缓存中。如果此时 RabbitMQ 节点发生宕机或重启,未落盘的消息会丢失,为了解决这一问题,RabbitMQ 提供了两种机制来确保消息投递的可靠性。
解决方案 | 描述 |
---|---|
事务机制 | 通过事务机制确保消息可靠投递,但性能消耗较大,实际工作中使用较少。 |
发送方确认(publisher confirm) | 通过 confirm 机制实现消息的可靠投递,是实际工作中常用的解决方案。 |
控制消息可靠性投递方式 | 描述 |
---|---|
confirm 确认模式 | 用于确认消息是否成功到达 RabbitMQ 服务器。 |
return 退回模式 | 用于在消息无法路由到指定队列时将消息退回给生产者,确保消息不会意外丢失。 |
1.3.1 confirm 确认模式
在 confirm 确认模式下,生产者在发送消息时可以设置一个 ConfirmCallback 监听器,无论消息是否成功到达交换机,该监听器都会被触发。如果消息成功到达交换机,则 ACK(确认标识)为 true;如果未到达,则 ACK 为 false。
- 配置 RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual # 消息接收确认publisher-confirm-type: correlated # 消息发送确认
- 设置确认回调逻辑并发送消息:无论消息确认成功还是失败,都会触发 ConfirmCallback 的 confirm 方法。如果消息成功发送到 Broker,则 ack 为 true;如果消息发送失败,则 ack 为 false,同时 cause 提供失败的原因。
@Configuration
public class RabbitTemplateConfig {// 配置带 ConfirmCallback 的 RabbitTemplate@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置 ConfirmCallback 回调函数,通过 Lambda 表达式实现 ConfirmCallback 接口的 confirm 方法。rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.printf("消息接收成功, id: %s%n", correlationData != null ? correlationData.getId() : "null");} else {System.out.printf("消息接收失败, id: %s, cause: %s%n",correlationData != null ? correlationData.getId() : "null", cause);}});return rabbitTemplate;}
}
@RestController
@RequestMapping("/producer")
public class ConfirmController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;// 发送消息并带有确认机制@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");// 发送消息confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData);return "消息发送确认完成";}
}
区别 | ConfirmListener | ConfirmCallback |
---|---|---|
所属库 | RabbitMQ Java Client 库 | Spring AMQP 框架 |
使用场景 | 用于直接与 RabbitMQ 服务器交互 | 专为 Spring 环境设计,简化与 RabbitMQ 的交互过程 |
方法 | 提供两个方法:handleAck 和 handleNack | 提供一个方法:confirm |
功能 | 处理消息的确认和否定确认事件 | 处理消息确认的回调 |
集成性 | 需要手动与 RabbitMQ 的 Channel 进行交互 | 与 Spring 框架无缝集成,支持依赖注入和配置管理 |
使用场景推荐 | 非 Spring 项目中直接使用 RabbitMQ Java Client 库 | Spring Boot 或 Spring 应用中推荐使用 ConfirmCallback |
1.3.2 return 退回模式
在 RabbitMQ 的 return 退回模式中,消息到达 Exchange 后会根据路由规则匹配将消息投递到队列。如果消息无法被任何队列消费,可以选择将消息退回给发送者。此时,可以通过设置一个返回回调方法,对被退回的消息进行处理。
- 配置 RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual # 消息接收确认publisher-confirm-type: correlated # 消息发送确认
- 设置返回回调逻辑并发送消息,当消息无法被路由到任何队列时会被退回给发送者,此时触发通过 setReturnCallback 设置的回调方法进行处理。
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置强制退回机制rabbitTemplate.setMandatory(true);// 设置 ReturnsCallback 回调方法rabbitTemplate.setReturnsCallback(returned -> System.out.printf("消息被退回: %s%n", returned));return rabbitTemplate;
}
@RestController
@RequestMapping("/producer")
public class MessageController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;// 测试消息退回@RequestMapping("/msgReturn")public String msgReturn() {// 创建 CorrelationData 对象CorrelationData correlationData = new CorrelationData("2");// 发送消息confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);return "消息发送成功";}
}
1.4 总结一下 RabbitMQ 如何保证消息的可靠传输
先展示一张 RabbitMQ 消息传递的示意图:
场景 | 可能原因 | 解决办法 |
---|---|---|
生产者将消息发送到 RabbitMQ 失败 | 网络问题等 | 参考本章节发送方确认-confirm确认模式 |
消息在交换机中无法路由到指定队列 | 代码或配置层面错误,导致消息路由失败 | 参考本章节发送方确认-return模式 |
消息队列自身数据丢失 | 消息到达 RabbitMQ 后,RabbitMQ 服务器宕机导致消息丢失 | 开启 RabbitMQ 持久化机制,消息会写入磁盘;在服务器恢复后,RabbitMQ 会自动读取之前存储的数据。 在极端情况下(消息未持久化时服务器宕机),可能会导致少量数据丢失,可以通过集群方式提升可靠性。 |
消费者异常导致消息丢失 | 消息到达消费者后未及时消费,消费者宕机或消费者逻辑问题 | 参考本章节消息确认。启用消费者手动确认机制,当消费者确认消息成功后才会删除消息,从而避免消息丢失。 除此之外可配置重试机制,确保消息消费的可靠性。 |
1.5 重试机制
在消息传递过程中,可能会因网络故障、服务不可用或资源不足等问题导致消息处理失败。为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送,如果对异常进⾏捕获那么就不会进⾏重试。然而,如果失败是由程序逻辑错误引起的,多次重试也无济于事,此时可以通过设置重试次数来避免无限重试的情况。
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto # 消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000 # 初始失败等待时长为5秒max-attempts: 5 # 最大重试次数(包括首次消费)
在手动确认模式下,重试次数的限制不会像自动确认模式那样直接生效,因为是否重试以及何时重试更多取决于应用程序的逻辑和消费者的实现。在自动确认模式下,RabbitMQ 会在消息投递给消费者后自动确认消息。如果消费者在处理消息时抛出异常,RabbitMQ 会根据配置的重试参数自动将消息重新入队,从而实现重试。重试次数和重试间隔等参数可以直接在 RabbitMQ 的配置中设定,RabbitMQ 会负责执行这些重试策略。
而在手动确认模式下,消费者需要显式地对消息进行确认。如果消费者在处理消息时遇到异常,可以选择不确认消息,让消息重新入队。此时,重试的控制权完全由应用程序掌握,而不是依赖 RabbitMQ 的内部机制。应用程序可以通过自身逻辑和结合 RabbitMQ 的高级特性,灵活实现自定义的重试策略。
1.6 TTL
TTL(Time to Live,过期时间)是 RabbitMQ 提供的一种机制,可以为消息或队列设置存活时间。当消息超过指定的存活时间且未被消费时,会被自动清除。这一机制常用于一些特定场景,例如在电商平台中,下单后超过24小时未付款,订单会自动取消;或者申请退款后超过7天未处理,则系统会自动退款。
目前有两种方式可以设置消息的 TTL:一种是为队列设置 TTL,此时队列中的所有消息共享相同的过期时间;另一种是为每条消息单独设置 TTL,使每条消息的过期时间可以不同。如果同时使用这两种方式,消息的实际 TTL 以两者中较小的数值为准。
1.6.1 设置消息的TTL
@RestController
@RequestMapping("/producer")
public class TtlController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息并设置 TTL@RequestMapping("/ttl")public String sendTtlMessage() {String ttlTime = "10000"; // 10秒// 发送消息并设置过期时间rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {// messagePostProcessor 是一个消息后处理器,MessageProperties 包含了消息的元数据信息,例如内容类型、优先级、消息ID以及过期时间等。setExpiration(ttlTime) 是 MessageProperties 类中的一个方法,用于设置消息的过期时间。messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return "发送成功!";}
}
1.6.2设置队列的TTL:
设置队列 TTL 可以在创建队列时通过添加参数 x-message-ttl 实现,单位为毫秒。
@Configuration
public class TtlQueueConfig {// 声明队列 ttlQueue2,设置过期时间为 20 秒@Bean("ttlQueue2")public Queue ttlQueue2() {return QueueBuilder.durable(Constant.TTL_QUEUE2) // 设置队列持久化.ttl(20 * 1000) // 设置 TTL 为 20 秒.build();}// 或者通过参数设置过期时间为 20 秒@Bean("ttlQueue2Alt")public Queue ttlQueue2Alt() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 20000); // 20 秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(arguments).build();}// 队列和交换机绑定@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange,@Qualifier("ttlQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
}
运行后会发现新增了一个队列,该队列的 Features 中显示有一个 TTL 标识,表示已成功设置消息的过期时间。
1.6.3 两种方式的区别
设置方式 | 描述 | 原因 |
---|---|---|
设置队列 TTL 属性 | 消息过期后会立即从队列中删除。 | 队列中的过期消息一定位于队列头部,RabbitMQ 只需定期从队列头开始扫描并删除过期消息,效率较高。 |
设置消息 TTL 属性 | 消息过期后不会立即从队列中删除,而是在消息投递给消费者之前判定是否过期,若过期则删除。 | 每条消息的过期时间不同,若需要删除所有过期消息需要扫描整个队列,成本较高,因此延迟到消费时再判定是否过期。 |
1.7 死信队列
死信是指因各种原因无法被正常消费的消息。当消息在一个队列中变成死信后,会被重新发送到一个专门的交换机,称为死信交换机(Dead Letter Exchange,DLX)。绑定到死信交换机的队列被称为死信队列(Dead Letter Queue,DLQ)。消息通常因以下原因变成死信:
原因 | 描述 |
---|---|
消息被拒绝 | 消费者使用 Basic.Reject 或 Basic.Nack 拒绝消息,并且 requeue 参数设置为 false。 |
消息过期 | 消息的 TTL(存活时间)到期后未被消费。 |
队列达到最大长度 | 队列中的消息数量达到最大限制,新消息无法写入,导致消息变成死信。 |
死信交换机和死信队列本质上与普通的交换机和队列没有区别,它们只是被专门用来处理无法被正常消费的消息。
//死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
@Configuration
public class DLXConfig {// 声明死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true) // 设置为持久化.build();}// 声明死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DLX_QUEUE) // 设置为持久化.build();}// 绑定死信队列和死信交换机@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,@Qualifier("dlxQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dlx") // 设置路由键.noargs();}
}
上述代码只是声明了一个队列和交换机,此时他们还不是死信的,我们需要通过下面的代码指定死信队列和死信交换机:
@Bean("normalQueue")
public Queue normalQueue() {// 设置队列参数Map<String, Object> arguments = new HashMap<>();// 这两行代码指定哪些队列和交换机是死信的arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 绑定死信交换机arguments.put("x-dead-letter-routing-key", "dlx"); // 设置死信队列的路由键// 创建普通队列并设置参数return QueueBuilder.durable(Constant.NORMAL_QUEUE) // 设置队列持久化.withArguments(arguments) // 添加参数.build();
}
死信队列和死信交换机的指定可以简写为:
return QueueBuilder.durable(Constant.NORMAL_QUEUE) // 声明普通队列并设置持久化.deadLetterExchange(Constant.DLX_EXCHANGE_NAME) // 设置死信交换机.deadLetterRoutingKey("dlx") // 设置死信队列的路由键.build(); // 构建队列
队列 Features 说明:
特性 | 描述 |
---|---|
D | durable 的缩写,表示队列设置为持久化。 |
TTL | Time to Live,表示队列设置了 TTL(过期时间)。 |
Lim | 表示队列设置了长度限制(x-max-length)。 |
DLX | 表示队列设置了死信交换机(x-dead-letter-exchange)。 |
DLK | 表示队列设置了死信路由键(x-dead-letter-routing-key)。 |
对于 RabbitMQ 来说,死信队列是一个非常有用的特性。它可以在消息无法被消费者正常消费时,将这些消息放入死信队列中进行处理。应用程序可以通过消费死信队列中的消息,分析异常原因,从而改善和优化系统的稳定性和可靠性。
在支付场景中,当用户完成支付后,支付系统会将支付结果发送到订单系统。为了避免在消息传递或处理过程中因异常导致支付信息丢失,可以使用死信队列机制。当消息处理失败时,异常消息会被投递到死信队列。订单系统的其他消费者可以监听死信队列,对这些消息进行进一步处理,例如生成工单进行人工核实或执行补偿操作,确保支付信息的完整性和系统的可靠性,死信队列还有别的应用场景:
应用场景 | 描述 |
---|---|
消息重试 | 将死信消息重新发送到原队列或其他队列进行重试处理,尝试再次消费消息。 |
消息丢弃 | 直接丢弃无法处理的消息,避免这些消息占用系统资源,影响正常消息的处理。 |
日志收集 | 将死信消息作为日志收集起来,用于后续的分析、问题定位和系统优化。 |
1.8 延迟队列
延迟队列(Delayed Queue)是一种消息队列机制,消息在发送后不会立即被消费者获取,而是经过指定的延迟时间后才可供消费者消费,延迟队列有广泛的应用场景,例如:
应用场景 | 描述 |
---|---|
智能家居 | 用户希望通过手机远程控制智能设备,在指定时间执行操作。通过延迟队列,指令会在设定时间到达后推送到设备。 |
日常管理 | 预定会议后,可在会议开始前 15 分钟通过延迟队列发送提醒通知给参会人。 |
提高用户活跃度 | 用户注册成功后,延迟 7 天发送短信提醒,提升用户活跃度或回访率。 |
其他场景 | 根据实际需求,可将延迟队列用于需要定时触发的各类业务逻辑中。 |
RabbitMQ 本身不直接支持延迟队列功能,通常可以通过以下两种方式实现:
方法 | 优点 | 缺点 |
---|---|---|
基于死信实现的延迟队列 | 灵活,不需要额外插件支持 | 存在消息顺序问题 需要额外逻辑处理死信队列的消息,增加系统复杂性 |
基于插件实现的延迟队列 | 通过插件直接创建延迟队列,简化延迟消息的实现 避免了死信队列的时序问题 | 需要依赖特定插件,增加运维工作 插件适用范围有限,仅支持特定版本的 RabbitMQ |
通过结合 TTL 和死信队列的方式可以实现模拟延迟队列的功能。例如在一个应用中,需要让每条消息延迟 10 秒后被消费。生产者将消息通过 normal_exchange 发送到 normal_queue 队列,并为该队列设置消息的 TTL 为 10 秒。当消息在 normal_queue 中过期后,会被转移到绑定了死信交换机的 dlx_queue 队列。消费者订阅 dlx_queue 队列,从而在延迟 10 秒后消费到这条消息。
注意事项:当发送 20 秒过期的消息后,再发送 10 秒过期的消息,会发现 10 秒过期的消息也是在 20 秒后才进入死信队列。这是因为 RabbitMQ 只会检查队首消息是否过期,如果队首消息未过期,即使后续消息已达到过期时间,也不会被立即丢弃并转移到死信队列。这会导致第一个消息的延时时间较长时,第二个消息的延时时间即使较短,也无法优先执行。
因此,在使用 TTL + 死信队列实现延迟任务队列时,需要确保业务中每个任务的延迟时间是一致的。如果业务需要为不同的任务类型设置不同的延迟时间,则需要为每种延迟时间分别创建单独的消息队列,以避免消息处理顺序受到影响。
1.8.1 延迟队列插件
RabbitMQ 官方提供了一个延迟插件来实现延迟功能,详情可参考 RabbitMQ 官方文档。安装插件后,可以通过 RabbitMQ 管理平台验证是否成功:在新建交换机时,查看是否出现延迟消息选项(Delayed Message)。如果该选项可用,说明延迟消息插件已成功运行。
1.8.2 基于插件延迟队列实现延迟队列
- 声明交换机、队列和绑定关系
@Configuration
public class DelayedConfig {// 声明延迟交换机@Bean("delayedExchange")public Exchange delayedExchange() {return ExchangeBuilder.directExchange(Constant.DELAYED_EXCHANGE_NAME).durable(true) // 设置为持久化.delayed() // 开启延迟功能.build();}// 声明队列@Bean("delayedQueue")public Queue delayedQueue() {return QueueBuilder.durable(Constant.DELAYED_QUEUE) // 设置为持久化.build();}// 绑定队列和延迟交换机@Bean("delayedBinding")public Binding delayedBinding(@Qualifier("delayedExchange") Exchange exchange,@Qualifier("delayedQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delayed") // 设置路由键.noargs();}
}
- 编写生产者代码
@RestController
@RequestMapping("/producer")
public class DelayedMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送带延迟的消息@RequestMapping("/delay2")public String sendDelayedMessages() {// 发送 20 秒延迟的消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME,"delayed","delayed test 20s... " + new Date(),messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(20000); // 设置 20 秒延迟return messagePostProcessor;});// 发送 10 秒延迟的消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME,"delayed","delayed test 10s... " + new Date(),messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(10000); // 设置 10 秒延迟return messagePostProcessor;});return "发送成功!";}
}
- 编写消费者代码
@Component
public class DelayedQueueListener {// 监听延迟队列的消息@RabbitListener(queues = Constant.DELAYED_QUEUE)public void listenerDLXQueue(Message message, Channel channel) throws Exception {// 打印接收到的消息和当前时间System.out.printf("%tc 死信队列接收到消息: %s%n", new Date(), new String(message.getBody(), "UTF-8"));}
}
1.9 事务
RabbitMQ 基于 AMQP 协议实现,该协议支持事务机制,因此 RabbitMQ 也具备事务功能。同时,Spring AMQP 提供了对事务操作的支持。通过 RabbitMQ 的事务机制,开发者可以确保消息的发送和接收具有原子性,即要么全部成功,要么全部失败。
- 配置事务管理器
@Configuration
public class TransactionConfig {// 配置 RabbitMQ 事务管理器@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}// 配置支持事务的 RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 启用事务支持return rabbitTemplate;}
}
- 声明队列
@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable("trans_queue") // 声明持久化队列.build();
}
- 编写生产者代码
@RestController
@RequestMapping("/trans")
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送事务消息@Transactional@RequestMapping("/send")public String send() {// 发送第一条消息rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");// 模拟异常int a = 5 / 0;// 发送第二条消息rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");return "发送成功";}
}
如果不添加 @Transactional,即使代码中发生异常,消息1仍然会成功发送,当添加了 @Transactional 后,发生异常时,事务将回滚,导致消息1和消息2都不会发送成功。
1.10 消息分发
当 RabbitMQ 队列有多个消费者时,消息会被分发给不同的消费者,每条消息只会发送给一个订阅了该队列的消费者。这种方式非常适合扩展处理能力,当负载加重时,可以通过增加更多的消费者来分担消息处理任务。然而,默认情况下,RabbitMQ 以轮询方式分发消息,而不考虑消费者是否已处理并确认消息。这可能会导致部分消费者处理速度慢、消息堆积,而其他消费者处理速度快却处于空闲状态,从而降低系统整体的吞吐量。
为了解决这一问题,可以使用 channel.basicQos(int prefetchCount) 方法来限制每个消费者未确认消息的最大数量。例如,当消费者调用了 channel.basicQos(5) 后,RabbitMQ 会为该消费者计数,每发送一条消息计数加1,每确认一条消息计数减1。当未确认消息数量达到5时,RabbitMQ 将停止向该消费者发送新消息,直到其确认了部分消息。这种机制类似于 TCP/IP 中的滑动窗口,能够有效均衡消费者的负载,提高系统效率,消息分发的常见应用场景如下:
应用场景 | 描述 |
---|---|
限流 | 通过 channel.basicQos 方法设置每个消费者未确认消息的最大数量,防止某些消费者处理过慢导致消息堆积。 |
非公平分发 | 默认情况下,RabbitMQ 以轮询方式分发消息,而不考虑消费者的处理能力,这种方式称为非公平分发也叫负载均衡。 |
1.10.1 限流
在订单系统中,订单系统每秒最多可处理 5000 个请求,正常情况下能够满足需求,但在秒杀高峰期,请求量瞬间激增到每秒 1 万个,如果这些请求全部通过 MQ 发送到订单系统,可能会导致系统崩溃。为了解决这一问题,RabbitMQ 提供了限流机制,通过设置 prefetchCount 参数,控制消费者每次从队列中预取的消息数量,从而实现流量控制和负载均衡。同时,限流机制要求将消息应答方式设置为手动应答,以确保消费者处理完消息后再拉取新的消息。
- 配置 prefetch 参数, 设置应答方式为⼿动应答:
# ack 确认方式:开启手动 ack
listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5 # 每次预取 5 条消息
- 配置交换机和队列
@Configuration
public class QosConfig {// 声明限流交换机@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME) // 直连交换机.durable(true) // 持久化.build();}// 声明队列@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constant.QOS_QUEUE) // 持久化队列.build();}// 绑定队列和交换机@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange,@Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos") // 路由键.noargs();}
}
- 发送消息, ⼀次发送 20 条消息
@RestController
@RequestMapping("/qos")
public class QosController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息@RequestMapping("/send")public String sendQosMessages() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME, "qos", "qos test..." + i);}return "发送成功!";}
}
- 编写消费者代码
@Component
public class QosQueueListener {// 监听指定队列的消息@RabbitListener(queues = Constant.QOS_QUEUE)public void listenerQueue(Message message, Channel channel) throws Exception {// 获取消息的 deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();// 打印接收到的消息和 deliveryTagSystem.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);// 手动签收(可启用)channel.basicAck(deliveryTag, true);}
}
1.10.2 负载均衡
我们可以通过设置 prefetch=1 的方式,实现消息的负载均衡。比如,在有两个消费者的情况下,如果一个消费者处理任务非常快,而另一个处理较慢,默认情况下,RabbitMQ 只在消息进入队列时分发消息,不考虑消费者未确认消息的数量,可能会导致一个消费者一直繁忙,而另一个消费者空闲。通过设置 prefetch=1,可以让 RabbitMQ 一次只分配一条消息给消费者,直到该消费者处理并确认当前消息后,才会发送下一条消息。如果某个消费者繁忙,RabbitMQ 会将消息分派给下一个空闲的消费者,从而更好地实现负载均衡。
- 配置 prefetch 参数, 设置应答方式为⼿动应答
# 配置 ack 确认方式:开启手动 ack 和预取限制
listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 每次预取 1 条消息
@Component
public class QosQueueListener {// 指定监听队列的名称,消费者1@RabbitListener(queues = Constant.QOS_QUEUE)public void listenerQosQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();// 打印接收到的消息System.out.printf("接收到消息: %s, deliveryTag: %d%n",new String(message.getBody(), "UTF-8"), deliveryTag);// 手动签收channel.basicAck(deliveryTag, true);}// 指定监听队列的名称,消费者2@RabbitListener(queues = Constant.QOS_QUEUE)public void listenerQueue2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();// 打印接收到的消息System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n",new String(message.getBody(), "UTF-8"), deliveryTag);// 模拟处理流程慢Thread.sleep(100);// 手动签收channel.basicAck(deliveryTag, true);}
}
接收到消息: qos test...1, deliveryTag: 1
消费者2接收到消息: qos test...0, deliveryTag: 1
接收到消息: qos test...2, deliveryTag: 2
接收到消息: qos test...3, deliveryTag: 3
接收到消息: qos test...4, deliveryTag: 4
接收到消息: qos test...5, deliveryTag: 5
消费者2接收到消息: qos test...6, deliveryTag: 2
接收到消息: qos test...7, deliveryTag: 6
接收到消息: qos test...8, deliveryTag: 7
接收到消息: qos test...9, deliveryTag: 8
接收到消息: qos test...10, deliveryTag: 9
消费者2接收到消息: qos test...11, deliveryTag: 3
接收到消息: qos test...12, deliveryTag: 10
接收到消息: qos test...13, deliveryTag: 11
接收到消息: qos test...14, deliveryTag: 12
接收到消息: qos test...15, deliveryTag: 13
消费者2接收到消息: qos test...16, deliveryTag: 4
接收到消息: qos test...17, deliveryTag: 14
接收到消息: qos test...18, deliveryTag: 15
接收到消息: qos test...19, deliveryTag: 16
deliveryTag 出现重复是因为两个消费者使用的是不同的 Channel,每个 Channel 上的 deliveryTag 都是独立计数的,因此在不同的 Channel 中可能会出现相同的 deliveryTag 值。