Doris-Routine Load(二十七)

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。

适用场景

当前仅支持从 Kafka 系统进行例行导入,使用限制:

(1)支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。

(2)支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。

(3)默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 be 的配置,将 kafka_broker_version_fallback 的值设置为要兼容的旧版本,或者在创建 routine load 的时候直接设置property.broker.version.fallback的值为要兼容的旧版本,使用旧版本的代价是 routine load 的部分新特性可能无法使用,如根据时间设置 kafka 分区的 offset。

基本原理

如上图,Client 向 FE 提交一个例行导入作业。

(1)FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

(2)在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

(3)FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的Task 进行重试。

(4)整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。

基本语法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

执行 HELP ROUTINE LOAD 可以查看语法帮助,下面是参数说明

1)[db.]job_name

导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。

2)tbl_name

指定需要导入的表的名称。

3)merge_type

数据的合并类型,一共支持三种类型 APPEND、DELETE、MERGE 其中,APPEND 是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据 key 相同的所有行,MERGE 语义 需要与 delete on 条件联合使用,表示满足 delete 条件的数据按照 DELETE 语义处理其余的按照 APPEND 语义处理 , 语法为

[WITHMERGE|APPEND|DELETE]
4)load_properties

用于描述导入数据。语法:

[column_separator], [columns_mapping], [where_predicates], [delete_on_predicates], [source_sequence], [partitions], [preceding_predicates]

(1)column_separator:

指定列分隔符,如: COLUMNS TERMINATED BY "," 这个只在文本数据导入的时候需要指定,JSON 格式的数据导入不需要指定这个参数。

默认为:\t

(2)columns_mapping:

指定源数据中列的映射关系,以及定义衍生列的生成方式。

映射列:

按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列 k1, k2, v1。源数据有 4 列,其中第 1、2、4 列分别对应 k2, k1, v1。则书写如下:

COLUMNS (k2, k1, xxx, v1)

其中 xxx 为不存在的一列,用于跳过源数据中的第三列。

衍生列:

以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。 接上一个示例,假设目的表还有第 4 列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

再举例,假设用户需要导入只包含 k1 一列的表,列类型为 int。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 case when 函数实现,正确写法应如下:

COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)

(3)where_predicates

用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:

WHERE k1 > 100 and k2 = 1000

(4)partitions

指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

示例:

PARTITION(p1, p2, p3)

(5)delete_on_predicates

表示删除条件,仅在 merge type 为 MERGE 时有意义,语法与 where 相同

(6)source_sequence:

只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。

(7)preceding_predicates

PRECEDING FILTER predicate用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。

5)job_properties

用于指定例行导入作业的通用参数。 语法:

PROPERTIES ("key1" = "val1","key2" = "val2"
)

目前支持以下参数:

(1)desired_concurrent_number

期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于 0。默认为 3。 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。

一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:

Min(partition num, desired_concurrent_number, alive_backend_num, 
Config.max_routine_load_task_concurrrent_num)

其中 Config.max_routine_load_task_concurrrent_num 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。

其中 partition num 指订阅的 Kafka topic 的 partition 数量。alive_backend_num 是当前正常的 BE 节点数。

(2)max_batch_interval/max_batch_rows/max_batch_size这三个参数分别表示:

  • ① 每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为 10。

  • ② 每个子任务最多读取的行数。必须大于等于 200000。默认是 200000。

  • ③ 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是100MB。

这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。 例如:

"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"

(3)max_error_number

采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。

采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行

(4)strict_mode

是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为NULL,则会被过滤。指定方式为 "strict_mode" = "true"

(5)timezone

指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果

(6)format

指定导入数据格式,默认是 csv,支持 json 格式

(7)jsonpaths

jsonpaths: 导入 json 方式分为:简单模式和匹配模式。如果设置了jsonpath 则为匹配模式导入,否则为简单模式导入,具体可参考示例

(8)strip_outer_array

布尔类型,为 true 表示 json 数据以数组对象开始且将数组对象中进行展平,默认值是false

(9)json_root

json_root 为合法的 jsonpath 字符串,用于指定 json document 的根节点,默认值为""

(10)send_batch_parallelism

整型,用于设置发送批处理数据的并行度,如果并行度的值超过BE配置中的max_send_batch_parallelism_per_job,那么作为协调点的BE将使用max_send_batch_parallelism_per_job 的值

6)data_source_properties

数据源的类型。当前支持:Kafka

("key1" = "val1","key2" = "val2"
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/177952.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jks、cer、p12生成

//jks生成 keytool -genkeypair -alias "tomcat" -validity 3650 -keyalg "RSA" -keystore "test.jks" //cer生成 keytool -export -alias "tomcat" -keystore "test.jks" -storepass 123456 -file test.cer //p12生成 …

Attempted to serialize java.lang.Class. Forgot to register a type adapter?

问题&#xff1a;Attempted to serialize java.lang.Class. Forgot to register a type adapter? 原因&#xff1a;Gson不支持类信息&#xff0c;序列化和返序列化 解决方案&#xff1a; 使用Gson的 excluedFieldsWithoutExposeAnnotation() 选项忽略这些字段 Gson gson new …

VMware系列:VMware官网注册账号之验证码问题以及获取ESXi安装程序的方法

VMware官网注册账号之验证码问题以及获取ESXi安装程序的方法 一. VMware官网注册账号之验证码问题1. VMware官网基础账号注册2. VMware官网exsi试用注册二. 获取ESXi安装程序的方法原生态版本下载过程OEM版本下载VMware官网下载过程服务器厂家官网下载过程附录:一. VMware官网…

ArkTS框架深度解析:@Prop、@Link、@ObjectLink装饰器的应用与同步机制【鸿蒙专栏-09】

文章目录 ArkTS框架深度解析:@Prop、@Link、@ObjectLink装饰器的应用与同步机制【HarmonyOS开发】@Prop装饰器概述限制条件使用规则说明使用场景@Link装饰器概述限制条件使用规则说明使用场景@Link装饰器的高级用法动态创建链接自定义同步逻辑最佳实践和注意事项结语ArkTS框架…

小新 Air-14 2021 Intel处理器ITL版(82FF)原厂Win11系统

链接&#xff1a;https://pan.baidu.com/s/1EkqpdGcixCNER5uP5yIc4Q?pwddm1d 提取码&#xff1a;dm1d lenovo联想小新Air14笔记本2021款【82FF】原装出厂Windows11系统镜像ISO文件 系统自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、联想电脑管家等…

手机如何去图片水印?试试这三种方法

手机如何去图片水印&#xff1f;去水印已然成为了自媒体从业者必备技能之一&#xff0c;无论是工作或生活中经常遇到图片/视频上带有水印&#xff0c;非常影响整体观感&#xff0c;网上去水印方法又很多&#xff0c;如果你是小白&#xff0c;这篇文章将提供给你三个实用去水印的…

草图大师sketchup道路怎么快速种树?

草图大师sketchup道路怎么快速种树&#xff1f;草图大师中的道路图纸想要在道路两旁种树&#xff0c;该怎么快速给道路种树呢&#xff1f;下面我们就来看看详细的教程&#xff0c;需要的朋友可以参考下 草图大师sketchup中想要快速种树&#xff0c;该怎么种多棵树呢&#xff1…

数据结构校招知识点总结

文章目录 前言1. 数据结构概论、算法设计与分析1.1 数据结构三要素&#xff1f;1.2 算法的基本概念&#xff1f;1.3 什么是时间复杂度&#xff1f; 2. 线性表2.1 链表结构和顺序存储结构的区别&#xff1f;2.2 单链表和双链表的区别&#xff1f;2.3 头指针和头结点的区别&#…

基于Python的网络爬虫设计与实现

基于Python的网络爬虫设计与实现 摘要&#xff1a;从互联网时代开始&#xff0c;网络搜索引擎就变得越发重要。大数据时代&#xff0c;一般的网络搜索引擎不能满足用户的具体需求&#xff0c;人们更加注重特定信息的搜索效率&#xff0c;网络爬虫技术应运而生。本设计先对指定…

上海数交所与合合信息发布产业数据行业创新中心,政产学研合力为“数据航母”加速

大数据产业是数字经济创新发展、加速发展的重要方向。11月25日&#xff0c;2023全球数商大会在上海盛大开幕。大会以“数联全球、商通未来”为主题&#xff0c;聚焦数字经济时代下&#xff0c;数据要素推动实体经济发展的规划与成果&#xff0c;是数据交易领域的行业级峰会和数…

Kafka 如何保证消息消费的全局顺序性

哈喽大家好&#xff0c;我是咸鱼 今天我们继续来讲一讲 Kafka 当有消息被生产出来的时候&#xff0c;如果没有指定分区或者指定 key &#xff0c;那么消费会按照【轮询】的方式均匀地分配到所有可用分区中&#xff0c;但不一定按照分区顺序来分配 我们知道&#xff0c;在 Kaf…

动态:class和:style绑定

1. 在应用界面中, 某个(些)元素的样式是变化的 class/style绑定就是专门用来实现动态样式效果的技术 2. 动态class绑定 :class等号后的变量值 可以是字符串 :class等号后 可以是对象 :class等号后 可以是数组 3. 动态style绑定 :style"{ color: myPinkColor, fontS…

医疗级超声波雾化器方案

药用雾化器是一种将药物液体转化为微小颗粒状物质并通过呼吸道输送到呼吸系统的医疗器械。常用于治疗呼吸道疾病&#xff0c;如哮喘、气管炎、肺炎、鼻窦炎等。且被广泛应用于医疗、美容、家居等领域。现代化的药用雾化器通常采用单片机方案控制&#xff0c;能够更加稳定地实现…

【软件测试】银行测试项目VS常规项目有什么区别?怎么做?

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 银行测试项目跟常…

ESP32-Web-Server编程- JS 基础5

ESP32-Web-Server编程- JS 基础5 概述 JS 编程内容颇多&#xff0c;我们提供一些简单的示例&#xff0c;先玩再学&#xff0c;边玩边学。 示例1-演示通过 JS 进行温度转换 资源链接 对应示例的 code 链接 &#xff08;点击直达代码仓库&#xff09; 示例2-增加网页弹窗 演…

文献速递:使用人工智能进行超声检查的文章:读者指南(超声影像人工智能专题文献分享)

文献速递&#xff1a;使用人工智能进行超声检查的文章&#xff1a;读者指南&#xff08;超声影像人工智能专题文献分享&#xff09; 01 文献速递介绍 本文讨论了人工智能&#xff08;AI&#xff09;如何将医学影像转化为可挖掘的高通量数据&#xff0c;并强调了机器学习算法…

【攻防世界-misc】reverseMe

1.下载后&#xff0c;得到这样一张图片 2.利用在线翻转网站获取值&#xff0c;在线旋转图片工具|在线翻转照片|调整照片方向|生成镜像图片 - 改图宝 反转后的图片&#xff0c;将值提取并上传。

Java零基础——Nginx篇

1.【熟悉】服务器概述 1.1 目前常见的web服务器 1&#xff0c;Apache(http://httpd.apache.org) 它是世界上用的最多的web服务器&#xff0c;市场占有率达60%左右&#xff0c;模块非常丰富&#xff0c;系统非常稳定&#xff0c;可移植性好&#xff0c;但是比较消耗资源 2&…

数据采集静态存储SRAM芯片EMI7064

数据采集是利用一种装置&#xff0c;从系统外部采集数据并输入到系统内部的一个接口。数据采集技术广泛应用在各个领域。比如摄像头&#xff0c;麦克风&#xff0c;都是数据采集工具。 ram工作时可以随时从任何一个指定的地址写入(存入&#xff09;或读出(取出)信息。RAM在计算…

开源与闭源:技术创新的两难选择

在当前数字化时代&#xff0c;技术界一直存在着关于开源与闭源软件的激烈辩论。最近&#xff0c;特斯拉CEO马斯克的公开表示引发了广泛的关注&#xff0c;他认为OpenAI不应该闭源&#xff0c;并宣布将首款聊天机器人开源。这一决定再次引发了人们对于开源与闭源的讨论。 开源的…