前言
今天是2024-2-21,农历正月十二,相信今天开始是新的阶段,尽管它不是新的周一、某月一日、某年第一天,尽管我是一个很讲究仪式感的人。新年刚过去 12 天,再过 3 天就开学咯,开学之后我的大学时光就进入了冲刺阶段,之前没完成的目标和习惯务必严格要求自己执行,我也慢慢悟出了解决各种 "病症" 的办法了~
这里推荐我喜欢的几本书:《黄金时代》、《一直特立独行的猪》、《沉默的大多数》,都是王小波的,对我收益颇深。尽管这博客是写给我自己看的 hahaha
言归正传,今天学习 DataX,这也是一个大数据工具,和 Maxwell 差不多,它是用来做全量数据同步的,前者主要是做增量数据同步的。
1、概述
1.1、什么是 DataX
DataX 是阿里巴巴开源的一个异构数据源离线同步工具(区别于 Maxwell、Cannal,这俩是主要是做增量同步的),致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。
源码地址:https://github.com/alibaba/DataX
1.2、DataX 的设计
为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路, DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。
1.3、支持的数据源
类型 | 数据源 | Reader(读) | Writer(写) |
RDBMS 关系型数据库 | MySQL | √ | √ |
Oracle | √ | √ | |
OceanBase | √ | √ | |
SQLServer | √ | √ | |
PostgreSQL | √ | √ | |
DRDS | √ | √ | |
通用RDBMS | √ | √ | |
阿里云数仓数据存储 | ODPS | √ | √ |
ADS | √ | ||
OSS | √ | √ | |
OCS | √ | √ | |
NoSQL数据存储 | OTS | √ | √ |
Hbase0.94 | √ | √ | |
Hbase1.1 | √ | √ | |
Phoenix4.x | √ | √ | |
Phoenix5.x | √ | √ | |
MongoDB | √ | √ | |
Hive | √ | √ | |
Cassandra | √ | √ | |
无结构化数据存储 | TxtFile | √ | √ |
FTP | √ | √ | |
HDFS | √ | √ | |
Elasticsearch | √ | ||
时间序列数据库 | OpenTSDB | √ | |
TSDB | √ | √ |
1.4、框架设计
- Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲, 流控,并发,数据转换等核心技术问题。
1.5、运行原理
- Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。一个 Job 启动一个进程。
- Task:根据不同数据源的切分策略,一个 Job 会被切分为多个 Task(由 Split 模块完成),Task 是 DataX 作业的最小单元,每个 Task 负责一部分数据的同步工作。
- TaskGroup:Scheduler 调度模块会对 Task 进行分组,每个 TaskGroup 负责启动 Task,单个 TaskGroup 的并发数量为 5(最多同时执行 5 个Task,一个 Task 执行完就会释放掉,再进来一个 Task 继续执行)。
- Reader -> Channel -> Writer :每个 Task 启动后,都会固定启动 Reader -> Channel -> Writer 来完成同步工作。
举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100 张分表的 mysql 数据同步到 odps 里面。 DataX 的调度决策思路是:
- DataXJob 根据分库分表切分成了 100 个 Task。
- 根据 20 个并发,DataX 计算共需要分配 4 个 TaskGroup。
- 4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运行 25 个 Task。
1.6、与 Sqoop 对比
2、DataX3.0 部署
傻瓜式安装解压,然后执行下面的脚本
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
运行结果:
当出现上面的结果说明安装成功,这里我们用的是 DataX 自带的一个测试作业,它是一个 json 格式的文件,之后我们的 DataX 作业也是通过自己 编写 json 文件来实现。
3、DataX 的使用
3.1、DataX 任务提交命令
DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可,就像我们安装时测试执行 DataX 任务的操作一样:
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
3.2、DataX 配置文件格式
可以通过下面这个命令来查看 DataX 配置文件模板:
# -r 代表 reader -w 代表 writer
python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
Reader和Writer的具体参数可参考官方文档,地址:
https://github.com/alibaba/DataX/blob/master/README.md
所以,如果我们需要自定义 DataX 任务的时候,就需要打开官网的 reader 和 writer 文档,查看需要配置哪些参数,接下来我们就来练习一下:
4、使用案例
4.1、MySQL -> HDFS
从 MySQL 写入到 HDFS ,我们就需要去官网查看 MySQLReader 和 HDFSWriter 的内容:
简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。
案例要求:同步 gmall 数据库中 base_province 表数据到 HDFS 的 /base_province 目录
需求分析:要实现该功能,需选用 MySQLReader 和 HDFSWriter,MySQLReader 具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
下面分别使用两种模式进行演示:
4.1.1、MySQLReader & TableMode
1)编写配置文件
vim /opt/module/datax/job/base_province.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"where": "id>=3","connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"table": ["base_province"]}],"password": "123456","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
2)配置说明
1. Reader 参数说明
注意:这里的 splitPk 参数是数据分片字段,一般是主键,仅支持整型 ,而且只有 TableMode 模式下才有效。
2. Writer 参数说明
我们的 hdfswriter 中有一个 column 参数,但是我们知道 HDFS 是没有列的这个概念的。其实,这里代表的是我们Hive表中数据的字段类型,这个配置参数是给 Hive 看的。之后我们在使用 hdfsreader 的时候 依然要配置这个参数,这个参数的意义仍然是 hive 的数据字段。
注意:这里的 fileName 参数指的是 HDFS 前缀名而并不是完整文件名!
3)提交任务
使用DataX向HDFS同步数据时,必须确保目标路径已存在!
hadoop fs -mkdir /base_province
python bin/datax.py job/base_province.json
执行结果:
可以看到,我们的文件名是由我们 hdfswriter 中指定的前缀 fileName + uuid 组成的。
查看HDFS 中的文件内容(因为我们的文件是经过 gzip 压缩的,所以网页端查看不了):
hadoop fs -cat /base_province/* | zcat
可以看到,MySQL 中 34 条数据一共写入了 32 条,这是因为我们设置了 where 参数的值为 id>=3
4.1.2、MySQLReader & QuerySQLMode
1)配置文件
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
可以看到,TableMode 的 mysqlreader 中是通过在 connection 参数设置 table 参数的值来指定我们的表,而这里 QuerySQLMode 模式是通过 querySql 参数来指定 SQL ,从SQL中可以得到表名。此外,QuerySQLMode 模式没有 columns 和 where 参数,因为这些都可以在 SQL 中指定。
那 TableMode 和 QuerySQLMode 有什么区别呢?其实 QuerySQLMode 正因为它可以指定 SQL ,所以就更加灵活,我们可以使用复杂的 join 和聚合函数,这一点是 TableMode 所实现不了的。反过来,我们上面知道 TableMode 的配置文件中可以在 mysqlreader 中指定一个参数 splitPk 来开启多个 Task 去读取一张表,这一点同样是 QuerySQLMode 所不具备的,QuerySQLMode 只支持单个 Task。
QuerySQLMode 这种模式用的还是比较少的,毕竟我们的 Hive 也可以完成数据的聚合和联结。
此外,关于 hdfswriter 还有一些注意事项:
注意事项:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决该问题的方案有两个:
一是修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑(也就是如果读取到 null 就把它替换为 "\\N",双斜杠是因为 DataX 是 Java 写的),可参考这里。
二是在Hive中建表时指定null值存储格式为空字符串(''")
2)配置文件说明
mysqlreader:
hdfswriter 和上面的是一样的。
3)提交任务
结果和上面是一样的,这里不再演示。
4.1.3、DataX 传参
通常情况下,离线数据同步任务需要每日定时重复执行(就像我们之前 flume 上传到 HDFS 也指定过),故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中 HDFS Writer 的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
1)修改配置文件
DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,我们只需要修改 hdfswriter 下 parameter 参数下的 path 为:
"path": "/base_province/${dt}"
2)创建 hdfs 路径
hadoop fs -mkdir /base_province/2020-06-14
3)提交任务
python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
4.2、HDFS -> MySQL
案例要求:同步HDFS上的/base_province目录下的数据到 MySQL gmall 数据库下的 test_province表。
需求分析:要实现该功能,需选用 HDFSReader 和 MySQLWriter。
1)编写配置文件
vim test_province.json
{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"defaultFS": "hdfs://hadoop102:8020","path": "/base_province","column": ["*"],"fileType": "text","compress": "gzip","encoding": "UTF-8","nullFormat": "\\N","fieldDelimiter": "\t",}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"table": ["test_province"],"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"}],"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"writeMode": "replace"}}}],"setting": {"speed": {"channel": 1}}}
}
2)配置说明
hdfsreader:
mysqlwriter:
其中 writeMode 的三种不同取值代表三种不同的 SQL 语句,其中 replace 和 on duplicate key update 都要求我们的 MySQL 表是有主键的。我们经常使用的 insert 语句是不需要主键的,所以当有主键重复的时候会直接报错。而 replace 语句如果遇到表中已经存在该主键的数据会直接替换掉, on duplicate key update 语句的话如果遇到表中已经存在该主键的数据会更新不同值的字段。
3)提交任务
创建 HDFS 输出端 MySQL 的表:
DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province` (`id` bigint(20) NOT NULL,`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
提交任务:
bin/datax.py job/test_province.json
4)查看结果
5、DataX 优化
上面 DataX 的任务配置文件中,job 下有两个参数 content 和 setting,content 是配置 reader 和 writer 的,而 setting 其实就是来给 DataX 优化用的(通过控制流量、并发)。
5.1、速度控制
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
关键优化参数如下:
参数 | 说明 |
job.setting.speed.channel | 并发数 |
job.setting.speed.record | 总 record 限速(tps:条数/s) |
job.setting.speed.byte | 总 byte 限速(bps:字节数/s) |
core.transport.channel.speed.record | 单个 channel 的record限速,默认值为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
1. 若配置了总 record 限速,则必须配置单个 channel 的 record 限速
2. 若配置了总 byte 限速,则必须配置单个 channe 的 byte 限速
3. 若配置了总 record 限速和总 byte 限速,channel 并发数参数就会失效。因为配置了总record限速和总 byte 限速之后,实际 channel 并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个 channel 的record限速)
配置示例:
{"core": {"transport": {"channel": {"speed": {"byte": 1048576 //单个channel byte限速1M/s}}}},"job": {"setting": {"speed": {"byte" : 5242880 //总byte限速5M/s}},...}
}
5.2、内存调整
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
总结
DataX 这个工具的学习就结束了,比我想象的要简单多,但是也需要好好熟悉练习一下。目前只学习了 DataX 在 HDFS 和 MySQL 之间的相互数据传递,以后用到其它框架的时候还需要精进一下。