第3.6章:StarRocks数据导入——DataX StarRocksWriter

一、Datax

1.1 DataX 3.0概述

 DataX3.0是一个异构数据源离线同步工具,可以方便的对各种异构数据源进行高效的数据同步。   其github地址为:

https://github.com/alibaba/DataX/blob/master/introduction.mdicon-default.png?t=N7T8https://github.com/alibaba/DataX/blob/master/introduction.md

GitCode - 开发者的代码家园icon-default.png?t=N7T8https://gitcode.com/alibaba/datax/overview

1.2 DataX3.0框架设计

DataX将复杂的网状的同步链路变成了星型数据链路,DataX自身作为中间传输载体负责连接各种数据源,解决了异构数据源同步问题。Datax采用的是

   DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中:

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1.3 DataX3.0核心架构

    DataX 3.0 开源版本支持单机多线程模式完成同步作业运行。基于DataX作业生命周期的时序图,从整体架构设计角度来阐述DataX各个模块相互关系。

1.3.1 核心模块介绍

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

 1.3.2 DataX调度流程

     用户提交了一个DataX作业,并且配置了DataX Channel并发数为20个,需求是将一个100张分表的mysql数据同步到starrocks里面, 则DataX的调度决策思路是:

  • DataXJob根据分库分表切分成了100个Task。
  • 根据20个并发,DataX计算共需要分配4个TaskGroup。
  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

二、StarRocksWriter

   DataX基于StarRocks开发的StarRocksWriter插件支持MySQL、Oracle等数据库中的数据导入至 StarRocks。在底层实现上,StarRocksWriter内部将各种reader读取的数据进行缓存攒批(以csv或 json格式),之后采用Stream Load 方式批量导入至 StarRocks。总体的数据流是Source -->Reader -->DataX channel --> Writer ---> StarRocks

 官网文章地址:

使用 DataX 导入 | StarRocks

三、创建配置文件

 为导入作业创建 JSON 格式配置文件, 这里列举几种Datax同步脚本。

(1)同步oracle数据至starrocks:oracle2starrocks.json

{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0}},"content": [{"reader": {"name": "oraclereader","parameter": {"username": "root","password": "root","connection": [{"querySql": ["select fid,f_diccode,concat(substr(qhcode,1,2),'0000') as partition_no from nannd.test1"],"jdbcUrl": ["jdbc:oracle:thin:@192.168.22.115:1521/init"]}]}},"writer": {"name": "starrockswriter","parameter": {"username": "root","password": "root","database": "","table": "test2","column": ["fid","f_diccode","partition_no"],"preSql": ["truncate table des.test2"],"postSql": [],"jdbcUrl": "jdbc:mysql://192.168.10.103:9030","loadUrl": ["192.168.10.101:8030","192.168.10.102:8030","192.168.10.103:8030"],"loadProps": {"format": "json","strip_outer_array": true}}}}]}
}
  • OracleReader的配置说明见:

 https://github.com/alibaba/DataX/blob/master/introduction.md

https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md

  • StarRocksWriter的配置说明见:官网

使用 DataX 导入 | StarRocks

(2)同步mysql库的数据至starrocks:mysql2starrocks.json

{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["OBJECTID","xmmc","shengmc","shimc","xianmc",],"connection": [{"table": ["init2.test6"],"jdbcUrl": ["jdbc:mysql://192.168.22.156:3306/init2"]}]}},"writer": {"name": "starrockswriter","parameter": {"username": "root","password": "root","database": "des3","table": "test7","column": ["OBJECTID","shengmc","shimc","xianmc",],"preSql": [],"postSql": [],"jdbcUrl": "","loadUrl": ["192.168.10.101:8030","192.168.10.102:8030","192.168.10.103:8030"],"loadProps": {"format": "json","strip_outer_array": true}}}}]}
}
  • MysqlReader的配置说明见:

https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

  • StarRocksWriter的配置说明见:官网

(3)同步tidb库的数据至starrocks:tidb2starrocks.json

{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root@sq2023","connection": [{"querySql": ["select id,member_id,create_time,update_time,now() as run_dt from test2"],"jdbcUrl": ["jdbc:mysql://192.168.22.143:4000/init1"]}]}},"writer": {"name": "starrockswriter","parameter": {"username": "root","password": "root","database": "des1","table": "test3","column": ["id","member_id","create_time","update_time","run_dt"],"preSql": [],"postSql": [],"jdbcUrl": "","loadUrl": ["192.168.10.101:8030","192.168.10.102:8030","192.168.10.103:8030"],"loadProps": {"format": "json","strip_outer_array": true}}}}]}
}

 ps:从tidb数据读取数据,采用的read插件还是MysqlReder,不赘述。

四、常见问题记录

4.1 常规排查方案

   例如:针对配置文件job.json启动导入任务,设置JVM 调优参数(--jvm="-Xms6G -Xmx6G")以及日志等级(--loglevel=debug),日志等级用来任务失败时打印更详细的作业执行信息

python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

4.2 时区问题

    如果源数据库与目标数据库时区不同,需要命令行中添加 -Duser.timezone=GMTxxx 选项设置源数据库的时区信息。例如,源库使用 UTC 时区,则启动任务时需添加参数 -Duser.timezone=GMT+0

4.3 性能调优

4.3.1 合理拆分任务

    合理配置任务参数,让DataX任务拆分为多个Task,同时,提升DataX Channel并发数。以mysqlreader为例,就要合理配置splitPk参数,如果splitPk不填写(包括不提供splitPk或者splitPk值为空),DataX会视作使用单通道同步该表数据。

4.3.2 配置堆内存

   当提升DataX Job内Channel并发数时,内存的占用也会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,调大JVM的堆内存。调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,在命令行添加对应的参数,如下:(xms:初始化堆内存; xmx:堆最大内存)

python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

ps:建议将初始化堆内存与堆最大内存配置一致,这样可以让同步数据处理起来更快,也可以避免内存的抖动。

4.3.3 任务限速

  使用DataX进行数据同步的另一个优势是可以限速,进而降低同步过程中对业务库的压力影响。DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以方便的控制同步作业速度,让同步作业在库可以承受的范围内达到最佳的同步速度。以最常用的字节流限速为例:

  • 修改datax/conf/core.json,限制单个chanel的速度为2M (2*1024*1024= 2097152 byte):

"speed": {"byte": 2097152,},
  • 同时修改作业json部分的速度限制,例如限制为4M(这样任务会用4/2=2个channel并发进行任务),修改:
    "job": {"setting": {"speed": {"byte" : 4194304}},...}
  • 以及:
"speed": {"channel": 5,"byte": 1048576,"record": 10000}

4.3.4 读取StarRocks数据

   StarRocks兼容MySQL协议,当我们需要将StarRocks中的数据同步至其他数据库时,可以使用mysqlreader来直接读取,但这种JDBC的方式性能可能不是很好,推荐Flink Connector或者Spark Connector来进行处理。

参考文章:

第3.5章:StarRocks数据导入--DataX StarRocksWriter_datax-starrockswriter-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/695512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

盘点那些世界名校计算机专业采用的教材

清华、北大、MIT、CMU、斯坦福的学霸们在新学期里要学什么?今天我们来盘点一下那些世界名校计算机专业采用的教材。 书单目录 1.《深入理解计算机系统》(原书第3版)2. 《算法导论》(原书第3版)3. 《计算机程序的构造和…

el-table增加/编辑打开el-dialog内嵌套el-form,解决编辑重置表单不成功等问题

需求:在做表格的增删改查,其中新增和编辑弹窗都是同一个弹窗,之后有个重置按钮,需要用户输入的时候可以重置清空等。本文章解决如下问题 1.就是在编辑数据回填后点击重置表单没有清空也没有报错 2.解决清空表单和表格数据相互影响…

10.docker exec -it /bin/bash报错解决、sh与bash区别

报错 进入容器时,报如下错误 dockeruserdell-PowerEdge-R740:~$ docker exec -it daf2 /bin/bash OCI runtime exec failed: exec failed: unable to start container process: exec: "/bin/bash": stat /bin/bash: no such file or directory: unknown…

常用的Web应用程序的自动测试工具有哪些

在Web应用程序的自动化测试领域,有许多流行的工具可供选择。以下是一些常用的Web自动化测试工具: 1. Selenium - Selenium是最流行的开源Web应用程序自动化测试套件之一。 - 它支持多种编程语言,如Java、C#、Python、Ruby等。 …

Python——multiprocessing报错:TypeError: cannot pickle ‘_thread.lock‘ object

多进程报错 Traceback (most recent call last):File "C:\Users\miaochangbin\PycharmProjects\eduCrawler\main.py", line 138, in <module>p.start()File "D:\Program Files\python\lib\multiprocessing\process.py", line 121, in startself._po…

C-MAPSS涡扇发动机仿真数据(PHM2008)

数据集介绍 在开始介绍数据集之前&#xff0c;先帮大家理清一下涡扇发动机的数据&#xff08;NASA提供&#xff0c;本文中称为数据集A&#xff09;和PHM2008竞赛数据&#xff08;本文称为数据集B&#xff09;的关系。之所以将数据集A和数据集B放在一篇文章中&#xff0c;是因为…

【论文解读】transformer小目标检测综述

目录 一、简要介绍 二、研究背景 三、用于小目标检测的transformer 3.1 Object Representation 3.2 Fast Attention for High-Resolution or Multi-Scale Feature Maps 3.3 Fully Transformer-Based Detectors 3.4 Architecture and Block Modifications 3.6 Improved …

MT8788|MTK8788安卓核心板参数_4G联发科MTK模块

MT8788核心板是一款功能强大的4G全网通安卓智能模块。该模块采用了联发科AIOT芯片平台&#xff0c;具有长达8年的生命周期。MT8788模块内置了12nm制程的八核处理器&#xff0c;包括4个Cortex A73和4个Coretex A53&#xff0c;主频最高可达2.0GHZ。标配内存为4GB64GB&#xff0c…

下载图片到本地,多个图片压缩后下载到本地

单个图片的下载&#xff1a; 第一种 async downLoadImage() {let response await fetch(https://img-home.csdnimg.cn/images/20231127111739.png);let data await response.blob();let url window.URL.createObjectURL(data);let link document.createElement(a);link.hr…

01 Linux简介

Linux背景 发展史 linux从哪来的&#xff1f;怎么发展的&#xff1f;得从UNIX说起 1968年&#xff0c;一些来自通用电气公司、贝尔实验室和麻省理工学院的研究人员开发了一个名叫Multics的特殊操作系统。Multics在多任务文件管理和用户连接中综合了许多新概念1969-1970年&am…

Json的简介与基本使用

JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人阅读和编写&#xff0c;同时也易于机器解析和生成。它基于JavaScript编程语言的一个子集&#xff0c;但JSON是独立于语言的文本格式&#xff0c;代码中可以使用各种语言来…

智能运维服务指的是哪些?智能运维阶段有哪些

智能运维服务通常包含哪些关键组成部分&#xff1f;它们在IT管理中的作用和重要性&#xff1f;智能运维的发展可以分为哪些主要阶段&#xff1f;每个阶段的核心技术或实践有哪些&#xff0c;它们是如何推动运维工作向更高水平的自动化和智能化发展的&#xff1f; 智能运维服务…

Linux离线安装插件

当公司Linux环境无外网情况下&#xff0c;需要先下载好离线安装包&#xff0c;然后上传到服务器&#xff0c;进行安装。 这里介绍一个下载插件安装包的网站&#xff0c;可以搜索到lrzsz、lsof、telnet、unzip、zip等安装包 搜索到想要的插件安装包后&#xff0c;下载并上传到服…

Ubuntu20.04安装Carla0.9.15

文章目录 环境要求下载Carla解压Carla运行Carla测试官方用例创建python环境安装依赖包案例&#xff1a;生成车辆案例&#xff1a;测试自动驾驶 参考链接 环境要求 系统配置要求&#xff1a; 至少3G显存的GPU&#xff0c;推荐3060及以上的显卡进行Carla拟真。预留足够的硬盘空…

Tomcat线程池原理(上篇:初始化原理)

文章目录 前言正文一、从启动脚本开始分析二、ProtocolHandler 的启动原理三、AbstractEndPoint 的启动原理四、创建默认线程池五、参数配置原理5.1 常规的参数配置5.2 自定义线程池5.3 测试自定义线程 前言 在Java Web的开发过程中&#xff0c;Tomcat常用的web容器。SpringBo…

C#知识点-13(进程、多线程、使用Socket实现服务器与客户端通信)

进程 定义&#xff1a;每一个正在运行的应用程序&#xff0c;都是一个进程 进程不等于正在运行的应用程序。而是为应用程序的运行构建一个运行环境 Process[] pros Process.GetProcesses();//获取电脑中所有正在运行的进程//通过进程&#xff0c;直接打开文件//告诉进程&…

cmd命令开启windows桌面远程控制并设置防火墙允许远程

cmd命令开启桌面远程控制 1、开启之前&#xff1a; 2、使用管理员身份运行cmd 3、执行cmd命令 reg add "HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlset\Control\Terminal server" /v fDenyTSConnections /t REG_DWORD /d 0 /f4、如果这台电脑的防火墙打开&#xff…

Android14 InputManager-InputManagerService环境的构造

IMS分为Java层与Native层两个部分&#xff0c;其启动过程是从Java部分的初始化开始&#xff0c;进而完成Native部分的初始化。 □创建新的IMS对象。 □调用IMS对象的start&#xff08;&#xff09;函数完成启动 同其他系统服务一样&#xff0c;IMS在SystemServer中的ServerT…

股票K线认知从形态到逻辑,仓位管理与交易体系实战

一、教程描述 本套教程内容分为三个部分&#xff0c;1、基础篇&#xff1a;讲的都是干货基础&#xff0c;有些是书本上没有的&#xff0c;通过对基础知识的掌握&#xff0c;对技术形态会有更深的理解&#xff0c;比如集合竞价、K线指标、盘中看盘技巧等等。2、交易篇&#xff…

boolean在Java中占几个字节?(企业真题)

boolean 占几个字节 编译时不谈占几个字节。 但是JVM在给boolean类型分配内存空间时&#xff0c;boolean类型的变量占据一个槽位(slot(狭槽、窄口、扁口)&#xff0c;等于4个字节)。 细节&#xff1a;true:1 false:0 拓展&#xff1a;在内存中&#xff0c;byte\short\char\boo…