Spark中写parquet文件是怎么实现的

背景

本文基于 Spark 3.5.0
写本篇文章的目的是在于能够配合spark.sql.maxConcurrentOutputFileWriters参数来加速写parquet文件的速度,为此研究一下Spark写parquet的时候会占用内存的大小,便于配置spark.sql.maxConcurrentOutputFileWriters的值,从而保证任务的稳定性

结论

一个spark parquet writer可能会占用128MB的内存(也就是parquet.block.size的大小)。 所有在调整spark.sql.maxConcurrentOutputFileWriters的时候得注意不能调整过大,否则会导致OOM,但是如果在最后写文件的时候加入合并小文件的功能(AQE+Rebalance的方式),也可以适当的调整大一点,因为这个时候的Task 不像没有shuffle一样,可能还会涉及到sort以及aggregate等消耗内存的操作,(这个时候就是一个task纯写parquet文件)
大家也可以参考Parquet文件是怎么被写入的-Row Groups,Pages,需要的内存,以及flush操作

分析

还是得从InsertIntoHadoopFsRelationCommand类中说起,涉及到写parquet的数据流如下:

InsertIntoHadoopFsRelationCommand.run||\/
FileFormatWriter.write||\/
fileFormat.prepareWrite||\/
executeWrite => planForWrites.executeWrite ||\/WriteFilesExec.doExecuteWrite||\/FileFormatWriter.executeTask||\/dataWriter.writeWithIterator||\/dataWriter.writeWithMetrics||\/DynamicPartitionDataConcurrentWriter.write||\/writeRecord||\/ParquetOutputWriter.write||\/recordWriter.write
  • 其中fileFormat.prepareWrite 涉及到 spark这一层级有关parquet的设置,并返回一个生成ParquetOutputWriter实例的工厂类实例OutputWriterFactory
    主要设置如 parquet.compression 压缩格式,一般是 zstd ,也可以通过 spark.sql.parquet.compression.codec设置
    parquet.write.support.classParquetWriteSupport,该类的作用为Spark把内部IternalRow转为parquet message

  • DynamicPartitionDataConcurrentWriter.write 涉及到了InternalRowUnsafeRow代码生成
    这里不讨论这部分的细节,只说一下getPartitionValuesrenewCurrentWriter 方法中的 getPartitionPath这两部分

    • getPartitionValues
      这个是InternalRow => UnsafeRow转换,为什么这么做,是因为对于UnsafeRow这种数据结构来说,能够很好管理内存和避免GC问题

          val proj = UnsafeProjection.create(description.partitionColumns, description.allColumns)row => proj(row)
      

      我们以UnsafeProjection的子类InterpretedUnsafeProjection,该类不是代码生成的类(这样便于分析),

        override def apply(row: InternalRow): UnsafeRow = {if (subExprEliminationEnabled) {runtime.setInput(row)}// Put the expression results in the intermediate row.var i = 0while (i < numFields) {values(i) = exprs(i).eval(row)i += 1}// Write the intermediate row to an unsafe row.rowWriter.reset()writer(intermediate)rowWriter.getRow()}
      
      • 首先是消除公共子表达式
      • 用values数组保存每个表达式计算出来的结果
      • rowWriter.reset() 用来对齐cursor,便于对于String类型的写入,这可以参考UnsafeRow内存布局和代码优化
      • unsafeWriter按照不同的类型写入到unsaferow不同的位置上去,这里的offset在cursor的内部的,也就是说cursor的值要大于offset
      • 返回UnsafeRow类型
        通过这种方式完成了InternalRow => UnsafeRow转换
    • getPartitionPath
      这个是通过表达式的方式获取partition的函数,从而完成InternalRow => String的转换,涉及的代码如下:

        private lazy val partitionPathExpression: Expression = Concat(description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>val partitionName = ScalaUDF(ExternalCatalogUtils.getPartitionPathString _,StringType,Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)})private lazy val getPartitionPath: InternalRow => String = {val proj = UnsafeProjection.create(Seq(partitionPathExpression), description.partitionColumns)row => proj(row).getString(0)}
      

      UnsafeProjection.create 上面已经说了怎么实现的了,重点说partitionPathExpression 生成partition的表达式,
      该表达式主要通过UDF中getPartitionPathString来生成,关键的一点是,传入的参数:Literal(c.name)和Cast(c, StringType, Option(description.timeZoneId))))
      这里的Literal(c.name)表示的是partition名字的常量
      Cast(c, StringType, Option(description.timeZoneId)))表示的是c这个变量所代表的值,
      为什么这么说,因为在ScalaUDF的内部计算方法中有:

        override def eval(input: InternalRow): Any = {val result = try {f(input)} catch {case e: Exception =>throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(functionName, inputTypesString, outputType, e)}resultConverter(result)}

      这里的f中会对传入的每个参数都会调用eval(InernalRow),对于Literal来说就是常亮,而对于Cast(Attribute)来说就是属性的值(通过BindReferences.bindReference方法)。

  • recordWriter.write涉及到 ParquetOutputFormat.getRecordWriter方法,该方法中涉及到parquet中的一些原生参数设置:

public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode)throws IOException, InterruptedException {final WriteSupport<T> writeSupport = getWriteSupport(conf);ParquetProperties.Builder propsBuilder = ParquetProperties.builder().withPageSize(getPageSize(conf)).withDictionaryPageSize(getDictionaryPageSize(conf)).withDictionaryEncoding(getEnableDictionary(conf)).withWriterVersion(getWriterVersion(conf)).estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)).withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)).withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)).withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)).withStatisticsTruncateLength(getStatisticsTruncateLength(conf)).withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)).withBloomFilterEnabled(getBloomFilterEnabled(conf)).withPageRowCountLimit(getPageRowCountLimit(conf)).withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));new ColumnConfigParser().withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding).withColumnConfig(BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false),propsBuilder::withBloomFilterEnabled).withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV).withColumnConfig(BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),propsBuilder::withBloomFilterFPP).parseConfig(conf);ParquetProperties props = propsBuilder.build();long blockSize = getLongBlockSize(conf);int maxPaddingSize = getMaxPaddingSize(conf);boolean validating = getValidation(conf);...WriteContext fileWriteContext = writeSupport.init(conf);FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties);w.start();...return new ParquetRecordWriter<T>(w,writeSupport,fileWriteContext.getSchema(),fileWriteContext.getExtraMetaData(),blockSize,codec,validating,props,memoryManager,conf);}

这里涉及到的关键的几个参数是:

   parquet.page.size                   1*1024*1024         -- page的大小 默认是 1MBparquet.block.size                  128*1024*1024       -- rowgroup的大小 默认是 128MBparquet.page.size.row.check.min     100                 -- page检查是否达到page size的最小行数parquet.page.size.row.check.max     10000               -- page检查是否达到page size的最大行数parquet.page.row.count.limit        20_000              -- page检查是否达到page size的行数极限行数

parquet.page.size.row.check.min parquet.page.size.row.check.max parquet.page.row.count.limit 这三个配置项存在着相互制约的关系,总的目标就是检查当行数达到了一定的阈值以后,来检查是否能够flush到内存page中,具体的可以查看ColumnWriteStoreBase类中的方法

接下来就是真正写操作了,调用的是InternalParquetRecordWriter.write方法,如下:

 private void initStore() {ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(),fileEncryptor, rowGroupOrdinal);pageStore = columnChunkPageWriteStore;bloomFilterWriteStore = columnChunkPageWriteStore;columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);this.recordConsumer = columnIO.getRecordWriter(columnStore);writeSupport.prepareForWrite(recordConsumer);}public void write(T value) throws IOException, InterruptedException {writeSupport.write(value);++ recordCount;checkBlockSizeReached();}

initStore主要是初始化 pageStorecolumnStore
具体的spark interalRow怎么转换为parquet message,主要在writeSupport.write中的rootFieldWriters
接下来就是checkBlockSizeReached,这里主要就是flush rowgroup到磁盘了,
具体的读者可以看代码:
对于flush到page可以看checkBlockSizeReached中columnStore.flush()
对于flush rowroup到磁盘可以看checkBlockSizeReached中pageStore.flushToFileWriter(parquetFileWriter)
总结出来就是 一个spark parquet writer可能会占用128MB的内存(也就是parquet.block.size的大小),
因为只有在满足了rowgroup的大小以后,才会真正的flush到磁盘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696350.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Javascript怎么输出内容?两种常见方式以及控制台介绍

javascript是一种非常重要的编程语言&#xff0c;在许多网页中它被广泛使用&#xff0c;可以实现许多交互效果和动态效果。输出是javascript中最基本的操作之一&#xff0c;下面将介绍两种常见的输出方式。 一、使用console.log()函数输出 console.log()函数是常用的输出函数…

Jmeter实现阶梯式线程增加的压测

安装相应jmeter 插件 1&#xff1a;安装jmeter 管理插件&#xff1a; 下载地址&#xff1a;https://jmeter-plugins.org/install/Install/&#xff0c;将下载下来的jar包放到jmeter文件夹下的lib/ext路径下&#xff0c;然后重启jmeter。 2&#xff1a;接着打开 选项-Plugins Ma…

在Linux上安装Docker: 一站式指南

Docker 是一款强大的容器化平台&#xff0c;为开发者提供了一种轻松打包、发布和运行应用的方式。在本文中&#xff0c;我们将探讨如何在Linux操作系统上安装Docker&#xff0c;为你提供一站式指南。 步骤1: 卸载旧版本 在安装新版Docker之前&#xff0c;建议先卸载旧版本&am…

三十年一个大轮回!日股突破“泡沫时期”历史高点

2月22日周四&#xff0c;英伟达四季报业绩超预期&#xff0c;而且本季度业绩指引非常乐观&#xff0c;提振美股股指期货并成为芯片股和AI概念股情绪的重要催化剂。今日亚洲芯片股和AI股起飞&#xff0c;日本在芯片股的带动下突破1989年泡沫时期以来的历史最高收盘价。 美股方面…

我之前炒股亏麻了,找百融云AI Agent谈了谈心

春节之前&#xff0c;A股和H股都跌麻了&#xff0c;但是机构的路演和调研反而多了。因为&#xff1a;写不完的安抚、说不完的陪伴、听不完的客户指责、以及捡不完的AH股便宜货。 有一位血液里流淌着美式咖啡的职场白领&#xff0c;虽然这些年在股市过得很不如意&#xff0c;但…

C语言---链表

一.定义 链表是由一系列节点组成&#xff0c;每个结点包含两个域&#xff0c;一个是数据域&#xff0c;数据域用来保存用户数据&#xff0c;另一个是指针域&#xff0c;保存下一个节点的地址。链表在内存中是非连续的。 二.分类 静态链表 动态链表 单向链表 双向链表 循环链…

maven使用问题及解决办法汇总

文章目录 1、maven clean后打包出现Cannot create resource output directory2、把已有jar包打包进本地maven仓库 1、maven clean后打包出现Cannot create resource output directory 主要原因是target目录被别的程序占用了&#xff0c;最笨的办法是重启电脑&#xff0c;当然也…

C++跨模块释放内存

linux一个进程只有一个堆&#xff0c;不要考虑这些问题&#xff0c;但是windows一个进程可能有多个堆&#xff0c;要在对应的堆上释放。 一&#xff0c; MT改MD 一个进程的地址空间是由一个可执行模块和多个DLL模块构成的&#xff0c;这些模块中&#xff0c;有些可能会链接到…

代码随想录训练营第29天| 491.递增子序列、46.全排列、47.全排列 II

491.递增子序列 题目链接&#xff1a;491. 非递减子序列 - 力扣&#xff08;LeetCode&#xff09; class Solution {List<List<Integer>> ans new ArrayList<>();public List<List<Integer>> findSubsequences(int[] nums) {backtrack(nums, …

(十三)【Jmeter】线程(Threads(Users))之tearDown 线程组

简述 操作路径如下: 作用:在正式测试结束后执行清理操作,如关闭连接、释放资源等。配置:设置清理操作的采样器、执行顺序等参数。使用场景:确保在测试结束后应用程序恢复到正常状态,避免资源泄漏或对其他测试的影响。优点:提供清理操作,确保测试环境的整洁和可重复性…

租用海外服务器,自己部署ChatGPT-Next-Web,实现ChatGPT聊天自由,还可以分享给朋友用

前言 如果有好几个人需要使用ChatGPT&#xff0c;又没有魔法上网环境&#xff0c;最好就是自己搭建一个海外的服务器环境&#xff0c;然后很多人就可以同时直接用了。 大概是情况是要花80元租一个一年的海外服务器&#xff0c;花15元租一个一年的域名&#xff0c;然后openai 的…

centos安装扩展

centos下安装php扩展时遇到的问题php 1.imapgit cd /root/php-5.6.27/ext/imap /usr/local/php/bin/phpize ./configure --prefix/usr/local/imap 错误1github configure: error: utf8_mime2text() has new signature, but U8T_CANONICAL is missing. This should not happe…

一 些有代表性的相位解包裹算法

Itoh首先给出了传统解包裹算法的数学描述!。传统的相位解包裹操作是通过对空间相邻点相位值的比较来完成的。根据抽样定理&#xff0c;如果相邻采样点的相位差不超过z&#xff0c;则对应的相位解包裹处理是非常简单的&#xff0c;理论上以某点为起始点沿某一路径对包裹相位的差…

中科院计算所:什么情况下,大模型才需要检索增强?

ChatGPT等大型语言模型在自然语言处理领域表现出色。但有时候会表现得过于自信&#xff0c;对于无法回答的事实问题&#xff0c;也能编出一个像样的答案来。 这类胡说乱说的答案对于医疗等安全关键的领域来说&#xff0c;是致命的。 为了弥补这一缺陷&#xff0c;研究者们提出…

ios抓包Tunnel to......443

fiddler官网下载“CertMaker for iOS and Android”插件&#xff0c;官网插件&#xff1a;https://www.telerik.com/fiddler/add-ons 双击运行插件后&#xff0c;重启fiddler&#xff0c;ios重新安装证书即可

猫头虎分享已解决Bug || 系统更新失败(System Update Failure):UpdateError, UpgradeFailure

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

Java并发编程面试题53道-JUC

Java中的JUC是"Java Concurrency Utilities"的缩写&#xff0c;它是指Java平台从Java 5版本开始引入的一系列用于处理多线程并发编程的工具类和框架。这个包(java.util.concurrent)极大地增强了Java在并发编程领域的支持&#xff0c;提供了一系列高级抽象如线程池&am…

Sora:视频生成模型作为世界模拟器

我们探索了视频数据上生成模型的大规模训练。具体来说&#xff0c;我们在可变持续时间、分辨率和长宽比的视频和图像上联合训练文本条件扩散模型。我们利用了一个在视频和图像潜在码的时空块上操作的变压器架构。我们规模最大的模型 Sora 能够生成一分钟的高保真视频。我们的结…

一周学会Django5 Python Web开发-Django5路由重定向

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计25条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

代码随想录算法训练营第21天—回溯算法01 | ● 理论基础 ● *77. 组合

理论基础 回溯是一种纯暴力搜索的方法&#xff0c;它和递归相辅相成&#xff0c;通常是执行完递归之后紧接着执行回溯相较于以往使用的for循环暴力搜索&#xff0c;回溯能解决更为复杂的问题&#xff0c;如以下的应用场景应用场景 组合问题 如一个集合{1,2,3,4}&#xff0c;找…