使用DataX实现mysql与hive数据互相导入导出

一、概论

1.1 什么是DataX

         DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

1.2 DataX 的设计

         为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步
在这里插入图片描述

1.3 框架设计

在这里插入图片描述

  • Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。
  • Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。
  • Framework:用于连接read和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
    运行原理
    在这里插入图片描述
  • Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
  • Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
  • TaskGroup:负责启动Task。

1.4 Datax所支持的渠道

类型数据源读者作家(写)文件
RDBMS关系型数据库MySQL读,写
           甲骨文        √        √    读,写
SQL服务器读,写
PostgreSQL的读,写
DRDS读,写
通用RDBMS(支持所有关系型数据库)读,写
阿里云数仓数据存储ODPS读,写
美国存托凭证
开源软件读,写
OCS读,写
NoSQL数据存储OTS读,写
Hbase0.94读,写
Hbase1.1读,写
凤凰4.x读,写
凤凰5.x读,写
MongoDB读,写
蜂巢读,写
卡桑德拉读,写
无结构化数据存储文本文件读,写
的FTP读,写
HDFS读,写
弹性搜索
时间序列数据库OpenTSDB
技术开发局读,写

二、快速入门

2.1 环境搭建

下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址: https://github.com/alibaba/DataX

配置要求:

  • Linux
  • JDK(1.8以上 建议1.8) 下载
  • Python(推荐 Python2.6.X)下载
    安装:

1) 将下载好的datax.tar.gz上传到服务器的任意节点,我这里上传到node01上的/exprot/soft
2)解压到/export/servers/

[root@node01 soft]# tar -zxvf datax.tar.gz  -C ../servers/

3)运行自检脚本

出现以下结果说明你得环境没有问题

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.
在这里插入图片描述

2.2搭建环境注意事项

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.

参考:

find ./* -type f -name ".*er"  | xargs rm -rf
find: paths must precede expression: |
Usage: find [-H] [-L] [-P] [-Olevel] [-D help|tree|search|stat|rates|opt|exec] [path...] [expression]find /datax/plugin/reader/ -type f -name "._*er" | xargs rm -rf
find /datax/plugin/writer/ -type f -name "._*er" | xargs rm -rf这里的/datax/plugin/writer/要改为你自己的目录

原文链接:https://blog.csdn.net/dz77dz/article/details/127055299

2.3读取Mysql中的数据写入到HDFS

准备
创建数据库和表并加载测试数据

create database test;
use test;
create table c_s(id   varchar(100) null,c_id int          null,s_id varchar(20)  null
);
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 1, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 2, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 3, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 5, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 6, '201967');

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r mysqlreader -w hdfswriterDataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the mysqlreader document:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.mdPlease refer to the hdfswriter document:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.mdPlease save the following configuration as a json file and  usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [],"connection": [{"jdbcUrl": [],"table": []}],"password": "","username": "","where": ""}},"writer": {"name": "hdfswriter","parameter": {"column": [],"compress": "","defaultFS": "","fieldDelimiter": "","fileName": "","fileType": "","path": "","writeMode": ""}}}],"setting": {"speed": {"channel": ""}}}
}

根据官网模板进行修改

[root@node01 datax]# vim job/mysqlToHDFS.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","c_id","s_id"],"connection": [{"jdbcUrl": ["jdbc:mysql://node02:3306/test"],"table": ["c_s"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "string"},{"name": "c_id","type": "int"},{"name": "s_id","type": "string"}],"defaultFS": "hdfs://node01:8020","fieldDelimiter": "\t","fileName": "c_s.txt","fileType": "text","path": "/","writeMode": "append"}}}],"setting": {"speed": {"channel": "1"}}}
}

HDFS的端口号注意版本,2.7.4 是9000;hdfs://node01:9000

MySQL的参数介绍
在这里插入图片描述
HDFS参数介绍
在这里插入图片描述
运行脚本

[root@node01 datax]# bin/datax.py  job/mysqlToHDFS.json
2020-10-02 16:12:16.358 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /export/servers/datax/hook
2020-10-02 16:12:16.359 [job-0] INFO  JobContainer -[total cpu info] =>averageCpu                     | maxDeltaCpu                    | minDeltaCpu-1.00%                         | -1.00%                         | -1.00%[total gc info] =>NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTimePS MarkSweep         | 1                  | 1                  | 1                  | 0.245s             | 0.245s             | 0.245sPS Scavenge          | 1                  | 1                  | 1                  | 0.155s             | 0.155s             | 0.155s2020-10-02 16:12:16.359 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-02 16:12:16.359 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-10-02 16:12:16.360 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-10-02 16:12:04
任务结束时刻                    : 2020-10-02 16:12:16
任务总计耗时                    :                 12s
任务平均流量                    :                5B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

2.4 读取HDFS中的数据写入到Mysql

准备工作

create database test;
use test;
create table c_s2(id   varchar(100) null,c_id int          null,s_id varchar(20)  null
);

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r hdfsreader -w mysqlwriterDataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the hdfsreader document:https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.mdPlease refer to the mysqlwriter document:https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.mdPlease save the following configuration as a json file and  usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"column": [],"defaultFS": "","encoding": "UTF-8","fieldDelimiter": ",","fileType": "orc","path": ""}},"writer": {"name": "mysqlwriter","parameter": {"column": [],"connection": [{"jdbcUrl": "","table": []}],"password": "","preSql": [],"session": [],"username": "","writeMode": ""}}}],"setting": {"speed": {"channel": ""}}}
}

根据官方提供模板进行修改

[root@node01 datax]# vim job/hdfsTomysql.json
{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"column": ["*"],"defaultFS": "hdfs://node01:8020","encoding": "UTF-8","fieldDelimiter": "\t","fileType": "text","path": "/c_s.txt"}},"writer": {"name": "mysqlwriter","parameter": {"column": ["id","c_id","s_id"],"connection": [{"jdbcUrl": "jdbc:mysql://node02:3306/test","table": ["c_s2"]}],"password": "123456","username": "root","writeMode": "replace"}}}],"setting": {"speed": {"channel": "1"}}}
}

脚本运行

[root@node01 datax]# bin/datax.py job/hdfsTomysql.json[total cpu info] =>averageCpu                     | maxDeltaCpu                    | minDeltaCpu-1.00%                         | -1.00%                         | -1.00%[total gc info] =>NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTimePS MarkSweep         | 1                  | 1                  | 1                  | 0.026s             | 0.026s             | 0.026sPS Scavenge          | 1                  | 1                  | 1                  | 0.015s             | 0.015s             | 0.015s2020-10-02 16:57:13.152 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-02 16:57:13.152 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.033s | Percentage 100.00%
2020-10-02 16:57:13.153 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-10-02 16:57:02
任务结束时刻                    : 2020-10-02 16:57:13
任务总计耗时                    :                 11s
任务平均流量                    :                5B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

2.5将Mysql表导入Hive

1.在hive中建表

-- hive建表
CREATE TABLE student2 (classNo string,stuNo string,score int) 
row format delimited fields terminated by ',';-- 构造点mysql数据
create table if not exists student2(classNo varchar ( 50 ),stuNo   varchar ( 50 ),score    int 
)
insert into student2 values('1001','1012ww10087',63);
insert into student2 values('1002','1012aa10087',63);
insert into student2 values('1003','1012bb10087',63);
insert into student2 values('1004','1012cc10087',63);
insert into student2 values('1005','1012dd10087',63);
insert into student2 values('1006','1012ee10087',63);

2.编写mysql2hive.json配置文件

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","connection": [{"table": ["student2"],"jdbcUrl": ["jdbc:mysql://192.168.43.10:3306/mytestmysql"]}],"column": ["classNo","stuNo","score"]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://192.168.43.10:9000","path": "/hive/warehouse/home/myhive.db/student2","fileName": "myhive","writeMode": "append","fieldDelimiter": ",","fileType": "text","column": [{"name": "classNo","type": "string"},{"name": "stuNo","type": "string"},{"name": "score","type": "int"}]}}}]}
}

3.运行脚本

bin/datax.py job/mysql2hive.json 

4.查看hive表是否有数据

2.6将Hive表数据导入Mysql

1.要先在mysql建好表

create table if not exists student(classNo varchar ( 50 ),stuNo   varchar ( 50 ),score    int 
)

2.hive2mysql.json配置文件

{"job": {"setting": {"speed": {"channel": 3}},"content": [{"reader": {"name": "hdfsreader","parameter": {"path": "/hive/warehouse/home/myhive.db/student/*","defaultFS": "hdfs://192.168.43.10:9000","column": [{"index": 0,"type": "string"},{"index": 1,"type": "string"},{"index": 2,"type": "Long"}],"fileType": "text","encoding": "UTF-8","fieldDelimiter": ","}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["classNo","stuNo","score"],"preSql": ["delete from student"],"connection": [{"jdbcUrl": "jdbc:mysql://192.168.43.10:3306/mytestmysql?useUnicode=true&characterEncoding=utf8","table": ["student"]}]}}}]}
}

注意事项:

在Hive的ODS层建表语句中,以“,”为分隔符;
fields terminated by ','
在DataX的json文件中,也以“,”为分隔符。
"fieldDelimiter": "," 与hive表里面的分隔符保持一致即可

由于DataX不能完全支持所有Hive表的数据类型,应将DataX启动文件中的hdfsreader中的column字段的类型改成DataX支持的类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/20524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue2 第十三节 使用Vue脚手架 (二)

1. ref属性 2. props配置项 3.mixin混入 4.plugin插件 一. ref属性 ① 作用:用于给节点打标识(给元素或者组件注册引用信息,id的替代者) ② 语法: 应用在html标签上获取的是真实的DOM元素,应用在组件…

JsonPath使用和示例

JsonPath使用和示例 1 简介2 官方实例3 JsonPath与XPath语法对比4 实例说明JsonPath与XPath语法5 Python中JsonPath模块6 Python中JsonPath使用7 结合接口测试的实例 1 简介 官网:https://goessner.net/articles/JsonPath/;JsonPath 是一种简单的方法来…

vite babel 获取组件的 children 代码, 填写到 jsxCode 属性中

最终效果 <DocsModule title"类型"><Button>默认按钮</Button><Button type"primary">主要按钮</Button><Button type"success">成功按钮</Button><Button type"danger">危险按钮&l…

2023年中职组“网络安全”赛项吉安市竞赛任务书

2023年中职组“网络安全”赛项 吉安市竞赛任务书 一、竞赛时间 总计&#xff1a;360分钟 竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A模块 A-1 登录安全加固 180分钟 200分 A-2 本地安全策略配置 A-3 流量完整性保护 A-4 事件监控 A-5 服务加固…

docker logs 使用说明

docker logs 可以查看某个容器内的日志情况。 前置参数说明 c_name容器名称 / 容器ID logs 获取容器的日志 , 命令如下&#xff1a; docker logs [options] c_name option参数&#xff1a; -n 查看最近多少条记录&#xff1a;docker logs -n 5 c_name--tail与-n 一样 &#…

【iOS】锁

线程安全 当一个线程访问数据的时候&#xff0c;其他的线程不能对其进行访问&#xff0c;直到该线程访问完毕。简单来讲就是在同一时刻&#xff0c;对同一个数据操作的线程只有一个。而线程不安全&#xff0c;则是在同一时刻可以有多个线程对该数据进行访问&#xff0c;从而得…

openCV C++环境配置

文章目录 一、openCV 安装二、新建项目三、配置环境变量四、测试使用 编译器:vs2017 OpenCV:4.5.4 一、openCV 安装 将openCV安装到一个路径下&#xff0c;我安装到了D盘根目录下 二、新建项目 在vs2017新建控制台空项目&#xff0c;打开项目属性 在VC目录 -> 包含目录下…

智慧林业~经典开源项目数字孪生智慧林业——开源工程及源码

东北林业局的工程和源码免费赠送&#xff0c;帮您实现深林防火的智慧林业。 项目介绍 东北林业局作为东北地区林业管理的重要机构&#xff0c;致力于森林资源保护和防火工作。他们的项目通过先进的技术手段&#xff0c;为林业管理提供可靠的解决方案。 本项目使用数字孪生技术&…

【实操教程】如何开始用Qt Widgets编程?(一)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 在本文中&#xff0…

轮足机器人硬件总结

简介 本文主要根据“轮腿机器人Hyun”总结的硬件部分。 轮腿机器人Hyun开源地址&#xff1a;https://github.com/HuGuoXuang/Hyun 1 电源部分 1.1 78M05 78M05是一款三端稳压器芯片&#xff0c;它可以将输入电压稳定输出为5V直流电压. 1.2 AMS1117-3.3 AMS1117-3.3是一种输…

Vue实现leafletMap自定义绘制线段 并且删除指定的已绘制的点位

效果&#xff1a;点击表格可实现选中地图点位&#xff0c;删除按钮点击可删除对应点位并且重新绘制线段&#xff0c;点击确定按钮 保存已经绘制的点位信息传给父组件 并且该组件已实现回显 完整的组件代码如下 文件名称为&#xff1a; leafletMakePointYt <!--* Descripti…

FPGA优质开源模块 - SRIO

本文介绍一个FPGA常用模块&#xff1a;SRIO&#xff08;Serial RapidIO&#xff09;。SRIO协议是一种高速串行通信协议&#xff0c;在我参与的项目中主要是用于FPGA和DSP之间的高速通信。有关SRIO协议的详细介绍网上有很多&#xff0c;本文主要简单介绍一下SRIO IP核的使用和本…

SIFT算法原理:SIFT算法详细介绍

前面我们介绍了Harris和Shi-Tomasi角点检测算法&#xff0c;这两种算法具有旋转不变性&#xff0c;但不具有尺度不变性&#xff0c;以下图为例&#xff0c;在左侧小图中可以检测到角点&#xff0c;但是图像被放大后&#xff0c;在使用同样的窗口&#xff0c;就检测不到角点了。…

Excel技巧 - 管理规则设置一行变色

如何设置某一列单元格的值大于一个值时&#xff0c;该单元格所在的一整行都变色呢&#xff1f; 1、先框选内容区域&#xff0c;点击开始&#xff0c;条件格式&#xff0c;新建规则 2、如果销量大于20&#xff0c;则该行都变为绿色 编辑格式选择&#xff1a;使用公式确定要设置…

【MySQL】存储过程(十一)

🚗MySQL学习第十一站~ 🚩本文已收录至专栏:MySQL通关路 ❤️文末附全文思维导图,感谢各位点赞收藏支持~ 一.引入 存储过程是事先经过编译并存储在数据库中的一段 SQL 语句的集合,调用存储过程可以简化应用开发人员的工作,可以减少数据在数据库和应用服务器之间的传输,…

大数据技术之Clickhouse---入门篇---SQL操作、副本

星光下的赶路人star的个人主页 积一勺以成江河&#xff0c;累微尘以崇峻极 文章目录 1、SQL操作1.1 Insert1.2 Update 和 Delete1.3 查询操作1.4 alter操作1.5 导出数据 2、副本2.1 副本写入流程2.2 配置步骤 1、SQL操作 基本上来说传统关系型数据库&#xff08;以 MySQL 为例…

【雕爷学编程】MicroPython动手做(30)——物联网之Blynk 4

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

phpstudy 进行 composer 全局配置

背景 因为注意到&#xff0c;使用 phpStudy 进行环境搭建时&#xff0c;有时需要使用 composer 每次都需要查找资料进行配置&#xff0c; 在此进行记录笔记&#xff0c;方便有需要的道友借鉴 配置 版本&#xff1a;composer1.8.5&#xff08;phpStudy8 当前只能安装这一个版本…

Flink作业调度的9种状态

1.什么是作业调度 Flink 通过 Task Slots 来定义执行资源。每个 TaskManager 有一到多个 task slot&#xff0c;每个 task slot 可以运行一条由多个并行 task 组成的流水线。 这样一条流水线由多个连续的 task 组成&#xff0c;比如并行度为 n 的 MapFunction 和 并行度为 n 的…

省份数量(力扣)深度优先 JAVA

有 n 个城市&#xff0c;其中一些彼此相连&#xff0c;另一些没有相连。如果城市 a 与城市 b 直接相连&#xff0c;且城市 b 与城市 c 直接相连&#xff0c;那么城市 a 与城市c 间接相连。 省份 是一组直接或间接相连的城市&#xff0c;组内不含其他没有相连的城市。 给你一个 …