文章目录
- 一 MQ 的作用与基本概念
- 1 流量削峰
- 2 应用解耦
- 3 异步调用
- 4 四个基本概念
- 二 核心模式
- 1 工作队列模式(Work Queue)
- 2 发布/订阅模式(Publish / Subscribe)
- 3 路由模式(Routing)
- 4 主题模式(Topic)
- 三 RabbitMQ 消息机制
- 1 消息应答 & 重新入队
- 2 预取值
- 3 持久化
- 4 发布确认
- 5 保证消息的幂等的两种方式
- 四 交换机
- 1 Fanout
- 2 Direct
- 3 Topic
一 MQ 的作用与基本概念
- 消息队列的主要功能:流量削峰、应用解耦、异步处理
1 流量削峰
- 先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务压垮
2 应用解耦
- 生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性
3 异步调用
- 将用户的请求数据存储到消息队列之后就立即返回结果,随后系统再对消息进行消费
- 是流量削峰的基础
- 使用消息队列进行异步处理之后,需要适当修改业务流程进行配合。比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,再通知用户订单成功,以免交易纠纷
4 四个基本概念
生产者
:产生数据、发送消息的程序
交换机
:接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息(推送到特定队列 / 推送到多个队列 / 丢弃等)
队列
:队列是 RabbitMQ 内部使用的一种数据结构,本质上是一个大的消息缓冲区
消费者
:大多时候是一个等待接收消息的程序
二 核心模式
1 工作队列模式(Work Queue)
- 消费者间是竞争关系,每个消息只被消费一次
- 主要思想是避免立即执行资源密集型任务,把任务封装为消息并将其发送到队列
- 默认采用轮询方式向消费者发送消息,也可以根据消费者的处理能力指定不公平分发
2 发布/订阅模式(Publish / Subscribe)
- 生产者将消息放入交换机,交换机把消息发送到和该交换机绑定的所有消息队列中
- 消费者之间是资源共享的
- 使用的交换机类型为 Fanout
3 路由模式(Routing)
- 使用的交换机类型为 Direct
- binding key 是交换机绑定到队列的 key,例如图中的 orange、black、green
- routing key 是消息携带的 key,交换机将消息路由到和 binding key 匹配的队列中
4 主题模式(Topic)
- 使用的交换机类型为 Topic,在路由模式的 routingKey 的基础上添加了通配符,实现了模糊匹配
三 RabbitMQ 消息机制
1 消息应答 & 重新入队
- 分为自动和手动,自动应答指的是,消息发送后立即被认为已经传送成功,容易发生消息丢失
- 手动应答的回答方式:ACK(处理成功) / NACK(处理失败) / REJECT(拒绝处理)
- 手动应答的好处是可以批量应答(批量指应答 channel 上未应答的消息),并且减少网络拥堵
- 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将获取到消息未完全处理,并对其重新排队
2 预取值
- 对于每个消费者来说,都有一个存放未处理消息的缓冲区。“预取计数”值定义了通道上允许的未确认消息的最大数量,限制缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题
- 通常增加预取计数值,将提高向消费者传递消息的速度
- 据此可以解释,虽然自动应答传输消息速率是最佳的,但在这种情况下已传递但尚未处理的消息的数量也会增加
3 持久化
- 默认情况下 RabbitMQ 退出或由于某种原因崩溃时,忽视队列和消息。确保消息不会丢失需要将队列和消息都标记为持久化
- 持久化并不能完全保证不会丢失消息,还需要配合发布确认
4 发布确认
- 队列持久化 + 消息持久化 + 发布确认 = 消息不丢失
- 生产者将信道设置成
confirm
模式,一旦信道进入confirm
模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了(如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出) - 三种发布确认方式的对比
类型 | 工作方式 | 特点 |
---|---|---|
单个发布确认 | 一次发布一个消息,只有它被确认发布,后续的消息才能继续发布 | 发布速度特别慢;同步(阻塞消息的发布) |
批量发布确认 | 一次发布一批消息,一起确认 | 当发布出现问题时无法定位问题消息,必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息;同步(阻塞消息的发布) |
异步发布确认 | 生产者只需发送消息而不用等待确认,RabbitMQ 收到某个消息,调用 ackCallBack 向生产者确认已收到消息,未收到调用 nackCallBack | 需要哈希表记录消息序号和消息内容;可靠性和效率最高;异步(不阻塞消息的发布) |
5 保证消息的幂等的两种方式
幂等:用户对于同一操作发起的一次请求或者多次请求的结果是一致的
-
唯一ID + 指纹码作为数据库主键,利用数据库主键去重
- 根据消息生成一个全局唯一的ID,然后加上一个指纹码,保障这次操作是绝对唯一的
- 将ID + 指纹码拼接好的值作为数据库主键,在消费消息前,先从数据库查询这条消息的指纹码标识是否存在,没有就执行
insert
操作,如果有就代表已经被消费
-
利用 Redis 的原子性
- 在接收到消息后,将消息ID作为 key 执行
setnx
命令 - 如果执行成功就表示没有处理过这条消息,执行失败表示消息已经被消费
- 在接收到消息后,将消息ID作为 key 执行
(类似于分布式锁的实现)
四 交换机
- 生产者只能将消息发送到交换机,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息
- 交换机类型:直接(direct),主题(topic),标题(headers) ,扇出(fanout)
- 通过绑定(binding),实现交换机和队列关系的建立
1 Fanout
- 将接收到的所有消息广播到它绑定的所有队列中
2 Direct
-
Direct 交换机只把消息发送到具有一致 routingKey 队列中去,要求生产者在向 MQ 发送消息时指定 routingKey
-
如果发送的消息的 routingKey 不对应任何一个队列,则丢弃该消息
-
一个 Direct 绑定的多个队列可以指定相同的 routingKey(类似 Fanout)
-
一个队列可以指定多个 routingKey
3 Topic
- 通俗来讲,在 Direct 模式上,队列指定的 routingKey 增加了通配符
- 队列指定的 routingKey 必须是一个单词列表,以点号分隔开
- *可以代替一个单词,#可以替代零个或多个单词