DataX-Oracle新增writeMode支持update

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改

修改后jar包地址 




前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN[UPDATE sql] 
WHEN NOT MATCHED THEN [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean update = writeMode.trim().toLowerCase().startsWith("update");boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| update;if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate || update) {//update只在mysql下使用if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();}//update在Oracle下使用else if (dataBaseType == DataBaseType.Oracle) {writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +StringUtils.join(columnHolders, ",") +") VALUES(" + StringUtils.join(valueHolders, ",") +")";}else {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));}} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (update) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}return writeDataSqlTemplate;}
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {String[] sArray = getStrings(merge);StringBuilder sb = new StringBuilder();sb.append("MERGE INTO %s A USING ( SELECT ");boolean first = true;boolean first1 = true;StringBuilder str = new StringBuilder();StringBuilder update = new StringBuilder();for (String columnHolder : columnHolders) {if (Arrays.asList(sArray).contains(columnHolder)) {if (!first) {sb.append(",");str.append(" AND ");} else {first = false;}str.append("TMP.").append(columnHolder);sb.append("?");str.append(" = ");sb.append(" AS ");str.append("A.").append(columnHolder);sb.append(columnHolder);}}for (String columnHolder : columnHolders) {if (!Arrays.asList(sArray).contains(columnHolder)) {if (!first1) {update.append(",");} else {first1 = false;}update.append(columnHolder);update.append(" = ");update.append("?");}}sb.append(" FROM DUAL ) TMP ON (");sb.append(str);sb.append(" ) WHEN MATCHED THEN UPDATE SET ");sb.append(update);sb.append(" WHEN NOT MATCHED THEN ");return sb.toString();}public static String[] getStrings(String merge) {merge = merge.replace("update", "");merge = merge.replace("(", "");merge = merge.replace(")", "");merge = merge.replace(" ", "");return merge.split(",");}
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {this.taskPluginCollector = taskPluginCollector;List<String> columns = new LinkedList<>();if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);this.columns.forEach(column->{if (Arrays.asList(sArray).contains(column)) {columns.add(column);}});this.columns.forEach(column->{if (!Arrays.asList(sArray).contains(column)) {columns.add(column);}});}columns.addAll(this.columns);// 用于写入数据的时候的类型根据目的表字段类型转换this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));// 写数据库的SQL语句calcWriteRecordSql();List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);int bufferBytes = 0;try {Record record;while ((record = recordReceiver.getFromReader()) != null) {if (record.getColumnNumber() != this.columnNumber) {// 源头读取字段列数与目的表字段写入列数不相等,直接报错throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",record.getColumnNumber(),this.columnNumber));}writeBuffer.add(record);bufferBytes += record.getMemorySize();if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}}if (!writeBuffer.isEmpty()) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}} catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear();bufferBytes = 0;DBUtil.closeDBResources(null, null, connection);}}
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)throws SQLException{PreparedStatement preparedStatement = null;try {connection.setAutoCommit(false);preparedStatement = connection.prepareStatement(this.writeRecordSql);if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);for (Record record : buffer) {List<Column> recordOne = new ArrayList<>();for (int j = 0; j < this.columns.size(); j++) {if (Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {if (!Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {recordOne.add(record.getColumn(j));}for (int j = 0; j < recordOne.size(); j++) {record.setColumn(j, recordOne.get(j));}preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}else {for (Record record : buffer) {preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}preparedStatement.executeBatch();connection.commit();}catch (SQLException e) {LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());connection.rollback();doOneInsert(connection, buffer);}catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);}finally {DBUtil.closeDBResources(preparedStatement, null);}}
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)throws SQLException{for (int i = 0; i < record.getColumnNumber(); i++) {int columnSqltype = this.resultSetMetaData.getMiddle().get(i);String typeName = this.resultSetMetaData.getRight().get(i);preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));}return preparedStatement;}

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "${r_username}","password": "${r_password}","connection": [{	   "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],"jdbcUrl": ["${r_jdbcUrl}"]}]}},"writer": {"name": "oraclewriter","parameter": {"writeMode": "update(f_year,f_code)","username": "${w_username}","password": "${w_password}","column": ["f_year","f_code","f_name","f_order"],"session": [],"preSql": [],"connection": [{"jdbcUrl": "${w_jdbcUrl}","table": ["tableName"]}]}}		   }]}
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

修改后jar包地址 

懒得修改可以直接下载两个jar替换到你们的datax对应目录。

https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/778353.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web Components使用(一)

在使用Web Components之前&#xff0c;我们先看看上一篇文章Web Components简介&#xff0c;其中提到了相关的接口、属性和方法。 正是这些接口、属性和方法才实现了Web Components的主要技术&#xff1a;Custom elements&#xff08;自定义元素&#xff09;、Shadow DOM&#…

轻松解决Composer常见错误

引言&#xff1a; Composer作为PHP的一个重要组成部分&#xff0c;为PHP开发者提供了非常方便的依赖管理工具。但是&#xff0c;难免会在使用中遇到一些常见的错误&#xff0c;如连接错误、内存限制错误、版本冲突错误、锁定文件错误和类未找到错误等。这些错误可能会影响我们…

vue3 项目中引入tailwindcss

Tailwind CSS 的工作原理是扫描所有 HTML 文件、JavaScript 组件以及任何 模板中的 CSS 类&#xff08;class&#xff09;名&#xff0c;然后生成相应的样式代码并写入 到一个静态 CSS 文件中。 官网&#xff1a;https://www.tailwindcss.cn/docs/installation 在项目中使用过…

C#手术麻醉信息系统全套商业源码,自主版权,支持二次开发 医院手麻系统源码

手术麻醉信息系统是HIS产品的中的一个组成部分&#xff0c;主要应用于医院的麻醉科&#xff0c;属于电子病历类产品。医院麻醉监护的功能覆盖整个手术与麻醉的全过程&#xff0c;包括手术申请与排班、审批、安排、术前、术中和术后的信息管理提供支持。 手术麻醉信息系统可与EM…

MySQL 数据库的日志管理、备份与恢复

一. 数据库备份 1.数据备份的重要性 备份的主要目的是灾难恢复。 在生产环境中&#xff0c;数据的安全性至关重要。 任何数据的丢失都可能产生严重的后果。 造成数据丢失的原因&#xff1a; 程序错误人为,操作错误,运算错误,磁盘故障灾难&#xff08;如火灾、地震&#xff0…

LeetCode 20.有效的括号

给定一个只包括 ‘(’&#xff0c;‘)’&#xff0c;‘{’&#xff0c;‘}’&#xff0c;‘[’&#xff0c;‘]’ 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括号必须以正确的顺序闭合。 每个右括号都…

ModuleNotFoundError: No module named ‘langchain.schema‘

自定义langchain文档分割器时报错找不到 langchain.schema 模块 langchain0.0.20 from langchain.schema import messages_to_dict ----> 1 from langchain.schema import DocumentModuleNotFoundError: No module named langchain.schema解决办法 pip install -U langcha…

Android的硬件接口HAL-2 HIDL

没写完哈。 不说废话&#xff0c;直接上干活。 1 创建HAL 2 生成桩代码 PACKAGEvendor.xxx.hardware.logiuvc21.0 LOCvendor/xxx/hardware/xxx2/1.0/default hidl-gen -o $LOC -Lc-impl -rvendor.xxx.hardware:vendor/xxx/hardware -randroid.hidl:system/libhidl/transport …

prometheus配置文件详解

以下是prometheus配置文件详解&#xff0c;重点地方有中文简介 # my global config global:scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is e…

Docker部署MinIO对象存储服务

1. 拉取MinIO镜像 # 下载镜像 docker pull minio/minio#查看镜像 docker images2. 创建目录 # 文件存储目录 mkdir -p /opt/minio/data# 配置文件 mkdir -p /opt/minio/config# 日志文件 mkdir -p /opt/minio/logs3. 创建Minio容器并运行 docker run \ -p 9000:9000 \ -p 90…

CSS(一)---【CSS简介、导入方式、八种选择器、优先级】

零.前言 本系列适用于零基础小白&#xff0c;亦或是初级前端工程师提升使用。 知识点较为详细&#xff0c;如果追求非常详细&#xff0c;请移步官方网站或搬运网站。 1.CSS简介 CSS全称&#xff1a;“Cascading Style Sheets”&#xff0c;中文名&#xff1a;“层叠样式表”…

Codeforces Round 937 (Div. 4)(D~G)

D - Product of Binary Decimals 题意&#xff1a; 思路&#xff1a;观察到n的范围很小&#xff0c;先求出所有可能的二进制十位数&#xff0c;然后dp把所有可能的值求出来。注意不能用求因子的方法来求解&#xff0c;因为这些二进制十位数不一定是素数&#xff0c;先除某个数…

车道线检测项目 | 基于lanenet实现的实时车道线检测

项目应用场景 面向自动驾驶场景的车道线检测场景&#xff0c;项目的特点是能够达到实时的车道线检测 项目效果&#xff1a; 项目细节 > 具体参见项目 README.md (1) 安装依赖 pip3 install -r requirements.txt (2) 测试图片 python tools/test_lanenet.py --weights_pat…

|行业洞察·香氛|《小红书2023香水香氛营销宝典-71页》

报告内容的详细解读&#xff1a; 行业格局 预计到2025年&#xff0c;香水市场规模将超过300亿&#xff0c;小红书成为香水种草的重要平台。从2018年到2025年&#xff0c;市场规模持续增长&#xff0c;年增速保持在20%左右。香水市场的热度在节日节点尤为明显&#xff0c;如情…

以XX医院为例的医疗建筑能效管理系统【建筑能耗 供电可靠 】

一、行业背景 二、行业特点 1.供电可靠性要求高&#xff1a;医院配电系统复杂&#xff0c;门诊、急救、手术室、ICU/CCU、血液透析等场合特一级和一级负荷比较多&#xff0c;一旦发生故障会造成严重影响&#xff0c;对配电可靠性要求极高。 2.能耗水平高&#xff1a;医院能耗…

排序大乱炖

目录 一&#xff1a;插入排序 1.1直接插入排序 1.2希尔排序 二&#xff1a;选择排序 2.1选择排序 2.2堆排序 三&#xff1a;交换排序 3.1冒泡排序 3.2快速排序 3.2.1Hoare版本 3.2.2双指针法 3.2.3非递归 一&#xff1a;插入排序 1.1直接插入排序 直接插入排序…

自动化测试 —— Pytest fixture及conftest详解

前言 fixture是在测试函数运行前后&#xff0c;由pytest执行的外壳函数。fixture中的代码可以定制&#xff0c;满足多变的测试需求&#xff0c;包括定义传入测试中的数据集、配置测试前系统的初始状态、为批量测试提供数据源等等。fixture是pytest的精髓所在&#xff0c;类似u…

IAR率先支持瑞萨首款通用RISC-V MCU,树立行业新标准

瑞典乌普萨拉&#xff0c;2024年3月27日 – 全球领先的嵌入式系统开发软件解决方案供应商IAR自豪地宣布&#xff1a;公司备受全球数百万开发者青睐的开发环境再次升级&#xff0c;已率先支持瑞萨首款通用32位RISC-V MCU&#xff0c;该 MCU 搭载了瑞萨自研的 CPU 内核。此次功能…

使用OpenCV在Qt C++环境中实现车牌号码的识别

要使用OpenCV在Qt C环境中实现车牌号码的识别&#xff0c;您可以按照以下步骤编写代码。这里提供一个简化的示例流程&#xff0c;涵盖车牌定位、提取和字符识别等关键步骤。请注意&#xff0c;实际应用可能需要根据具体场景进行调整和优化。 准备环境 确保您的项目已经正确集…

多线程(17)如何检测和预防死锁

检测和预防死锁是并发控制和操作系统设计的两个重要方面。了解如何检测和预防死锁不仅对于操作系统开发者重要&#xff0c;对于任何涉及并发和资源管理的应用程序开发者也同样重要。 死锁检测 1. 资源分配图 在操作系统中&#xff0c;死锁检测通常通过维护一个资源分配图来实…