一、介绍
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
github地址
详细文档
操作手册
支持数据框架如下:
架构
Reader:为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
二、使用
- 下载
下载地址:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
- 解压缩
# 解压缩
tar -zxvf datax.tar.gz -C /opt/module/
- 编写数据同步任务
{"job": {"content": [{"reader": {"name": "streamreader","parameter": {"sliceRecordCount": 10,"column": [{"type": "long","value": "10"},{"type": "string","value": "hello,datax"}]}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": 5}}}
}
- 启动任务
python /opt/module/datax/bin/datax.py /opt/module/datax/job/stream_to_stream.json
- 执行结果
- 配置说明
参数 | 说明 |
---|---|
job.setting | 设置全局配置参数 |
job.setting.speed | 控制任务速度配置参数,包括:channel(通道(并发))、record(字节流)、byte(记录流)等三种模式 |
job.setting.speed.channel | 并发数 |
job.setting.speed.record | 字节流 |
job.setting.speed.byte | 记录流 |
job.setting.errorLimit | 设置错误限制 |
job.setting.errorLimit.record | 指定允许的最大错误记录数 |
job.setting.errorLimit.percentage | 指定允许的最大错误记录百分比 |
job.setting.dirtyDataPath | 设置错误限制 |
job.setting.dirtyDataPath.path | 设置错误限制 |
job.setting.log | 设置错误限制 |
job.setting.log.level | 设置错误限制 |
job.setting.log.dir | 设置错误限制 |
content | 任务配置参数 |
reader | Reader配置 |
name | Reader类型 |
parameter | Reader具体配置(具体配置查看具体Reader) |
writer | Writer配置 |
name | Writer类型 |
parameter | Writer具体配置(具体配置查看具体Writer) |
三、常用配置
3.1、MysqlReader
{"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["id","name"],"splitPk": "db_id","connection": [{"table": ["table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}},"writer": {"name": "streamwriter","parameter": {"print":true}}}]}
}
配置说明:
jdbcUrl:链接地址
username:mysql用户名
password:mysql密码
table:待同步的表名
column:所配置的表中需要同步的列名集合,可以使用使用*代表所有字段
splitPk:使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
where:筛选条件
querySql:sql语句,可以替代column和where配置
3.2、MysqlWriter
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 19880808,"type": "long"},{"value": "1988-08-08 08:08:08","type": "date"},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1000}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}}}]}
}
配置说明:
jdbcUrl:链接地址
username:mysql用户名
password:mysql密码
table:待同步的表名
column:所配置的表中需要同步的列名集合,可以使用使用*代表所有字段
preSql:写入数据到目的表前,会先执行这里的标准语句
postSql:写入数据到目的表后,会执行这里的标准语句
writeMode:控制写入数据到目标表采用insert into或者replace into或者 ON DUPLICATE KEY UPDATE语句
batchSize:一次性批量提交的记录数大小
3.3、HdfsReader
{"job": {"setting": {"speed": {"channel": 3}},"content": [{"reader": {"name": "hdfsreader","parameter": {"path": "/user/hive/warehouse/mytable01/*","defaultFS": "hdfs://xxx:port","column": [{"index": 0,"type": "long"},{"index": 1,"type": "boolean"},{"type": "string","value": "hello"},{"index": 2,"type": "double"}],"fileType": "orc","encoding": "UTF-8","fieldDelimiter": ","}},"writer": {"name": "streamwriter","parameter": {"print": true}}}]}
}
配置说明:
path:文件路径
defaultFS:namenode节点地址
fileType:文件的类型,目前支持:”text”、”orc”、”rc”、”seq”、”csv”
column:读取字段列表
fieldDelimiter:读取的字段分隔符
encoding:读取文件的编码配置
nullFormat:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null
haveKerberos:是否有Kerberos认证,默认false
kerberosKeytabFilePath:Kerberos认证keytab文件路径,且为绝对路径
kerberosPrincipal:Kerberos认证Principal名
hadoopConfig:hadoop相关的一些高级参数
3.4、HdfsWriter
{"setting": {},"job": {"setting": {"speed": {"channel": 2}},"content": [{"reader": {"name": "txtfilereader","parameter": {"path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],"encoding": "UTF-8","column": [{"index": 0,"type": "long"},{"index": 1,"type": "long"},{"index": 2,"type": "long"},{"index": 3,"type": "long"},{"index": 4,"type": "DOUBLE"},{"index": 5,"type": "DOUBLE"},{"index": 6,"type": "STRING"},{"index": 7,"type": "STRING"},{"index": 8,"type": "STRING"},{"index": 9,"type": "BOOLEAN"},{"index": 10,"type": "date"},{"index": 11,"type": "date"}],"fieldDelimiter": "\t"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://xxx:port","fileType": "orc","path": "/user/hive/warehouse/writerorc.db/orcfull","fileName": "xxxx","column": [{"name": "col1","type": "TINYINT"},{"name": "col2","type": "SMALLINT"},{"name": "col3","type": "INT"},{"name": "col4","type": "BIGINT"},{"name": "col5","type": "FLOAT"},{"name": "col6","type": "DOUBLE"},{"name": "col7","type": "STRING"},{"name": "col8","type": "VARCHAR"},{"name": "col9","type": "CHAR"},{"name": "col10","type": "BOOLEAN"},{"name": "col11","type": "date"},{"name": "col12","type": "TIMESTAMP"}],"writeMode": "append","fieldDelimiter": "\t","compress":"NONE"}}}]}
}
配置说明:
path:存储到Hadoop Hdfs文件系统的路径信息
defaultFS:namenode节点地址
fileType:文件的类型,目前支持:“text”或“orc”
fileName:文件名
column:写入字段列表
fieldDelimiter:读取的字段分隔符
compress:文件压缩类型
3.5、FtpReader
{"setting": {},"job": {"setting": {"speed": {"channel": 2}},"content": [{"reader": {"name": "ftpreader","parameter": {"protocol": "sftp","host": "127.0.0.1","port": 22,"username": "xx","password": "xxx","path": ["/home/hanfa.shf/ftpReaderTest/data"],"column": [{"index": 0,"type": "long"},{"index": 1,"type": "boolean"},{"index": 2,"type": "double"},{"index": 3,"type": "string"},{"index": 4,"type": "date","format": "yyyy.MM.dd"}],"encoding": "UTF-8","fieldDelimiter": ","}},"writer": {"name": "ftpWriter","parameter": {"path": "/home/hanfa.shf/ftpReaderTest/result","fileName": "shihf","writeMode": "truncate","format": "yyyy-MM-dd"}}}]}
}
配置说明:
protocol:ftp服务器协议,目前支持传输协议有ftp和sftp
host:ftp服务器地址
port:ftp服务器端口
timeout:连接ftp服务器连接超时时间,单位毫秒,默认:60000
connectPattern:连接模式(主动模式或者被动模式)
username:用户名
password:密码
path:路径
column:读取字段列表
fieldDelimiter:读取的字段分隔符
3.6、FtpWriter
{"setting": {},"job": {"setting": {"speed": {"channel": 2}},"content": [{"reader": {},"writer": {"name": "ftpwriter","parameter": {"protocol": "sftp","host": "***","port": 22,"username": "xxx","password": "xxx","timeout": "60000","connectPattern": "PASV","path": "/tmp/data/","fileName": "yixiao","writeMode": "truncate|append|nonConflict","fieldDelimiter": ",","encoding": "UTF-8","nullFormat": "null","dateFormat": "yyyy-MM-dd","fileFormat": "csv","suffix": ".csv","header": []}}}]}
}
配置说明:
protocol:ftp服务器协议,目前支持传输协议有ftp和sftp
host:ftp服务器地址
port:ftp服务器端口
timeout:连接ftp服务器连接超时时间,单位毫秒,默认:60000
connectPattern:连接模式(主动模式或者被动模式)
username:用户名
password:密码
path:路径
fileName:文件名
fieldDelimiter:读取的字段分隔符