Apache Doris 基于 Job Scheduler 实现秒级触发任务调度能力

作者|SelectDB 技术团队

在数据管理愈加精细化的需求背景下,定时调度在其中扮演着重要的角色。它通常被应用于以下场景:

  • 定期数据更新,如周期性数据导入和 ETL 操作,减少人工干预,提高数据处理的效率和准确性。
  • 结合 Catalog 实现外部数据源数据定期同步,确保多源数据高效、准确的整合到目标系统中,满足复杂的业务分析需求。
  • 定期清理过期/无效数据,释放存储空间,避免过多过期/无效数据对系统性能产生影响。

在 Apache Doris 之前版本中,通常需要依赖于外部调度系统,如通过业务代码定时调度或者引入第三方调度工具、分布式调度平台来满足上述需求。然而,因受限于外部系统自身能力,可能无法满足 Doris 对调度策略及资源管理灵活性的要求。此外,如果外部调度系统出现故障,这不仅会增加业务风险,还需投入额外的运维时间和人力来应对。

引入 Job Scheduler

为解决上述问题,Apache Doris 在 2.1 版本中引入了 Job Scheduler 功能,实现了自主任务调度能力,调度的精准度可达到秒级。该功能的推出不仅保障了数据导入的完整性和一致性,更让用户能够灵活、便捷调整调度策略。同时,因减少了对外部系统的依赖,也降低了系统故障的风险和运维成本,为社区用户带来更加统一、可靠的使用体验。

Doris Job Scheduler 是一种基于预设计划运行的任务管理系统,能够在特定时间点或按照指定时间间隔触发预定义操作,实现任务的自动化执行。Job Scheduler 具备以下特点:

  • 高效调度:Job Scheduler 可以在指定的时间间隔内安排任务和事件,确保数据处理的高效性。采用时间轮算法保证事件能够精准做到秒级触发。
  • 灵活调度:Job Scheduler 提供了多种调度选项,如按 分、小时、天或周的间隔进行调度,同时支持一次性调度以及循环(周期)事件调度,并且周期调度也可以指定开始时间、结束时间。
  • 事件池和高性能处理队列:Job Scheduler 采用 Disruptor 实现高性能的生产消费者模型,最大可能的避免任务执行过载。
  • 调度记录可追溯:Job Scheduler 会存储最新的 Task 执行记录(可配置),通过简单的命令即可查看任务执行记录,确保过程可追溯。
  • 高可用:依托于 Doris 自身的高可用机制,Job Schedule 可以很轻松的做到自恢复、高可用。

(具体实现原理可参考本文“设计与实现”章节介绍)

语法及示例

01 语法说明

一条有效的 Job 语句需包含以下内容:

  • 关键字 CREATE JOB 需加作业名称,它在数据库中标识唯一事件。
  • ON SCHEDULE 子句用于指定 Job 作业的类型、触发时间和频率。
    • AT timestamp 用于一次性事件。它指定 JOB 仅在给定的日期和时间执行一次,AT CURRENT_TIMESTAMP 指定当前日期和时间。因 JOB 一旦创建则会立即运行,也可用于异步任务创建。
    • EVERY:用于周期性作业,可指定作业的执行频率,关键字后需指定时间间隔(周、天、小时、分钟)。
      • Interval:用于指定作业执行频率。1 DAY 表示每天执行一次,1 HOUR表示每小时执行一次,1 MINUTE 表示每分钟执行一次,1 WEEK 表示每周执行一次。
      • 子句EVERY包含可选 STARTS子句。STARTS后面为 timestamp 值,该值用于定义开始重复的时间,CURRENT_TIMESTAMP 用于指定当前日期和时间。JOB 一旦创建则会立即运行。
      • 子句EVERY包含可选 ENDS子句。ENDS 关键字后面为***timestamp*** 值,该值定义 JOB 事件停止运行的时间。
  • DO 子句用于指定 Job 作业触发时所需执行的操作,目前仅支持 Insert 语句。
CREATEJOBjob_nameON SCHEDULE schedule[COMMENT 'string']DO execute_sql;schedule: {AT timestamp | EVERY interval[STARTS timestamp ][ENDS timestamp ]
}interval:quantity { WEEK |DAY | HOUR | MINUTE}

下方为简单的示例:

CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

该语句表示创建一个名为 my_job的作业,每分钟执行一次,执行的操作是将 db2.tbl2 中的数据导入到 db1.tbl1中。

02 举例说明

创建一次性的 Job: 在 2025-01-01 00:00:00 时执行一次,将 db2.tbl2中数据导入到 db1.tbl1 中。

CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

创建周期性的 Job,未指定结束时间: 在 22025-01-01 00:00:00 时开始每天执行 1 次,将 db2.tbl2 中数据导入到 db1.tbl1 中。

CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE  create_time >=  days_add(now(),-1);

创建周期性的 Job,指定结束时间: 在 2025-01-01 00:00:00 时开始每天执行 1 次,将 db2.tbl2 中的数据导入到 db1.tbl1 中,在 2026-01-01 00:10:00 时结束。

CREATE JOB my_job ON SCHEDULER EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 create_time >=  days_add(now(),-1);

借助 Job 实现异步执行: 由于 Job 在 Doris 中是以同步任务的形式创建的,但其执行过程却是异步进行的,这一特性使得 Job 非常适合用于实现异步任务,例如常见的 insert into select 任务。

假设需要将db2.tbl2 中的数据导入到 db1.tbl1 中,这里只需要指定 JOB 为一次性任务,且开始时间设置为当前时间即可。

CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

基于 Catalog 与 Job Scheduler 的数据自动同步

以某电商场景为例,用户常常需要从 MySQL 中提取业务数据,并将这些数据同步到 Doris 中进行数据分析,从而支持精准的营销活动。而 Job Scheduler 可与数据湖能力 Multi Catalog 配合,高效完成跨数据源的定期数据同步

基于 Catalog 与 Job Scheduler 的数据自动同步.png

以上表为例,用户希望查询符合_总消费金额、最后一次访问时间、性别、所在城市_这几个数值条件的用户,并将满足条件的用户信息导入到 Doris 中,以便后续的定向推送。

1. 首先,创建一张 Doris 表

CREATE TABLE IF NOT EXISTS user_activity
(`user_id` LARGEINT NOT NULL COMMENT "用户id",`date` DATE NOT NULL COMMENT "数据灌入日期时间",`city` VARCHAR(20) COMMENT "用户所在城市",`age` SMALLINT COMMENT "用户年龄",`sex` TINYINT COMMENT "用户性别",`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

2. 其次,创建对应 MySQL 库的 Catalog

CREATE CATALOG activity PROPERTIES ("type"="jdbc","user"="root","jdbc_url" = "jdbc:mysql://127.0.0.1:9734/user?useSSL=false","driver_url" = "mysql-connector-java-5.1.49.jar","driver_class" = "com.mysql.jdbc.Driver"
);

3. 最后,将 MySQL 数据导入到 Doris 中。采用 Catalog + Insert Into 的方式来导入全量数据,由于全量导入操作可能会引发系统服务波动,通常选择在业务闲暇时进行操作。

  • 一次性调度: 如下方代码所示,使用一次性任务来定时触发全量导入任务,触发时间为凌晨 3:00。
CREATE JOB one_time_load_job
ON SCHEDULE 
AT '2024-8-10 03:00:00'
DOINSERT INTO user_activity FROM SELECT * FROM activity.user.activity 
  • 周期调度: 用户也可以创建一个周期性的调度任务,定期更新最新的数据。
CREATE JOB schedule_load
ON SCHEDULE EVERY 1 DAY
DOINSERT INTO user_activity FROM SELECT * FROM activity.user.activity where create_time >=  days_add(now(),-1)

设计与实现

高效的调度通常伴随着大量的资源消耗,高精度的调度更是如此。传统的实现方式是直接使用 Java 内置的定时调度能力——定时调度线程周期访问,或采用一些定时调度的工具类库,但其在精度以及内存占用上存在较大的问题。为更好保障性能的前提下降低资源的占用,我们选择 TimingWheel 算法与 Disruptor 结合,实现秒级别的任务调度。

设计与实现

具体来说,利用 Netty 的 HashedWheelTimer 实现时间轮算法,Job Manager 会周期性(默认十分钟)地将未来事件放入时间轮中调度。为了保证任务高效触发并避免资源过度占用,采用 Disruptor 构建单生产者多消费者模型。时间轮仅负责触发,并不直接执行任务。对于到期需触发的任务时,会将其放入 Diapatch 线程,由其负责将任务分发至相应的执行线程池,对于需立即执行的任务,则直接将其投递至相应的任务执行线程池中。

对于单次执行事件,将在调度完成后删除事件定义;对于周期性事件,时间轮中的系统事件将定期拉取下一个周期的执行任务。这样可以避免大量任务集中在一个 Bucket 中,减少无意义的遍历、提高处理效率。

而对于事务型任务,Job Scheduler 能够通过与事务的强关联以及事务回调机制,确保事务型任务的执行结果与预期一致,从而保证数据的完整性和一致性。

结束语

Doris Job Scheduler 是一款强大且灵活的任务调度工具,是数据处理中必不可少的功能之一。除了在数据湖分析、内部 ETL 等常见场景的应用外,Job Scheduler 对于异步物化视图的实现也起到关键的作用。异步物化视图是一个预先计算并存储的结果集,其数据更新的频率与源表的变动紧密相关。当源表数据更新频繁时,为确保物化视图中数据保持最新状态,就需要对物化视图定期刷新。因此在 2.1 版本中,我们巧妙地利用 JOB 定时调度功能,保障了物化视图与源表数据的一致性,大幅降低了人工干预的成本。

未来,Doris Job Scheduler 还会支持以下特性:

  • 支持通过 UI 界面查看不同时段执行的任务分布情况。
  • 支持 JOB 流程编排,即 DAG JOB。这意味着我们可以在内部实现数仓任务编排,与 Catalog 功能叠加将会更高效地完成数据处理和分析工作。
  • 支持对导入任务、UPDATE、DELETE 操作进行定时调度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/814699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【王道数据结构笔记】顺序表的静态分配代码分析

🎈个人主页:豌豆射手^ 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:数据结构 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进…

2024-04-08

作业要求: 1> 思维导图 2>使用手动连接,将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中,在自定义的槽函数中调用关闭函数 将登录按钮使用qt4版本的连接到自定义的槽函数中,在槽函数中判断ui界面上输入的账号是否…

攻防世界05view_source

鼠标右键无法查看源代码,但是可以直接F12进行查看 但是没有办法复制,可以在地址前面加上view-source 这时候就可以复制了 知识点1:view-source协议-查看源码 view-source是一种协议,早期基本上每个浏览器都支持这个协议。后来Mi…

C语言基础(四)

C语言基础 一维数组数组初始化全部初始化部分初始化数组的默认值冒泡排序 字符数组 二维数组初始化行数是否可省略列数是否可以省略部分初始化 访问二维字符数组 函数分类库函数自定义函数调用自定义函数函数声明 一维数组 概念&#xff1a;一组数据类型相同的元素的集合 <…

无人机巡检技术革命性变革光伏电站运维管理

在中国广袤的大地上&#xff0c;光伏电站如雨后春笋般崛起&#xff0c;晶体硅组件板在阳光下熠熠生辉&#xff0c;为人们带来了源源不断的绿色能源。然而&#xff0c;随着光伏产业的迅猛发展&#xff0c;电站运维管理面临着前所未有的挑战。而无人机巡检技术的引入&#xff0c;…

【Unity+Python】如何通过Socket进行通信

1、Unity端创建名为UnityClient.cs脚本代码(客户端)&#xff1a; 注意&#xff1a;unity的规则中类&#xff0c;名和脚本文件名需要相同。 using System.Net.Sockets; using System.Text; using UnityEngine;public class UnityClient : MonoBehaviour {TcpClient client;Netw…

TypeScript 忽略红色波浪线

&#x1f468;&#x1f3fb;‍&#x1f4bb; 热爱摄影的程序员 &#x1f468;&#x1f3fb;‍&#x1f3a8; 喜欢编码的设计师 &#x1f9d5;&#x1f3fb; 擅长设计的剪辑师 &#x1f9d1;&#x1f3fb;‍&#x1f3eb; 一位高冷无情的全栈工程师 欢迎分享 / 收藏 / 赞 / 在看…

CNN-Transformer时间序列预测

部分代码&#xff1a; # CNN-Transformer class CNNTransformerEncoder(nn.Module):def __init__(self, input_features, transformer_encoder_heads,embedding_features, cnn_kernel_size, dim_feedforward_enc, n_encoder_layer):super(CNNTransformerEncoder, self).__init…

数模 初见数建

文章目录 初见数学建模1.1 数学建模是什么1.2 数学建模的概述1.3 如何学习数学建模---分模块化1.4 数学建模前提了解1.5 数学建模的六个步骤1.6 如何备战建模比赛1.7 数学建模赛题类型1.8 数学建模算法体系概述 初见数学建模 1.1 数学建模是什么 1.原型与模型 原型&#xff…

ssm048电子竞技管理平台的设计与实现+jsp

电子竞技管理平台设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本电子竞技管理平台就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短…

Docker之自定义镜像上传至阿里云

一、Alpine介绍 Alpine Linux是一个轻量级的Linux发行版&#xff0c;专注于安全、简单和高效。它采用了一个小巧的内核和基于musl libc的C库&#xff0c;使得它具有出色的性能和资源利用率。 Alpine Linux的主要特点包括&#xff1a; 小巧轻量&#xff1a;Alpine Linux的安装…

【新版】系统架构设计师 - 知识点 - 面向对象开发方法

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 知识点 - 面向对象开发方法面向对象开发方法面向对象的分析需求模型分析模型 面向对象的设计 用例模型关系、UML事务关系、类的关系 架构 - 知识点 - 面向对象开发方法 面向对象开发方法 分析阶段…

<-泛型->

1.泛型的概念 所谓泛型&#xff0c;就是允许在定义类, 接口 时通过一个"标识"表示类中某个属性的类型或者某个方法的返回值或形参类型.这个类型参数将在使用时确定. 2.举例 (1). 集合类在设计阶段/声明阶段不能确定这个容器到底存的是什么对象&#xff0c;所以在JDK…

微信小程序制作圆形进度条

微信小程序制作圆形进度条 1. 建立文件夹 选择一个目录建立一个文件夹&#xff0c;比如 mycircle 吧&#xff0c;另外把对应 page 的相关文件都建立出来&#xff0c;包括 js&#xff0c;json&#xff0c;wxml 和 wxcc。 2. 开启元件属性 在 mycircle.json中开启 component 属…

51蓝桥杯之DS18B20

DS18B20 基础知识 代码流程实现 将官方提供例程文件添加到工程中 添加onewire.c文件到keil4里面 一些代码补充知识 代码 #include "reg52.h" #include "onewire.h" #include "absacc.h" unsigned char num[10]{0xc0,0xf9,0xa4,0xb0,0x99,…

随机链表的复制 - LeetCode 热题 32

大家好&#xff01;我是曾续缘&#x1f4a4; 今天是《LeetCode 热题 100》系列 发车第 32 天 链表第 11 题 ❤️点赞 &#x1f44d; 收藏 ⭐再看&#xff0c;养成习惯 随机链表的复制 给你一个长度为 n 的链表&#xff0c;每个节点包含一个额外增加的随机指针 random &#xff…

【leetcode面试经典150题】28.盛最多水的容器(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主&#xff0c;题解使用C语言。&#xff08;若有使用其他语言的同学也可了解题解思路&#xff0c;本质上语法内容一致&…

漫步密度森林:借助HDBSCAN实现高效数据聚类

文章来源&#xff1a;navigating-the-density-forest-harnessing-hdbscan-for-advanced-data-clustering 2024 年 4 月 9 日 介绍 在数据科学中&#xff0c;聚类算法是揭示数据集内在结构的重要工具。在这些工具中&#xff0c;基于分层密度的噪声应用空间聚类 (HDBSCAN) 作为…

【C++学习】C++11新特性(第一节)

文章目录 ♫一.文章前言♫二.C11新特性♫一.统一的列表初始化♫二.std::initializer_list♫三.声明♫四.decltype关键字♫五.nullptr♫六.新增加容器---静态数组array、forward_list以及unordered系列♫6.1unordered_map与unoredered_set♫6.2array♫6.3 forward_list&#xff…

【Altium Designer 20 笔记】隐藏PCB上的信号线(连接线)

使用网络类隐藏特定类型的信号线 如果你想要隐藏特定类型的信号线&#xff08;例如电源类&#xff09;&#xff0c;你可以首先创建一个网络类。使用快捷键DC调出对象类浏览器&#xff0c;在Net Classes中右击添加类&#xff0c;并重命名&#xff08;例如为“Power”&#xff0…