前言
上一节给大家讲了Rocket的顺序消息,这一节和大家聊一下延迟消息,关于顺序消息大家可以点下面这个链接直接看
RocketMQ的延迟消息
延迟消息
延迟消息就是指生产者发送消息之后,消息不会立马被消费,而是等待一定的时间之后再被消息
RocketMQ的延迟消息用起来非常简单,只需要在创建消息的时候指定延迟级别,之后这条消息就成为延迟消息了
Message message = new Message("sanyouTopic", "java日记 0".getBytes());
//延迟级别
message.setDelayTimeLevel(1);
虽然用起来简单,但是背后的实现原理还是有点意思,我们接着往下看
RocketMQ延迟消息的延迟时间默认有18个级别,不同的延迟级别对应的延迟时间不同
RocketMQ内部有一个Topic,专门用来表示是延迟消息的,叫SCHEDULE_TOPIC_XXXX
,XXXX不是占位符,就是XXXX
RocketMQ会根据延迟级别的个数为SCHEDULE_TOPIC_XXXX
这个Topic创建相对应数量的队列
比如默认延迟级别是18,那么SCHEDULE_TOPIC_XXXX
就有18个队列,队列的id从0开始,所以延迟级别为1时,对应的队列id就是0,为2时对应的就是1,依次类推
那SCHEDULE_TOPIC_XXXX
这个Topic有什么作用呢?
这就得从消息存储时的一波偷梁换柱的骚操作了说起了
当服务端接收到消息的时候,判断延迟级别大于0的时候,说明是延迟消息,此时会干下面三件事:
-
将消息的Topic改成
SCHEDULE_TOPIC_XXXX
-
将消息的队列id设置为延迟级别对应的队列id
-
将消息真正的Topic和队列id存到前面提到的消息存储时的额外信息中
之后消息就按照正常存储的步骤存到CommitLog文件中
由于消息存到的是SCHEDULE_TOPIC_XXXX
这个Topic中,而不是消息真正的目标Topic中,所以消费者此时是消费不到消息的
举个例子,比如有条消息,Topic为sanyou,所在的队列id = 1,延迟级别 = 1,那么偷梁换柱之后的结果如下图所示
代码如下
所以从上分析可以得出一个结论
所有RocketMQ的延迟消息,最终都会存储到
SCHEDULE_TOPIC_XXXX
这个Topic中,并且同一个延迟级别的消息在同一个队列中
在存消息偷梁换柱之后,实现延迟消费的最关键的一个步骤来了
BocketMQ在启动的时候,除了为每个延迟级别创建一个队列之后,还会为每个延迟级别创建一个延迟任务,也就相当于一个定时任务,每隔100ms执行一次
这个延迟任务会去检查这个队列中的消息有没有到达延迟时间,也就是不是可以消费了
前面的结论,每个队列都有一个ConsumeQueue文件,可以通过ConsumeQueue找到这个队列中的消息
一旦发现到达延迟时间,可以消费了,此时就会从这条消息额外存储的消息中拿到真正的Topic和队列id,重新构建一条新的消息,将新的消息的Topic和队列id设置成真正的Topic和队列id,内容还是原来消息的内容
之后再一次将新构建的消息存储到CommitLog中
由于新消息的Topic变成消息真正的Topic了,所以之后消费者就能够消费到这条消息了
所以,从整体来说,RocketMQ延迟消息的实现本质上就是最开始消息是存在SCHEDULE_TOPIC_XXXX
这个中转的Topic中
然后会有一个类似定时任务的东西,不停地去找到这个Topic中的消息
一旦发现这个消息达到了延迟任务,说明可以消费了,那么就重新构建一条消息,这条消息的Topic和队列id都是实际上的Topic和队列id,然后存到CommitLog
之后消费者就能够在目标的Topic获取到消息了
总结
RocketMQ的延迟消息是一种特殊的消息类型,当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理。这种消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。RocketMQ默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的特定变量中。
在实际应用中,不使用定时器,利用RocketMQ的延迟消息可以实现定时任务的功能,适用于一些特定的场景,如电商交易系统的订单超时未支付自动取消订单等。
其实现原理主要是消息在RocketMQ Broker端的流转过程中,对延迟消息进行特殊处理,计算这条延迟消息需要在什么时候进行投递。投递时间等于消息存储时间加上延迟级别对应的时间。
联系方式
关于文章中大家有任何疑问可以通过关注公众号《编程乐学》进行留言,同时,公众号还有更多有趣的项目以及关于学习编程的笔记资料大家可以看看,欢迎大家进行留言。