Spark中多分区写文件前可以不排序么

背景

Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。

分析

这部分主要分为三个部分:
一个是V1Writes规则的重改;
另一个是FileFormatWriter中的dataWriter的选择;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的

V1Writes规则的重改

直接转到代码部分:

object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}

其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序,其中prepareQuery关键的代码如下:

    val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}

write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommandInsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是:

V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)

getSortOrder方法关键代码如下:

    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}

所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则会加上Sort逻辑计划,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0(默认值为0)且 sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划

FileFormatWriter 中的dataWriter的选择

InsertIntoHadoopFsRelationCommandInsertIntoHiveTable 这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write方法,这里有个concurrentOutputWriterSpecFunc函数变量的设置:

      val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)

设置concurrentOutputWriterSpecFunc的代码如下:

  private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}

如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则ConcurrentOutputWriterSpec为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())

其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择:

    val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}

这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文

Spark中为什么会加上Sort

至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriterDynamicPartitionDataConcurrentWriter的区别:

  • DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
  • DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM

对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。

参考

  1. [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
  2. [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/683759.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(10)Hive的相关概念——文件格式和数据压缩

目录 一、文件格式 1.1 列式存储和行式存储 1.1.1 行存储的特点 1.1.2 列存储的特点 1.2 TextFile 1.3 SequenceFile 1.4 Parquet 1.5 ORC 二、数据压缩 2.1 数据压缩-概述 2.1.1 压缩的优点 2.1.2 压缩的缺点 2.2 Hive中压缩配置 2.2.1 开启Map输出阶段压缩&…

MySQL篇之回表查询

一、聚集索引 将数据存储与索引放到了一块,索引结构的叶子节点保存了行数据。特点:必须有,而且只有一个。 聚集索引选取规则: 1. 如果存在主键,主键索引就是聚集索引。 2. 如果不存在主键,将使用第一个唯一(UNIQUE&am…

Linux环境中的git

目录 1.要使用git,首先要安装git 2.首次使用git需要做的操作 3.git操作 1.要使用git,首先要安装git 指令:sudo yum install -y git 2.首次使用git需要做的操作 在gitee网页,在你的仓库中找到: 先将下面两行代码分别…

51单片机 温度传感器得数据,传到上位机

#include <reg52.h> #include <intrins.h> #define MAIN_Fosc 11059200UL //宏定义主时钟HZ #define jingzhen 11059200UL /*使用22.1184M晶体*/ // #define botelv 9600UL /*波特率定义为9600*/ unsigned char zifua; //待显示字符。volatile …

高中生护眼台灯怎么选?教育部认可护眼灯品牌

随着孩子步入更高的年级&#xff0c;学业压力也会越来越繁重&#xff0c;随之带来的也是更长时间的用眼和高近视率。众所周知&#xff0c;高中是孩子学业最繁忙的一段时期&#xff0c;同时也是青少年近视的重灾区&#xff0c;不少学生因为每天过度用眼&#xff0c;再加上缺少户…

【Cocos入门】物理系统(物理碰撞)

物理碰撞 物理引擎默认是关闭状态以节省资源开销。开启方法和之前的普通碰撞类似&#xff1a;cc.director.getPhysicsManager().enabled true但有一个区别&#xff0c;物理引擎的开启必须放在onLoad函数内运行&#xff0c;否则不生效。 物理碰撞组件也同样具有碰撞回调函数。…

day14笔记(多态)

多态 自己写多态演示遇见两个问题 1.在类里写show方法时犹豫要不要写参数,其实不用写参数也可以获取到类的信息 public void show(){//括号里面写参数吗System.out.println(getName()", "getAge());} 2.在测试类里面写方法时,写了一个show方法带参数,其实这里应该是…

sqlserver2012 解决日志大的问题 bat脚本

要解决SQL Server 2012中事务日志过大的问题&#xff0c;你可以创建一个批处理脚本&#xff08;.bat&#xff09;来定期备份事务日志。下面是一个示例批处理脚本&#xff0c;该脚本使用SQLCMD工具来执行事务日志备份&#xff1a; echo off set "DBNAMEYourDatabaseName&qu…

LeetCode474. Ones and Zeroes——动态规划

文章目录 一、题目二、题解 一、题目 You are given an array of binary strings strs and two integers m and n. Return the size of the largest subset of strs such that there are at most m 0’s and n 1’s in the subset. A set x is a subset of a set y if all e…

【网站项目】229企业员工薪酬关系系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

基于微信江苏南京某健身房私教预约小程序系统设计与实现 研究背景和意义、国内外现状

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

【计算机是如何工作的】编程语言是如何与CPU指令对应起来

编程语⾔&#xff08;Program Language&#xff09; 程序&#xff08;Program&#xff09;早期编程编程语⾔发展操作系统&#xff08;Operating System&#xff09;什么是进程/任务&#xff08;Process/Task&#xff09;进程控制块抽象(PCB Process Control Block)CPU 分配⸺进…

[OPEN SQL] 新增数据

INSERT语句用于数据的新增操作 本次操作使用的数据库表为SCUSTOM&#xff0c;其字段内容如下所示 航班用户(SCUSTOM) 该数据库表中的部分值如下所示 1.插入单条数据 语法格式 INSERT <dbtab> FROM <wa>. INSERT INTO <dbtab> VALUES <wa>. INSERT &…

【Cocos入门】物理检测

目录 一、物理检测的概念二、点测试三、矩形测试四、射线测试 一、物理检测的概念 CoCos中&#xff0c;物理检测也是物理系统的一部分&#xff0c;它不是用于检测物体的物理特性的&#xff0c;而是用来查询物体的(比如某个地方是否存在物理碰撞体)。其又分成&#xff1a;点检测…

编程和计算机基础

编程 编程&#xff1a;就是让计算机为解决某个问题而使用某种程序设计语言编写程序代码&#xff0c;并最终得到结果的过程。 计算机程序&#xff1a;是计算机所执行的一系列的指令集合&#xff0c;而程序全部都是用我们所掌握的语言来编写的&#xff0c;所以人们控制计算机一…

云备份项目:在云端保护您的数据【一、初识】

桃李不言&#xff0c;下自成蹊 文章目录 项目简介项目设计方案服务端功能划分客户端功能划分 项目环境搭建环境准备第三方库JsonCppbundle数据压缩库httplib 总结 ☘️项目源代码&#xff1a;云备份 ☘️云备份专栏&#xff1a;云备份 项目简介 云备份系统是一个自动化的备份解…

Spring 事务原理总结四

作为一名认知有限的中国人&#xff0c;我对年的喜爱&#xff0c;胜过其他一切&#xff0c;因为它给了我拒绝一切的合理理由。每到这个时候&#xff0c;我都会用各种理由来为自己的不作为开脱&#xff0c;今年亦是如此。看着频频发出警报的假期余额&#xff0c;我内心的焦躁变得…

【优化数学模型】1. 基于Python的线性规划问题求解

【优化数学模型】1. 基于Python的线性规划问题求解 一、线性规划问题1.概述2.三要素 二、示例&#xff1a;药厂生产问题三、使用 Python 绘图求解线性规划问题1.绘制约束条件2.绘制可行域3.绘制目标函数4.绘制最优解 四、使用 scipy.optimize 软件包求解线性规划问题1.导入库2.…

011axios

实际开发中&#xff0c;前端页面所需要的数据往往从服务器获取&#xff0c;这就涉及服务器的通信 axios是一个基于promise网络请求库&#xff0c;作用于node.js和浏览器 在浏览器端使用xmlhttprequests发送网络请求.自动完成json转换 axios.get(/url?id12345) .then(function…

寒假作业2月13号

数组练习 1、选择题 1.1、若有定义语句&#xff1a;int a[3][6]; &#xff0c;按在内存中的存放顺序&#xff0c;a 数组的第10个元素是 D A&#xff09;a[0][4] B) a[1][3] C)a[0][3] D)a[1][4] 1.2、有数组 int a[…