1、datax:
是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步
2、参考网址文献:
https://github.com/alibaba/DataX/blob/master/introduction.md
3、Datax的框架设计:
Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。
1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework
2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端
3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。
4、Datax的核心架构:
Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:
模块的核心介绍:
1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。
2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。
3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。
4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。
5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.
5、Datax的核心优势:
1、可靠的数据质量监控
2、丰富的数据转换功能
3、精准的控制速度
4、容错机制:
1、线程内部重试
DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
2、线程级别重试
目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
5、极简的体验
6、Datax与Sqoop的区别:
功能 | Datax | sqoop |
运行模式 | 单进程多线程 | MR |
分布式 | 是不支持分布式 | 支持 |
流控 | 有 | 需要定制 |
统计信息 | 支持 | 不支持,分布式的数据收集不方便 |
数据校验 | 只有core部分有校验功能 | 不支持,分布式的数据收集不方便 |
监控 | 需要定制 | 需要定制 |
7、Datax部署:
1、下载jar包:
下载路径:https://github.com/alibaba/DataX
2、解压文件,配置环境变量:
#解压jar包
tar -zxvf datax.tar.gz#配置环境变量:
vim /etc/profileexport DATAX_HOME=/user/loacl/soft/datax
export PATH=.:$PATH:$DATAX_HOME/bin#配置好环境变量,让配置文件生效
source /etc/profile
3、使执行文件拥有执行权:
添加执行权:chmod +x data.py
8、Datax的使用:
在datax中会自动的生成模板的命令:datax.py -r streamreader -w streamwriter
1、streamreader to streamwriter,数据打印在控制台上面
参数说明:
"sliceRecordCount": 100 #指定打印的个数"channel": 1 #指定并发度
#创建json文件:vim streamreadertostreamwriter.json{"job": {"content": [{"reader": {"name": "streamreader", "parameter": {"column": [{"type":"string","value":"wyz"},{"type":"int","value":18}], "sliceRecordCount": 100 #指定打印的个数}}, "writer": {"name": "streamwriter", "parameter": {"encoding": "", "print": true}}}], "setting": {"speed": {"channel": 1 #指定并发度}}}
}#脚本执行命令:
datax.py streamreadertostreamwriter.json
2、mysql to mysql
1、可以通过命令获取模板:
datax.py -r mysqlreader -w mysqlwriter 2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题例如:表中的某个字段是主键,主键唯一
vim mysqlreaderTomysqlwriter.json{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name","age","clazz","gender"]}], "connection": [{"jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"], "table": ["stu"]}], "password": "123456", "username": "root", "where": "" #不是必须要写的,作用是可以在读数据时进行一次过滤}}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25", "table": ["data_test"]}], "password": "123456", "preSql": [], #不是必须写的,作用是再写入数据前可以执行该sql"session": [], "username": "root", "writeMode": "insert" #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update}}}], "setting": {"speed": {"channel": 1}}}
}#执行脚本:
datax.py mysqlreaderTomysqlwriter.json
将数据写入到mysql时,写入的表是需要提前创建的。