Spark 如何对源端数据做切分?

引言

典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。

分析

数据源读取对应的物理执行节点为FileSourceScanExec,读取数据代码块如下

lazy val inputRDD: RDD[InternalRow] = {val readFile: (PartitionedFile) => Iterator[InternalRow] =relation.fileFormat.buildReaderWithPartitionValues(sparkSession = relation.sparkSession,dataSchema = relation.dataSchema,partitionSchema = relation.partitionSchema,requiredSchema = requiredSchema,filters = pushedDownFilters,options = relation.options,hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))val readRDD = if (bucketedScan) {createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,relation)} else {createReadRDD(readFile, dynamicallySelectedPartitions, relation)}sendDriverMetrics()readRDD}

主要关注非bucket的处理,对于非bucket的扫描调用createReadRDD方法定义如下

/*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** @param readFile a function to read each (part of a) file.* @param selectedPartitions Hive-style partition that are part of the read.* @param fsRelation [[HadoopFsRelation]] associated with the read.*/private def createReadRDD(readFile: (PartitionedFile) => Iterator[InternalRow],selectedPartitions: Array[PartitionDirectory],fsRelation: HadoopFsRelation): RDD[InternalRow] = {// 文件打开开销,每次打开文件最少需要读取的字节    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小val maxSplitBytes =FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +s"open cost is considered as scanning $openCostInBytes bytes.")// Filter files with bucket pruning if possibleval bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabledval shouldProcess: Path => Boolean = optionalBucketSet match {case Some(bucketSet) if bucketingEnabled =>// Do not prune the file if bucket file name is invalidfilePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)case _ =>_ => true}// 对分区下文件进行切分并按照从大到小进行排序val splitFiles = selectedPartitions.flatMap { partition =>partition.files.flatMap { file =>// getPath() is very expensive so we only want to call it once in this block:val filePath = file.getPathif (shouldProcess(filePath)) {// 文件是否可split,parquet/orc/avro均可被splitval isSplitable = relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)// 切分文件PartitionedFileUtil.splitFiles(sparkSession = relation.sparkSession,file = file,filePath = filePath,isSplitable = isSplitable,maxSplitBytes = maxSplitBytes,partitionValues = partition.values)} else {Seq.empty}}}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)val partitions =FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)new FileScanRDD(fsRelation.sparkSession, readFile, partitions)}

可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要,其核心逻辑如下

def maxSplitBytes(sparkSession: SparkSession,selectedPartitions: Seq[PartitionDirectory]): Long = {// 读取文件时打包成最大的partition大小,默认为128MB,对应一个block大小val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销,默认为4MBval openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的(不保证)最小分割文件分区数,默认未设置,从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism// -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism// -> 总共多少核max(executor core总和, 2),最少为2val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum.getOrElse(sparkSession.leafNodeDefaultParallelism)// 总共读取的大小val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum// 单core读取的大小val bytesPerCore = totalBytes / minPartitionNum// 计算大小,不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))}

对于PartitionedFileUtil#splitFiles,其核心逻辑如下,较为简单,直接按照最大切分大小切分大文件来进行分片

def splitFiles(sparkSession: SparkSession,file: FileStatus,filePath: Path,isSplitable: Boolean,maxSplitBytes: Long,partitionValues: InternalRow): Seq[PartitionedFile] = {if (isSplitable) {// 切分为多个分片(0L until file.getLen by maxSplitBytes).map { offset =>val remaining = file.getLen - offsetval size = if (remaining > maxSplitBytes) maxSplitBytes else remainingval hosts = getBlockHosts(getBlockLocations(file), offset, size)PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)}} else {Seq(getPartitionedFile(file, filePath, partitionValues))}}

在获取到Seq[PartitionedFile]列表后,还并没有完成对文件的切分,还需要调用FilePartition#getFilePartitions做最后的处理,方法核心逻辑如下

def getFilePartitions(sparkSession: SparkSession,partitionedFiles: Seq[PartitionedFile],maxSplitBytes: Long): Seq[FilePartition] = {val partitions = new ArrayBuffer[FilePartition]val currentFiles = new ArrayBuffer[PartitionedFile]var currentSize = 0L/** Close the current partition and move to the next. */def closePartition(): Unit = {if (currentFiles.nonEmpty) {// Copy to a new Array.// 重新生成一个新的PartitionFileval newPartition = FilePartition(partitions.size, currentFiles.toArray)partitions += newPartition}currentFiles.clear()currentSize = 0}// 打开文件开销,默认为4MBval openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using "Next Fit Decreasing"partitionedFiles.foreach { file =>if (currentSize + file.length > maxSplitBytes) {// 如果累加的文件大小大于的最大切分大小,则关闭该分区,表示完成一个Task读取的数据切分closePartition()}// Add the given file to the current partition.currentSize += file.length + openCostInBytescurrentFiles += file}// 最后关闭一次分区,文件可能较小closePartition()partitions.toSeq}

可以看到经过这一步后,会把一些小文件做合并,生成maxSplitBytes大小的PartitionFile,这样可以避免拉起太多task读取太多小的文件。

生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度,也即最后Spark生成的Task个数

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

整体流程图如下图所示

拆分、合并过程如下图所示

实战

对于TPCH 10G生成的customer parquet表

https://oss.console.aliyun.com/bucket/oss-cn-hangzhou/fengzetest/object?path=rt_spark_test%2Fcustomer-parquet%2F

共8个Parquet文件,总文件大小为113.918MB

Spark作业配置如下,executor只有1core

conf spark.driver.resourceSpec=small;
conf spark.executor.instances=1;
conf spark.executor.resourceSpec=small;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;

根据前面的公式计算

defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(1, 2) = 2
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 2 = 72.959MB
maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

得到maxSplitBytes为72.959MB,从日志中也可看到对应大小

经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次经过合并后得到3个FilePartitioned,分别对应

  • FilePartitioned 1: 00000, 00001, 00002
  • FilePartitioned 2: 00003, 00004, 00006
  • FilePartitioned 3: 00005, 00007

即总共会生成3个Task

从Spark UI查看确实生成3个Task

从日志查看也是生成3个Task

变更Spark作业配置,5个executor共10core

conf spark.driver.resourceSpec=small;
conf spark.executor.instances=5;
conf spark.executor.resourceSpec=medium;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;

根据前面的公式计算

defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(10, 2) = 10
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 10 = 14.5918MB
maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

查看日志

此时可以看到14.5918MB会对源文件进行切分,会对00001, 00002,00003,00004,00005,00006进行切分,切分成两份,00007由于小于14.5918MB,因此不会进行切分,经过PartitionedFileUtil#splitFiles后,总共存在7 * 2 + 1 = 15个PartitionedFile

  • 00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)
  • 00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)
  • 00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)
  • 00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)
  • 00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)
  • 00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)
  • 00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)
  • 00007(0 -> 4.634MB)

经过排序后得到如下以及合并后得到10个FilePartitioned,分别对应

  • FilePartitioned 1: 00000(0 -> 14.5918MB)
  • FilePartitioned 2: 00001(0 -> 14.5918MB)
  • FilePartitioned 3: 00002(0 -> 14.5918MB)
  • FilePartitioned 4: 00003(0 -> 14.5918MB)
  • FilePartitioned 5: 00004(0 -> 14.5918MB)
  • FilePartitioned 6: 00005(0 -> 14.5918MB)
  • FilePartitioned 7: 00006(0 -> 14.5918MB)
  • FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
  • FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)
  • FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)

即总共会生成10个Task

通过Spark UI也可查看到生成了10个Task

查看日志,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)在同一个Task中

00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)

00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)在同一个Task中

总结

通过源码可知Spark对于源端Partition切分,会考虑到分区下所有文件大小以及打开每个文件的开销,同时会涉及对大文件的切分以及小文件的合并,最后得到一个相对合理的Partition。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510622.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库事务隔离发展历史

事务隔离是数据库系统设计中根本的组成部分,本文主要从标准层面来讨论隔离级别的发展历史,首先明确隔离级别划分的目标;之后概述其否定之否定的发展历程;进而引出 Adya给出的比较合理的隔离级别定义,最终总结隔离标准一…

为什么游戏行业喜欢用PolarDB

为什么游戏行业喜欢用PolarDB 游戏行业痛点 在我看来, 不同行业对数据库使用有巨大的差别. 比如游戏行业没有复杂的事务交易场景, 他有一个非常大的blob 字段用于存储角色的装备信息, 那么大Blob 字段的更新就会成为数据库的瓶颈, 比如在线教育行业需要有抢课的需求, 因此会有…

从业务开发中学习和理解架构设计

前言 在软件开发领域经常会接触到架构这个词汇,在我最初的印象中,架构是一个很高级的词汇。它似乎代表了复杂的工程结构、高层次的抽象设计、最新的开发语言特性等等。对于当时只专注于写业务逻辑的我来说,不免心生对架构的敬畏。工作中对架…

腾讯云联合中国联通打造下一代IDC数字化运营应用

近日,腾讯云联合中国联通智网创新中心打造的下一代IDC数字化运营应用,正式亮相2022中国联通合作伙伴大会。立足双方资源和经验优势,该数字化应用可实现对数据中心电力系统、制冷环境、安全防范等全方位智慧化运营,助力中国联通推进…

研发效能的思考总结

前言 在谈效能之前,我想先谈谈作为一个技术人或者技术TL,研发的核心价值是什么? 之前看了一篇文章,比较有意思,分享一下观念: T外包公司:最核心的竞争力不是技术,而是快速响应、资…

以“升舱”之名,谈谈云原生数据仓库 AnalyticDB 的核心技术

背景 说到升舱,我们首先想到的是飞机经济舱升级到商务舱、头等舱。阿里云企业级云原生数据仓库AnalyticDB(以下简称ADB)[1]在帮助以金融机构为主的行业数字化转型和传统数仓升级项目中,也引用了“升舱(仓)…

阿里云联合平行云推出云XR平台,支持沉浸式体验应用快速落地

近日,阿里云与平行云联合发布云XR平台,降低云端视觉计算应用的开发门槛,加速数字孪生、虚拟人、虚拟现实、沉浸式体验与虚拟仿真平台等XR应用落地,帮助互联网、新零售、社交、工业、交通、城市管理等行业探索创新业务形态。 阿里…

放弃笨重的 IDE,转而尝试 Emacs

【编者按】IDE对于开发者而言,重要性不言而喻,但随着功能愈发强大,它们对硬件的要求也日益提高,甚至越好用的 IDE 就越笨重。链接:https://renato.athaydes.com/posts/switching-from-heavyweight-ides-to-emacs.html声…

“穿越”到虚拟世界笑风生,网易瑶台沉浸式活动平台创新云端活动体验

2020年,第二届分布式人工智能国际会议(DAI 2020)因疫情的到来险些无法举办,南京大学人工智能学院的俞扬老师通过与网易伏羲团队的合作,最终在网易旗舰级武侠端游《逆水寒》中顺利举行了本次会议。300余位全球人工智能领…

云上解锁Web3.0 阿里云XR平台助力彼真科技呈现沉浸式演唱会

摘要:通过将沉浸式演唱会应用托管到云XR平台上,彼真科技也无需像传统应用一样做大量的终端适配工作,通过Web协议即可将虚拟演唱会《故障四方》的国风科幻世界呈现在不同的终端上。云XR平台完成了算力调度、渲染、推流、编码和用户管理等工作&…

这种精度高,消耗资源少的大模型稀疏训练方法被阿里云科学家找到了!

近日,阿里云机器学习PAI关于大模型稀疏训练的论文《Parameter-Efficient Sparsity for Large Language Models Fine-Tuning》被人工智能顶会IJCAI 2022接收。 论文提出了一种参数高效的稀疏训练算法PST,通过分析权重的重要性指标,得出了其拥有…

兼顾时尚与商务? ThinkPad Z 重新定义“商务本”

因为疫情等不确定因素,在过去三年中,混合办公模式已经成为职场的新常态。新一代工作族的商务办公笔记本不仅要随身携带完成日常工作,一些人还可能“身兼数职”,在工作之余也需要笔记本的配合。办公室里严谨、内敛商务本设计&#…

Go原生插件使用问题全解析

导言 本人在设计和落地基于Go原生插件机制的扩展开发产品时踩到了很多坑,由于这方面相关资料很少,因而借此机会做一个非常粗浅的总结,希望能对大家有所帮助。 本文只说问题和解决方案,不读代码。 一些背景知识 2.1 运行时 通…

从云计算到函数计算

从云计算到函数计算 函数计算,你的名字 云计算,是一种基于互联网的计算方式,通过这种方式,共享的软硬件资源和信息可以按需求提供给计算机各种终端和其他设备,使用服务商提供的电脑基建作计算资源,因此用…

基于 OPLG 从 0 到 1 构建统一可观测平台实践

应用架构与可观测技术演进历程 在软件开发早期,单体应用架构因其结构简单,便于测试和部署,得到了广泛的应用,对应的监控诊断技术主要是基于日志和日志关键词的指标监控。随着软件复杂度的不断提升,单体应用架构逐步向分…

从运维到运维大神,只需要一个正确的选择

马上就是7月24日了,听群里的朋友说,7和24这两个数字是运维工作的最佳体现——7X24小时待命,所以咱们IT人将这一天自定义为“运维日”。 对于运维工作来说,想要在黑天鹅横飞,灰犀牛直撞的当下,既能独善其身…

主流定时任务解决方案全横评

定时任务作为一种按照约定时间执行预期逻辑的通用模式,在企业级开发中承载着丰富的业务场景,诸如后台定时同步数据生成报表,定时清理磁盘日志文件,定时扫描超时订单进行补偿回调等。 程序开发人员在定时任务领域有着诸多框架和方…

基于阿里云 Serverless 函数计算开发的疫情数据统计推送机器人

一、Serverless函数计算 什么是Serverless? 在《Serverless Architectures》中对 Serverless 是这样子定义的: Serverless was first used to describe applications that significantly or fully incorporate third-party, cloud-hosted applications…

看 Serverless Task 如何解决任务调度可观测性中的问题

在上篇文章《解密函数计算异步任务能力之「任务的状态及生命周期管理」》中,我们介绍了任务系统的状态管理,并介绍了用户应如何根据需求,对任务状态信息进行实时的查询等操作。在本篇中我们将会进一步走进函数计算异步任务,介绍异…

B站每日自动签到传统单节点网站的 Serverless 上云

什么是函数?刚刚考完数学没多久的我,脑力里立马想到的是自变量、因变量、函数值,也就是yf(x)。当然,在计算机里,函数function往往指的是一段被定义好的代码程序,我们可以通过传参调用这个定义好的函数&…