DataX源码分析 reader

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel


文章目录

  • 系列文章目录
  • 前言
  • Reader组件如何处理各类数据源
  • 源码


前言

DataX的Reader组件负责从数据源中读取数据,并将这些数据转换成DataX框架可以处理的数据格式。DataX的Reader组件采用了插件化的设计,使得添加新的数据源类型变得相对容易。只需要实现相应的Reader接口或抽象类,并提供必要的配置参数,就可以将新的数据源集成到DataX框架中。这种可扩展性使得DataX能够适应不断变化的数据环境。Reader通常与特定的数据源绑定,每种数据源类型可能都需要一个独立的Reader实现。

以下是一个简化的源码分析步骤,以DataX的MySQLReader为例:

初始化:
在DataX的任务配置文件中,会指定使用哪种Reader,并配置相应的参数,如MySQL的连接信息、查询SQL等。这些信息会被解析并传递给Reader。

构建Reader:
根据配置文件中指定的Reader类型,DataX会动态地创建相应的Reader实例。对于MySQLReader,它会调用MysqlReader.Builder来构建Reader对象。

任务准备:
Reader会执行一些准备工作,如建立与数据源的连接、准备查询语句等。对于MySQLReader,这通常包括调用openConnection方法建立数据库连接,以及调用prepare方法准备SQL查询。

读取数据:
Reader的核心功能是从数据源中读取数据。对于MySQLReader,这通常涉及到执行SQL查询,并遍历查询结果集。Reader可能会使用多线程或分批处理的方式来提高读取效率。

数据转换:
读取到的原始数据可能需要进行一些转换,以满足DataX框架或目标Writer的要求。这可能包括数据类型转换、数据清洗等。

发送数据:
读取并转换后的数据会发送给DataX的Framework,由Framework负责将数据写入目标Writer。

关闭资源:
在读取任务完成后,Reader会负责关闭与数据源相关的资源,如数据库连接等。


Reader组件如何处理各类数据源

DataX的Reader组件处理不同的数据源类型主要是通过抽象和扩展的机制来实现的。具体来说,DataX框架为每种数据源类型定义了一个Reader接口或抽象类,并为每种具体的数据源实现了相应的Reader类。

以下是DataX的Reader组件如何处理不同数据源类型的基本步骤:

抽象定义:
DataX首先定义了一个抽象的Reader接口或抽象类,该接口或抽象类定义了一组通用的方法,如init(初始化)、prepare(准备)、post(读取数据)和close(关闭资源)等。这些方法为Reader提供了统一的生命周期和数据处理流程。

具体实现:
对于每种数据源类型,DataX会创建一个具体的Reader类来实现上述接口或抽象类。例如,对于MySQL数据源,会有一个MysqlReader类;对于Oracle数据源,会有一个OracleReader类。这些具体的Reader类会根据数据源的特性来实现接口中定义的方法。

配置文件解析:
当DataX启动一个数据同步任务时,它会首先解析任务配置文件(通常是JSON格式)。配置文件中包含了任务的各种参数,包括数据源类型、Reader类型、Writer类型以及各自的配置参数。

动态加载:
DataX框架会根据配置文件中的Reader类型动态加载相应的Reader实现类。这通常是通过反射机制实现的,即根据Reader类型的字符串名称,在运行时动态加载并实例化对应的Reader类。

调用Reader方法:
一旦Reader类被加载并实例化,DataX框架会按照定义的生命周期方法调用Reader的相应方法。例如,首先调用init方法进行初始化,然后调用prepare方法准备数据源连接和查询,接着调用post方法读取数据,并在任务完成后调用close方法关闭资源。

数据转换:
在读取数据的过程中,Reader可能需要对数据进行一些转换或适配,以便与DataX框架的数据处理流程兼容。这可能包括数据类型转换、字段重命名、数据清洗等。

错误处理与日志记录:
Reader实现类还需要处理可能出现的错误和异常,并记录必要的日志信息。这有助于在数据同步过程中出现问题时进行故障排查和问题定位。

通过以上步骤,DataX的Reader组件能够灵活处理不同类型的数据源,并实现了数据从数据源到DataX框架的顺畅传输。同时,这种抽象和扩展的机制也使得DataX框架易于扩展,可以方便地添加对新数据源类型的支持。

源码


/*** 每个Reader插件在其内部内部实现Job、Task两个内部类。* * * */
public abstract class Reader extends BaseObject {/*** 每个Reader插件必须实现Job内部类。* * */public static abstract class Job extends AbstractJobPlugin {/*** 切分任务* * @param adviceNumber* *            着重说明下,adviceNumber是框架建议插件切分的任务数,插件开发人员最好切分出来的任务数>=*            adviceNumber。<br>* <br>*            之所以采取这个建议是为了给用户最好的实现,例如框架根据计算认为用户数据存储可以支持100个并发连接,*            并且用户认为需要100个并发。 此时,插件开发人员如果能够根据上述切分规则进行切分并做到>=100连接信息,*            DataX就可以同时启动100个Channel,这样给用户最好的吞吐量 <br>*            例如用户同步一张Mysql单表,但是认为可以到10并发吞吐量,插件开发人员最好对该表进行切分,比如使用主键范围切分,*            并且如果最终切分任务数到>=10,我们就可以提供给用户最大的吞吐量。 <br>* <br>*            当然,我们这里只是提供一个建议值,Reader插件可以按照自己规则切分。但是我们更建议按照框架提供的建议值来切分。 <br>* <br>*            对于ODPS写入OTS而言,如果存在预排序预切分问题,这样就可能只能按照分区信息切分,无法更细粒度切分,*            这类情况只能按照源头物理信息切分规则切分。 <br>* <br>* * * */public abstract List<Configuration> split(int adviceNumber);}public static abstract class Task extends AbstractTaskPlugin {public abstract void startRead(RecordSender recordSender);}
}

public class MysqlReader extends Reader {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Reader.Job {private static final Logger LOG = LoggerFactory.getLogger(Job.class);private Configuration originalConfig = null;private CommonRdbmsReader.Job commonRdbmsReaderJob;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);if (userConfigedFetchSize != null) {LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");}this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);this.commonRdbmsReaderJob.init(this.originalConfig);}@Overridepublic void preCheck(){init();this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);}@Overridepublic List<Configuration> split(int adviceNumber) {return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);}@Overridepublic void post() {this.commonRdbmsReaderJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderJob.destroy(this.originalConfig);}}public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;@Overridepublic void init() {this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post() {this.commonRdbmsReaderTask.post(this.readerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}}}

public class RdbmsReader extends Reader {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("reader", "rdbms");}public static class Job extends Reader.Job {private Configuration originalConfig;private CommonRdbmsReader.Job commonRdbmsReaderMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();int fetchSize = this.originalConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,Constant.DEFAULT_FETCH_SIZE);if (fetchSize < 1) {throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,String.format("您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.",fetchSize));}this.originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,fetchSize);this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job(DATABASE_TYPE);this.commonRdbmsReaderMaster.init(this.originalConfig);}@Overridepublic List<Configuration> split(int adviceNumber) {return this.commonRdbmsReaderMaster.split(this.originalConfig,adviceNumber);}@Overridepublic void post() {this.commonRdbmsReaderMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderMaster.destroy(this.originalConfig);}}public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderSlave;@Overridepublic void init() {this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderSlave = new SubCommonRdbmsReader.Task(DATABASE_TYPE);this.commonRdbmsReaderSlave.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig,recordSender, super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post() {this.commonRdbmsReaderSlave.post(this.readerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderSlave.destroy(this.readerSliceConfig);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/678454.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARP欺骗攻击利用之抓取https协议的用户名与密码

1.首先安装sslstrip 命令执行&#xff1a;apt-get install sslstrip 2.启动arp欺骗 arpspoof -i ech0 -t 192.168.159.148 192.168.159.2 arpspoof -i ech0(网卡) -t 目标机ip 本地局域网关 3.命令行输入: vim /etc/ettercap/etter.conf进入配置文件 找到下红框的内容&a…

【Linux】学习-深入了解文件的读与写

深入了解语言级别(C语言)文件操作的"读"与"写" 在学习前&#xff0c;我们先要知道在Linux下的一个原则&#xff1a;一切皆是文件 如何理解呢&#xff1f;举个外设的例子&#xff0c;比如键盘和显示器&#xff0c;这两个外设也可以其实本质上也是文件&…

Qt Windows和Android使用MuPDF预览PDF文件

文章目录 1. Windows MuPDF编译2. Android MuPDF编译3. 引用 MuPDF 库4. 解析本地PDF文件 1. Windows MuPDF编译 使用如下命令将MuPDF的源码克隆到本地 git clone --recursive git://git.ghostscript.com/mupdf.git直接用VS&#xff0c;打开 mupdf/platform/win32/mupdf.sln …

pandas 按相同站号重新整合出一个dataframe

情况1&#xff1a; 如果两个DataFrame都有一个共同的列&#xff08;不是索引&#xff09;&#xff0c;你可以使用merge或join来整合它们。 import pandas as pd # 创建两个示例DataFrame df1 pd.DataFrame({ ID: [001, 002, 003], A: [foo, bar, baz] }) df2 pd.Dat…

docker 部署 mongodb 集群【建议收藏】

一、简洁搭建mognodb副本集 环境说明 我都是在云服务器上搭建的&#xff0c;CentOS7&#xff0c;Docker环境&#xff0c;版本忘记了。我就直接在同一台服务器上搭建三个mongodb即可。 1、基本信息如下 服务器地址 www.it307.top 副本集名称 rs 容器节点及端口映射 ​ m0…

数据结构——6.1 图的基本概念

第六章 图 6.1 图的基本概念 概念 图的概念&#xff1a;G由点集V和边集E构成&#xff0c;记为G(V,E)&#xff0c;边集可以为空&#xff0c;但是点集不能为空 注意&#xff1a;线性表可以是空表&#xff0c;树可以是空树&#xff0c;但图不可以是空&#xff0c;即V一定是非空集…

leetcode:63.不同路径二

dp数组含义&#xff1a;由初始位置到最终位置路径个数 递推公式&#xff1a;如果没有障碍再进行递推公式 初始化&#xff1a;1.若起始位置和终止位置有障碍路径个数为0 2.dp[i][0] 1和dp[0][j] 1的for循环条件都需要加上一个and dp[i][0] 0和and dp[0][j] 0. 3.遍历顺序…

三维形体投影面积(c++题解)

题目描述 在 n x n 的网格 grid 中&#xff0c;我们放置了一些与 x&#xff0c;y&#xff0c;z 三轴对齐的 1 x 1 x 1 立方体。 每个值 v grid[i][j] 表示 v 个正方体叠放在单元格 (i, j) 上。 现在&#xff0c;我们查看这些立方体在 xy 、yz 和 zx 平面上的投影。 投影 就…

案例:三台主机实现 级联复制

介绍&#xff1a;级联复制架构 级联复制架构 是一种特殊的主从结构&#xff0c;之前聊到的几种主从结构都只有两层&#xff0c;但级联复制架构中会有三层&#xff0c;关系如下&#xff1a; 也就是在级联复制架构中&#xff0c;存在两层从库&#xff0c;这实际上属于一主多从架…

Deepin基本环境查看(九)【被封印的创世神】

文章目录 - 相关文章目录1、概述2、Deepin中的创世神和管理员1&#xff09;创世神root2&#xff09;root被封印原因3&#xff09;其他的神灵【管理员】 3、神殿管理【su与sudo】1&#xff09;su&#xff08;Switch User&#xff09;2&#xff09;sudo&#xff08;Superuser Do&…

Open CASCADE学习|环形弹簧建模

目录 Draw Test Harness&#xff1a; C&#xff1a; 环形弹簧&#xff0c;也称为弓簧&#xff0c;是由拉伸弹簧和连接弹簧构成的。在结构上&#xff0c;环形弹簧通常包括端环、外环和内环&#xff0c;其主要参数包括弹簧的内径、外径和自由高度。环形弹簧的一个显著特点是&am…

计算机毕业设计SSM基于的冷链食品物流信息管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; vue mybatis Maven mysql5.7或8.0等等组成&#xff0c;B…

prometheus之redis_exporter部署

下载解压压缩包 mkdir /opt/redis_exporter/ cd /opt/redis_exporter/ wget http://soft.download/soft/linux/prometheus/redis_exporter/redis_exporter-v1.50.0.linux-amd64.tar.gz tar -zxvf redis_exporter-v1.50.0.linux-amd64.tar.gz ln -s /opt/redis_exporter/redis_…

网络原理(一)

&#x1f495;"Echo"&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;网络原理(一) 一. 应用层 应用层是和程序员联系最密切的一层,对于应用层来说,程序员可以自定义应用层协议,应用层的协议一般要约定好以下两部分内容: 根据需求,明确要传输哪些信…

[算法学习]

矩阵乘法 只有当左矩阵列数等于右矩阵行数&#xff0c;才能相乘N*M的矩阵和M*K的矩阵做乘法后矩阵大小为N*k矩阵乘法规则&#xff1a;第一个矩阵A的第 i 行与第二个矩阵的第 j 列的各M个元素对应相乘再相加得到新矩阵C[i][j]的值 整除 同余 同余的性质 线性运算&#xff0c;…

服务器渲染(SSR)-前端框架

Next.js、Nuxt.js和Remix都是基于Node.js比较流行三个前端框架&#xff0c;它们都基于JavaScript和React/Vue构建&#xff0c;并提供了一些额外的功能和工具来简化开发过程。 1. Next.js是一个基于React的前端框架&#xff0c;它提供了服务器渲染&#xff08;SSR&#xff09;和…

Android:Cordova,JavaScript操作设备功能

Cordova学习 Cordova提供了一组设备相关的API,通过这组API,移动应用能够以JavaScript访问原生的设备功能,如摄像头、麦克风等。 Cordova还提供了一组统一的JavaScript类库,以及为这些类库所用的设备相关的原生后台代码。 Cordova是PhoneGap贡献给Apache后的开源项目,是从…

最适合新手的SpringBoot+SSM项目《苍穹外卖》实战—(一)项目概述

黑马程序员最新Java项目实战《苍穹外卖》&#xff0c;最适合新手的SpringBootSSM的企业级Java项目实战。 项目简介 《苍穹外卖》项目的定位是一款为餐饮企业&#xff08;餐厅、饭店&#xff09;定制的软件产品。该项目是一个在线外卖订购系统&#xff0c;顾客可以通过网站或者…

Java安全 CC链1分析(Lazymap类)

Java安全 CC链1分析 前言CC链分析CC链1核心LazyMap类AnnotationInvocationHandler类 完整exp&#xff1a; 前言 在看这篇文章前&#xff0c;可以看下我的上一篇文章&#xff0c;了解下cc链1的核心与环境配置 Java安全 CC链1分析 前面我们已经讲过了CC链1的核心ChainedTransf…

TestNG基础教程

TestNG基础教程 一、常用断言二、执行顺序三、依赖测试四、参数化测试1、通过dataProvider实现2、通过xml配置&#xff08;这里是直接跑xml&#xff09; 五、testng.xml常用配置方式1、分组维度控制2、类维度配置3、包维度配置 六、TestNG并发测试1、通过注解来实现2、通过xml来…