Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【电路笔记】-放大器类型

放大器类型 文章目录 放大器类型1、概述2、关于偏置的注意事项3、A类(Class A)放大器4、B类(Class B)放大器5、AB类(Class AB)放大器6、C类(Class C)放大器7、总结1、概述 放大器通常根据输出级的结构进行分类。 事实上,功率放大确实发生在该阶段,因此输出信号的质量和…

Arduino (esp ) 下String的内存释放

在个人的开源项目 GitHub - StarCompute/tftziku: 这是一个通过单片机在各种屏幕上显示中文的解决方案 中为了方便快速检索使用了string,于是这个string在esp8266中占了40多k,原本以为当string设置为""的时候这个40k就可以回收,结果发觉不行…

【JS异步编程】async/await——用同步代码写异步

历史小剧场 懂得暴力的人,是强壮的;懂得克制暴力的人,才是强大的。----《明朝那些事儿》 什么是 async/await async: 声明一个异步函数 自动将常规函数转换成Promise,返回值也是一个Promise对象;只有async函数内部的异…

Java SE入门及基础(59) 线程的实现(上) 线程的创建方式 线程内存模型 线程安全

目录 线程(上) 1. 线程的创建方式 Thread类常用构造方法 Thread类常用成员方法 Thread类常用静态方法 示例 总结 2. 线程内存模型 3.线程安全 案例 代码实现 执行结果 线程(上) 1. 线程的创建方式 An application t…

利用 Docker 简化 Nacos 部署:快速搭建 Nacos 服务

利用 Docker 简化 Nacos 部署:快速搭建 Nacos 服务 引言 在微服务架构中,服务注册与发现是确保服务间通信顺畅的关键组件。Nacos(Dynamic Naming and Configuration Service)作为阿里巴巴开源的一个服务发现和配置管理平台&…

任务调度器——任务切换

一、开启任务调度器 函数原型: void vTaskStartScheduler( void ) 作用:用于启动任务调度器,任务调度器启动后, FreeRTOS 便会开始进行任务调度 内部实现机制(以动态创建为例): &#xff0…

Linux 安装、配置Tomcat 的HTTPS

Linux 安装 、配置Tomcat的HTTPS 安装Tomcat 这里选择的是 tomcat 10.X ,需要Java 11及更高版本 下载页 ->Binary Distributions ->Core->选择 tar.gz包 下载、上传到内网服务器 /opt 目录tar -xzf 解压将解压的根目录改名为 tomat-10 并移动到 /opt 下, 形成个人…

测评推荐:企业管理u盘的软件有哪些?

U盘作为一种便携的存储设备,方便易用,被广泛应用于企业办公、个人学习及日常工作中。然而,U盘的使用也带来了数据泄露、病毒传播等安全隐患。为了解决这些问题,企业管理U盘的软件应运而生。 本文将对市面上流行的几款U盘管理软件…

Hadoop3:Yarn容量调度器配置多队列案例

一、情景描述 需求1: default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%。 二、多队列优点 (1)因为担心员工不小心,写递归死循环代码&#…

数据处理:四选一、四关联

今天去面试,面试官们给我一个‘选择’,有四个选项:‘展示你的才华’、‘展示你的美貌’、‘展示你的才华与美貌’、‘都不展示’ {label: “选择”,children: [{label: “展示你的才华”,children: [],isShow: talentModal,click: () > {i…

电路笔记(电源模块): 基于FT2232HL实现的jtag下载器硬件+jtag的通信引脚说明

JTAG接口说明 JTAG 接口根据需求可以选择20针或14针的配置,具体选择取决于应用场景和需要连接的功能。比如之前的可编程逻辑器件XC9572XL使用JTAG引脚(TCK、TDI、TDO、TMS、VREF、GND)用于与器件进行调试和编程通信。更详细的内容可以阅读11…

51单片机STC8H8K64U通过RA8889/RA8876如何控制彩屏(SPI源码下载)

【硬件部份】 一、硬件连接实物: STC8H系列单片机不需要外部晶振和外部复位,在相同的工作频率下,速度比传统的8051单片机要快12倍,具有高可靠抗干扰的优秀特性,与瑞佑的RA8889/RA8876控制芯片刚好可以完美搭配用于工…

redis实战-缓存雪崩问题及解决方案

定义理解 缓存雪崩是指在同一时间段,大量缓存的key同时失效,或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力 和缓存击穿的区别: 缓存雪崩是由于缓存中的大量数据同时失效或缓存服务器故障引起的&#xff1b…

(漏洞检查项) | 服务端请求伪造 SSRF

(漏洞检查项)|服务端请求伪造 SSRF 漏洞场景 服务端请求伪造(SSRF,Server-Side Request Forgery)漏洞发生在应用程序允许攻击者通过构造恶意请求,利用服务器端发起HTTP请求,并访问内部资源或进行其他未授权操作。 漏…

css_20_定位

相对定位 设置相对定位 给元素设置 position: relative 即可实现相对定位。 可以使用 left、right、top 、 bottom 四个属性调整位置。 相对定位的参考点是相对自己原来的位置相对定位的特点: 1.不会脱离文档流,元素位置的变化,只…

机器学习周记(第四十五周:Graphformer)2024.6.24~2024.6.30

目录 摘要ABSTRACT1 论文信息1.1 论文标题1.2 论文摘要1.3 论文引言1.4 论文贡献 2 论文模型2.1 问题定义2.2 模型架构2.2.1 自注意下采样模块(Self-attention down-sampling module)2.2.2 稀疏图自注意力机制(Sparse graph self-attention m…

python自动移除excel文件密码(小工具)

安装 msoffcrypto-tool 使用pip命令安装: 打开命令行工具(如终端、命令提示符或Powershell),然后输入以下命令来安装msoffcrypto-tool: pip install msoffcrypto-tool库,进行自动移除excel文件密码 import msoffcrypt…

【C++】using namespace std 到底什么意思

📢博客主页:https://blog.csdn.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文作为 JohnKi 的学习笔记,引用了部分大佬的案例 📢未来很长&a…

新手练习项目 7:猜数字游戏

名人说:莫听穿林打叶声,何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder) 目录 一、项目描述二、项目实现三、项目步骤四、项目扩展方向 更多项目内容,请关注我、订…

comsol学习笔记

comsol岩土力学与流固耦合的学习 comsol的相关视频教程 https://www.bilibili.com/video/BV1Cu4y1r7Gn/?spm_id_from333.337.search-card.all.click&vd_source02b2bad477a153eaeb9c48cbbedaf8df [这里面有讲解地应力平衡技术] https://www.bilibili.com/video/BV17C4y1j…