之前写了一篇关于消息队列的文章:《消息队列介绍与对比》,本文主要介绍消息队列在实际工作中的使用情况(截止到2023年,因为我2023年离职了,后续的情况不了解了,哈哈)。
市面上的多种消息队列都有在小米应用,如Notify,Kafka,EMQ,RabbitMQ,Talos,RocketMQ,MQTT等。我们并不是为了使用而使用,主要还是因为部门较多,不同部门业务不同,侧重点不同,因此在选型消息队列时就会根据实际业务需求去选择更合适的消息队列。
在目前的单位主要还是RabbitMQ,RocketMQ,Kafka主要用于日志采集种。
本文主要介绍RabbitMQ,RocketMQ,Talos和Notify等等。
1、RabbitMQ的使用
在小米使用RabbitMQ最典型的场景是邮件中继。在2021年,公司要求所有部门自动发送邮件时不可直连邮件服务器,而是要经过一个邮件中继Guard,其最主要的目的是让Guard做一层治理,比如流量管控,避免大量请求都打到邮件服务器。如果某个大神程序没写好,每个订单都发送一封邮件,一天几十万订单,可是要了命了。Guard内部就使用了RabbitMQ,业务系统将邮件发送到Guard,Guard会将所有邮件作为消息头递到消息队列,随后RabbitMQ负责将消息push到消费者(邮件服务器),在上面有过介绍,push消息的速率完全由broker掌握,我们可以控制Push消息的频率,类似大家经常说的限流,消峰,这就可以有效避免邮件服务器被瞬时大量邮件打垮。
不过也正是因为这点,经常会出现消息堆积,邮件发不出去的场景。如果某个业务发送了大量邮件,可能会影响到其他业务的邮件发送。如何做到相互不影响,也是Guard团队重点要解决的问题。
在我目前的单位,RabbitMQ是主推,这主要是领导层决定的,当然我不知道选择RabbitMQ的具体原因,我来得比较晚。我只知道去年发生过的一个问题是,因为Server重启,导致队列丢失,数据丢失,出现事故。这个我在消息队列介绍中已经说过了,RabbitMQ的队列默认是auto-delete的,重启后队列数据就会全部丢失。而且RabbitMQ的限制还包括它是阅后即焚,非常不方便追溯;最重要的是没法实现消费分组,这在实际业务中是非常不方便的。所以,我认为RabbitMQ并不是业务中较好的选择。
2、RocketMQ
RocketMQ也是这几年才在小米大规模使用的,尤其是电商系统,像有品和小米商城,目前只用RocketMQ作为实际业务的消息队列,日志会用到talos。之前的有品使用过原生的Kafka,也使用过公司自研的Notify。Notify是我个人非常不认同的一个中间件,其内部使用了Mysql和Redis,作为消息队列,非常不成熟,其架构类似如下:
可以看到,其消息队列依赖于Redis和数据库,实现的性能以及功能都有待商榷。后来它自己也基于RocketMQ了。额,我能直接用RocketMQ,为什么还要用你啊?
后期经过调研,有品最终都换成了RocketMQ,主要是考虑到RocketMQ的几个特性:
- 事务写入
- Key级别顺序消息,可以用订单号作为hashkey
- 重试 ,支持不同模式的重试(顺序消费时默认无限重试,并发消费可间隔重试16次数)
- 死信队列 当达到一定消费次数之后,就直接进入死信队列,方便后续手动触发。
小米RocketMQ发展轨迹:
我们使用的是开源的RocketMQ,众所周知,其只支持固定级别的延迟,为了实现任意时间的延迟,小米云团队参考了DDMQ的经验借助RocksDB和时间轮以插件的形式无侵入地支持了任意时间的延迟。
此外,针对于Pull模式(RocketMQ 4.x)的缺点,也做了改进。由于RocketMQ要求一个分区只能同时被一个消费者消费(同组),因此当消费者数量大于分区数量时,多出的消费者是不能进行消费的,这无疑是一种浪费。因此针对这点,小米进行了优化。优化如下:
POP模式不会绑定某个实例,弥补了Pull模式的不足,不会出现数据倾斜、消息堆积的问题。实际上,RocketMQ5.X官方已经解决了,官方实现了消费的负载均衡,消息会同时分配给消费者分组中的多个消费者一起分担,功能要比小米基于4.X版本开发的更加强大。具体可看官网:消费者负载均衡
在我走之前,RocketMQ一直都是小米主推的消息队列,主要还是因为特别适用于我们的业务场景。这也是我自己最喜欢用的消息队列。虽然RocketMQ是基于Kafka思想开发的,但站在巨人的肩膀上并超越它,不是什么坏事。
3、Talos
Talos是小米自研的一个消息队列,已经比较早了,设计它的初衷是因为当时使用的是Kafka 0.8 版,当时版本的Kafka自身存在很多缺点,比如集群扩容和故障恢复时非常麻烦。
Talos实际上也是参考Kafka进行开发的,其主要变化:
1、将存储和结算相分离。存储采用HDFS,TalosServer只负责调度;
2、采用一致性hash实现负载均衡。
据Talos团队介绍,目前Talos实现了:
- 日处理消息数超过 2 万亿条,日消息峰值 4 千万条/秒,日处理数据量 1.3PB;
- Topic 总数 13000+,下游的作业数 15000+,接入业务数量 350+ ;
其实,Talos还实现了Exactly Once,这是我觉得比较好的一点(当然,新版本的kafka也已经实现了)。基本思想是在生产者生产一个序列号,同样的序列号,使得broker不会再继续处理;在消费端,先查redis缓存,看是否已经处理同样的序列号,如果已处理,就不再继续处理。
关于Talos这部分,我建议大家看下面参考资料中的小米消息队列的实践,里面详解介绍了talos的由来,架构和特点,非常赞。
参考资料:
千与千寻-浅谈Kafka以及Rocketmq的高性能
万亿级消息背后: 小米消息队列的实践_云计算_勇幸_InfoQ精选文章