RocketMQ 消息集成:多类型业务消息 - 定时消息

引言

Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。

本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。

点击链接,查看直播讲解:https://yqh.aliyun.com/live/detail/29063

概念:什么是定时消息

在业务消息集成场景中,定时消息是,生产者将一条消息发送到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间,消费者才可以消费到。

相似地,延迟消息其实是对于定时消息的另外一种解释,指的是生产者期望消息延迟一定时间,消费者才可以消费到。可以理解为定时到当前时间加上一定的延迟时间。

对比一下定时消息和普通消息的流程。普通消息,可以粗略的分为消息发送,消息存储和消息消费三个过程。当一条消息发送到 Topic 之后,那么这条消息就可以马上处于等待消费者消费的状态了。

而对于定时/延时消息来说,其可以理解为在普通消息的基础上叠加了定时投递到消费者的特性。生产者发送了一条定时消息之后,消息并不会马上进入用户真正的Topic里面,而是会被 RocketMQ 暂存到一个系统 Topic 里面,当到了设定的时间之后,RocketMQ 才会将这条消息投递到真正的 Topic 里面,让消费者可以消费到。

场景:为什么需要使用定时消息

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。往往这类定时事件触发都会存在以下诉求:

  • 高性能吞吐:需要大量事件触发,不能有性能瓶颈。
  • 高可靠可重试:不能丢失事件触发。
  • 分布式可扩展:定时调度不能是单机系统,需要能够均衡的调度到多个服务负载。

传统的定时调度方案,往往基于数据库的任务表扫描机制来实现。大概的思路就是将需要定时触发的任务放到数据库,然后微服务应用定时触发扫描数据库的操作,实现任务捞取处理。

这类方案虽然可以实现定时调度,但往往存在很多不足之处:

  • 重复扫描:在分布式微服务架构下,每个微服务节点都需要去扫描数据库,带来大量冗余的任务处理,需要做去重处理。
  • 定时间隔不准确:基于定时扫描的机制无法实现任意时间精度的延时调度。
  • 横向扩展性差:为规避重复扫描的问题,数据库扫表的方案里往往会按照服务节点拆分表,但每个数据表只能被单节点处理,这样会产生性能瓶颈。

在这类定时调度类场景中,使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。消息队列 RocketMQ 版的定时消息具有高并发和水平扩展的能力。

案例:使用定时消息实现金融支付超时需求

利用定时消息可以实现在一定的时间之后才进行某些操作而业务系统不用管理定时的状态。下面介绍一个典型的案例场景:金融支付超时。现在有一个订单系统,希望在用户下单 30 分钟后检查用户的订单状态,如果用户还没有支付,那么就自动取消这笔订单。

基于 RocketMQ 定时消息,我们可以在用户下单之后发送一条定时到 30 分钟之后的定时消息。同时,我们可以使用将订单 ID 设置为 MessageKey。当 30 分钟之后,订单系统收到消息之后,就可以通过订单 ID 检查订单的状态。如果用户超时未支付,那么就自动的将这笔订单关闭。

原理:RocketMQ 定时消息如何实现

固定间隔定时消息

如前文介绍,定时消息的核心是如何在特定的时间把处于系统定时 Topic 里面的消息转移到用户的 Topic 里面去。

Apache RocketMQ 4.x 的版本的定时消息是先将定时消息放到按照 DelayLevel 放到 SCHEDULE_TOPIC_XXXX 这个系统的不同 Queue 里面,然后为每一个 Queue 启动一个定时任务,定时的拉取消息并将到了时间的消息转投到用户的 Topic 里面去。这样虽然实现简单,但也导致只能支持特定 DelayLevel 的定时消息。

当下,支持定时到任意秒级时间的定时消息的实现的 pr 提出到了社区,下面简单的介绍一下其基本的实现原理。

时间轮算法

在介绍具体的实现原理之前,先介绍一下经典的时间轮算法,这是定时消息实现的核心算法。

如上所示,这是一个一圈定时为 7 秒的时间轮,定时的最小精度的为秒。同时,时间轮上面会有一个指向当前时间的指针,其会定时的移向下一个刻度。

现在我们想定时到 1 秒以后,那么就将数据放到 “1” 这个刻度里面,同时如果有多个数据需要定时到同一个时间,

那么会以链表的方式添加到后面。当时间轮转到 “1” 这个刻度之后,就会将其读取并从链表出队。那如果想定到超过时间轮一圈的时间怎么处理呢?例如我们想定时到 14 秒,由于一圈的时间是 7 秒,那么我们将其放在“6”这个刻度里面。当第一次时间轮转到“6” 时,发现当前时间小于期望的时间,那么忽略这条数据。当第二次时间轮转到“6”时,这个时候就会发现已经到了我们期望的 14 秒了。

任意秒级定时消息

在 RocketMQ 中,使用 TimerWheel 对于时间轮进行描述和存储,同时使用一个 AppendOnly 的 TimerLog 记录时间轮上面每一个刻度所对应的所有的消息。

TimerLog 记录了一条定时消息的一些重要的元数据,用于后面定时的时间到了之后,将消息转移到用户的 Topic 里面去。其中几个重要的属性如下:

对于 TimerWheel 来说,可以抽象的认为是一个定长的数组,数组中的每一格代表时间轮上面的一个“刻度”。TimerWheel 的一个“刻度”拥有以下属性。

TimerWheel 和 TimerLog 直接的关系如下图所示:

TimerWheel 中的每一格代表着一个时间刻度,同时会有一个 firstPos 指向这个刻度下所有定时消息的首条 TimerLog 记录的地址,一个 lastPos 指向这个刻度下所有定时消息最后一条 TimerLog 的记录的地址。并且,对于所处于同一个刻度的的消息,其 TimerLog 会通过 prevPos 串联成一个链表。

当需要新增一条记录的时候,例如现在我们要新增一个 “1-4”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “1-3”,然后修改 lastPos 指向 “1-4”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。

有了 TimerWheel 和 TimerLog 之后,我们再来看一下一条定时消息从发送到 RocketMQ 之后是怎么最终投递给用户的。

首先,当发现用户发送的是一个定时消息过后,RocketMQ 实际上会将这条消息发送到一个专门用于处理定时消息的系统 Topic 里面去

然后在 TimerMessageStore 中会有五个 Service 进行分工合作,但整体可以分为两个阶段:入时间轮和出时间轮

对于入时间轮:

  • TimerEnqueueGetService 负责从系统定时 Topic 里面拉取消息放入 enqueuePutQueue 等待 TimerEnqueuePutService 的处理
  • TimerEnqueuePutService 负责构建 TimerLog 记录,并将其放入时间轮的对应的刻度中

对于出时间轮:

  • TimerDequeueGetService 负责转动时间轮,并取出当前时间刻度的所有 TimerLog 记录放入 dequeueGetQueue
  • TimerDequeueGetMessageService 负责根据 TimerLog 记录,从 CommitLog 中读取消息
  • TimerDequeuePutMessageService 负责判断队列中的消息是否已经到期,如果已经到期了,那么将其投入用户的 Topic 中,等待消费消费;如果还没有到期,那么重新投入系统定时 Topic,等待重新进入时间轮。

实战:使用定时消息

了解了 RocketMQ 秒级定时消息的原理后,我们看下如何使用定时消息。首先,我们需要创建一个 “定时/延时消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。

从前面可以看出,对于定时消息来说,是在发送消息的时候 “做文章”。所以,对于生产者,相对于发送普通消息,我们可以在发送的时候设置期望的投递时间。

当定时的时间到了之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。

注意:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大。所以一般建议尽量不要设置大量相同触发时刻的消息。

作者:凯易、明锻

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510556.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文读懂 BizDevOps:数字化转型下的技术破局

我们正迈向数字经济时代,数字化转型成为普遍行动。未来绝大多数业务都将运行在数字基座之上,软件系统成为业务创新和发展的核心引擎。在这一趋势下,产品研发的交付能力面临巨大挑战,产品研发的交付实践和方法亟待变革。 BizDevOp…

栈c++代码实现

//实在不想写数组法了&#xff0c;写个常用的STL的吧 #include "iostream" #include "algorithm" #include "stack" using namespace std; void Init(stack <int> s) { while(!s.empty()) { s.pop(); } } int main() { stack …

地址标准化服务AI深度学习模型推理优化实践

导读 深度学习已在面向自然语言处理等领域的实际业务场景中广泛落地&#xff0c;对它的推理性能优化成为了部署环节中重要的一环。推理性能的提升&#xff1a;一方面&#xff0c;可以充分发挥部署硬件的能力&#xff0c;降低用户响应时间&#xff0c;同时节省成本&#xff1b;…

PS里建立工作路径对话框中的“容差”是干什么的?

这里的容差是指&#xff1a;将选区转换为路径时的平滑程度&#xff0c;容差越大&#xff0c;平滑越重&#xff1b;容差越小&#xff0c;越精确&#xff08;与原选区对照&#xff09;&#xff0c;越接近你画的选区。PS的容差用在不同的地方&#xff0c;有不同的用法&#xff0c;…

淘系数据模型治理最佳实践

导读&#xff1a;本次分享题目为淘系数据模型治理&#xff0c;主要介绍过去一年淘系数据治理工作的一些总结。 具体将围绕以下4部分展开 模型背景&问题2问题分析3治理方案4未来规划 模型背景&问题 1.整体情况 首先介绍一下淘系的整体数据背景。 淘系的数据中台成立…

【走进RDS】之SQL Server性能诊断案例分析

客户的困扰 前几天某程序员小王向阿里云咨询他的SQL Server数据库整体负载较高&#xff0c;是否有优化的方法&#xff1f;前几天另外一个工单则是需要阿里云工程师帮忙定位某一个时刻的数据库性能尖刺的问题。 这些都是常见的性能诊断工单&#xff0c;其实数据库性能诊断不仅…

题目1335:闯迷宫( BFS在求解最短路径或者最短步数上有很多的应用)

题目描述&#xff1a;sun所在学校每年都要举行电脑节&#xff0c;今年电脑节有一个新的趣味比赛项目叫做闯迷宫。 sun的室友在帮电脑节设计迷宫&#xff0c;所以室友就请sun帮忙计算下走出迷宫的最少步数。 知道了最少步数就可以辅助控制比赛难度以及去掉一些没有路径到达终点的…

用了那么久的 Lombok,你知道它的原理么?

序言 在写Java代码的时候&#xff0c;最烦写setter/getter方法&#xff0c;自从有了Lombok插件不用再写那些方法之后&#xff0c;感觉再也回不去了&#xff0c;那你们是否好奇过Lombok是怎么把setter/getter方法给你加上去的呢&#xff1f;有的同学说我们Java引入Lombok之后会…

Fury:一个基于JIT动态编译的高性能多语言原生序列化框架

Fury是一个基于JIT动态编译的多语言原生序列化框架&#xff0c;支持Java/Python/Golang/C等语言&#xff0c;提供全自动的对象多语言/跨语言序列化能力&#xff0c;以及相比于别的框架最高20~200倍的性能。 引言 过去十多年大数据和分布式系统蓬勃发展&#xff0c;序列化是其…

HDU1181:变形课(DFS)

Description 呃......变形课上Harry碰到了一点小麻烦,因为他并不像Hermione那样能够记住所有的咒语而随意的将一个棒球变成刺猬什么的,但是他发现了变形咒语的一个统一规律:如果咒语是以a开头b结尾的一个单词,那么它的作用就恰好是使A物体变成B物体. Harry已经将他所会的所有咒…

阿里云丁宇:以领先的云原生技术,激活应用构建新范式

8 月 11 日&#xff0c;2022 阿里云飞天技术峰会在深圳举行&#xff0c;会上阿里云提出云原生激活应用构建三大范式&#xff0c;并发布最新的产品与解决方案。基于分布式云容器平台 ACK One&#xff0c;实现多地域分布式系统一致管理&#xff1b;发布 ACK FinOps 解决方案&…

inline函数和一般的函数有什么不同

比如 int g(int x) { return x x; } int f() { return g(); } 这样f会调用g&#xff0c;然后g返回x x给f&#xff0c;然后f继续把那个值返回给调用者。 如果g是inline的话。f会被直接编译成。 int f() { return x x; } 相当于把g执行的操作直接融合到f里。这样减少…

操作系统的“冷板凳”要坐多久?万字长文解读16年开源老兵的坚持

想知道内核研发是怎样的体验&#xff1f;操作系统的“冷板凳”得坐多久才有春天&#xff1f;本文对话龙蜥社区理事长马涛&#xff0c;畅所欲言聊开源&#xff0c;一起来看看那些开源润物细无声背后的故事以及龙蜥社区运营的道法术。 高门槛的 Linux 内核研发&#xff0c;如何支…

在阿里做前端程序员,我是这样规划的

前端程序员常问的几个问题 此文来自一次团队内的分享。我是来自大淘宝技术内容前端团队的胤涧&#xff0c;负责内容中台技术。我的习惯是每个新财年初都会进行一次分享《HOW TO BE AN EMINENT ENGINEER》&#xff0c;聊聊目前团队阵型、OKR、业务和技术大图&#xff0c;聊聊我作…

c++堆栈中 top() pop()的具体作用是什么

top()是取栈顶元素pop()是弹出栈顶元素stack<int> a;a.push(1); // 1a.push(2); // 1 2a.push(3); // 1 2 3int c a.top(); // c 3a.pop(); // 1 2a.push(4); // 1 2 4c a.top(); // c 4

如何可视化编写和编排你的 K8s 任务

简介 K8s Job 是 Kubernetes 中的一种资源&#xff0c;用来处理短周期的 Pod&#xff0c;相当于一次性任务&#xff0c;跑完就会把 Pod 销毁&#xff0c;不会一直占用资源&#xff0c;可以节省成本&#xff0c;提高资源利用率。 阿里任务调度 SchedulerX 和云原生结合&#x…

前端智能化实践——可微编程

什么是可微编程 通过动画、动效增加 UI 表现力&#xff0c;作为前端或多或少都做过。这里以弹性阻尼动画的函数为例&#xff1a; 函数在 时是效果最好的。最终&#xff0c;实现成 JavaScript 代码&#xff1a; function damping(x, max) {let y Math.abs(x);// 下面的参数都是…

HDU 1495(非常可乐)

Problem Description 大家一定觉的运动以后喝可乐是一件很惬意的事情&#xff0c;但是seeyou却不这么认为。因为每次当seeyou买了可乐以后&#xff0c;阿牛就要求和seeyou一起分享这一瓶可乐&#xff0c;而且一定要喝的和seeyou一样多。但seeyou的手中只有两个杯子&#xff0c…

解析 RocketMQ 业务消息——“事务消息”

引言&#xff1a;在分布式系统调用场景中存在这样一个通用问题&#xff0c;即在执行一个核心业务逻辑的同时&#xff0c;还需要调用多个下游做业务处理&#xff0c;而且要求多个下游业务和当前核心业务必须同时成功或者同时失败&#xff0c;进而避免部分成功和失败的不一致情况…

模型代码联动难? BizWorks 来助力

业务模型设计和沉淀是企业数字化转型过程中非常重要的一个环节, 日趋复杂的业务场景和协作模式给建模的有效性以及模型作为业务资产如何持续发挥价值带来了新的挑战: 设计完成的业务模型是否被合理实现了?经过数月、半年、1年迭代后&#xff0c;模型设计还能否对业务系统的演…