【MQ】Spring3 中 RabbitMQ 的使用与常见场景

一、初识 MQ

传统的单体架构,分布式架构的同步调用里,无论是方法调用,还是 OpenFeign 难免会有以下问题:

  1. 扩展性差(高耦合,需要依赖对应的服务,同样的事件,不断有新需求,这个事件的业务代码会越来越臃肿,这些子业务都写在一起)
  2. 性能下降(等待响应,最终整个业务的响应时长就是每次远程调用的执行时长之和)
  3. 级联失败(如果是一个事务,一个服务失败就会导致全部回滚,若是分布式事务就更加麻烦了,但其实一些行为的失败不应该导致整体回滚)
  4. 服务宕机(如果服务调用者未考虑服务提供者的性能,导致提供者因为过度请求而宕机)

但如果不是很要求同步调用,其实也可以用异步调用,如果是单体架构,你可能很快能想到一个解决方案,就是阻塞队列实现消息通知:

在这里插入图片描述

但是在分布式架构下,可能就需要一个中间件级别的阻塞队列,这就是我们要学习的 Message Queue 消息队列,简称 MQ,而现在流行的 MQ 还不少,在实现其基本的消息通知功能外,还有一些不错的扩展

以 RabbitMQ 和 Kafka 为例:

RabbitMQKafka
公司/社区RabbitApache
开发语言ErlangScala & Java
协议支持AMQP,XMPP,SMTP,STOMP自定义协议
可用性
单机吞吐量一般非常高(Kafka 亮点)
消息延迟微秒级毫秒以内
消息可靠性一般

消息延迟指的是,消息到队列,并在队列中“就绪”的时间与预期时间的差距,其实就是数据在中间件中流动的耗时,预期时间可以是现在、几毫秒后、几秒后、几天后…

据统计,目前国内消息队列使用最多的还是 RabbitMQ,再加上其各方面都比较均衡,稳定性也好,因此我们课堂上选择 RabbitMQ 来学习。

二、RabbitMQ 安装

Docker 安装 RabbitMQ:

mkdir /root/mq
cd /root/mqdocker rm mq-server -f
docker rmi rabbitmq:3.8-management -f
docker volume rm mq-plugins -fdocker pull rabbitmq:3.8-management# 插件数据卷最好还是直接挂载 volume,而不是挂载我们的目录
docker run \
--name mq-server \
-e RABBITMQ_DEFAULT_USER=xxx \
-e RABBITMQ_DEFAULT_PASS=xxx \
--hostname mq1 \
-v mq-plugins:/plugins \
-p 15672:15672 \
-p 5672:5672 \
-d rabbitmq:3.8-management

三、RabbitMQ 基本知识

(1)架构

15672:RabbitMQ 提供的管理控制台的端口

5672:RabbitMQ 的消息发送处理接口

用户名密码就是安装时,启动容器时指定的用户名密码

MQ 对应的就是这里的消息代理 Broker:

在这里插入图片描述

RabbitMQ 详细架构图:

在这里插入图片描述

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的 exchange、queue

现在你可能只认识生产者、消费者、队列,其他是什么呢?

其实你可以理解为 MQ 也是存储东西的,存储的就是消息,virtual host 就是数据库,queue 就是表,消息就是一行数据,而 MQ 有特殊的机制,消息先通过 exchange 再决定前往哪个 queue

管理控制台的使用就不多说了

(2)五大模式

这只是最常见的五种模式:

  1. 简单模式

在这里插入图片描述

  1. 工作模式

在这里插入图片描述

  1. 发布订阅模式

关联交换机的队列都能收到一份消息,广播

在这里插入图片描述

  1. 路由模式

关联交换机时,提供 routing key(可以是多个,队列之间可以重复),发布消息时提供一个 routing key,由此发送给指定的队列

在这里插入图片描述

值得注意的是,简单模式和工作模式,其实也是有交换机的,任何队列都会绑定一个默认交换机 "",类型是 direct,routing key 为队列的名称

  1. 主题模式

在这里插入图片描述

路由模式的基础上,队列关联交换机时 routing key 可以是带通配符的

routing key 的单词通过 . 分割, # 匹配 n 个单词(n ≥ 0),* 只匹配一个单词

例如 #.red:

  • 可以匹配的 routing key:p1.red、red、p2.p1.red

在发布消息时,要使用具体的 routing key,交换机发送给匹配的队列

(3)数据隔离

  1. 隔离 virtual host

在这里插入图片描述

  1. 隔离用户(赋予访问权限)

在这里插入图片描述

四、RabbitMQ 基本使用 Spring AMQP

引入 RabbitMQ 相关的 SDK,可以通过创建连接 Connection、创建通道 Channel,用 Channel 进行操作,接受消息也差不多,不过多演示:

public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("xx.xx.xx.xx");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("xxx");factory.setPassword("xxx");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

但比较麻烦,Spring AMQP 框架可以自动装配 RabbitMQ 的操作对象 RabbitTemplate,这样我们就可以更方便的操作 MQ,并充分发挥其特性

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

默认包含 RabbitMQ 的实现,如果你想对接其他 AMQP 协议的 MQ,得自己实现其抽象封装的接口

(1)发送消息

注意,下面是 Spring3 的写法,所以会有点不一样,可能看不懂,稍后解释!

消息发送器封装:

@Repository
@RequiredArgsConstructor
@Slf4j
public class RabbitMQSender {private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);}private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("处理 ack 回执失败, {}", ex.getMessage());return null;};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即执行// setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());}/*** @param exchange 交换机* @param routingKey routing key* @param msg 消息* @param delay 延迟时间(如果是延迟交换机,delay 才有效)* @param maxRetries 最大重试机会* @param <T> 消息的对象类型*/private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData = newCorrelationData();MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器@Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c -> {if(c.isAck()) {log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());} else {log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());if(retryCount >= maxRetries) {log.error("次数到达上限 {}", maxRetries);return;}retryCount++;log.warn("开始第 {} 次重试", retryCount);CorrelationData cd = newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);}public void sendMessage(String exchange, String routingKey, Object msg) {send(exchange, routingKey, msg, 0, 0);}public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){send(exchange, routingKey, msg, delay, 0);}public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {send(exchange, routingKey, msg, 0, maxReties);}public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {send(exchange, routingKey, msg, delay, maxReties);}}

(2)接受消息

监听器:

  • RabbitTemplate 是可以主动获取消息的,也可以不实时监听,但是一般情况都是监听,有消息就执行
  • 监听的是 queue,若 queue 不存在,就会根据注解创建一遍
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "xxx"),exchange = @Exchange(name = "xxx", delayed = "true"),key = {"xxx"}
))
public void xxx(X x) {}

(3)声明交换机与队列

可以通过 @Bean 创建 Bean 对象的方式去声明,可以自行搜索,我更喜欢监听器注解的形式,而且 Bean 的方式,可能会因为配置不完全一样,导致其他配置类的交换机队列无法声明(现象如此,底层为啥我不知道)

(4)消息转换器

消息是一个字符串,但为了满足更多需求,需要将一个对象序列化成一个字符串,但默认的序列化实现貌似用的是 java 对象的序列化,这种方式可能得同一个程序的 java 类才能反序列化成功,所以我们应该选择分布式的序列化方式,比如 json

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageConverterConfig {@Beanpublic MessageConverter messageConverter(){// 1. 定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);// 2. 配置自动创建消息 id,用于识别不同消息jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);return jackson2JsonMessageConverter;}
}

这里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己实现的 ObjectMapper

(5)配置文件

spring:rabbitmq:host: ${xxx.mq.host} # rabbitMQ 的 ip 地址port: ${xxx.mq.port} # 端口username: ${xxx.mq.username}password: ${xxx.mq.password}virtual-host: ${xxx.mq.virtual-host}publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true # 若是 false 则直接丢弃了,并不会发送者回执listener:simple:prefetch: 1 # 预取为一个(消费完才能拿下一个)concurrency: 2 # 消费者最少 2 个线程max-concurrency: 10 # 消费者最多 10 个线程auto-startup: true # 为 false 监听者不会实时创建和监听,为 true 监听的过程中,若 queue 不存在,会再根据注解进行创建,创建后只监听 queue,declare = "false" 才是不自动声明default-requeue-rejected: false # 拒绝后不 requeue(成为死信,若没有绑定死信交换机,就真的丢了)acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)retry: # 这个属于 spring amqp 的 retry 机制enabled: false # 不开启失败重试
#          initial-interval: 1000
#          multiplier: 2
#          max-attempts: 3
#          stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false

五、常见问题

在这里插入图片描述

(1)RabbitMQ 如何保证消息可靠性

保证消息可靠性、不丢失。主要从三个层面考虑

如果报错可以先记录到日志中,再去修复数据(保底)

1、生产者确认机制

生产者确认机制,确保生产者的消息能到达队列

  1. publisher-confirm,针对的是消息从发送者到交换机的可靠性,成功则进行下一步,失败返回 NACK
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;@PostConstruct
public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);
}private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("处理 ack 回执失败, {}", ex.getMessage());return null;
};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即执行// setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};
};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());
}/*** @param exchange 交换机* @param routingKey routing key* @param msg 消息* @param delay 延迟时间(如果是延迟交换机,delay 才有效)* @param maxRetries 最大重试机会* @param <T> 消息的对象类型*/
private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData = newCorrelationData();MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器@Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c -> {if(c.isAck()) {log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());} else {log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());if(retryCount >= maxRetries) {log.error("次数到达上限 {}", maxRetries);return;}retryCount++;log.warn("开始第 {} 次重试", retryCount);CorrelationData cd = newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
}

Spring3 的 RabbitMQ Confirm,需要配置为 correlated,发送消息时提供 CorrelationData,也就是与消息关联的数据,包括发送者确认时的回调方法

在这里插入图片描述

要想提供 Confirm 的回调办法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 对象(新的 JUC 工具类,可以查一查如何使用)

配置后,在未来根据回调函数进行处理(当然也可以直接设置在 RabbitTemplate 对象的 ConfirmCallBack)

还可以自己实现消息的发送者重试:

在这里插入图片描述

  1. publisher-returns,针对的是消息从交换机到队列的可靠性,成功则返回 ACK,失败触发 returns 的回调方法
@Component
@RequiredArgsConstructor
@Slf4j
public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {// 不存在 routing key 对应的队列,那在我看来转发到零个是合理的现象,但在这里也认为是路由失败(MQ 认为消息一定至少要进入一个队列,之后才能被处理,这就是可靠性)(反正就是回执了,你爱咋处理是你自己的事情)@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 可能一些版本的 mq 会因为是延时交换机,导致发送者回执,只要没有 NACK 这种情况其实并不是不可靠(其实我也不知道有没有版本会忽略)// 但是其实不忽略也不错,毕竟者本来就是特殊情况,一般交换机是不存储的,但是这个临时存储消息// 但这样也就代表了,延时后消息路由失败是没法再次处理的(因为我们交给延时交换机后就不管了,可靠性有 mq 自己保持)MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();// 这里的 message 并不是原本的 message,是额外的封装,x-delay 在 publish-returns 里面封装到 receiveDelay 里了Integer delay = messageProperties.getReceivedDelay();// 如果不是延时交换机,却设置了 delay 大于 0,是不会延时的,所以是其他原因导致的(以防万一把消息记录到日志里)if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {log.info("交换机 {}, 路由键 {} 消息 {} 延迟 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));return;}log.warn("publisher-returns 发送者回执(应答码{},应答内容{})(消息 {} 成功到达交换机 {},但路由失败,路由键为 {})",returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}

RabbitMQSender:

private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;private final PublisherReturnsCallBack publisherReturnsCallBack;@PostConstruct
public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);// 设置统一的 publisher-returns(confirm 也可以设置统一的,但最好还是在发送时设置在 future 里)// rabbitTemplate 的 publisher-returns 同一时间只能存在一个// 因为 publisher confirm 后,其实 exchange 有没有转发成功,publisher 没必要每次发送都关注这个 exchange 的内部职责,更多的是“系统与 MQ 去约定”rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
}

同理你也可以按照自己的想法进行重试…

在测试练习阶段里,这个过程是异步回调的,如果是单元测试,发送完消息进程就结束了,可能就没回调,程序就结束了,自然就看不到回调时的日志

如果既没有 ACK 也没有 NACK,也没有发布者回执,那就相当于这个消息销声匿迹了,没有任何的回应,那么就会抛出异常,我们可以处理这个异常,比如打印日志、重发之类的…

private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("处理 ack 回执失败, {}", ex.getMessage());return null;
};

2、持久化

消息队列的数据持久化,确保消息未消费前在队列中不会丢失,其中的交换机、队列、和消息都要做持久化

默认都是持久化的

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、消费者确认

队列的消息出队列,并不会立即删除,而是等待消费者返回 ACK 或者 NACK

消费者要什么时候发送 ACK 呢?

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

如果出现这种场景,就是不可靠的,所以应该是消息处理后,再发送 ACK

Spring AMQP 有三种消费者确认模式:

  1. manual,手段 ack,自己用 rabbitTemplate 去发送 ACK/NACK(这个比较麻烦,不用 RabbitListener 接受消息才必须用这个)
  2. auto,配合 RabbitListener 注解,代码若出现异常,NACK,成功则 ACK
  3. none,获得消息后直接 ACK,无论是否执行成功

出现 NACK 后要如何处理(此过程还在我们的服务器):

  1. 拒绝(默认)
  2. 重新入队列
  3. 返回 ACK,消费者重新发布消息指定的交换机
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageRecovererConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成为死信(默认)
//        return new ImmediateRequeueMessageRecoverer(); // nack、requeue
//        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、发送给指定的交换机,confirm 机制需要设置到 rabbitTemplate 里}}

Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)retry: # 这个属于 spring amqp 的 retry 机制enabled: false # 不开启失败重试initial-interval: 1000 # 第一次重试时间间隔multiplier: 3 # 每次重试间隔的倍数max-attempts: 4 # 最大接受次数stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false

解释:第一次失败,一秒后重试、第二次失败,三秒后重试,第三次失败,九秒后重试,第四次失败就没机会了(SpringAMQP会抛出异常AmqpRejectAndDontRequeueException)

失败之后根据对应的处理策略进行处理

(2)死信交换机

消息过期、消息执行失败并且不重试也不重新入队列,堆积过多等情况,消息会成为死信,若队列绑定死信交换机,则转发给死信交换机,若没有则直接丢弃

队列1 -> 死信交换机 -> 队列2,这个过程是消息队列内部保证的可靠性,消息也没有包含原发送者的信息,甚至连接已经断开了,所以没有 publisher-confirm 也没有 publisher-returns

在这里插入图片描述

这个机制和 republish 有点像,但是有本质的区别,republish 是消费者重发,而这里是队列将死信转发给死信交换机

死信的情况:

  1. nack && requeue == false
  2. 超时未消费
  3. 队列满了,由于队列的特性,队列头会先成为死信

(3)延迟功能如何实现

刚才提到死信的诞生可能是超时未消费,那么其实这个点也可以简单的实现一个延迟队列:

队列为一个不被监听的专门用来延迟消息发送的缓冲带,其死信交换机才是目标交换机

message.getMessageProperties().setExpiration("1000");

设置的是过期时间,其本意并不是延迟,是可以实现延迟~

在这里插入图片描述

另外,队列本身也能设置 ttl 过期时间,但并不是队列的过期时间(显然不合理,截止后无论啥都丢了,冤不冤啊,至少我想不到这种场景),而是队列中的消息存活的最大时间,消息的过期时间和这个取一个最小值才是真实的过期时间

值得注意的是,虽然能实现延时消息的功能,但是

  1. 实现复杂
  2. 延迟可能不准确,因为队列的特性,如果队列头未出队列,哪怕其后者出现死信,也只能乖乖等前面的先出去之后才能前往死信交换机(例如消息的 ttl 分别为 9s、3s、1s,最终三个消息会被同时转发,因为“最长寿的”排在了前面)

这种方式的顺序优先级大于时间优先级

而 RabbitMQ 也提供了一个插件,叫 DelayExchange 延时交换机,专门用来实现延时功能

Scheduling Messages with RabbitMQ | RabbitMQ

  • 请自行上网下载

延时交换机的声明:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}

延时消息的发送:

private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即执行message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};
};

这里设置的是 Delay,不是过期时间,哪怕超过了时间也不叫做死信

期间一直存在延时交换机的硬存里,延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

(4)消息堆积如何解决

死信的成因还可能是堆叠过多

我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,解决方案也所有很多的

  1. 提高消费者的消费能力 ,可以使用多线程消费任务
  2. 增加更多消费者,提高消费速度,使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
  3. 扩大队列容积,提高堆积上限

但是,RabbitMQ 队列占的是内存,间接性的落盘,提高上限最终的结果很有可能就是反复落库,特别不稳定,且并没有解决消息堆积过多的问题

我们可以使用 RabbitMQ 惰性队列,惰性队列的好处主要是

  1. 接收到消息后直接存入磁盘而非内存,虽然慢,但没有间歇性的 page-out,性能比较稳定
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存,正常消费后就删除了
  3. 基于磁盘存储,消息上限高,支持数百万条的消息存储

声明方式:

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :策略的作用对象,是所有的队列
  • x-queue-mode 参数的值为 lazy
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "xxx"),value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),key = "xxx"
))

交换机、队列扩展属性叫参数,消息的拓展属性叫头部,扩展属性一般都以 x- 开头(extra)

消息堆积问题的解决方案?

  • 队列上绑定多个消费者,提高消费速度
  • 使用惰性队列,可以再mq中保存更多消息

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的 page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

(5)高可用如何保证

RabbitMQ 在服务大规模项目时,一般情况下不会像数据库那样存储的瓶颈,用惰性队列已经是很顶天了的,其特性和用途不会有太极端的存储压力

更多的是在并发情况下,处理消息的能力有瓶颈,可能出现节点宕机的情况,而避免单节点宕机,数据丢失、无法提供服务等问题需要解决,也就是需要保证高可用性

Erlang 是一种面向并发的语言,天然支持集群模式,RabbitMQ 的集群有两种模式:

  1. 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
  2. 镜像集群:是一种主从集群,在普通集群的基础上,添加了主从备份的功能,提高集群的数据可用性

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险(虽然重启能解决,但那不是强一致,而是最终一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用 Raft 协议确保主从的数据一致性

1、普通集群

各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):

  1. 交换机的信息
  2. 队列的信息

但不包括队列中的消息(动态的数据不同步)

监听队列的时候,如果监听的节点不存在该队列(只是知道元数据),当前节点会访问队列所在的节点,该节点返回数据到当前节点并返回给监听者

队列所在节点宕机,队列中的消息就会“丢失”(是在重启之前,这个消息就消失无法被处理的意思)

在这里插入图片描述

如何部署,上网搜搜就行

2、镜像集群

各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):

  1. 交换机的信息
  2. 队列的信息

本质是主从模式,创建队列的节点为主节点,其他节点为镜像节点,队列中的消息会从主节点备份到镜像节点中

注意

  • 像 Redis 那样的主从集群,同步都是全部同步来着
  • 但 RabbitMQ 集群的主从模式比较特别,他的粒度是队列,而不是全部

也就是说,一个队列的主节点,可能是另一个队列的镜像节点,所以分析某个场景的时候,要确认是哪个队列,单独进行观察分析讨论

  • 不同队列之间只有交互,不会相互影响数据同步

针对某一个队列,所有写操作都在主节点完成,然后同步给镜像节点,读操作任何一个都 ok

主节点宕机,镜像节成为新的主节点

在这里插入图片描述

镜像集群有三种模式:

  1. exactly 准确模式,指定副本数 count = 主节点数 1 + 镜像节点数,集群会尽可能的维护这个数值,如果镜像节点出现故障,就在另一个节点上创建镜像,比较建议这种模式,可以设置为 N/2 + 1
  2. all 全部模式,count = N,主节点外全部都是镜像节点
  3. nodes 模式,指定镜像节点名称列表,随机一个作为主节点,如果列表里的节点都不存在或不可用,则创建队列时的节点作为主节点,之后访问集群,列表中的节点若存在才会创建镜像节点

没有镜像节点其实就相当于普通模式了

如何配置上网搜搜就行,比较麻烦,需要设置策略,以及匹配的队列(不同队列分开来讨论,可以设置不同的策略)

3、仲裁队列

RabbitMQ 3.8 以后,推出了新的功能仲裁队列来

  1. 代替镜像集群,都是主从模式,支持主从数据同步,默认是 exactly count = 5
  2. 约定大于配置,使用非常简单没有复杂的配置,队列的类型选择 Quorum 即可
  3. 底层采用 Raft 协议确保主从的数据强一致性

Spring Boot 配置:

在这里插入图片描述

仲裁队列声明:

@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "xxx"),value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),key = "xxx"
))

队列不声明默认就是普通集群,这里声明的仲裁队列也只是针对一个队列

(6)消息重复消费问题

在保证MQ消息不重复的情况下,MQ 的一条消息被消费者消费了多次

消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常或者是服务宕机,MQ 迟迟没有接收到 ACK 也没有 NACK,此时 MQ 不会将发送的消息删除,按兵不动,消费者重新监听或者有其他消费者的时候,交由它消费,而这条消息如果在之前就消费过了的话,则会导致重复消费

解决方案:

  1. 消息消费的业务本身具有幂等性,再次处理相同消息时不会产生副作用,一些时候可能需要用到分布式锁去维护幂等性
    • 比如一个订单的状态设置为结束,那重复消费的结果一致
  2. 记录消息的唯一标识,如果消费过了的,则不再消费
    • 消费成功将 id 缓存起来,消费时查询缓存里是否有这条消息
    • 设置允许的缓存时间时,你不必想得太极端,一般很快就有消费者继续监听拿到消息,哪怕真有那个情况,这里带来的损失大概率可以忽略不记了,一切要结合实际情况!

有时候两种方案没有严格的界定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/895150.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EasyExcel 导出合并层级单元格

EasyExcel 导出合并层级单元格 一、案例 案例一 1.相同订单号单元格进行合并 合并结果 案例二 1.相同订单号的单元格进行合并2.相同订单号的总数和总金额进行合并 合并结果 案例三 1.相同订单号的单元格进行合并2.相同订单号的商品分类进行合并3.相同订单号的总数和总金额…

cs106x-lecture3(Autumn 2017)

打卡cs106x(Autumn 2017)-lecture3 1、streamErrors Suppose an input file named streamErrors-data.txt contains the following text: Donald Knuth M 76 Stanford U. The code below attempts to read the data from the file, but each section has a bug. Correct th…

C++模板编程——typelist的实现

文章最后给出了汇总的代码&#xff0c;可直接运行 1. typelist是什么 typelist是一种用来操作类型的容器。和我们所熟知的vector、list、deque类似&#xff0c;只不过typelist存储的不是变量&#xff0c;而是类型。 typelist简单来说就是一个类型容器&#xff0c;能够提供一…

windows通过网络向Ubuntu发送文件/目录

由于最近要使用树莓派进行一些代码练习&#xff0c;但是好多东西都在windows里或虚拟机上&#xff0c;就想将文件传输到树莓派上&#xff0c;但试了发现u盘不能简单传送&#xff0c;就在网络上找到了通过windows 的scp命令传送 前提是树莓派先开启ssh服务&#xff0c;且Window…

字节跳动后端一面

&#x1f4cd;1. Gzip压缩技术详解 Gzip是一种流行的无损数据压缩格式&#xff0c;它使用DEFLATE算法来减少文件大小&#xff0c;广泛应用于网络传输和文件存储中以提高效率。 &#x1f680; 使用场景&#xff1a; • 网站优化&#xff1a;通过压缩HTML、CSS、JavaScript文件来…

三维模拟-机械臂自翻车

机械仿真 前言效果图后续 前言 最近在研究Unity机械仿真&#xff0c;用Unity实现其运动学仿真展示的功能&#xff0c;发现一个好用的插件“MGS-Machinery-master”&#xff0c;完美的解决了Unity关节定义缺少液压缸伸缩关节功能&#xff0c;内置了多个场景&#xff0c;讲真的&…

USB子系统学习(四)用户态下使用libusb读取鼠标数据

文章目录 1、声明2、HID协议2.1、描述符2.2、鼠标数据格式 3、应用程序4、编译应用程序5、测试6、其它 1、声明 本文是在学习韦东山《驱动大全》USB子系统时&#xff0c;为梳理知识点和自己回看而记录&#xff0c;全部内容高度复制粘贴。 韦老师的《驱动大全》&#xff1a;商…

史上最快 Python版本 Python 3.13 安装教程

Python3.13安装和配置 一、Python的下载 1. 网盘下载地址 (下载速度比较快&#xff0c;推荐&#xff09; Python3.13.0下载&#xff1a;Python3.13.0下载地址&#xff08;windows&#xff09;3.13.0下载地址&#xff08;windows&#xff09; 点击下面的下载链接&#xff0c…

AWS Fargate

AWS Fargate 是一个由 Amazon Web Services (AWS) 提供的无服务器容器计算引擎。它使开发者能够运行容器化应用程序&#xff0c;而无需管理底层的服务器或虚拟机。简而言之&#xff0c;AWS Fargate 让你只需关注应用的容器本身&#xff0c;而不需要管理运行容器的基础设施&…

vue3+vite+eslint|prettier+elementplus+国际化+axios封装+pinia

文章目录 vue3 vite 创建项目如果创建项目选了 eslint prettier从零教你使用 eslint prettier第一步&#xff0c;下载eslint第二步&#xff0c;创建eslint配置文件&#xff0c;并下载好其他插件第三步&#xff1a;安装 prettier安装后配置 eslint (2025/2/7 补充) 第四步&am…

vLLM V1 重磅升级:核心架构全面革新

本文主要是 翻译简化个人评读&#xff0c;原文请参考&#xff1a;vLLM V1: A Major Upgrade to vLLM’s Core Architecture vLLM V1 开发背景 2025年1月27日&#xff0c;vLLM 开发团队推出 vLLM V1 alpha 版本&#xff0c;这是对框架核心架构的里程碑式升级。基于过去一年半的…

Jupyter Notebook自动保存失败等问题的解决

一、未生成配置文件 需要在命令行中&#xff0c;执行下面的命令自动生成配置文件 jupyter notebook --generate-config 执行后会在 C:\Users\用户名\.jupyter目录中生成文件 jupyter_notebook_config.py 二、在网页端打开Jupyter Notebook后文件保存失败&#xff1b;运行代码…

使用wpa_supplicant和wpa_cli 扫描wifi热点及配网

一&#xff1a;简要说明 交叉编译wpa_supplicant工具后会有wpa_supplicant和wpa_cli两个程序生产&#xff0c;如果知道需要连接的wifi热点及密码的话不需要遍历及查询所有wifi热点的名字及信号强度等信息的话&#xff0c;使用wpa_supplicant即可&#xff0c;否则还需要使用wpa_…

【真一键部署脚本】——一键部署deepseek

目录 deepseek一键部署脚本说明 0 必要前提 1 使用方法 1.1 使用默认安装配置 1.1 .1 使用其它ds模型 1.2 使用自定义安装 2 附录&#xff1a;deepseek模型手动下载 3 脚本下载地址 deepseek一键部署脚本说明 0 必要前提 linux环境 python>3.10 1 使用方法 1.1 …

5.2Internet及其作用

5.2.1Internet概述 Internet称为互联网&#xff0c;又称英特网&#xff0c;始于1969年的美国ARPANET&#xff08;阿帕网&#xff09;&#xff0c;是全球性的网络。 互连网指的是两个或多个不同类型的网络通过路由器等网络设备连接起来&#xff0c;形成一个更大的网络结构。互连…

“图像识别分割算法:解锁视觉智能的关键技术

嘿&#xff0c;各位朋友&#xff01;今天咱们来聊聊图像识别分割算法。这可是计算机视觉领域里特别厉害的一项技术&#xff0c;简单来说&#xff0c;它能让机器“看懂”图像中的不同部分&#xff0c;并把它们精准地分出来。想象一下&#xff0c;机器不仅能识别出图里有猫还是狗…

AJAX项目——数据管理平台

黑马程序员视频地址&#xff1a; 黑马程序员——数据管理平台 前言 功能&#xff1a; 1.登录和权限判断 2.查看文章内容列表&#xff08;筛选&#xff0c;分页&#xff09; 3.编辑文章&#xff08;数据回显&#xff09; 4.删除文章 5.发布文章&#xff08;图片上传&#xff0…

html转PDF文件最完美的方案(wkhtmltopdf)

目录 需求 一、方案调研 二、wkhtmltopdf使用 如何使用 文档简要说明 三、后端服务 四、前端服务 往期回顾 需求 最近在做报表类的统计项目&#xff0c;其中有很多指标需要汇总&#xff0c;网页内容有大量的echart图表&#xff0c;做成一个网页去浏览&#xff0c;同时…

示例:JAVA调用deepseek

近日&#xff0c;国产AI DeepSeek在中国、美国的科技圈受到广泛关注&#xff0c;甚至被认为是大模型行业的最大“黑马”。在外网&#xff0c;DeepSeek被不少人称为“神秘的东方力量”。1月27日&#xff0c;DeepSeek应用登顶苹果美国地区应用商店免费APP下载排行榜&#xff0c;在…

.NET周刊【2月第1期 2025-02-02】

国内文章 dotnet 9 已知问题 默认开启 CET 导致进程崩溃 https://www.cnblogs.com/lindexi/p/18700406 本文记录 dotnet 9 的一个已知且当前已修问题。默认开启 CET 导致一些模块执行时触发崩溃。 dotnet 使用 ColorCode 做代码着色器 https://www.cnblogs.com/lindexi/p/…