文章目录
- RabbitMQ是什么?有什么特点?
- RabbitMQ架构
- RabbitMQ消息消费过程
- 如何保证消息不丢失?可靠性传输?
- 生产者丢失了数据
- RabbitMQ(broker)丢失了数据
- 消费端丢失数据
- 顺序消息
- 错乱场景
- 解决方案
- 高可用
- 普通集群模式(非高可用)
- 在镜像集群模式(高可用)
- 总结
RabbitMQ是什么?有什么特点?
- RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
- 可靠性(Reliability)
- 灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
- 消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
- 高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等
- 管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
- 跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
- 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ架构
- RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。从计算机术语层面来说, RabbitMQ 模型更像是一种交换机模型。 RabbitMQ主要包含下面几个部分:
- Producer投递消息
- Consumer接收消息
- QueueRabbitMQ不支持队列层面广播消费
- Exchange生产者将消息发到交换器,交换器再将数据路由到队列。如果路由不到,或许会返回给生产者,或许直接丢弃。包括fanout、direct、topic、headers类型
- Broker一个Broker可以看做一个RabbitMQ服务节点或者服务实例
- RoutingKey路由键,指定这个消息的路由规则
- BindingKey绑定键,关联交换器与队列
- payload消息体
- 标签(Label)用来表述消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMO, RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者(Consumer)在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
RabbitMQ消息消费过程
- 生产者发送消息
- 生产者连接到 RabbitMO Broker,建立一个连接(Connection),开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至 RabbitMO Broker,其中包含路由键、交换器等信息
- 相应的交换器根据接收到的路由键查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
- 消费者接收消息
- 消费者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
- 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
- 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
- 消费者确认(ack)接收到的消息。
- RabbitMQ 从队列中删除相应已经被确认的消息。
- 关闭信道。
如何保证消息不丢失?可靠性传输?
生产者丢失了数据
- RabbitMQ 提供事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
- RabbitMQ 提供 confirm 模式,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,首先会发送到Exchange,无论成功与否都回调函数confirm()返回ack消息。第二步从Exchange路由分配到Queue中,如果失败则回调函数returnedMessage()。
- 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
RabbitMQ(broker)丢失了数据
- 创建 queue 的时候将其设置为持久化保证 RabbitMQ 持久化 queue 的元数据, durable 设置为 true.创建Exchange时设置为持久化
- 发送消息的时候将消息的 deliveryMode 设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
消费端丢失数据
- RabbitMQ 提供的自动 ack 机制,将acknowledge-mode改为手动模式,业务处理成功后通过一个 api 来调用关闭 RabbitMQ 的自动ack ,如果RabbitMQ认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
顺序消息
错乱场景
- 一个queue对应多个consumer,consumer从MQ里面读取数据是有序的,但是无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
- 一个queue对应一个consumer,但是consumer里面进行了多线程消费
解决方案
- 根据业务拆分成不同queue,比如同一个订单号用同一个queue,同一个queue使用同一个consumer去消费
- 一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,分发给不同线程去处理
高可用
普通集群模式(非高可用)
- 在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。queue只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(queue 的一些配置信息包括 queue 所在实例)。消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。此时要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作
在镜像集群模式(高可用)
- 每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据(元数据和消息)。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上
- 在RabbitMQ管理控制台新增一个镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了
- 优点,高可用,某个节点宕机不影响集群,别的 consumer 都可以到其它节点上去消费数据
- 缺点,性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重;扩展性可言太差,加的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue
- 另外,由于每个节点都保存了副本,所以我们还可以通过HAProxy实现负载均衡
总结
本文介绍了的使用,如有问题欢迎私信和评论