1、RocketMQ有什么作用?
- 异步:数据的产生方不需要关心谁来使用数据,只需要将数据发送到broker,后续需要管消费流程,Rocket也有保证消息可靠性的方案
- 消峰:正常业务系统当流量激增时,有可能会将系统压垮,有了消息队列可以将大量请求缓存起来,慢慢进行消费处理
- 解耦:系统的耦合性越高,容错性就越低,以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用。这时可以用MQ进行解耦,生产者向broker投递消息,即使子系统出现故障也会先保存在broker中,当故障恢复时,消费者会通过消费队列获取消息
2.RoctetMQ的架构
- NameServer:类似于注册中心,管理组件的注册实例信息,不参与消息的传输
- produce:生产者,消息的发送方
- Consumer:消费者,消息消费方
- broker:暂存和传输消息(包含:commitLog、ConsumeQueue、IndexFile)
- Topic:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
- commitLog:CommitLog是存储消息的主体;borker接收到生产者投递来的消息,会存储到commitLog文件中
- ConsumeQueue:逻辑消费队列;可以看成基于topic的commitLog的索引文件因为CommitLog是按照顺序写入的,不同的topic消息都会混淆在一起,而Consumer又是按照topic来消费消息的,这样的话势必会去遍历commitLog文件来过滤topic,这样性能肯定会非常差,所以rocketMq采用ConsumeQueue来提高消费性能。即每个Topic下的每个queueId对应一个Consumequeue
- IndexFile:IndexFile提供了一种可以通过key(topic+msgId)或时间区间来查询消息的方法
3、 RoctetMQ的优缺点
3.1消息堆积的问题:
如果生产者大量发送消息,消费者消费能力不够,会造成broker消息堆积,需要根据实际场景制定解决方案。
3.2系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
3.3一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?
4.RoctetMQ顺序消息,如何保证顺序
1)全局有序:可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,多个消息队列同时消费是无法绝对保证消息的有序性的。
2)局部有序:可以定义一个特定的字段通过运算取模进行投递特定的队列中,消费者端使用MessageListenerOrderly
处理有序消息。
5. 消息过滤,如何实现
两种方案:1)一种是在broker端按照Consumer的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到Consumer端,缺点是加重了Broker的负担;2)在comsumer端进行过滤,无用的消息不进行处理
6.消息的幂等性问题
幂等性原则:就是用户对于同一种操作发起的多次请求的结果是一样的,不会因为操作了多次就产生不一样的结果
解决方案:1)在数据库中做好唯一约束,2)通过redis保存每条消息唯一的key,消费重复消息时先去redis判断key是否已存在
7.RocketMQ是怎么实现分布式事务消息的?
正常事务提交:
1)消费者向borker投递消息时,不是Commit/
Rollback
状态所以还是半消息,
2)当borker接收到半消息时(此时的消息消费者不可见)也并不会提供给消费者;
3)当生产者知道向broker投送消息成功执行本地事务
4)生产者本地事务完成后向borker发送Commit/
Rollback
,borker确认状态后根据状态进行投递或删除
补偿流程:
- broker对没有
Commit/Rollback
的事务状态消息,从服务端发起一次“回查” - Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新
Commit
/Rollback
事务有三个状态
public enum LocalTransactionState {//本地事务执行成功,给broker发送一个commit的标识COMMIT_MESSAGE,ROLLBACK_MESSAGE,//这个状态将会引起回查UNKNOW,
}
8.什么是分布式消息中的半消息?
半消息是只生产者向broker发送消息时,还没经过两次确认的消息,此时的状态标记为“不可消费”状态,所以暂时对消费者不可见
9.消息的可用性,RocketMQ如何能保证消息的可用性/可靠性?
需要从Producer,Consumer和Broker三个方面来回答:
9.1.producer方面:
- 同步发送,即发送一条数据等到接受者返回响应之后再发送下一个数据包
- 异步发送,生产者发送消息时在数据库记录投递状态表,broker接收到进行持久化后才会回调生产者。这时生产者确认投递状态,如果在异步发送后生产者宕机就接收不到回调了,这时可以另外用一个定时任务去查询投递状态表是否有消息长时间未投递成功。
- 发送分布式事务消息的投递方式
9.2broker方面:
消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费
9.3Consumer方面
Consumer自身维护了个持久化的offset(对应Message Queue里的min offset),用来标记已经成功消费且已经成功发回Broker的消息下标。如果Consumer消费失败,它会向Broker发回消费失败的状态,发回成功才会更新自己的offset。如果发回给broker时broker挂掉了,Consumer会定时重试,如果Consumer和Broker一起挂掉了,消息还在Broker端存储着,Consumer端的offset也是持久化的,重启之后继续拉取offset之前的消息进行消费。
10.RocketMQ的刷盘机制是什么?
RocketMQ的存储与读写是基于JDK NIO的内存映射机制,消息存储时首先将消息追加到内存中。通过配置刷盘策略将内存消息数据保存到磁盘文件中。
RocketMQ提供了两种刷盘策略:同步刷盘和异步刷盘
- 同步刷盘
在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
- 异步刷盘(默认)
异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中。
优缺点分析
同步刷盘能保证消息抵达broker后不丢失,如果是异步刷盘当producer发来消息后响应成功,另外开启一个线程刷盘时,此时刚好borker出现问题无法正常执行。这是消息可能就无法刷到commitLog中。
异步刷盘吞吐量比同步刷盘高,响应速度快
11.RocketMQ怎么实现负载均衡
11.1Producer负载均衡:
在给borker发送消息时,会根据从NameServer获取到的broker集群实例列表,通过轮训的策略进行投递。
11.2Consumer负载均衡
集群模式:
首先理解Broker是如何与Consumer进行消息交互的,是Consumer主动向Borker的Consumer queue拉取消息的;而一个queue只能对应一个Consumer实例,而Consumer实例可以同时消费多个queue,处于多对一的关系。那么如果在系统运行过程中我又启动了多个Consumer,这时会有一个重平衡机制,让新启动的Consumer分配到queue。
问题:如果只有3个queue和3个Consumer实例,刚好是一 一对应的,这时新增consumer实例,queue数量不够的话,新增的实例就分配不到queue
广播模式:
广播模式会让一条消息让所有订阅的消费者组的实例都进行消费,会让所有consumer都分到所有的queue
12.什么是死信?
当一条消息无法正常消费的时会被投递到死信队列,比如一条消息初次消费失败或超时,消息队列会自动进行消费重试累计默认16次;达到最大重试次数后,消息队列不会立刻将消息丢弃而是暂时存放到死信队列中。
死信队列的特征
- 死信消息会保存三天后(CommitLog的过期时间)被删除
- RocketMQ会为每个消费组都设置一个Topic名称为 %DLQ%+consumerGroup 的死信队列
13.什么是推拉消息模式?
PULL:拉取模式为消费者主动从broker中拉取消息消费,只要拉取到消息,就会启动消费过程
PUSH:推模式为消费者就是要注册消息的监听器,监听器是要用户自行实现的。当消息达到broker服务器后,消费者端会触发监听器拉取消息进行消费。但是从实际上看还是从broker中拉取消息
14.RocketMQ Broker中的消息被消费后会立即删除吗?
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息(offset),当有消息消费后只是当前Consumer的消费进度(offset)更新了。
15.RocketMQ的消息存储结构是怎么样的?
消息会通过messageStore消息存储管理器将消息储存在broker的commitLog中,而CommitLog结构包含
- ConsumeQueue:逻辑消费队列;可以看成基于topic的commitLog的索引文件因为CommitLog是按照顺序写入的,不同的topic消息都会混淆在一起,而Consumer又是按照topic来消费消息的,这样的话势必会去遍历commitLog文件来过滤topic,这样性能肯定会非常差,所以rocketMq采用ConsumeQueue来提高消费性能。即每个Topic下的每个queueId对应一个Consumequeue
- IndexFile:IndexFile提供了一种可以通过key(topic+msgId)或时间区间来查询消息的方法
MessageStoreConfig:消息存储配置对象
文件目录:storePathRootDir\store下的commitLog、Consume Queue、index等
另外borker持久化文件中还会储存一些其他数据,比如offset、topic等
文件目录:storePathRootDir\store\config下的json文件数据(consumerFilter.json,consumerOffset.json,subscriptionGroup.json,topics.json)