DataX 数据库同步部分源码解析

        在工作中遇到异构数据库同步的问题,从Oracle数据库同步数据到Postgres,其中的很多数据库表超过百万,并且包含空间字段。经过筛选,选择了开源的DataX+DataX Web作为基础框架。DataX 是阿里云的开源产品,大厂的产品值得信赖,而且,DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久,每天完成同步8w多道作业,每日传输数据量超过300TB,经过了时间、实践的检验。这里顺便分析一下源码,看看大厂的程序员是怎么实现数据库的快速全表查询、写入操作,怎么进行多线程管理的。

部分内容参见:        

        GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。        

         DataX/introduction.md at master · alibaba/DataX · GitHub

        DataX/dataxPluginDev.md at master · alibaba/DataX · GitHub

一、DataX介绍

        DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

​        DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

datax_why_new

       为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

二、源码解析(基于DataX v202309版本)

datax_framework_new

        DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

2.1 Reader 源码解析

        oraclereader插件包括Constant.java、OracleReader.java和OracleReaderErrorCode.java三个Java类。先关注一下OracleReader,OracleReader继承Reader基类,在其中,通过内部类Task实现读取数据库操作,将读取的数据交由框架处理。具体为CommonRdbmsReader.Task来实现。在代码中包含了commonRdbmsReaderTask的初始化及读取数据操作等内容。核心为this.commonRdbmsReaderTask.startRead。

	public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;@Overridepublic void init() {this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig,recordSender, super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post() {this.commonRdbmsReaderTask.post(this.readerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}}

        CommonRdbmsReader.Task的startRead方法如下:

public void startRead(Configuration readerSliceConfig,RecordSender recordSender,TaskPluginCollector taskPluginCollector, int fetchSize) {String querySql = readerSliceConfig.getString(Key.QUERY_SQL);String table = readerSliceConfig.getString(Key.TABLE);PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);LOG.info("Begin to read record by Sql: [{}\n] {}.",querySql, basicMsg);PerfRecord queryPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY);queryPerfRecord.start();Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password);// session config .etc relatedDBUtil.dealWithSessionConfig(conn, readerSliceConfig,this.dataBaseType, basicMsg);int columnNumber = 0;ResultSet rs = null;try {rs = DBUtil.query(conn, querySql, fetchSize);queryPerfRecord.end();ResultSetMetaData metaData = rs.getMetaData();columnNumber = metaData.getColumnCount();//这个统计干净的result_Next时间PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);allResultPerfRecord.start();long rsNextUsedTime = 0;long lastTime = System.nanoTime();while (rs.next()) {rsNextUsedTime += (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime = System.nanoTime();}allResultPerfRecord.end(rsNextUsedTime);//目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间LOG.info("Finished read record by Sql: [{}\n] {}.",querySql, basicMsg);} catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);} finally {DBUtil.closeDBResources(null, conn);}}

        上述代码可见查询数据库的常规步骤。

1.建立数据库链接

Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password);

        DBUtil内部通过原生Jdbc实现,代码如下:

private static synchronized Connection connect(DataBaseType dataBaseType,String url, Properties prop) {try {Class.forName(dataBaseType.getDriverClassName());DriverManager.setLoginTimeout(Constant.TIMEOUT_SECONDS);return DriverManager.getConnection(url, prop);} catch (Exception e) {throw RdbmsException.asConnException(dataBaseType, e, prop.getProperty("user"), null);}}

2.执行查询操作,返回ResultSet

ResultSet rs = null;
try {rs = DBUtil.query(conn, querySql, fetchSize);
}catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {DBUtil.closeDBResources(null, conn);
} 

        DBUtil内部查询实现代码如下。

    /*** a wrapped method to execute select-like sql statement .** @param conn Database connection .* @param sql  sql statement to be executed* @return a {@link ResultSet}* @throws SQLException if occurs SQLException.*/public static ResultSet query(Connection conn, String sql, int fetchSize)throws SQLException {// 默认3600 s 的query Timeoutreturn query(conn, sql, fetchSize, Constant.SOCKET_TIMEOUT_INSECOND);}/*** a wrapped method to execute select-like sql statement .** @param conn         Database connection .* @param sql          sql statement to be executed* @param fetchSize* @param queryTimeout unit:second* @return* @throws SQLException*/public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)throws SQLException {// make sure autocommit is offconn.setAutoCommit(false);Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(fetchSize);stmt.setQueryTimeout(queryTimeout);return query(stmt, sql);}/*** a wrapped method to execute select-like sql statement .** @param stmt {@link Statement}* @param sql  sql statement to be executed* @return a {@link ResultSet}* @throws SQLException if occurs SQLException.*/public static ResultSet query(Statement stmt, String sql)throws SQLException {return stmt.executeQuery(sql);}

3.获取数据元数据信息

ResultSetMetaData metaData = rs.getMetaData();

4.遍历数据,对数据进行转换并传递给框架

while (rs.next()) {rsNextUsedTime += (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime = System.nanoTime();
}

5.在finally块中关闭数据链接

finally {DBUtil.closeDBResources(null, conn);
} 

        DBUtil内部关闭链接代码如下。很明显上述代码调用时只传入了connection,会造成只关闭链接,未关闭ResultSet和Statement,有瑕疵。

    public static void closeDBResources(ResultSet rs, Statement stmt,Connection conn) {if (null != rs) {try {rs.close();} catch (SQLException unused) {}}if (null != stmt) {try {stmt.close();} catch (SQLException unused) {}}if (null != conn) {try {conn.close();} catch (SQLException unused) {}}}public static void closeDBResources(Statement stmt, Connection conn) {closeDBResources(null, stmt, conn);}

        在第2步DBUtil内部的查询代码部分,指定了fetchSize参数。

stmt.setFetchSize(fetchSize);

        fetchSize是实现读取数据源表的关键点之一。简单理解,fetchSize定义了本地缓存大小,例如,fetchSize=1000即可简单理解为本地缓存区大小为1000条数据大小,当执行ResultSet.next取数据时,如果本地缓存中没有数据,会从数据库中取出1000条(剩余数据大于1000时为1000,小于1000时为剩余数据)数据放到缓存中,接下来的rs.next操作就是从本地缓存中读取数据,直至缓存区为空才再次请求数据库。通过减少与数据库的交互次数,提升性能。

        如果 fetchsize 设置的太小,会导致程序频繁地访问数据库,影响性能;如果 fetchsize 设置的太大,则可能会导致内存不足。在oraclereader插件的代码Constant.java中定义了fetchSize的默认值。

package com.alibaba.datax.plugin.reader.oraclereader;public class Constant {public static final int DEFAULT_FETCH_SIZE = 1024;}

 接下来我们看一下transportOneRecord的代码,该代码将一条数据进行转换后传递给Writer。

protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector) {Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); recordSender.sendToWriter(record);return record;
}

        buildRecord方法将一条数据的各个字段按照类型转换为标准数据,方便后续各类数据库写入插件使用实现数据插入。如果数据中包含了不支持的其他字段类型,需要在SQL中通过转换函数进行转换,否则对于不支持的其他字段类型,或在转换过程中出现其他错误,这条数据将被作为脏数据扔掉。当然,也可以修改buildRecord方法代码,让DataX支持更多数据类型的查询和写入。代码如下:

protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,TaskPluginCollector taskPluginCollector) {Record record = recordSender.createRecord();try {for (int i = 1; i <= columnNumber; i++) {switch (metaData.getColumnType(i)) {case Types.CHAR:case Types.NCHAR:case Types.VARCHAR:case Types.LONGVARCHAR:case Types.NVARCHAR:case Types.LONGNVARCHAR:String rawData;if (StringUtils.isBlank(mandatoryEncoding)) {rawData = rs.getString(i);} else {rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY :rs.getBytes(i)), mandatoryEncoding);}record.addColumn(new StringColumn(rawData));break;case Types.CLOB:case Types.NCLOB:record.addColumn(new StringColumn(rs.getString(i)));break;case Types.SMALLINT:case Types.TINYINT:case Types.INTEGER:case Types.BIGINT:record.addColumn(new LongColumn(rs.getString(i)));break;case Types.NUMERIC:case Types.DECIMAL:record.addColumn(new DoubleColumn(rs.getString(i)));break;case Types.FLOAT:case Types.REAL:case Types.DOUBLE:record.addColumn(new DoubleColumn(rs.getString(i)));break;case Types.TIME:record.addColumn(new DateColumn(rs.getTime(i)));break;// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115case Types.DATE:if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {record.addColumn(new LongColumn(rs.getInt(i)));} else {record.addColumn(new DateColumn(rs.getDate(i)));}break;case Types.TIMESTAMP:record.addColumn(new DateColumn(rs.getTimestamp(i)));break;case Types.BINARY:case Types.VARBINARY:case Types.BLOB:case Types.LONGVARBINARY:record.addColumn(new BytesColumn(rs.getBytes(i)));break;// warn: bit(1) -> Types.BIT 可使用BoolColumn// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumncase Types.BOOLEAN:case Types.BIT:record.addColumn(new BoolColumn(rs.getBoolean(i)));break;case Types.NULL:String stringData = null;if (rs.getObject(i) != null) {stringData = rs.getObject(i).toString();}record.addColumn(new StringColumn(stringData));break;case Types.OTHER:if (dataBaseType == DataBaseType.PostgreSQL) {record.addColumn(new StringColumn(rs.getString(i)));break;}default:throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE,String.format("您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",metaData.getColumnName(i),metaData.getColumnType(i),metaData.getColumnClassName(i)));}}} catch (Exception e) {if (IS_DEBUG) {LOG.debug("read data " + record.toString()+ " occur exception:", e);}//TODO 这里识别为脏数据靠谱吗?taskPluginCollector.collectDirtyRecord(record, e);if (e instanceof DataXException) {throw (DataXException) e;}}return record;}
}

未完待续...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/802886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue.js组件精讲 组件的通信2:派发与广播——自行实现dispatch和broadcast方法

上一讲的 provide / inject API 主要解决了跨级组件间的通信问题&#xff0c;不过它的使用场景&#xff0c;主要是子组件获取上级组件的状态&#xff0c;跨级组件间建立了一种主动提供与依赖注入的关系。然后有两种场景它不能很好的解决&#xff1a; 父组件向子组件&#xff0…

transformer上手(2) —— 注意力机制

自从 2017 年 Google 发布《Attention is All You Need》之后&#xff0c;各种基于 Transformer 的模型和方法层出不穷。尤其是 2018 年&#xff0c;OpenAI 发布的 GPT 和 Google 发布的 BERT 模型在几乎所有 NLP 任务上都取得了远超先前最强基准的性能&#xff0c;将 Transfor…

Java多路查找树(含面试大厂题和源码)

多路查找树&#xff08;Multiway Search Tree&#xff09;&#xff0c;也称为B树或B树&#xff0c;是一种自平衡的树形数据结构&#xff0c;用于存储大量数据&#xff0c;通常用于数据库和文件系统中。它允许在查找、插入和删除操作中保持数据的有序性&#xff0c;同时优化了磁…

【蓝桥杯每日一题】4.9网络分析(代码详解版)

终于把清明节假期时自己挖的坑给补上了 题目来源&#xff1a; 2069. 网络分析 - AcWing题库 参考&#xff1a; Bear_king的图解 y总代码解读 思路1&#xff1a; 思考&#xff1a; 题目看着&#xff0c;看到“发送信息后&#xff0c;又会发送到相邻的结点上面”这句话&am…

js通过Object.defineProperty实现数据响应式

目录 数据响应式属性描述符propertyResponsive 依赖收集依赖队列寻找依赖 观察器 派发更新Observer完整代码关于数据响应式关于Object.defineProperty的限制 数据响应式 假设我们现在有这么一个页面 <!DOCTYPE html> <html lang"en"><head><m…

Oracle表空间满清理方案汇总分享

目录 前言思考 一、第一种增加表空间的数据文件数量达到总容量的提升 二、第二种解决方案针对system和sysaux的操作 2.1SYSTEM表空间优化 2.2sysaux表空间回收 2.2.1针对sysaux的表空间爆满还有第二套方案维护 三、第三种解决方案使用alter tablespace resize更改表空间的…

深入浅出 -- 系统架构之微服务架构的新挑战

尽管微服务架构有着高度独立的软件模块、单一的业务职责、可灵活调整的技术栈等优势&#xff0c;但也不能忽略它所带来的弊端。本篇文章&#xff0c;我们从网络、性能、运维、组织架构和集成测试五个方面来聊一下设计微服务架构需要考虑哪些问题&#xff0c;对设计有哪些挑战呢…

Webots常用的执行器(Python版)

文章目录 1. RotationalMotor2. LinearMotor3. Brake4. Propeller5. Pen6. LED 1. RotationalMotor # -*- coding: utf-8 -*- """motor_controller controller."""from controller import Robot# 实例化机器人 robot Robot()# 获取基本仿真步长…

ChatGPT/GPT4科研应用与绘图技术及论文写作

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…

2024年第十七届“认证杯”数学中国数学建模网络挑战赛思路

2024年第十七届“认证杯”数学中国数学建模网络挑战赛将于2024年4月举行。 比赛两个阶段统一报名&#xff0c;参赛费为每队100元人民币&#xff08;两个阶段总共&#xff09;。如果需要组委会提供详细的论文评价&#xff0c;需要再支付100元人民币的论文点评费(即每个参赛队支…

c++的学习之路:19、模板

摘要 本章主要是说了一些模板&#xff0c;如非类型模板参数、类模板的特化等等&#xff0c;文章末附上测试代码与导图 目录 摘要 一、非类型模板参数 二、类模板的特化 1、概念 2、函数模板特化 3、类模板特化 三、模板的分离编译 1、什么是分离编译 2、模板的分离编…

2024.4.8力扣每日一题——使数组连续的最少操作数

2024.4.8 题目来源我的题解方法一 去重排序滑动窗口 题目来源 力扣每日一题&#xff1b;题序&#xff1a;2009 我的题解 方法一 去重排序滑动窗口 参考官方题解。 记数组 nums的长度为 n。经过若干次操作后&#xff0c;若数组变为连续的&#xff0c;那么数组的长度不会改变&…

ip地址切换器安卓版,保护隐私,自由上网

在移动互联网时代&#xff0c;随着智能手机和平板电脑的普及&#xff0c;移动设备的网络连接变得愈发重要。为了满足用户在不同网络环境下的需求&#xff0c;IP地址切换器安卓版应运而生。本文将以虎观代理为例&#xff0c;为您详细解析IP地址切换器安卓版的功能、应用以及其所…

UVA1596 Bug Hunt 找Bug 解题报告

题目链接 https://vjudge.net/problem/UVA-1596 题目大意 输入并模拟执行一段程序&#xff0c;输出第一个bug所在的行。每行程序有两种可能&#xff1a; 数组定义&#xff0c;格式为arr[size]。例如a[10]或者b[5]&#xff0c;可用下标分别是0&#xff5e;9和0&#xff5e;4…

Linux压缩打包

压缩文件有时候也叫归档文件&#xff0c;但是归档是将多个文件捆绑成一个文件&#xff0c;并没有压缩&#xff0c;压缩才是将大小压缩的更小。 tar 压缩 tar -zcf 压缩后文件名.tar.gz 需要压缩的文件 [rootlocalhost ~]# tar -zcf ser.tar.gz services压缩多个文件 [rootloca…

克服与新一代人工智能部署相关的数据挑战

随着商界领袖逐渐了解该技术的力量和潜力&#xff0c;人们对 ChatGPT 等生成式人工智能工具的潜力的兴趣正在迅速上升。 这些工具能够创建以前属于人类创造力和智力领域的输出&#xff0c;有潜力改变许多业务流程&#xff0c;并成为每个人&#xff08;从作家和创作者到程序员和…

题目:学习使用按位异或 ^

题目&#xff1a;学习使用按位异或 ^ There is no nutrition in the blog content. After reading it, you will not only suffer from malnutrition, but also impotence. The blog content is all parallel goods. Those who are worried about being cheated should leave q…

蓝桥杯加训

1.两只塔姆沃斯牛&#xff08;模拟&#xff09; 思路&#xff1a;人和牛都记录三个数据&#xff0c;当前坐标和走的方向&#xff0c;如果人和牛的坐标和方向走重复了&#xff0c;那就说明一直在绕圈圈&#xff0c;无解 #include<iostream> using namespace std; const i…

openstack-认证服务

整个OpenStack是由控制节点&#xff0c;计算节点&#xff0c;网络节点&#xff0c;存储节点四大部分组成。 openstack重要集成组件: Nova-计算服务&#xff1b;Neutron-网络服务&#xff1b;Swift-对象存储服务&#xff1b;Cinder-块存储服务&#xff1b;Glance-镜像服务Keys…

LeetCode-118. 杨辉三角【数组 动态规划】

LeetCode-118. 杨辉三角【数组 动态规划】 题目描述&#xff1a;解题思路一&#xff1a;Python 动态规划解题思路二&#xff1a;解题思路三&#xff1a;0 题目描述&#xff1a; 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&…