DataX 数据库同步部分源码解析

        在工作中遇到异构数据库同步的问题,从Oracle数据库同步数据到Postgres,其中的很多数据库表超过百万,并且包含空间字段。经过筛选,选择了开源的DataX+DataX Web作为基础框架。DataX 是阿里云的开源产品,大厂的产品值得信赖,而且,DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久,每天完成同步8w多道作业,每日传输数据量超过300TB,经过了时间、实践的检验。这里顺便分析一下源码,看看大厂的程序员是怎么实现数据库的快速全表查询、写入操作,怎么进行多线程管理的。

部分内容参见:        

        GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。        

         DataX/introduction.md at master · alibaba/DataX · GitHub

        DataX/dataxPluginDev.md at master · alibaba/DataX · GitHub

一、DataX介绍

        DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

​        DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

datax_why_new

       为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

二、源码解析(基于DataX v202309版本)

datax_framework_new

        DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

2.1 Reader 源码解析

        oraclereader插件包括Constant.java、OracleReader.java和OracleReaderErrorCode.java三个Java类。先关注一下OracleReader,OracleReader继承Reader基类,在其中,通过内部类Task实现读取数据库操作,将读取的数据交由框架处理。具体为CommonRdbmsReader.Task来实现。在代码中包含了commonRdbmsReaderTask的初始化及读取数据操作等内容。核心为this.commonRdbmsReaderTask.startRead。

	public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;@Overridepublic void init() {this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig,recordSender, super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post() {this.commonRdbmsReaderTask.post(this.readerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}}

        CommonRdbmsReader.Task的startRead方法如下:

public void startRead(Configuration readerSliceConfig,RecordSender recordSender,TaskPluginCollector taskPluginCollector, int fetchSize) {String querySql = readerSliceConfig.getString(Key.QUERY_SQL);String table = readerSliceConfig.getString(Key.TABLE);PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);LOG.info("Begin to read record by Sql: [{}\n] {}.",querySql, basicMsg);PerfRecord queryPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY);queryPerfRecord.start();Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password);// session config .etc relatedDBUtil.dealWithSessionConfig(conn, readerSliceConfig,this.dataBaseType, basicMsg);int columnNumber = 0;ResultSet rs = null;try {rs = DBUtil.query(conn, querySql, fetchSize);queryPerfRecord.end();ResultSetMetaData metaData = rs.getMetaData();columnNumber = metaData.getColumnCount();//这个统计干净的result_Next时间PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);allResultPerfRecord.start();long rsNextUsedTime = 0;long lastTime = System.nanoTime();while (rs.next()) {rsNextUsedTime += (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime = System.nanoTime();}allResultPerfRecord.end(rsNextUsedTime);//目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间LOG.info("Finished read record by Sql: [{}\n] {}.",querySql, basicMsg);} catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);} finally {DBUtil.closeDBResources(null, conn);}}

        上述代码可见查询数据库的常规步骤。

1.建立数据库链接

Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password);

        DBUtil内部通过原生Jdbc实现,代码如下:

private static synchronized Connection connect(DataBaseType dataBaseType,String url, Properties prop) {try {Class.forName(dataBaseType.getDriverClassName());DriverManager.setLoginTimeout(Constant.TIMEOUT_SECONDS);return DriverManager.getConnection(url, prop);} catch (Exception e) {throw RdbmsException.asConnException(dataBaseType, e, prop.getProperty("user"), null);}}

2.执行查询操作,返回ResultSet

ResultSet rs = null;
try {rs = DBUtil.query(conn, querySql, fetchSize);
}catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {DBUtil.closeDBResources(null, conn);
} 

        DBUtil内部查询实现代码如下。

    /*** a wrapped method to execute select-like sql statement .** @param conn Database connection .* @param sql  sql statement to be executed* @return a {@link ResultSet}* @throws SQLException if occurs SQLException.*/public static ResultSet query(Connection conn, String sql, int fetchSize)throws SQLException {// 默认3600 s 的query Timeoutreturn query(conn, sql, fetchSize, Constant.SOCKET_TIMEOUT_INSECOND);}/*** a wrapped method to execute select-like sql statement .** @param conn         Database connection .* @param sql          sql statement to be executed* @param fetchSize* @param queryTimeout unit:second* @return* @throws SQLException*/public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)throws SQLException {// make sure autocommit is offconn.setAutoCommit(false);Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(fetchSize);stmt.setQueryTimeout(queryTimeout);return query(stmt, sql);}/*** a wrapped method to execute select-like sql statement .** @param stmt {@link Statement}* @param sql  sql statement to be executed* @return a {@link ResultSet}* @throws SQLException if occurs SQLException.*/public static ResultSet query(Statement stmt, String sql)throws SQLException {return stmt.executeQuery(sql);}

3.获取数据元数据信息

ResultSetMetaData metaData = rs.getMetaData();

4.遍历数据,对数据进行转换并传递给框架

while (rs.next()) {rsNextUsedTime += (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime = System.nanoTime();
}

5.在finally块中关闭数据链接

finally {DBUtil.closeDBResources(null, conn);
} 

        DBUtil内部关闭链接代码如下。很明显上述代码调用时只传入了connection,会造成只关闭链接,未关闭ResultSet和Statement,有瑕疵。

    public static void closeDBResources(ResultSet rs, Statement stmt,Connection conn) {if (null != rs) {try {rs.close();} catch (SQLException unused) {}}if (null != stmt) {try {stmt.close();} catch (SQLException unused) {}}if (null != conn) {try {conn.close();} catch (SQLException unused) {}}}public static void closeDBResources(Statement stmt, Connection conn) {closeDBResources(null, stmt, conn);}

        在第2步DBUtil内部的查询代码部分,指定了fetchSize参数。

stmt.setFetchSize(fetchSize);

        fetchSize是实现读取数据源表的关键点之一。简单理解,fetchSize定义了本地缓存大小,例如,fetchSize=1000即可简单理解为本地缓存区大小为1000条数据大小,当执行ResultSet.next取数据时,如果本地缓存中没有数据,会从数据库中取出1000条(剩余数据大于1000时为1000,小于1000时为剩余数据)数据放到缓存中,接下来的rs.next操作就是从本地缓存中读取数据,直至缓存区为空才再次请求数据库。通过减少与数据库的交互次数,提升性能。

        如果 fetchsize 设置的太小,会导致程序频繁地访问数据库,影响性能;如果 fetchsize 设置的太大,则可能会导致内存不足。在oraclereader插件的代码Constant.java中定义了fetchSize的默认值。

package com.alibaba.datax.plugin.reader.oraclereader;public class Constant {public static final int DEFAULT_FETCH_SIZE = 1024;}

 接下来我们看一下transportOneRecord的代码,该代码将一条数据进行转换后传递给Writer。

protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector) {Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); recordSender.sendToWriter(record);return record;
}

        buildRecord方法将一条数据的各个字段按照类型转换为标准数据,方便后续各类数据库写入插件使用实现数据插入。如果数据中包含了不支持的其他字段类型,需要在SQL中通过转换函数进行转换,否则对于不支持的其他字段类型,或在转换过程中出现其他错误,这条数据将被作为脏数据扔掉。当然,也可以修改buildRecord方法代码,让DataX支持更多数据类型的查询和写入。代码如下:

protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,TaskPluginCollector taskPluginCollector) {Record record = recordSender.createRecord();try {for (int i = 1; i <= columnNumber; i++) {switch (metaData.getColumnType(i)) {case Types.CHAR:case Types.NCHAR:case Types.VARCHAR:case Types.LONGVARCHAR:case Types.NVARCHAR:case Types.LONGNVARCHAR:String rawData;if (StringUtils.isBlank(mandatoryEncoding)) {rawData = rs.getString(i);} else {rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY :rs.getBytes(i)), mandatoryEncoding);}record.addColumn(new StringColumn(rawData));break;case Types.CLOB:case Types.NCLOB:record.addColumn(new StringColumn(rs.getString(i)));break;case Types.SMALLINT:case Types.TINYINT:case Types.INTEGER:case Types.BIGINT:record.addColumn(new LongColumn(rs.getString(i)));break;case Types.NUMERIC:case Types.DECIMAL:record.addColumn(new DoubleColumn(rs.getString(i)));break;case Types.FLOAT:case Types.REAL:case Types.DOUBLE:record.addColumn(new DoubleColumn(rs.getString(i)));break;case Types.TIME:record.addColumn(new DateColumn(rs.getTime(i)));break;// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115case Types.DATE:if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {record.addColumn(new LongColumn(rs.getInt(i)));} else {record.addColumn(new DateColumn(rs.getDate(i)));}break;case Types.TIMESTAMP:record.addColumn(new DateColumn(rs.getTimestamp(i)));break;case Types.BINARY:case Types.VARBINARY:case Types.BLOB:case Types.LONGVARBINARY:record.addColumn(new BytesColumn(rs.getBytes(i)));break;// warn: bit(1) -> Types.BIT 可使用BoolColumn// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumncase Types.BOOLEAN:case Types.BIT:record.addColumn(new BoolColumn(rs.getBoolean(i)));break;case Types.NULL:String stringData = null;if (rs.getObject(i) != null) {stringData = rs.getObject(i).toString();}record.addColumn(new StringColumn(stringData));break;case Types.OTHER:if (dataBaseType == DataBaseType.PostgreSQL) {record.addColumn(new StringColumn(rs.getString(i)));break;}default:throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE,String.format("您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",metaData.getColumnName(i),metaData.getColumnType(i),metaData.getColumnClassName(i)));}}} catch (Exception e) {if (IS_DEBUG) {LOG.debug("read data " + record.toString()+ " occur exception:", e);}//TODO 这里识别为脏数据靠谱吗?taskPluginCollector.collectDirtyRecord(record, e);if (e instanceof DataXException) {throw (DataXException) e;}}return record;}
}

未完待续...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/802886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

transformer上手(2) —— 注意力机制

自从 2017 年 Google 发布《Attention is All You Need》之后&#xff0c;各种基于 Transformer 的模型和方法层出不穷。尤其是 2018 年&#xff0c;OpenAI 发布的 GPT 和 Google 发布的 BERT 模型在几乎所有 NLP 任务上都取得了远超先前最强基准的性能&#xff0c;将 Transfor…

js通过Object.defineProperty实现数据响应式

目录 数据响应式属性描述符propertyResponsive 依赖收集依赖队列寻找依赖 观察器 派发更新Observer完整代码关于数据响应式关于Object.defineProperty的限制 数据响应式 假设我们现在有这么一个页面 <!DOCTYPE html> <html lang"en"><head><m…

Oracle表空间满清理方案汇总分享

目录 前言思考 一、第一种增加表空间的数据文件数量达到总容量的提升 二、第二种解决方案针对system和sysaux的操作 2.1SYSTEM表空间优化 2.2sysaux表空间回收 2.2.1针对sysaux的表空间爆满还有第二套方案维护 三、第三种解决方案使用alter tablespace resize更改表空间的…

深入浅出 -- 系统架构之微服务架构的新挑战

尽管微服务架构有着高度独立的软件模块、单一的业务职责、可灵活调整的技术栈等优势&#xff0c;但也不能忽略它所带来的弊端。本篇文章&#xff0c;我们从网络、性能、运维、组织架构和集成测试五个方面来聊一下设计微服务架构需要考虑哪些问题&#xff0c;对设计有哪些挑战呢…

Webots常用的执行器(Python版)

文章目录 1. RotationalMotor2. LinearMotor3. Brake4. Propeller5. Pen6. LED 1. RotationalMotor # -*- coding: utf-8 -*- """motor_controller controller."""from controller import Robot# 实例化机器人 robot Robot()# 获取基本仿真步长…

ChatGPT/GPT4科研应用与绘图技术及论文写作

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…

c++的学习之路:19、模板

摘要 本章主要是说了一些模板&#xff0c;如非类型模板参数、类模板的特化等等&#xff0c;文章末附上测试代码与导图 目录 摘要 一、非类型模板参数 二、类模板的特化 1、概念 2、函数模板特化 3、类模板特化 三、模板的分离编译 1、什么是分离编译 2、模板的分离编…

ip地址切换器安卓版,保护隐私,自由上网

在移动互联网时代&#xff0c;随着智能手机和平板电脑的普及&#xff0c;移动设备的网络连接变得愈发重要。为了满足用户在不同网络环境下的需求&#xff0c;IP地址切换器安卓版应运而生。本文将以虎观代理为例&#xff0c;为您详细解析IP地址切换器安卓版的功能、应用以及其所…

克服与新一代人工智能部署相关的数据挑战

随着商界领袖逐渐了解该技术的力量和潜力&#xff0c;人们对 ChatGPT 等生成式人工智能工具的潜力的兴趣正在迅速上升。 这些工具能够创建以前属于人类创造力和智力领域的输出&#xff0c;有潜力改变许多业务流程&#xff0c;并成为每个人&#xff08;从作家和创作者到程序员和…

蓝桥杯加训

1.两只塔姆沃斯牛&#xff08;模拟&#xff09; 思路&#xff1a;人和牛都记录三个数据&#xff0c;当前坐标和走的方向&#xff0c;如果人和牛的坐标和方向走重复了&#xff0c;那就说明一直在绕圈圈&#xff0c;无解 #include<iostream> using namespace std; const i…

openstack-认证服务

整个OpenStack是由控制节点&#xff0c;计算节点&#xff0c;网络节点&#xff0c;存储节点四大部分组成。 openstack重要集成组件: Nova-计算服务&#xff1b;Neutron-网络服务&#xff1b;Swift-对象存储服务&#xff1b;Cinder-块存储服务&#xff1b;Glance-镜像服务Keys…

LeetCode-118. 杨辉三角【数组 动态规划】

LeetCode-118. 杨辉三角【数组 动态规划】 题目描述&#xff1a;解题思路一&#xff1a;Python 动态规划解题思路二&#xff1a;解题思路三&#xff1a;0 题目描述&#xff1a; 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&…

C语言进阶课程学习记录-第27课 - 数组的本质分析

C语言进阶课程学习记录-第27课 - 数组的本质分析 数组实验-数组元素个数的指定实验-数组地址与数组首元素地址实验-指针与数组地址的区别小结 本文学习自狄泰软件学院 唐佐林老师的 C语言进阶课程&#xff0c;图片全部来源于课程PPT&#xff0c;仅用于个人学习记录 数组 实验-数…

Hot100【十一】:编辑距离

// 定义dp[i][j]: 表示word1前i个字符转换到word2前j个字符最小操作数 // 初始化dp[m1][n1] class Solution {public int minDistance(String word1, String word2) {int m word1.length();int n word2.length();// 1. dp数组int[][] dp new int[m 1][n 1];// 2. dp数组初…

IO流:将文件从A复制到B,并实现复制过程进度条的实现

private static boolean copyFile(String strFileA, String strFileB) {// 使用try资源块 ,其中创建的流对象可以自动关闭try (FileInputStream inputStream new FileInputStream(strFileA); // 输入流FileOutputStream outputStream new FileOutputStream(strFileB) // 输…

【Linux】进程的状态(运行、阻塞、挂起)详解,揭开孤儿进程和僵尸进程的面纱,一篇文章万字讲透!!!!进程的学习②

目录 1.进程排队 时间片 时间片的分配 结构体内存对齐 偏移量补充 对齐规则 为什么会有对齐 2.操作系统学科层面对进程状态的理解 2.1进程的状态理解 ①我们说所谓的状态就是一个整型变量&#xff0c;是task_struct中的一个整型变量 ②.状态决定了接下来的动作 2.2运行状态 2.…

【闲聊】-网页划词翻译插件

英文之痛 作为程序猿&#xff0c;常常需要接触外文网站&#xff0c;以前很痛苦&#xff0c;现在大模型时代有很多智能工具可以直接翻译&#xff0c;翻译的虽然越来越好&#xff0c;但是还是不如直接看英文能理解本义&#xff0c;相信我&#xff0c;看翻译的理解和看原文的理解…

龙迅LT2611UXC 2 PORT LVDS桥接到HDMI 2.0,内置MCU,颗自行操作

龙迅LT2611UXC描述&#xff1a; LT2611UXC是一个高性能的LVDS到HDMI2.0的转换器&#xff0c;用于STB&#xff0c;DVD应用程序。LVDS输入可以配置为单端口或双端口&#xff0c;有1个高速时钟通道&#xff0c;3~4个高速数据通道&#xff0c;最大运行1.2Gbps/通道&#xff0c;可支…

gpu模拟器总体流程

1、开显存空间&#xff0c;初始化 这里显存就是运行模拟器的机器 2、创建页表&#xff0c;开设备端空间并复制数据 虚拟地址 3、划分形状&#xff0c;传入内核函数&#xff0c;形状参数和设备端数据地址、执行计算 4、复制数据回主机端&#xff0c;释放gpu资源

手写简易操作系统(二十五)--文件系统第三部分

前情提要 一、文件写入 1.1、file的写入 文件写入比较复杂&#xff0c;函数行数相当多 /*** description: 把buf中的count个字节写入file,成功则返回写入的字节数,失败则返回-1 * param {file*} file 文件* param {void*} buf 缓存* param {uint32_t} count 写入的字节数…