消息队列
消息对列是一个存放消息的容器,当我们需要消息的时候就从消息队列中取出消息使用。消息队列是分布式系统中重要的组件,使用消息队列的目的是为了通过异步处理提高系统的性能和削峰值,降低系统的耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。
1.消息模型
点对点
消息生产者向消息队列中发送一个消息之后,只能被一个消费者消费一次。
发布订阅
消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。
发布订阅模式和观察者模式有以下不同:
- 观察者模式中,观察者和主题都知道对方的存在;而在发布者订阅者模式中,生产者与消费者不知道对方的存在,他们之间通过频道进行通信。
- 观察者模式是同步的,当事件触发时,主题会调用观察者的方法,然后等待方法的返回;而发布者订阅者模式是异步的,生产者向频道发送一条消息之后,就不需要关心消费者什么时候来订阅这个消息,可以立即返回。
2.使用场景
异步处理
生产者将消息发送给消息队列之后,不需要同步等待消息接收者处理完毕,而是立即返回进行其它操作。消息接收者从消息队列中订阅消息后进行异步处理。
例如在注册流程中通常需要发送验证邮件来确保注册用户的合法性,可以使用消息队列使发送验证邮件的操作异步处理,用户在填完注册信息之后就可以完成注册,而将发送邮件这一消息发送到消息队列。
只有在业务流程允许异步处理的情况下才能这么做,例如上面的注册流程中,如果要求用户对验证邮件进行点击后才能完成注册的话,就不能使用消息队列。
流量削峰
我们先来看下传统的服务器接受处理请求的流程
如上图,在不使用消息队列服务器的时候,用户的请求都直接访问数据库,在高并发的情况下数据库的压力剧增,不仅使得响应很慢,还可能因此挂掉数据库,导致用户页面直接报错。
我们看加入消息队列后的接受处理请求流程有什么变化
如上图,在使用消息队列后,即使在高并发的情况下用户的请求数据发送给消息队列之后立即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理消息速度比数据库要快很多,因此响应速度得到大幅改善。
因此我们可以得到消息队列有很好的削峰作用,通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削去高峰期的并发事务。如在某些电商平台的一些秒杀活动中,合理使用消息队列可以抵御活动刚开始大量请求涌入对系统的冲击。
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票等。
系统解耦
传统的系统数据传输模式
如上图,主系统和其他系统的耦合性太强,都是直接调用,稍微有一点改动或者新增模块,双方都得改代码,过于麻烦。
加入消息队列后的变化
如上图,如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好。
消息队列是利用发布者订阅模式工作,消息发送者(生产者)发布消息,一个或多个消费者订阅消息。从上图可以看出消息发送者和消息接收者之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接收者从分布式消息队列中获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,就可以订阅该消息,对原有的系统和业务没有任何的影响。从而实现网站业务的可扩展性设计。
另外为了避免消息队列服务器宕机造成消息的丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才会删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列集群中的其他服务器发布消息。
使用消息队列带来的问题
- 可用性降低:在加入MQ之前,不用考虑MQ服务器挂掉的情况,引入后就需要去考虑,所以可用性降低。
- 复杂性提高:加入MQ之后,你需要保证消息没有被重复消费,处理消息丢失的情况,保证消息传递的顺序性等问题。
- 数据一致性:消息队列带来的异步确实可以提高系统的响应速度,但是,如果消息的真正消费者没有正确消费消息,这样就会导致数据不一致的问题。
解决方案
1.对于可用性问题
实际项目中发送MQ消息,如果不使用集群,其中MQ机器出现故障宕机,那么MQ消息就不能发送,系统就会崩溃,所以我们需要集群MQ。各种消息中间间的集群方式不同。下面以ActiveMQ的集群为例(Zookeeper+ActiveMQ)如图:
服务器向Zookeeper注册时,Zookeeper会分配序列号,我们认为序列号小的那个就是主服务器,序列号大的那个就是备用服务器。
当我们的客户端(Web server)需要访问服务时,需要连接Zookeeper,获得指定目录下的临时节点列表,也就是已经注册的服务器信息,获得序列号小的那台主服务器的地址,进行后续的访问操作,以达到总是访问主服务器的目的。当主服务器发生故障,Zookeeper从指定目录下删除对应的临时节点,同时通知关心这一变化的所有客户端,高效的传播这一信息,当下一个请求来的时候,还是连接Zookeeper,但是此时其实是访问备用MQ服务器。
2.对于复杂性问题
(1)如何保证消息不被重复消费
要回答这个问题, 必须要知道为什么会出现消息被重复消费,大多都是因为网络不通导致,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。解决该问题有下面三种思路:
- 如果消息是做数据库的插入操作,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
- 如果这个消息是做redis的set操作,那么不用解决,因为无论set几次结果都是一样的。set操作本来就是幂等操作。
- 如果上面两种情况都不行,那么准备一个第三方来做消费记录,以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以k-v的形式写入redis。那么消费者开始消费前,先去redis中查看有没有消费记录即可。
(2)如何保证消息的可靠传输
保证消息的可靠传输就是防止生产者弄丢数据,消息队列弄丢数据,消费者弄丢数据而已。
消息队列一般都会持久化到磁盘,生产者数据丢失的话MQ事务会进行回滚,可以尝试重新发送。消费者丢失的话一般都是采用了自动确认消息模式导致消费信息被删,只要改为手动的就好,也就是说消费者消费完之后,调用一个MQ的确认方法就可以。
(3)如何保证从消息队列里拿到的数据按顺序执行
通过算法,将需要保持先后顺序的消息放到同一个消息队列中,然后只用一个消费者去消费该队列。
- rabbitMQ:拆分多个queue,每个queue一个consumer,就是多一些queue而已。或者就是一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
- kafka:一个topic,一个partition,一个consumer,内部单线程消费,写n个内存queue,然后N个线程分别消费一个内存queue即可。
(4)数据是通过push还是pull方式给到消费段,各自有什么利弊
push模型实时性能好,但是因为状态维护等问题,难以应用到消息中间件的实践中,因为
在Broker端需要维护consumer的状态,不好适用Broker去支持大量的consumer的场景。
consumer的消费速度是不一致的,Broker进行推送难以处理不同consumer的状况
Broker无法处理consumer无法读取消息的情况,因为不知道consumer的宕机是短暂的还是永久的
另外推送消息(量可能会很大)也会加重consumer的负载或者压垮consumer
pull模式实现起来相对简单一点,但是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用。