Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))

这种情况下,就会只有一个reduce任务运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/761792.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sentinel熔断规则详解

1、慢调用降级熔断 1.1、参数详解 最大RT&#xff1a;调用接口的最大时间。 比例阈值&#xff1a;超过了最大RT调用时间的请求的比例。 熔断时长&#xff1a;触发熔断后&#xff0c;熔断的时间 最小请求数据&#xff1a;每秒最少的请求数量&#xff0c;只有大于等于这个数…

SQLiteC/C++接口详细介绍sqlite3_stmt类(九)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;六&#xff09; 下一篇&#xff1a; 无 33、sqlite3_column_table_name 函数 sqlite3_column_table_name 用于返回结果集中指定列所属的表的名称。如果查询中列使…

前端案例:产品模块

文章目录 产品模块效果结构布局分析父级盒子布局图片和段落评价和详情 产品模块效果 结构布局分析 1、大的父级盒子包含全部的内容 2、内容装入 图片&#xff08;img标签&#xff09;&#xff1b;分别三个子盒子装入两段评价以及商品信息。 父级盒子布局 div {width: 300px…

网络通信——IP地址、端口号、协议(TCP、UDP)

通信架构 网络通信三要素 IP地址 IPv4地址 IPv6地址 IP域名 IP常识 端口号 概念 协议 开放式网络互联标准&#xff1a;OSI、TCP/IP 传输层的2个通信协议——UDP、TCP TCP协议&#xff1a;三次握手建立建立可靠连接 进行三次握手的原因&#xff1a;为了确保客户端和服务端…

实时数仓之实时数仓架构(Doris)

目前比较流行的实时数仓架构有两类,其中一类是以Flink+Doris为核心的实时数仓架构方案;另一类是以湖仓一体架构为核心的实时数仓架构方案。本文针对Flink+Doris架构进行介绍,这套架构的特点是组件涉及相对较少,架构简单,实时性更高,且易于Lambda架构实现,Doris本身可以支…

R语言Meta分析核心技术:回归诊断与模型验证

R语言作为一种强大的统计分析和绘图语言&#xff0c;在科研领域发挥着日益重要的作用。其中&#xff0c;Meta分析作为一种整合多个独立研究结果的统计方法&#xff0c;在R语言中得到了广泛的应用。通过R语言进行Meta分析&#xff0c;研究者能够更为准确、全面地评估某一研究问题…

安卓studio连接手机之后,一两秒之后就自动断开了。问题解决。

太坑了&#xff0c;安卓studio链接手机之后。几秒之后就断开了。我以为是adb的问题&#xff0c;就重新安装了一下adb。并且在环境变量中配置了Path的路径。然而并没有什么用啊。 后来查看是wps的服务和ADB有冲突。直接把WPS卸载掉之后就没有出现链接手机闪现的的问题。

基于python+vue研究生志愿填报辅助系统flask-django-php-nodejs

二十一世纪我们的社会进入了信息时代&#xff0c;信息管理系统的建立&#xff0c;大大提高了人们信息化水平。传统的管理方式对时间、地点的限制太多&#xff0c;而在线管理系统刚好能满足这些需求&#xff0c;在线管理系统突破了传统管理方式的局限性。于是本文针对这一需求设…

阿里云国际该如何设置DDoS高防防护策略?

DDoS高防提供针对网络四层DDoS攻击的防护策略设置功能&#xff0c;例如虚假源和空连接检测、源限速、目的限速&#xff0c;适用于优化调整非网站业务的DDoS防护策略。在DDoS高防实例下添加端口转发规则&#xff0c;接入非网站业务后&#xff0c;您可以单独设置某个端口的DDoS防…

Hive SQL必刷练习题:留存率问题(*****)

留存率&#xff1a; 首次登录算作当天新增&#xff0c;第二天也登录了算作一日留存。可以理解为&#xff0c;在10月1号登陆了。在10月2号也登陆了&#xff0c;那这个人就可以算是在1号留存 今日留存率 &#xff08;今日登录且明天也登录的用户数&#xff09; / 今日登录的总…

蓝桥杯STM32 G431 hal库开发速成——输入捕获

蓝桥杯的输入捕获较为简单&#xff0c;基本不涉及溢出的问题。所以这里就不介绍溢出了。文末有源码。 一、Cubemx配置 二、代码编写 1.在捕获回调函数中 void HAL_TIM_IC_CaptureCallback(TIM_HandleTypeDef *htim) {if(htim->InstanceTIM3){switch(count){case 1:{jishu1…

如何让uni-app开发的H5页面顶部原生标题和小程序的顶部标题不一致?

如何让标题1和标题2不一样&#xff1f; 修改根目录下的App.vue&#xff08;核心代码如下&#xff09; <script>export default {onLaunch() {// 监听各种跳转----------------------------------------[navigateTo, redirectTo, reLaunch, switchTab, navigateBack, ].…

【JSON2WEB】10 基于 Amis 做个登录页面login.html

【JSON2WEB】01 WEB管理信息系统架构设计 【JSON2WEB】02 JSON2WEB初步UI设计 【JSON2WEB】03 go的模板包html/template的使用 【JSON2WEB】04 amis低代码前端框架介绍 【JSON2WEB】05 前端开发三件套 HTML CSS JavaScript 速成 【JSON2WEB】06 JSON2WEB前端框架搭建 【J…

《云计算:数字时代的引擎》

在数字化时代&#xff0c;云计算技术以其强大的计算能力和灵活的应用方式&#xff0c;成为推动各行各业发展的引擎。本文将围绕云计算的技术进展、技术原理、行业应用案例、面临的挑战与机遇以及未来趋势进行详细探讨。 云计算的技术进展 云计算的技术进展涵盖了多个方面&…

python(django)之产品后台管理功能实现

1、添加新项目 在命令行输入以下代码 python manage.py startapp prroduct 2、添加路径和代码结构 在新项目目录下admin.py中加入以代码 from .models import Product class ProductAdmin(admin.ModelAdmin):list_display [product_name, product_desc,producter,created_…

基于Springboot的闲置图书分享(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的闲置图书分享&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&…

Linux服务器导出CPU和内存使用情况

Linux服务器默认存储一个月的CPU和内存记录&#xff0c;所在目录&#xff1a;/var/log/sa/&#xff0c;如下图所示 在此用sar命令来执行 sar是一个比较全面的性能监控工具&#xff0c;包括cpu、内存、磁盘和网络等信息&#xff0c;并且该命令会每10分钟自动保存一次硬件资源使用…

odoo扩展导出pdf功能

1. 说明: odoo原生导出功能扩展导出pdf文件功能, 如有额外需求请联系博主 2. 版本说明: odoo版本: odoo15 其他odoo版本未进行测试,如有需要自行测试 3. 地址: 该补丁代码放在github仓库, 地址: https://github.com/YSL-Alpaca/odoo_export_pdf 4. 改补丁依赖于第三方软件wkh…

ubuntu20.04搭建nginx rtmp视频服务到指定位置解决权限不足

1.安装依赖 apt-get install build-essential libpcre3 libpcre3-dev libssl-dev2.建一个目录 mldir rtmp_nginx 3.源码下载 wget http://nginx.org/download/nginx-1.21.6.tar.gz wget https://github.com/arut/nginx-rtmp-module/archive/master.zip4.解压缩 tar -xf ng…

IBM SPSS Statistics for Mac v27.0.1中文激活版

IBM SPSS Statistics for Mac是一款功能强大的统计分析软件&#xff0c;专为Mac用户设计&#xff0c;用于数据分析和决策支持。该软件拥有直观易用的界面和丰富多样的统计工具&#xff0c;使得用户可以轻松进行数据处理、分析和解释。 软件下载&#xff1a;IBM SPSS Statistics…