这篇文章我们来聊聊消息队列。我一直在想,能不能用一篇文章就把消息队列的所有内容给串联起来。然后,之后每次看到这篇文章的时候,我就能够立马回忆起来这个大知识的所有知识点。所以我想尝试一下用这种长文的方式,将我自己对于消息队列的理解,用一篇长文章串联起来所有知识点。
消息队列的思维导图如下:
消息队列的优点
我们先从消息队列的优点讲起:解耦、异步、削峰。我们想想假如没有消息队列会怎么样?当生产者生产一条消息时,生产者会与消费者建立直接的联系,这会导致生产者和消费者进行强耦合。假如现在我想加一个消费者,那我不仅要写增加消费者的代码,还要对生产者的代码进行修改,这是很糟糕的。所以我们就引入消息队列,放在生产者和消费者中间,让生产者和消费者彻底解耦开来。从这个角度来理解,其实会发现消息队列和 SpringIOC 有点像。回到刚才说的,消息队列除了有解耦的作用,还有异步的作用。假如没有消息队列,生产者在生产一条消息给消费者,要等消费者消费完,生产者才可以继续生产。但是引入了消息队列,生产者只需要将消息放到消息队列里就可以继续转头生产了,消费者想什么时候拿就什么时候拿,这样子就实现了异步。消息队列的优点还有削峰。假如某一段时间有几千万条消息涌入进来,如果没有消息队列的话,消费者根本消费不了几千万条消息,会很容易死的。但是假如引入了消息队列,就可以把几千万条消息放到消息队列里存着,消费者从消息队列里慢慢拉出来消费,有点像“漏斗”的感觉。不过话说回来,消息队列也不全是优点,它还是有缺点的。因为生产者和消费者现在都直接与消息队列挂钩了,一旦消息队列挂了,那整个系统就废了。
消息队列各种名词含义
之前我一直搞不懂activemq、rabbitmq、rocketmq、kafka、MQ、消息队列、中间件是什么意思。也不是搞不清楚咯,只是对这些名词有点搞混乱,我没有真正的去区分它们是什么意思。后来我查阅了很多资料,然后结合自己的理解,总结成以下文字:MQ就是消息队列,它是消息队列的英文表达,我们说 MQ 其实就是说消息队列,说消息队列其实就是 MQ 。而 activemq、rabbitmq、rocketmq、kafka这几个东西我理解成实现消息队列的系统。消息队列就是一种中间件,中间件还有Nginx,redis,Tomcat 这些。
各种消息队列的优缺点
activemq、rabbitmq、rocketmq、kafka 这几个实现消息队列的系统,其实都有各自的优缺点,但是这里我先不展开说明,因为有点啰嗦。
kafka 消息队列的内部架构
因为我做项目用的都是 kafka,对 kafka 我是最熟悉的,所以我就通过 kafka 来讲一下消息队列吧。
我先从 kafka 的内部架构讲起。你想想,最原始的消息队列是如何做的?是不是就是一条队列。假如有多个消费者,你一条队列的话包含着不同类型的数据,从消费者的角度考虑,其实他可能只需要一种类型的消息,但是由于只有一条队列,迫于无奈,他只好硬着头皮拿到了很多它根本不想要的消息,所以有人就对这个原始的消息队列进行改进,改进成多条队列,每条队列叫一个 topic,不同 topic 之间都是存放不同类型的数据,同一 topic 的数据类型都是相同的,这样,消费者需要哪种数据类型就订阅哪一个 topic 既可。其实这种思路已经很好了,但是后来,人们又发现这样子其实并不快,因为假如有很多个消费者同时订阅一个 topic 的话,是串行拉消息的,这样子就不太快。所以他们就把 topic 分成很多个分区(partition),不同的消费者可以同时从不同的 partition 中拉消息,这就实现了并行,这样,消费者消费的速度就大大增加。消费同一个 topic 的消费者我们叫他们为消费者组。举个例子来说明以上我提到的这些名词:假如现在有一个 topic,这个 topic 有三个 partition,分别是 partition1、partition2、partition3。那么假如有2个消费者,消费者1和消费者2,他们都订阅了这个 topic,那我们就叫消费者1和消费者2为一个消费组。消费者1消费 partition1和 partition3,消费者2消费 partition2。
kafka 消息队列的特点
再来讲讲 kafka 的特点。其实最显而易见的特点就3个:保证数据不丢失、保证数据不被重复消费、保证数据消费的有序性。
kafka 怎么保证数据不丢失?那我们应该先想想 kafka 数据丢失有可能在哪里发生?无非就是生产者丢失了数据,或者消息队列丢失了数据,或者消费者丢失了数据。解决生产者丢失数据的方法是:假如生产者往 topic 中的 partition 写数据时,leader 接受到消息后,必须要等待它所有的 follower 都同步到这条消息,生产者才认为发送成功了,否则就一直重发,重发无限次。假如消息队列丢失了数据怎么办?解决办法是用 leader-follower。消费者丢失数据怎么办?消费者丢失数据情况就一个,就是消费者刚拿到消息后,消费者自己自动提交了 offset,让消息队列误以为消费者消费完数据,其实消费者刚准备消费,突然挂了,这条消息就没了。解决办法是你把那个自动提交 offset 的那个东西给关掉,改成自己手动提交既可。
kafka 怎么保证数据不被重复消费?这个很简单,具体做法是你每消费完一条数据,就往内存set中写一条日志,记录了你消费了这条数据,等下次你又消费到这条数据时,就可以用内存set去一下重,假如发现这条数据被消费过,那就直接扔掉。
kafka 还可以用来保证消息队列的有序性。其实有序性要靠生产者和消费者来共同完成。生产者怎么保证消息队列的有序性呢?比如说你有很多张订单表,就可以用订单表的id作为 key,这样就能保证同一个订单的所有数据都被放入一个 topic 中。等消费者从topic中取出数据后,假如这个消费者是单线程的,那数据肯定被有序消费。但是假如消费者是多线程的,就有可能导致数据不一致的问题,解决的办法是每个线程都搞一个内存队列,相同 key 的就被放在同一个内存队列中,那这样就可以保证消息有序被消费了。
假如几千万条消息堆积在消息队列中怎么办?
讲完 kafka 的特点后,我们最后来讲讲假如 kafka 消息队列积压了几千万条数据,那我们应该如何处理?比如你有一个 topic,积压了很多数据,一种有效的方法是将数据全部转移到一个新建的topic 中,然后把这个 topic 分成几十个 partition,每个 partition 安排一个消费者,这个消费者唯一的任务就是消费你这个 partition 的数据,这样三两下数据就被消费完了。