高效改进!防止DataX从HDFS导入关系型数据库丢数据

在这里插入图片描述

高效改进!防止DataX从HDFS导入关系型数据库丢数据

针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题,优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片,确保了每个分片数据都得到全面读取和传输,有效提升了数据导入的可靠性和效率。这些改动不仅解决了丢数据的问题,还显著提高了处理多分片数据的性能。

背景

我们数据中台设计,数据同步功能是datax完成,在orc格式时datax从hdfs导数据到关系型数据库数据丢失,而在textfile格式时丢失数据,当文件超过250M多时会丢数据。因想使用orc格式节省数据空间,提高spark运行效率,需要解决这个问题。

问题

在这里插入图片描述
在这里插入图片描述

只读取了256M 左右的数据,数据条数对不上,导致hdfs,orc格式导入数据到pg,mysql等关系型数据库,数据丢失。

解决

修改hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java

问题代码

 InputSplit[] splits = in.getSplits(conf, 1);RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();// 获取列信息List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);

修改后

 // OrcInputFormat getSplits params numSplits not used, splits size = block numbersInputSplit[] splits = in.getSplits(conf, -1);for (InputSplit split : splits) {{RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();// 获取列信息List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();

点击参考查看

重新打包替换hdfsreader.jar即可

解析

  1. 新增循环处理所有分片的逻辑: 之前的代码只处理了第一个分片(splits[0]),现在改为了处理所有的分片。新增的部分如下:

    java
    InputSplit[] splits = in.getSplits(conf, -1);
    for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();
    

    旧的逻辑是:

    java
    InputSplit[] splits = in.getSplits(conf, 1);
    RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
    Object key = reader.createKey();
    Object value = reader.createValue();
    

    这样改动的目的是,同时处理多个分片,从而提升数据读取的效率。

  2. 移除了重复的分片处理逻辑: 不使用重复的分片处理逻辑:

    java
    // OrcInputFormat getSplits params numSplits not used, splits size = block numbers
    InputSplit[] splits = in.getSplits(conf, -1);
    
  3. 代码块的重构: 将读取分片、解析记录以及处理记录的逻辑放入一个循环中,使代码更简洁、更易读:

    改之前:

    java
    InputSplit[] splits = in.getSplits(conf, 1);
    RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
    Object key = reader.createKey();
    Object value = reader.createValue();
    

    改后使用循环:

    java
    InputSplit[] splits = in.getSplits(conf, -1);
    for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();
    
  4. 处理每个记录字段并传输记录: 保持对每条记录的字段读取并将其传输转移到了新的循环处理逻辑中:

    改之前:

    while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);
    }
    reader.close();
    

    改后:

    for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();
    }
    
  5. 为什么是256M没有更改前他是按每个文件进行分割,而在datax的配置中Java heap size 即默认xmx设置时256M,所以当单个文件超过256M时,超过的部分就被丢掉了,造成数据缺失,而更改后的是按hdfs block size 块的大小进行分割,循环遍历,所以直接修改xmx也能解决问题,但是你要想万一文件超过128G那,你不可能一直调大Java heap size,所以按hdfs block size分割是合理的解决方案

reader单个分片(InputSplit)的大小

在DataX的数据读取过程中,reader单个分片(InputSplit)的大小通常取决于底层存储系统和具体的配置参数。对于HDFS(Hadoop Distributed File System)를的读取,分片大小主要由以下几个因素决定:

  1. HDFS块大小(Block Size): HDFS将文件分为多个块,每个块通常是64MB、128MB或256MB大小,具体大小可以通过HDFS的配置参数dfs.blocksize进行设置。DataX会根据这些块来创建分片,也就是一个分片通常对应一个或多个HDFS块。
  2. 文件本身的大小: 如果文件比HDFS块小,或者没有跨越多个块,则一个文件可能只对应一个分片。
  3. DataX的任务配置: DataX允许在其配置文件中指定一些与分片相关的参数,类似于Hadoop的mapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize,这些参数可以影响分片的逻辑。
  4. InputFormat: DataX使用的Hadoop的InputFormat也能控制分片的逻辑,比如FileInputFormatTextInputFormatOrcInputFormat等。这些格式定义了如何分割输入数据,结合文件大小和块大小来决定分片。

总结

  • 主要改动是将之前只处理单个分片的逻辑重构为一个循环,处理所有分片。这使代码更具扩展性和效率,也适应不同的输入数据量。
  • 移除了无用且重复的注释和代码行,以保持代码清晰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 实现 excel 数据过滤

一、场景分析 假设有如下一份 excel 数据 shop.xlsx, 写一段 python 程序&#xff0c;实现对于车牌的分组数据过滤。 并以车牌为文件名&#xff0c;把店名输出到 车牌.txt 文件中。 比如 闽A.txt 文件内容为&#xff1a; 小林书店福州店1 小林书店福州店2 二、依赖安装 程序依…

单片机通过AT指令控制ESP8266+TCP 实现收发数据

在嵌入式系统设计中&#xff0c;经常需要通过无线模块进行数据通信。ESP8266是一款流行的Wi-Fi模块&#xff0c;它支持AT指令集&#xff0c;可以方便地与各种微控制器进行通信。本文将详细介绍如何使用STM32单片机通过AT指令控制ESP8266模块实现TCP协议的收发数据。 ESP8266模…

【AI创新】优化ChatGPT提示词Prompt设计:释放AI的无限潜能

【AI创新】优化ChatGPT提示词Prompt设计&#xff1a;释放AI的无限潜能 文章目录 &#x1f31f; 引言&#x1f31f; 第一性原理在Prompt设计中的应用系统与用户信息的深度融合实际应用案例分析结论 &#x1f31f; 系统信息与用户信息的协同作用系统信息&#xff08;SYSTEM Infor…

TBWeb正式稳定版V3.4.0+AI+MJ绘画+免授权无后门+详细安装教程

TBWeb正式稳定版V3.4.0AIMJ绘画免授权无后门详细安装教程&#xff1b; 运行环境 Nginx1.22 PHP5.7 MySQL7.4 Redis7.0 Node.js&#xff08;16.19.1&#xff09; PM2管理器5.6 TBWeb系统是基于 NineAI 二开的可商业化 TB Web 应用&#xff08;免授权&#xff0c;无后门&a…

【隐私计算】隐语HEU同态加密算法解读

HEU: 一个高性能的同态加密算法库&#xff0c;提供了多种 PHE 算法&#xff0c; 包括ZPaillier、FPaillier、IPCL、Damgard Jurik、DGK、OU、EC ElGamal 以及基于FPGA和GPU硬件加速版本的Paillier版本。 本文我们会基于GPU运行HEU Docker容器&#xff0c;编译打包GPaillier并测…

探索卷积层参数量与计算量

1 问题 了解VGG网络并利用PyTorch实现VGG探索1x1卷积的作用探索卷积层参数量、计算量的计算方法 2 方法 了解VGG网络并利用PyTorch实现VGG1、VGG是Oxford的Visual Geometry Group的组提出的&#xff0c;VGG的缩写也来自于这个组的名字。VGG网络探索了提升网络的深度对最终的图像…

重构复杂简单变量之状态与策略模式

状态与策略模式 主要用于消除复杂的类型代码&#xff0c;并将其替换为更清晰、可维护的状态或策略对象。这个方法通常用于以下情况&#xff1a; 类型代码问题&#xff1a;当我们在类中使用整数或字符串来表示对象的状态或行为时&#xff0c;这可能会导致代码变得难以理解和维护…

算法的学习笔记—两个链表的第一个公共结点(牛客JZ52)

&#x1f600;前言 在链表问题中&#xff0c;寻找两个链表的第一个公共结点是一个经典问题。这个问题的本质是在两个单链表中找到它们的相交点&#xff0c;或者说它们开始共享相同节点的地方。本文将详细讲解这个问题的解题思路&#xff0c;并提供一种高效的解决方法。 &#x…

LeetCode Hot 100:二叉树

LeetCode Hot 100&#xff1a;二叉树 94. 二叉树的中序遍历 思路 1&#xff1a;递归 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}…

蓝牙资讯|iOS 18.1 正式版下周推送,AirPods Pro 2耳机将带来助听器功能

苹果公司宣布将在下周发布 iOS 18.1 正式版&#xff0c;同时确认该更新将为 AirPods Pro 2 耳机带来新增“临床级”助听器功能。在启用功能后&#xff0c;用户首先需要使用 AirPods 和 iPhone 进行简短的听力测试&#xff0c;如果检测到听力损失&#xff0c;系统将创建一项“个…

docker run 命令解析

docker run 命令解析 docker run 命令用于从给定的镜像启动一个新的容器。这个命令可以包含许多选项&#xff0c;下面是一些常用的选项&#xff1a; -d&#xff1a;后台运行容器&#xff0c;并返回容器ID&#xff1b;-i&#xff1a;以交互模式运行容器&#xff0c;通常与 -t …

【C++】string类 (模拟实现详解 下)

我们接着上一篇【C】string类 &#xff08;模拟实现详解 上&#xff09;-CSDN博客继续对string模拟实现。从这篇内容开始&#xff0c;string相关函数的实现就要声明和定义分离了。 1.reserve、push_back和append 在string.h的string类里进行函数的声明。 void reserve(size_…

qt获取本地语言

获取本地语言 #define QSTRING_TO_UTF8(str) std::string(str.toUtf8()) enum LanguageType {kLanguageTypeChinese,kLanguageTypeTradition,kLanguageTypeEnglish };QLocale qlLanguage;QString qstrLangCode qlLanguage.languageToString(qlLanguage.language());LOG(INFO)…

Python包——Matplotlib

Matplotlib 是 Python 中一个广泛使用的绘图库&#xff0c;它能够生成高质量的图表和图形。它提供了一个类似于 MATLAB 的绘图框架&#xff0c;使得数据可视化变得简单和直观。下面是一些关于如何使用 Matplotlib 的基础知识和示例。 1.常用API 1.1 绘图类型 函数名称描述Bar…

JVM(HotSpot):GC之垃圾回收器的分类

文章目录 前言一、串行二、吞吐量优先三、响应时间优先四、常见垃圾回收器使用组合 前言 上一篇&#xff0c;我们学习了分代回收机制 它的主要内容是对JVM内存的一个划分&#xff0c;以及垃圾回收器工作时&#xff0c;区域运作顺序的一个规定。 所以&#xff0c;它是一个规范。…

Debian会取代CentOS成为更主流的操作系统吗?

我们知道&#xff0c;其实之前的话&#xff0c;国内用户对centos几乎是情有独钟的偏爱&#xff0c;很多人都喜欢选择centos系统&#xff0c;可能是受到一些原因的影响导致的吧&#xff0c;比如他相当于免费的红帽子系统&#xff0c;或者一些教程和网上的资料都推荐这个系统&…

idea历史版本下载

idea下载 idea官网默认最新版下载&#xff0c; https://www.jetbrains.com.cn/idea/ 历史版本下载入口&#xff1a; https://www.jetbrains.com/idea/download/other.html

Spring Boot论坛网站:开发、部署与管理

3系统分析 3.1可行性分析 通过对本论坛网站实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本论坛网站采用SSM框架&#xff0c;JAVA作为开发语言&#xff0c;是…

HTTP 与 HTTPS 的区别:原理、安全性与应用场景

一、引言 在互联网的世界里&#xff0c;信息的传输离不开协议的支持。HTTP 和 HTTPS 是我们在浏览网页、使用网络应用等场景中经常接触到的协议。随着网络安全意识的不断提高&#xff0c;了解 HTTP 和 HTTPS 的区别对于保障网络通信安全和理解网络应用的运行机制变得至关重要。…

针对 el-date picker pickerOptions 快捷选项的超级方法

提供快捷的配置&#xff0c;支持原子组合&#xff0c;高级用法支持用户自定义配置项 demo import { generateShortCuts } from ./date-shortcuts.js ... pickerOptions: {shortcuts: generateShortCuts({type: day}) } ...date-shortcuts 文件 import moment from moment // …