消息应答
概览
消息应答机制是 RabbitMQ 中确保消息处理的可靠性和一致性的重要机制之一。当消费者从队列中接收到消息并处理完成后,通常需要向 RabbitMQ 发送一个明确的消息应答,以告知 RabbitMQ 消息已经被处理,并可以安全地从队列中移除。
作用
-
确认消息已处理:消费者可以向 RabbitMQ 发送消息应答,以确认已经成功地处理了消息。
-
消息传递的可靠性:通过消息应答机制,可以确保消息在被消费者成功处理后才被从队列中移除,从而保证消息传递的可靠性。
模式
-
自动应答(Automatic Acknowledgement): 在这种模式下,消费者收到消息后立即自动发送确认,告知 RabbitMQ 消息已经被处理。这种模式下,消息一旦被发送给消费者,就会立即从队列中移除,无论消息是否被成功处理。虽然这种模式简单方便,但是可能会导致消息丢失或重复消费的问题,不适用于对消息传递的可靠性有要求的场景。
-
手动应答(Manual Acknowledgement): 在这种模式下,消费者需要显式地向 RabbitMQ 发送消息应答,以确认消息已经被成功处理。消费者在处理完消息后,可以调用
channel.basicAck()
方法发送确认,或调用channel.basicNack()
方法发送拒绝确认。手动应答模式可以确保消息在被消费者成功处理后才被从队列中移除,提高了消息传递的可靠性。
手动应答模式适用于对消息传递的可靠性有要求的场景,例如需要保证消息不丢失、不重复消费的场景。通过手动应答,可以更精确地控制消息的处理过程,提高消息传递的可靠性和一致性。
配置
方式一
全局配置
在springboot的yml文件中配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode
参数用于配置 RabbitMQ 消息监听器的应答模式。它决定了消费者接收到消息后如何向 RabbitMQ 确认消息的处理状态。该参数可以设置为以下几种值:
-
none:表示禁用应答模式。在这种模式下,消费者不会向 RabbitMQ 确认消息的处理状态,RabbitMQ 会认为消息一直处于未处理状态,直到连接关闭。这种模式下,消息可能会被多次消费,可能会导致消息重复处理的问题。
-
auto:表示自动应答模式。在这种模式下,消费者接收到消息后会立即自动发送确认,告知 RabbitMQ 消息已经被处理。这种模式下,消息一旦被发送给消费者,就会立即从队列中移除,无论消息是否被成功处理。自动应答模式简单方便,但可能会导致消息丢失或重复消费的问题,不适用于对消息传递的可靠性有要求的场景。
-
manual:表示手动应答模式。在这种模式下,消费者需要显式地向 RabbitMQ 发送消息应答,以确认消息已经被成功处理。消费者在处理完消息后,可以调用
channel.basicAck()
方法发送确认,或调用channel.basicNack()
方法发送拒绝确认。手动应答模式可以确保消息在被消费者成功处理后才被从队列中移除,提高了消息传递的可靠性。 -
manual_batch:表示批量应答模式。批量应答模式是 RabbitMQ 3.3.0 版本引入的一种新的应答模式。在批量应答模式下,消费者可以一次性确认多个消息的处理结果,从而提高应答的效率。消费者可以通过调用
channel.basicAck()
方法一次性确认多个消息的处理,或者调用channel.basicNack()
方法一次性拒绝多个消息的处理。批量应答模式适用于需要提高消费者效率的场景,但需要注意确保消息处理的一致性和可靠性。
方式二
对某个消费者单独配置
@RabbitListener(queues = {"queue_normal"})public void consumeNormal(String msg, Message message, Channel channel) throws IOException {log.debug("消费者 - 普通队列 - 接收消息:" + msg);try {/*** deliveryTag:表示消息的唯一标识符,用于标识需要确认的消息。* 每个消息都有一个唯一的 deliveryTag,由 RabbitMQ 自动生成。* 在确认消息时,需要指定对应消息的 deliveryTag。* multiple: 表示是否批量确认消息。如果将 multiple 参数设置为 false,* 则只确认指定 deliveryTag 对应的单个消息;如果将 multiple* 参数设置为 true,则表示确认所有 deliveryTag 小于或等于指定值的消息。* 通常情况下,建议将 multiple 参数设置为 false,以避免误操作导致确认了未处理的消息。* requeue:表示是否重新将消息放入队列中。如果将 requeue 参数设置为 true,* 则表示消息将被重新放入队列中,等待被重新消费;如果将 requeue* 参数设置为 false,则表示消息将被丢弃,不会重新放入队列中。通常情况下,* 在确认消息时应将 requeue 参数设置为 false,以确保消息不会被重复消费。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);throw new ServerException("普通队列消费消息失败!");}}
方法:channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)参数详解:
deliveryTag
:表示消息的唯一标识符,用于标识需要确认的消息。每个消息都有一个唯一的deliveryTag
,由 RabbitMQ 自动生成。在确认消息时,需要指定对应消息的deliveryTag
。
multiple
:表示是否批量确认消息。如果将multiple
参数设置为false
,则只确认指定deliveryTag
对应的单个消息;如果将multiple
参数设置为true
,则表示确认所有deliveryTag
小于或等于指定值的消息。通常情况下,建议将multiple
参数设置为false
,以避免误操作导致确认了未处理的消息。
requeue
:表示是否重新将消息放入队列中。如果将requeue
参数设置为true
,则表示消息将被重新放入队列中,等待被重新消费;如果将requeue
参数设置为false
,则表示消息将被丢弃,不会重新放入队列中。通常情况下,在确认消息时应将requeue
参数设置为false
,以确保消息不会被重复消费。
测试
这里用之前的简单模式进行测试,如果不想看前面的,可以自己创建队列和消费者
package com.model.listener;import com.code.exception.ServiceException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** @Author: Haiven* @Time: 2024/4/19 10:29* @Description: TODO*/
@Component
@Slf4j
@RabbitListener(queuesToDeclare = @Queue(value = "${rabbitmq.simple.queue}"))
public class SimpleConsumer {@RabbitHandlerpublic void simpleHandler(String msg, Message message, Channel channel) throws IOException, ServiceException {long msgId = message.getMessageProperties().getDeliveryTag();log.debug("简单模式消费者 - simple consumer - 接收到消息:" + msg);try {//线程睡 4 sTimeUnit.SECONDS.sleep(4);//手动应答channel.basicAck(msgId, false);log.debug("简单模式消费者 - simple consumer - 处理消息成功:" + msg);}catch (Exception e){channel.basicNack(msgId, false, true);log.debug("简单模式消费者 - simple consumer - 处理消息失败:" + msg);throw new ServiceException("简单模式接收消息失败");}}
}
发送消息
后台接收
这里暂时看不出消息是否是第一时间消费完毕,无法确认消息是否由channel.basicAck(msgId, false);方法应答,后续回结合死信队列,会更直观的验证
消息持久化
概览
在 RabbitMQ 中,消息持久化是一种确保消息在服务器宕机或重启后不会丢失的重要机制。通过消息持久化,可以将消息存储到磁盘上,以确保消息的可靠性和持久性。
要实现消息的持久化,需要同时确保消息和队列都被持久化。下面是实现消息持久化的步骤:
1.队列持久化:首先,需要确保队列被声明为持久化。在声明队列时,需要设置 durable
参数为 true
,表示该队列是持久化的。
Springboot在创建队列的时候:
QueueBuilder.durable(name)表示为持久化队列
QueueBuilder.nonDurable(name)表示为非持久化队列
2.消息持久化:然后,在发布消息时,需要将消息标记为持久化。在发布消息时,需要设置 deliveryMode
参数为 2
,表示该消息是持久化的。
发送消息时:
rabbitTemplate.convertAndSend();发送的消息默认就是持久化的
通过以上步骤,队列和消息都被声明为持久化的,确保了消息在服务器宕机或重启后不会丢失。需要注意的是,消息持久化会带来一定的性能开销,因为需要将消息写入磁盘,所以在一些对性能要求较高的场景下,需要权衡考虑是否使用消息持久化机制。
权重分配
概览
在 RabbitMQ 中,权重分配通常指的是在消息队列的消费者之间进行负载均衡,以确保消息能够在多个消费者之间均匀分配,达到最优的消息处理效率。RabbitMQ 并没有直接提供权重分配的功能,但可以通过一些方法实现类似的效果。
实现
-
多个消费者绑定同一个队列:可以将多个消费者绑定到同一个队列上,RabbitMQ 将会循环地将消息发送给不同的消费者,实现简单的轮询分发。这种方式可以实现简单的权重分配,但不能根据消费者的处理能力动态调整权重。
-
手动设置消费者优先级:在消费者处理消息时,可以根据一些策略手动设置消费者的优先级。例如,可以根据消费者的处理能力、负载情况等因素动态调整消费者的优先级,从而实现动态的权重分配。这种方式需要在业务代码中实现逻辑,并且需要维护消费者的优先级信息。
-
使用 Direct Exchange 进行路由:可以使用 Direct Exchange 进行消息的路由,并根据消费者的能力对消息进行标记。然后,消费者根据消息的标记选择性地接收消息,实现动态的权重分配。这种方式需要在消息生产者和消费者之间约定好消息的标记,以及消费者的能力等信息。
-
使用 RabbitMQ 插件:RabbitMQ 社区提供了一些插件,例如 Consistent Hash Exchange、Priority Queue 等,可以实现更复杂的消息路由和权重分配策略。可以根据实际需求选择合适的插件来实现权重分配。
总的来说,权重分配是一个比较复杂的问题,需要根据实际业务需求和系统架构选择合适的方法。在设计消息队列系统时,需要考虑消息的生产和消费速度、消费者的处理能力、系统的稳定性等因素,综合考虑选择合适的权重分配策略。
配置
package com.model.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Author: Haiven* @Time: 2024/4/19 11:51* @Description: TODO*/
@Component
@Slf4j
public class WorkConsumer {@RabbitListener(queues = {"work"}, concurrency = "1")public void consumer01(String msg){log.debug("消费者 -01- 接收消息:" + msg );}@RabbitListener(queues = {"work"}, concurrency = "6")public void consumer02(String msg){log.debug("消费者 -02- 接收消息:" + msg );}
}
@RabbitListener(queues = {"work"}, concurrency = "5")
concurrency
参数设置了消费者实例的数量为 5。通过调整concurrency
参数的值,可以实现不同消费者实例的权重分配。需要注意的是,
concurrency
参数表示每个消费者实例的并发线程数,而不是消费者的数量。例如,如果concurrency
参数设置为 5,则表示每个消费者实例将会启动 5 个并发线程来处理消息,如果需要配置多个消费者实例,可以通过创建多个MyMessageListener
Bean 来实现。
测试
向之前的工作队列发送多条消息
后台接收
后台接收的频次比例为 1:6(之前为轮训分发)