文章目录
- 1. 什么是消息队列
- 1.1 消息队列概述
- 1.2 使用消息队列的优势
- 1.3 使用消息队列的劣势
- 1.4 常见的消息队列产品对比
- 2. RabbitMQ 基本概念
- 2.1 RabbitMQ 概述
- 2.2 RabbitMQ 的概念模型
- 2.2.1 Message
- 2.2.2 Publisher
- 2.2.3 Exchange
- 2.2.4 Binding
- 2.2.5 Queue
- 2.2.6 Connection
- 2.2.7 Channel
- 2.2.8 Virtual Host
- 2.2.9 Consumer
- 2.2.10 Broker
- 2.3 RabbitMQ 的六种工作模式
- 2.3.1 工作队列模式
- 2.3.2 发布/订阅模式
- 2.3.3 路由模式
- 2.3.4 主题模式
- 2.3.5 RPC 模式
- 2.3.6 消息头模式
- 3. RabbitMQ 高级特性
- 3.1 消息投递时序图
- 3.2 消息的可靠投递
- 3.2.1 publishConfirm 机制
- 3.2.2 returnCallback 机制
- 3.2.3 消息确认 ACK 机制
- 3.2.3.1 消息确认 ACK 机制的基本概念
- 3.2.3.2 Spring AMQP 实现手动确认 ACK 机制
- 3.3 死信队列
- 3.4 延时队列
- 3.4.1 TTL + 死信队列实现延时队列
- 3.4.2 x-delay-message-exchange 插件实现延时队列
- 4. RabbitMQ 如何解决问题
- 4.1 使用 RabbitMQ 集群实现高可用
- 4.2 如何保证消息不被重复消费
- 4.3 如何确保消息不丢失
- 4.3.1 消息从生产者到 broker 之间的防丢失
- 4.3.2 消息在 broker 中的防丢失
- 4.3.3 消息从 broker 到消费者之间的防丢失
- 4.4 如何保证消息传递的顺序性
1. 什么是消息队列
1.1 消息队列概述
消息队列(Message Queue
,以下简称 MQ
)是在消息的传输过程中保存消息的容器
,多用于分布式系统服务间的通信。
1.2 使用消息队列的优势
使用消息队列主要有解耦、异步、削峰的优势。
- 应用解耦:可以提高系统的容错性和可维护性
- 异步提速:提升用户体验和系统吞吐量
- 削峰填谷:提高系统的稳定性
1.3 使用消息队列的劣势
使用消息队列主要有系统可用性降低、复杂度提高的劣势。
- 系统可用性降低:系统引入的外部依赖越多,则系统的稳定性越低。一旦
MQ
宕机,则系统中依赖MQ
实现的功能全部不可用。所以MQ
一定要高可用
。 - 系统的复杂度提高:引入
MQ
后需要考虑的问题会比较多且比较棘手,主要有以下几个方面的问题:- 如何保证消息不被重复消费?
- 如何处理消息丢失的情况?
- 如何保证消息传递的顺序性?
1.4 常见的消息队列产品对比
市面上常见的消息队列产品主要有 RabbitMQ
、ActiveMQ
、RocketMQ
、Kafka
。
特性 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级 (高于 ActiveMQ) | 万级 (次于 RabbitMQ) | 10 万级 ,支持高吞吐 | 10 万级 ,支持高吞吐 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等级旗下,可以支撑大量的 topic | topic 到几十/几百的级别时,吞吐量会大幅度下降。在同等机器下,如果要支持大规模的 topic,需要增加更多的机器资源 | ||
协议支持 | AMQP 、XMPP、SMTP、STOMP | AMQP、OpenWire、STOMP、REST、XMPP | 自定义 | 自定义协议,社区封装了 HTTP 协议支持 |
时效性 | 微秒(us ) 级,RabbitMQ 的最大特点,延迟最低 | 毫秒(ms )级 | 毫秒(ms )级 | 毫秒(ms )级内 |
可用性 | 高,基于主从架构可实现高可用 | 高,基于主从架构可实现高可用 | 非常高,分布式架构 | 非常高,分布式架构,一个数据多个副本,少数机器宕机不会丢失数据,不会导致不可用 |
消息可靠性 | 基本不丢 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到 0 丢失 | 经过参数优化配置,可以做到 0 丢失 |
功能支持 | 基于 erlang 开发,并发能力很强,性能极好,延时最低,社区活跃,管理界面功能丰富 | 老牌产品,成熟度高,文档多 | MQ 功能较为完善,基于分布式架构,扩展性好 | 功能较为简单,只支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
2. RabbitMQ 基本概念
2.1 RabbitMQ 概述
RabbitMQ
是实现了高级消息队列协议(AMQP
)的开源消息中间件。主要的特点包括:
- 可靠性(Reliability):
RabbitMQ
使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing):在消息进入队列之前,通过
Exchange
来路由消息。对于典型的路由功能,RabbitMQ
已经提供了一些内置的Exchange
来实现。针对更复杂的路由功能,可以将多个Exchange
绑定在一起,也可以通过插件机制实现自己的Exchange
。 - 消息集群(Clustering):多个
RabbitMQ
服务器可以组成一个集群,形成一个逻辑Broker
。 - 高可用(Highly Available Queues):队列可以在集群中的机器上进行同步,使得在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol):
RabbitMQ
支持多种消息队列协议,比如STOMP
、MQTT
。 - 多语言客户端(Many Clients):
RabbitMQ
几乎支持所有的常用编程语言。 - 管理界面(Management UI):
RabbitMQ
提供了一个易用的用户界面。 - 跟踪机制(Tracing):如果消息异常,
RabbitMQ
提供了消息跟踪机制,可以方便定位问题。 - 插件机制(Plugin System):
RabbitMQ
提供插件机制,可以根据自己的需求获得或自己编写插件以满足定制化需求。
2.2 RabbitMQ 的概念模型
2.2.1 Message
Message
即消息。消息是不具名的,由**消息头(header)和消息体(payload)**组成。
- 消息体由二进制数组承载,保证了传输的高效性和一定的安全性。
- 消息头由一系列的可选属性组成。包括 路由键(routing-key)、消息优先级(priority)、**是否需要持久化(delivery-mode)**等组成。
2.2.2 Publisher
Publisher
即消息的生产者。也是一个向 broker
发布消息的客户端应用程序。
2.2.3 Exchange
Exchange
即交换机,用来接收生产者发送的消息并将这些消息路由给与其绑定的队列。
Exchange
分发消息根据类型不同,分发策略随之不同,一共由四种类型:
direct
:只有消息中的路由键(Routing Key)和绑定关系(Binding)完全匹配时,交换机才会将消息分发到对应的队列中。fanout
:每条发送到fanout
类型的交换机上的消息都会被分发到所有绑定到这个交换机的队列上去。fanout
类型的交换机不处理路由键的匹配逻辑,只是简单地将消息分发到所有绑定的队列上去。由于在分发过程中少处理了路由键匹配的逻辑,fanout
类型的交换机转发消息效率是最高的。topic
:在分发过程中需要处理路由键匹配的逻辑,匹配逻辑如下:- 单词与单词之间用
.
隔开 *
匹配一个单词#
匹配 0 个或多个单词*
、#
只能单独用作占位符,不能与单词贴着使用
- 单词与单词之间用
headers
:在分发过程中不需要处理路由键匹配的逻辑,而使用**绑定参数(Arguments
)**来进行匹配,匹配逻辑如下:Routing Key
不参与路由逻辑- 如果绑定关系中
Arguments
中的x-match = all
时,表示发送的消息属性中的header
中的所有键值对与绑定关系中声明的Arguments
中的所有键值对都相等才能匹配,可以理解为消息属性中的header
中的所有键值对包含绑定关系中声明的Arguments
中的所有键值对。 - 如果绑定关系中
Arguments
中的x-match = any
时,表示发送的消息属性中的header
中任一键值对与Arguments
中相等就能匹配。
2.2.4 Binding
Binding
即队列与交换机的绑定关系,可以理解为交换机和队列的关系中间表。Binding
包括以下两部分:
Routing Key
:即路由键,可以理解为消息的路由关键字。在交换机类型为direct
或topic
时,将参与路由逻辑。Arguments
:即绑定参数,可以理解为消息的路由参数,键值对的形式。在交换机类型为Headers
时,将参与路由逻辑。
2.2.5 Queue
Queue
即队列,用来保存消息直到发送给消费者。它是存放消息的容器,一个消息可以被投入到一个或多个队列。消息将一直呆在队列中,等待消费者将其从队列中取走。
it's essentially a large message buffer. —— 本质上队列就是一个巨大的消息缓冲区。
RabbitMQ
官网上如是介绍 Queue
。
队列有几个非常重要的属性:
TTL
:即Time-To-Live
属性,队列的最大存活时间。通过在声明队列时,设置x-message-ttl
参数(单位:ms
)来完成。队列中的所有消息都会遵守这个时间的限制。- 最大长度限制:顾名思义,即队列的大小。这个最大长度可以是针对队列中就绪状态消息数量的限制,也可以是针对队列中所有就绪状态消息的总字节数的限制,且两个限制可以同时存在。(注意,这里说的大小都是针对队列中就绪状态的消息来进行统计的。)
- 消息数量限制:通过设置
x-max-length
来实现 - 消息总字节数限制:通过设置
x-max-length-bytes
来实现(单位:字节,只计算消息头的字节数,不计算消息头、消息属性占用的字节数)
- 消息数量限制:通过设置
- 队列溢出行为:当队列中的就绪状态消息超出了消息的最大长度限制后,将会有队列溢出行为发生,由
x-overflow
进行设置drop-head
:删除队列头部的消息reject-publish
:丢掉最近接收的消息reject-publish-dlx
:将消息发往死信队列
2.2.6 Connection
Connection
即网络连接。
2.2.7 Channel
Channel
即信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真是的 TCP
连接内的虚拟连接,AMQP
命令都是通过信道发出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于 OS
来说建立和销毁 TCP
都是非常昂贵的开销,所以引入了信道的概念,以复用 TCP
连接。
2.2.8 Virtual Host
Virtual Host
即虚拟主机,可以理解为虚拟消息服务器。每个 Virtual Host
相当于一个独立的 RabbitMQ
服务器,每个 Virtual Host
之间的数据是相互隔离的,不能互通。
可以将 Virtual Host
类比为 MySQL
中的 DataBase
的概念。
2.2.9 Consumer
Consumer
即消息的消费者,表示一个从 broker
中获取消息的客户端应用程序。
2.2.10 Broker
Broker
即消息队列服务实体。
2.3 RabbitMQ 的六种工作模式
2.3.1 工作队列模式
工作队列模式实现:
- 生产者和消费者需要指定同一个交换机
- 生产者发送时指定路由名称为队列名称
- 消费者将队列绑定至交换机上,无需指定路由键
工作队列模式下,多个消费者监听同一个队列的时候,一条消息只会有一个消费者接收到。
2.3.2 发布/订阅模式
发布订阅模式实现:
- 生产者和所有消费者需要指定同一个交换机
- 指定交换机类型为
fanout
- 生产者发送时无需指定
Routing Key
- 消费者绑定各自队列到交换机上
发布订阅模式下,所有绑定了交换机的队列都会收到消息;但如果多个消费者监听了同一个队列,一条消息还是只会有一个消费者收到。
2.3.3 路由模式
路由模式实现:
- 生产者和所有消费者需要指定同一个交换机
- 指定交换机类型为
direct
- 生产者发送时指定
Routing Key
- 消费者将各自需要监听的队列通过自定义
Routing Key
绑定到交换机上
路由模式下,只有与生产者指定了相同的 Routing Key 的队列才会收到消息;如果两个队列绑定的 Routing Key 相同,则都会收到消息。
2.3.4 主题模式
主题模式实现:
- 生产者和所有消费者需要指定同一个交换机
- 指定交换机类型为
topic
- 生产者发送时指定
Routing Key
- 消费者将各自需要监听的队列通过自定义
Routing Key
匹配格式绑定到交换机上。- 多个单词用
.
隔开,否则视为一个单词 #
匹配零个或多个单词,*
只能匹配一个单词
- 多个单词用
主题模式下,只有生产者的 Routing Key 与队列绑定的 Routing Key 格式匹配成功的队列才能收到消息。如果几个队列的 Routing Key 格式都能匹配上,则都会收到消息。
2.3.5 RPC 模式
RPC 模式 Spring AMQP
实现
- 客户端和服务端需要指定同一个交换机
- 指定交换机类型为
direct
- 客户端发送请求时指定
Routing Key
,且使用RabbitMQ.convertAndSendAndRecive
方法将请求入参传入,此方法将返回一个Object
对象,即为RPC
的调用结果(在未获取结果之前,发送的线程将一直处于阻塞状态)。 - 服务端将需要监听的队列通过指定
Routing Key
绑定到交换机上,且将@RabbitListener
标注的方法的返回参数设置为RPC
调用的返回结果。
RPC 模式可以实现 RPC 调用的功能。但在未使用 Spring AMQP 的情况下,需要手动指定 RPC 的回调队列,且需要手动监听回调队列,并从中获取指定 correlationId 的返回结果,Spring AMPQ 已经自动实现了这些功能。
2.3.6 消息头模式
消息头模式与路由模式类似,不过消息头模式并不使用路由键来进行匹配,而是使用绑定参数来进行匹配。匹配规则见Exchange 类型之 header
3. RabbitMQ 高级特性
3.1 消息投递时序图
RabbitMQ
的消息投递时序图如下所示:
3.2 消息的可靠投递
在使用消息队列时,由于消息链路上涉及到三个终端,则如何避免消息丢失或者投递失败是必须要考虑的问题。RabbitMQ
提供了三种机制来控制消息的投递可靠性:
- 生产者发送确认机制:发送确认分为两部分:
- 确认生产者发送的消息是否到达
broker
(或者说是否到达交换机,默认只要到达了broker
就一定会到达交换机),即publishConfirm
机制 - 确认交换机的消息是否由路由规则到达了队列,即
returnCallback
机制
- 确认生产者发送的消息是否到达
- 消费者接收确认机制:确认消费者是否成功消费了队列中的消息,即消息确认
ACK
机制
3.2.1 publishConfirm 机制
当消息到达 broker
之后,会触发 confirmCallback
回调,确认消息已经到达 broker
,默认只要到达了 broker
就一定会到达交换机。
在 SpringBoot
应用中,需要开启配置:
spring:rabbitmq:# 高版本写法publisherConfirmType: CORRELATED# 低版本写法:publisher-confirms: true
然后在 RabbitTemplate
中调用 setConfirmCallback
设置具体回调逻辑。
3.2.2 returnCallback 机制
当交换机按照路由规则投递消息至队列失败时,会触发 returnCallback
回调,将消息退回或者直接丢弃。
在 SpringBoot
应用中,需要开启配置:
spring:rabbitmq:publisher-returns: true
然后在 RabbitTemplate
中调用 setReturnsCallback
设置具体回调逻辑。
注意,在 AMPQ
协议中,mandatory
标识位用于控制消息的退回逻辑,在 SpringBoot
应用中对应:
- 当
RabbitTemplate
中的mandatory
属性为true
时,将会将消息退回给生产者 - 当
RabbitTemplate
中的mandatory
属性为false
时,将直接把消息丢弃
3.2.3 消息确认 ACK 机制
3.2.3.1 消息确认 ACK 机制的基本概念
消息确认 ACK
机制,即消费者接收到 broker
投递的消息后,与 broker
的一个应答机制。消费者接收到 broker
投递的消息后,会出现以下几种情况:
- 消费者在接收到消息后,正常返回给
broker
一个ACK
的应答,broker
在接收到这个应答后,将消息从队列中删除 - 消费者自身出现了宕机/逻辑出错导致没有正常返回
broker
一个正常的应答,或者使用了nack
或者reject
应答,则根据requeue
参数的不同执行不同的逻辑(reject
应答对应的requeue
参数值为true
)- 如果
requeue
参数值为false
,则broker
会直接丢弃这条消息 - 如果
requeue
参数值为true
,则会将这条消息发送给正在监听这个队列的其他消费者,如果只有当前消费者,则会继续把这条消息投递给当前消费者。
- 如果
nack 和 reject 的主要区别在于,reject 只能拒绝单个消息,但 nack 可以拒绝小于指定 deliveryTag 的所有消息。deliveryTag 是 AMQP 中针对一个消息的序号值,同一消息每次从 broker 发出后,该序号值都会 +1。
注意,如果在消费者的消费逻辑中没有捕获异常,或者在手动确认模式下一直没有回复 ACK
,那么消息将会不停地重新分发,将极大地影响 broker
的运行稳定性。
3.2.3.2 Spring AMQP 实现手动确认 ACK 机制
Spring AMQP
中,ACK
一共有三种确认模式
- AcknowledgeMode.
AUTO
:自动确认 - AcknowledgeMode.
MANUAL
:手动确认 - AcknowledgeMode.
NONE
:不确认
一般来说,我们在实现消息可靠性投递时,最好将确认模式修改为手动确认模式,手动处理ACK
。配置文件如下:
spring:rabbitmq:listener:# spring amqp 的监听容器有两种,simple 和 direct,可以根据不同的场景选择不同的监听容器simple:acknowledge-mode: manual
在完成上面的设置之后,需要在监听方法入参中填入 Channel
对象:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;@RabbitListener(//在这里完善你的绑定关系)/*** <p>消息监听方法</p>** @param header 消息头键值对* @param message 消息体* @param channel 信道*/public void onListening(@Headers Map<String, Object> header, @Payload Message message, Channel channel) {// do something here//消息处理完成后调用 ack 方法//这里的 deliveryTag 从消息中获得channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);// 或者可以使用 nack 方法,第二个参数是是否批量处理,第三个参数即为 requeue 参数//channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);}
3.3 死信队列
首先需要理解一下什么叫死信。正常的消息可以按照消息投递时序图走完整个流程,但是出现以下几种情况后,正常消息就变成了死信:
- 消息被拒绝(
basicReject
或basicNack
),且requeue
值为false
- 超过了队列指定的
x-message-ttl
属性值,或者超过了消息设定的expiration
值时 - 队列超过了最大长度限制,且队列溢出行为参数为
reject-publish-dlx
时
死信队列,实际上就是一个接收死信的队列,其与正常的交换机,队列完全一致。使用方法:
- 在队列属性上添加
x-dead-letter-exchange
和x-dead-letter-routing-key
参数,指定死信出现时,将死信转发过去的目标死信队列 - 在队列属性上,添加
x-overflow
参数,并指定值为reject-publish-dlx
死信队列应用场景:
- 当发生消费异常时,可以方便地将出错的消息发送至死信队列,随后调用业务逻辑将当时出错的数据持久化到数据库(或者执行其他的操作)。
- 通过这样的方式,不用去翻看应用日志就可以知道找出出错的消息,也方便重新投递,有助于排查线上问题。
3.4 延时队列
有时候我们会延时执行任务的需求,即在过一段时间之后,再执行任务。
AMQP
协议,RabbitMQ
原生并没有直接支持延时队列的功能,目前使用 RabbitMQ
来实现延时队列有两种方式:
- 通过**
TTL
和死信队列**结合来实现延时队列 - 通过
RabbitMQ
的**x-delay-message-exchange
插件**来实现延时队列
3.4.1 TTL + 死信队列实现延时队列
通过**TTL
和死信队列**结合来实现延时队列
- 首先声明一个队列,队列上绑定死信队列,并设置队列的超时时间
- 不设置任何消费者监听此队列
- 生产者将消息发送到队列上
- 由于没有消费者监听队列,则发送到队列中的消息经过超时时间之后,就会转发到死信队列上,由死信队列完成相应的延时处理业务逻辑
此种方法存在时序性的弊端,因为队列的特殊性(FIFO
),在同一个队列消息过期时间不同的情况下,可能会出现先到期的后被执行延时逻辑的可能性。
3.4.2 x-delay-message-exchange 插件实现延时队列
通过 RabbitMQ
的**x-delay-message-exchange
插件**来实现延时队列
broker
集成x-delay-message-exchange
插件,使用Docker
进行安装:
docker pull feeld/x-delayed-rabbitmq
- 声明延时队列交换机,并指定交换机类型为
x-delayed-message
,可以通过指定x-delayed-type
来指定实际交换机的类型 - 将队列绑定到该延时队列交换机上
- 发送消息时,通过设置消息头参数
x-delay
或者MessageProperties.setDelay()
方法可以设置延时时间(单位:ms
),如果两者都设置,则以最后一次设置的时间为准
4. RabbitMQ 如何解决问题
这里说的问题,即前面提到的使用消息队列的劣势中举例的一些问题
4.1 使用 RabbitMQ 集群实现高可用
RabbitMQ
有两种集群模式:
- 普通集群模式:
- 镜像集群模式:
4.2 如何保证消息不被重复消费
消息重复消费,即同一条消息被消费者的消费逻辑处理了多遍,即消息重复重复消费的问题。可能出现的原因如下:
- 消息路由方式选择错误:错误地使用了消息分发路由模式,使本该路由给单个消费者的消息分发给了多个消费者,每个消费者都执行了一次消息消费的逻辑。
- 消息确认机制没有正常工作:当出现网络波动或其他情况,使生产者/消费者与
broker
之间的消息确认机制没有正常工作时,则可能出现消息重复消费的问题。
这里我们只讨论消息确认机制没有正常工作的处理方法:
- 在消息中增加一个
ID
字段用来标识消息的唯一性 - 在消费者消费消息后,将收到的消息的
ID
在存储层中进行查询- 如果能查询出来,则说明被处理过,即消息已经被消费过,执行比较更新或者其他逻辑
- 如果查询不出来,则说明没有被处理过,即消息还没有被消费过,则执行正常的处理逻辑,并将这条消费记录存在存储层中
4.3 如何确保消息不丢失
从消息投递时序图我们可以知道,消息的整个生命周期会经历生产者,broker
,消费者三个终端,则消息的防丢失需要从以下几个方面来进行考虑:
- 消息从生产者到
broker
之间的防丢失 - 消息在
broker
中的防丢失 - 消息从
broker
到消费者之间的防丢失
4.3.1 消息从生产者到 broker 之间的防丢失
生产者将消息发送到 broker
时,因为网络等原因,消息可能会发送失败,即出现了消息丢失。一般有两种方式来解决:
RabbitMQ
提供的事务机制,在Spring AMQP
中可以通过设置
rabbitTemplate.setChannelTransacted(true);
来启用事务。当消息没有正常到达 broker
时,会抛出异常,生产者可以捕获异常来感知到消息并没有正常到达 broker
,从而执行重发等逻辑。但是缺点是开启事务后,每个 channel
会采用同步阻塞的方式来等待事务的完成,导致** broker 的吞吐量和性能会大幅下降**。
publishConfirm
机制:
即发布确认模式:
一旦使用了 publishConfirm 模式,所有发布的消息都会被指派一个唯一的 ID,一旦消息到达了 broker 之后,broker 就会发送一个 ack 消息给生产者,标识这个消息已经正常到达了 broker;如果消息是持久化的,那么确认消息会在消息写入磁盘之后发出。
Spring AMQP
的相关配置见 publishConfirm 机制。相比于事务模式,发布确认模式的性能比较优秀。
4.3.2 消息在 broker 中的防丢失
将消息、交换机、队列的都设置为持久化,则可以避免消息在 broker
中的丢失。
- 消息持久化:生产者发送消息时设置
deliveryMode
属性为2
,即可将消息设置为持久化,在Spring AMQP
中设置:
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
进行消息持久化的设置
- 交换机/队列持久化:在声明交换机/队列时,设置
durable
为true
即可。
4.3.3 消息从 broker 到消费者之间的防丢失
消息从 broker
到消费者之间的防丢失,需要考虑以下两个方面:
- 消息没有路由到队列:由于消息不能匹配任何的队列路由逻辑,在生产者设置了
mandatory
为true
且开启了returnCallback
配置时,broker
将会把消息退回给生产者 - 消息没有从队列到达消费者:
- 在开启了队列持久化和消息持久化后,如果消息和队列没设置过期时间,即使
broker
宕机,消息也不会丢失,消息最终还是会发送到消费者手中 - 在开启了队列持久化和消息持久化后,如果消息和队列存在过期时间,则在队列上配置死信队列,将过期的消息发送到死信队列中进行处理
- 在开启了队列持久化和消息持久化后,如果消息和队列没设置过期时间,即使
- 消息到达了消费者,但是还没有被消费逻辑处理:
- 将消息
ACK
改为手动确认模式,在消息完成处理逻辑后,手动发送ACK
确认消费者收到消息 - 如果消息处理逻辑中出现异常,则将消息发往死信队列,或者设置
requeue
属性为true
,将消息发往其他队列
- 将消息
4.4 如何保证消息传递的顺序性
当消息有顺序状态时,由于存在多个消费者,则可能会出现消息最终处理顺序错乱的问题。
举例说明:进入队列的消息是 1,2,3,如果有多个消费者监听了这个队列,那么由于网络或者自身处理速度等因素影响之后,可能会出现消费者的处理顺序为 3,2,1 的情况
解决办法:
- 当消息需要强一致顺序时,将具有顺序性的多条消息打包成一条消息发往
broker
- 当消息不需要强一致顺序时,可以在生产端给每一个消息增加一个自增
ID
,或者一个消息产生的时间戳,用作消息顺序标记字段,在消息消费的处理逻辑上进行消息顺序标记字段的判断- 如果接收到的消息的顺序字段比当前处理过的消息当前顺序要大,则正常消费
- 如果接收到的消息的顺序字段比当前处理过的消息当前顺序要小,则直接丢弃或者做一些其他逻辑处理