Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))

这种情况下,就会只有一个reduce任务运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/761792.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sentinel熔断规则详解

1、慢调用降级熔断 1.1、参数详解 最大RT&#xff1a;调用接口的最大时间。 比例阈值&#xff1a;超过了最大RT调用时间的请求的比例。 熔断时长&#xff1a;触发熔断后&#xff0c;熔断的时间 最小请求数据&#xff1a;每秒最少的请求数量&#xff0c;只有大于等于这个数…

SQLiteC/C++接口详细介绍sqlite3_stmt类(九)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;六&#xff09; 下一篇&#xff1a; 无 33、sqlite3_column_table_name 函数 sqlite3_column_table_name 用于返回结果集中指定列所属的表的名称。如果查询中列使…

Android中的前台Service

文章目录 概念介绍使用场景启动方式结束方式概念介绍 前台服务可以说是除了绑定式Service和非绑定式Service之外,又一种Service类型。 顾名思义,它是运行在前台可以和用户打交道的Service。也因此它的优先级相比另外两个运行在后台的Service要高,几乎不会被系统回收。 使…

前端案例:产品模块

文章目录 产品模块效果结构布局分析父级盒子布局图片和段落评价和详情 产品模块效果 结构布局分析 1、大的父级盒子包含全部的内容 2、内容装入 图片&#xff08;img标签&#xff09;&#xff1b;分别三个子盒子装入两段评价以及商品信息。 父级盒子布局 div {width: 300px…

网络通信——IP地址、端口号、协议(TCP、UDP)

通信架构 网络通信三要素 IP地址 IPv4地址 IPv6地址 IP域名 IP常识 端口号 概念 协议 开放式网络互联标准&#xff1a;OSI、TCP/IP 传输层的2个通信协议——UDP、TCP TCP协议&#xff1a;三次握手建立建立可靠连接 进行三次握手的原因&#xff1a;为了确保客户端和服务端…

安卓面试题多线程 146-152

146. 简述AQS 支持两种同步方式 ?1、独占式 2、共享式 这样方便使用者实现不同类型的同步组件,独占式如 ReentrantLock,共享式如Semaphore,CountDownLatch,组合式的如 ReentrantReadWriteLock。总之,AQS 为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。147. 简…

cad vba 打开excel并弹窗打开指定文件

CAD vba 代码实现打开excel,并通过对话框选择xls文件&#xff0c;并打开此文件进行下一步操作。代码如下: excel.activeworkbook.sheets(1) excel对象下activeworkbook,再往下是sheets对象&#xff0c;(1)为第一个表&#xff0c; thisworkbook是vba代码所在的工作簿。 Opti…

实时数仓之实时数仓架构(Doris)

目前比较流行的实时数仓架构有两类,其中一类是以Flink+Doris为核心的实时数仓架构方案;另一类是以湖仓一体架构为核心的实时数仓架构方案。本文针对Flink+Doris架构进行介绍,这套架构的特点是组件涉及相对较少,架构简单,实时性更高,且易于Lambda架构实现,Doris本身可以支…

R语言Meta分析核心技术:回归诊断与模型验证

R语言作为一种强大的统计分析和绘图语言&#xff0c;在科研领域发挥着日益重要的作用。其中&#xff0c;Meta分析作为一种整合多个独立研究结果的统计方法&#xff0c;在R语言中得到了广泛的应用。通过R语言进行Meta分析&#xff0c;研究者能够更为准确、全面地评估某一研究问题…

在OAK-D S2相机上应用ORB_SLAM3

文章目录 ROS1 noetic + depthai_rosORB_SLAM3什么是ORB_SLAM3怎么安装运行ROS1 noetic + depthai_ros 目前X86和arch64平台测试安装包没有问题。 树莓派上安装ROS需要自己编译安装,时间比较长,需要测试的可以到 官网 查看,替换下面安装ROS步骤就可以了。 ubuntu20.04推荐…

突破界面开发的边界:使用Fizzgui将Go语言和HTML/CSS相结合

简洁与高效&#xff1a;使用Go-qt和Go-walk开发跨平台GUI应用程序的最佳选择 前言 在当今软件开发领域&#xff0c;图形用户界面&#xff08;GUI&#xff09;已经成为了几乎所有应用程序的标配。Go语言作为一门强大而灵活的编程语言&#xff0c;也提供了多种选择来开发图形界…

安卓studio连接手机之后,一两秒之后就自动断开了。问题解决。

太坑了&#xff0c;安卓studio链接手机之后。几秒之后就断开了。我以为是adb的问题&#xff0c;就重新安装了一下adb。并且在环境变量中配置了Path的路径。然而并没有什么用啊。 后来查看是wps的服务和ADB有冲突。直接把WPS卸载掉之后就没有出现链接手机闪现的的问题。

基于python+vue研究生志愿填报辅助系统flask-django-php-nodejs

二十一世纪我们的社会进入了信息时代&#xff0c;信息管理系统的建立&#xff0c;大大提高了人们信息化水平。传统的管理方式对时间、地点的限制太多&#xff0c;而在线管理系统刚好能满足这些需求&#xff0c;在线管理系统突破了传统管理方式的局限性。于是本文针对这一需求设…

golang通过参数控制HTTP server是否使用基本认证

之前写的《golang实现一个BasicAuth的HTTP server》一定会做基本认证。 本例给出了如何通过启动时候指定的参数来控制是否做基本认证 代码对比和解释 给出与上一篇中源码的diff adminhpc-1:~/go/auth_http$ diff -ruN http_rpc_server.go_bak http_rpc_server.go --- http_rp…

阿里云国际该如何设置DDoS高防防护策略?

DDoS高防提供针对网络四层DDoS攻击的防护策略设置功能&#xff0c;例如虚假源和空连接检测、源限速、目的限速&#xff0c;适用于优化调整非网站业务的DDoS防护策略。在DDoS高防实例下添加端口转发规则&#xff0c;接入非网站业务后&#xff0c;您可以单独设置某个端口的DDoS防…

Hive SQL必刷练习题:留存率问题(*****)

留存率&#xff1a; 首次登录算作当天新增&#xff0c;第二天也登录了算作一日留存。可以理解为&#xff0c;在10月1号登陆了。在10月2号也登陆了&#xff0c;那这个人就可以算是在1号留存 今日留存率 &#xff08;今日登录且明天也登录的用户数&#xff09; / 今日登录的总…

[Java安全入门]六.CC2+CC4+CC5+CC7

一.前言 与前面几条cc不同的是&#xff0c;cc2的依赖是4.0版本&#xff0c;并且解决了高版本无法使用AnnotationInvocationHandler类的弊端。cc2使用javassist和PriorityQueue来构造链。 二.添加依赖 <dependencies><!-- https://mvnrepository.com/artifact/common…

读书笔记--阅读华为数据治理之旅有感

通过阅读华为的数据治理之旅,了解到华为公司作为高科技企业的引领者,在数据治理工作、数字化智能化转型方面的确有许许多多值得大家学习的地方,华为公司的业务范围广泛,市场竞争压力大,迫切需要用一些高效的手段来减轻员工的工作量,让员工各司其职,在各自承担的主营业务…

蓝桥杯STM32 G431 hal库开发速成——输入捕获

蓝桥杯的输入捕获较为简单&#xff0c;基本不涉及溢出的问题。所以这里就不介绍溢出了。文末有源码。 一、Cubemx配置 二、代码编写 1.在捕获回调函数中 void HAL_TIM_IC_CaptureCallback(TIM_HandleTypeDef *htim) {if(htim->InstanceTIM3){switch(count){case 1:{jishu1…

数据分析-概率分布

概率分布 概率分布(Probability Distributions)离散概率分布伯努利分布(Bernoulli Distribution)二项分布(The Binomial distribution)泊松分布(Poisson Distribution) 连续概率分布均匀分布(Uniform Distribution)正态分布(Normal Distribution)指数分布&#xff08;Exponenti…