什么是数据同步利器DataX,如何使用?

什么是 Datax?

DataX 是阿里云 DataWorks数据集成 的开源版本,使用Java 语言编写,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

应用场景有那些?

  1. 数据仓库同步:DataX 可以帮助将数据从一个数据仓库(如关系型数据库、大数据存储系统等)同步到另一个数据仓库,实现数据的迁移、备份或复制。

  2. 数据库迁移:当我们需要将数据从一个数据库平台迁移到另一个数据库平台时,DataX 可以帮助完成数据的转移和转换工作

  3. 数据集成与同步:DataX 可以用作数据集成工具,用于将多个数据源的数据进行整合和同步。它支持多种数据源,包括关系型数据库、NoSQL 数据库、文件系统等,可以将这些数据源的数据整合到一个目标数据源中。

  4. 数据清洗与转换:DataX 提供了丰富的数据转换能力,可以对数据进行清洗、过滤、映射、格式转换等操作。这对于数据仓库、数据湖和数据集市等数据存储和分析平台非常有用,可以帮助提高数据质量和一致性。

  5. 数据备份与恢复:DataX 可以用于定期备份和恢复数据。通过配置定时任务,可以将数据从源端备份到目标端,并在需要时进行数据恢复。

DataX支持那些数据源?

架构设计

DataX作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 开源版本支持单机多线程模式完成同步作业运行,如下图

  1. DataX完成单个数据同步的作业,称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张表的mysql数据同步到odps里面。 DataX的调度决策是:

  1. Job根据分表切分成了100个Task。

  2. 根据20个并发,DataX计算需要分配4个TaskGroup。

  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责5个并发共计运行25个Task。

如何使用 Datax?

点击datax 下载,下载后解压至本地某个目录,如下图

image-20240203222845753

用例说明

这里为了方便演示,我们同步MySQL的user_info表至MySQL的ods_test_mysql_user_info_m,同步条件为更新时间字段,如下

在实际工作中你可以选择不同类型的数据源测试


drop table ods_test_mysql_user_info_mCREATE TABLE `user_info` (`id` int NOT NULL COMMENT 'ID',`name` varchar(50) NOT NULL COMMENT '名称',`sex` tinyint NOT NULL COMMENT '性别 1男 2女',`phone` varchar(11) COMMENT '手机',`address` varchar(1000)  COMMENT '地址',`age` int  COMMENT '年龄',`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间',`update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户信息表';CREATE TABLE `ods_test_mysql_user_info_m` (`id` int NOT NULL COMMENT 'ID',`name` varchar(50) NOT NULL COMMENT '名称',`sex` tinyint NOT NULL COMMENT '性别 1男 2女',`phone` varchar(11) COMMENT '手机',`address` varchar(1000)  COMMENT '地址',`age` int  COMMENT '年龄',`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间',`update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户信息数仓表';

在user_info表中插入数据如下

创建作业的配置文件(json格式)

在 datax 的 script 目录,创建ods_test_mysql_user_info_m.json文件,配置如下,mysqlreader表示读取端,mysqlwriter表示写入端

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","sex","phone","address","age","create_time","update_time"],"splitPk": "id","connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"],"table": ["user_info"]}],"password": "root","username": "root","where": "update_time > '${updateTime}' "}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "replace","column": ["id","name","sex","phone","address","age","create_time","update_time"],"connection": [{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false","table": ["ods_test_mysql_user_info_m"]}],"username": "root","password": "root","preSql": [],"session": ["set session sql_mode='ANSI'"]}}}],"setting": {"speed": {"channel": "5"}}}
}

创建执行脚本

为了更贴合实际,写一个调度脚本sync.sh支持动态参数来执行任务

#!/bin/bash
## 执行示例 sh /Users/weizhao.dong/Documents/soft/datax/datax-script/call.sh /Users/weizhao.dong/Documents/soft/datax/datax-script/dwd_g2park_inout_report_s.json 1
jsonScript=$1
echo '执行脚本:'$jsonScript
interval=$2
echo "时间间隔(分钟):"$interval
now_time=$(date '+%Y-%m-%d %H:%M:%S')
echo "当前时间:"$now_time
update_time=$(date -v -${interval}M  '+%Y-%m-%d %H:%M:%S')
#linux 更新时间获取
#update_time=$(date -d "${now_time} $interval minute ago" +"%Y-%m-%d %H:%M:%S")
echo "更新时间:"$update_time
#执行
python3 /Users/weizhao.dong/Documents/soft/datax/bin/datax.py $jsonScript -p "-DupdateTime='${update_time}'"

假设我们要执以上ods_test_mysql_user_info_m.json脚本,并且同步十分钟之前的数据,如下

./sync.sh ods_test_mysql_user_info_m.json 10
测试

执行./sync.sh ods_test_mysql_user_info_m.json 10进行同步

以上结果可能有些人有疑问,就三条数据执行时间为 10s,其实这个 10s主要是初始化时间,耗时过长,同步的数据量多了优势就体现出来了,以下为实际生产同步数据结果,可以看到同步63102条耗时22s

推荐用法

以上我们只是通过一个简单的示例来演示了dataX如何使用,如果只是一次性同步,没问题,但是如果是周期性进行同步,有以下几种方式推荐

crontab调度

这种方式是最简单的,可以使用操作系统中的crontab定时调度,通过crontab -e编辑corn 任务,添加对应脚本即可

海豚调度器

在种方式在大数据领域用的比较多,典型场景就是 mysql 同步到数仓,海豚调度器内置了 datax 并且提供了图形化配置界面,配置起来非常方便

同时每次执行都有记录,并且都有对应的日志

定时任务框架(elasticjob/xxl-job)

在我们实际使用的业务系统定时调度框架都支持调度 shell 脚本,通过传入对应参数也可执行

文章转载自:架构成长指南

原文链接:https://www.cnblogs.com/waldron/p/18034189

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/710123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python笔记_数据类型

定义:python的变量在使用前必须赋值,数据类型指的是变量指定的内存数据的类型 string字符串类型使用引号int整型整数float浮点型小数bool布尔值(逻辑)输出true/false A,整数类型 整型字节 1,python的整数有十六进制,十进制&#…

面试数据库篇(mysql)- 11主从同步

原理 MySQL主从复制的核心就是二进制日志 二进制日志(BINLOG)记录了所有的 DDL(数据定义语言)语句和 DML(数据操纵语言)语句,但不包括数据查询(SELECT、SHOW)语句。 复…

Java 小项目开发日记 03(文章分类接口的开发)

Java 小项目开发日记 03&#xff08;文章分类接口的开发&#xff09; 项目目录 配置文件&#xff08;pom.xml&#xff09; <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocat…

计算机网络:数据链路层知识点汇总

文章目录 一、数据链路层功能概述二、封装成帧和透明传输三、差错控制&#xff08;检错编码&#xff09;四、差错控制&#xff08;纠错编码&#xff09;五、流量控制与可靠传输机制六、停止-等待协议七、后退N帧协议&#xff08;GBN&#xff09;八、选择重传协议&#xff08;SR…

用户增长6步法

什么是用户增长&#xff1f; 通过痛点、产品、渠道、内容、技术、数据等要素实现用户的获取、激活、留存、变现、推荐&#xff0c;用户增长包含了产品出现前的用户增长、产品生产周期内的用户增长、产品生命周期外的用户增长三个阶段。 用户增长6步法&#xff1a;方法、模型和…

YOLOv8-TensorRT on Jetson

YOLOv8-TensorRT Jetson 项目地址&#xff1a;https://github.com/triple-Mu/YOLOv8-TensorRT/blob/main/docs/Jetson.md 文档地址&#xff1a;https://github.com/triple-Mu/YOLOv8-TensorRT/blob/main/docs/Jetson.md 注意 engine 文件不跨平台&#xff0c;只能在对应的平台…

docker mysql主从复制

新建主服务器容器实例3301 mysql 主 3301 docker run -p 3301:3306 --name mysql-master \ -v /mydata/mysql-master/log:/var/log/mysql \ -v /mydata/mysql-master/data:/var/lib/mysql \ -v /mydata/mysql-master/conf:/etc/mysql \ -v /home/mysql/mysql-files:/var/lib/…

MATLAB环境下使用相关图可视化相关矩阵

为了处理各行各业中出现的高维数据&#xff0c;迫切需要寻找适用的统计学方法。大维随机矩阵理论是处理高维数据的理论工具之一&#xff0c;在高维统计分析中&#xff0c;表现出良好的性能并有着广泛的应用。 二十世纪四十年代和五十年代初期&#xff0c;大维随机矩阵理论起源…

AI大模型 拍照搜题

最近&#xff0c;发现一款小程序【问智通】&#xff0c;实现了拍照搜题结合AI大模型&#xff0c;省去了打字和敲数学公式向AI提问&#xff0c;完美的补充了其它拍照搜题平台拍不到&#xff0c;没解析等不足&#xff01;&#xff01;&#xff01; 小程序码&#xff1a; APP下载…

【多模态】28、LLaVA 第一版 | Visual Instruction Tuning 多模态模型的指令微调

论文&#xff1a;Visual Instruction Tuning 代码&#xff1a;https://llava-vl.github.io/ 出处&#xff1a;NeurIPS 2023 Oral 系列工作&#xff1a;LLaVA-1.5、LLaVA-PLUS、LLaVA-Interactive、Video-LLaVA、LLaVA-Med 等&#xff0c;LLaVA 也是首次将指令学习引入多模态…

西门子WinCC冗余项目使用

1 如果需要使用WinCC冗余系统时&#xff0c;请仔细阅读下面的文档&#xff0c;它将解决以下几个问题&#xff1a; &#xff08;1&#xff09;WinCC冗余有什么样的功能&#xff1f; &#xff08;2&#xff09;需要购买什么样的授权&#xff1f; &#xff08;3&#xff09;应…

TypeScript 中命名空间与模块的区别

&#x1f469; 个人主页&#xff1a;不爱吃糖的程序媛 &#x1f64b;‍♂️ 作者简介&#xff1a;前端领域新星创作者、CSDN内容合伙人&#xff0c;专注于前端各领域技术&#xff0c;成长的路上共同学习共同进步&#xff0c;一起加油呀&#xff01; ✨系列专栏&#xff1a;前端…

H3C OSPF 外部路由引入实验

H3C OSPF 外部路由引入实验 实验拓扑 实验需求 按照图示配置 IP 地址R1&#xff0c;R2&#xff0c;R3 运行 OSPF 使内网互通&#xff0c;所有接口&#xff08;公网接口除外&#xff09;全部宣告进 Area 0&#xff1b;要求使用环回口作为 Router-id业务网段不允许出现协议报文…

请立刻停止编写 Dockerfiles 并使用 docker init

您是那种觉得编写 Dockerfile 和 docker-compose.yml 文件很痛苦的人之一吗&#xff1f; 我承认&#xff0c;我就是其中之一。 我总是想知道我是否遵循了 Dockerfile、 docker-compose 文件的最佳编写实践&#xff0c;我害怕在不知不觉中引入了安全漏洞。 但是现在&#xff0c…

【数据结构和算法初阶(C语言)】时间复杂度(衡量算法快慢的高端玩家,搭配例题详细剖析)

目录 1.算法效率 1.1如何衡量一个算法的好坏 1.2 算法的复杂度 2.主菜-时间复杂度 2.1 时间复杂度的概念 2.2 大O的渐进表示法 2.2.1算法的最好&#xff0c;最坏和平均的情况 3.经典时间复杂度计算举例 3.1计算冒泡排序的时间复杂度 3.2计算折半查找的时间复杂度 3.…

Vue3 学习笔记(Day5)

「写在前面」 本文为尚硅谷禹神 Vue3 教程的学习笔记。本着自己学习、分享他人的态度&#xff0c;分享学习笔记&#xff0c;希望能对大家有所帮助。推荐先按顺序阅读往期内容&#xff1a; 1. Vue3 学习笔记&#xff08;Day1&#xff09; 2. Vue3 学习笔记&#xff08;Day2&…

提升培训考试效率的系统设计策略

随着培训的重要性日益凸显&#xff0c;如何提升培训考试系统的效率成为了许多组织和机构关注的焦点。 一、设计自适应的考试界面 培训考试系统的界面应该能够自适应不同的屏幕尺寸和设备类型&#xff0c;如电脑、平板电脑和手机。采用响应式设计技术&#xff0c;确保考生在不同…

Leetcode115. 不同的子序列 -代码随想录

题目&#xff1a; 代码(首刷看解析 2024年2月29日&#xff09;&#xff1a; 不晓得这种超过int和long的测试案例是用来恶心谁的&#xff0c;用DP都没机会取模 class Solution { public:// 动态规划const int MOD 1000000007;int numDistinct(string s, string t) {long n s.…

市场复盘总结 20240229

仅用于记录当天的市场情况&#xff0c;用于统计交易策略的适用情况&#xff0c;以便程序回测 短线核心&#xff1a;不参与任何级别的调整&#xff0c;采用龙空龙模式 一支股票 10%的时候可以操作&#xff0c; 90%的时间适合空仓等待 二进三&#xff1a; 进级率中 60% 最常用…

06|Mysql内部组件结构

1. 连接器 客户端要向mysql发起通信都必须先跟Server端建立通信连接&#xff0c;而建立连接的工作就是由连接器完成的 mysql -h host[数据库地址] -u root[用户] -p root[密码] -P 3306连接步骤: 1、如果用户名或密码不对&#xff0c;你就会收到一个"Access denied for us…