Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【电路笔记】-放大器类型

放大器类型 文章目录 放大器类型1、概述2、关于偏置的注意事项3、A类(Class A)放大器4、B类(Class B)放大器5、AB类(Class AB)放大器6、C类(Class C)放大器7、总结1、概述 放大器通常根据输出级的结构进行分类。 事实上,功率放大确实发生在该阶段,因此输出信号的质量和…

Java SE入门及基础(59) 线程的实现(上) 线程的创建方式 线程内存模型 线程安全

目录 线程(上) 1. 线程的创建方式 Thread类常用构造方法 Thread类常用成员方法 Thread类常用静态方法 示例 总结 2. 线程内存模型 3.线程安全 案例 代码实现 执行结果 线程(上) 1. 线程的创建方式 An application t…

利用 Docker 简化 Nacos 部署:快速搭建 Nacos 服务

利用 Docker 简化 Nacos 部署:快速搭建 Nacos 服务 引言 在微服务架构中,服务注册与发现是确保服务间通信顺畅的关键组件。Nacos(Dynamic Naming and Configuration Service)作为阿里巴巴开源的一个服务发现和配置管理平台&…

任务调度器——任务切换

一、开启任务调度器 函数原型: void vTaskStartScheduler( void ) 作用:用于启动任务调度器,任务调度器启动后, FreeRTOS 便会开始进行任务调度 内部实现机制(以动态创建为例): &#xff0…

Linux 安装、配置Tomcat 的HTTPS

Linux 安装 、配置Tomcat的HTTPS 安装Tomcat 这里选择的是 tomcat 10.X ,需要Java 11及更高版本 下载页 ->Binary Distributions ->Core->选择 tar.gz包 下载、上传到内网服务器 /opt 目录tar -xzf 解压将解压的根目录改名为 tomat-10 并移动到 /opt 下, 形成个人…

测评推荐:企业管理u盘的软件有哪些?

U盘作为一种便携的存储设备,方便易用,被广泛应用于企业办公、个人学习及日常工作中。然而,U盘的使用也带来了数据泄露、病毒传播等安全隐患。为了解决这些问题,企业管理U盘的软件应运而生。 本文将对市面上流行的几款U盘管理软件…

Hadoop3:Yarn容量调度器配置多队列案例

一、情景描述 需求1: default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%。 二、多队列优点 (1)因为担心员工不小心,写递归死循环代码&#…

电路笔记(电源模块): 基于FT2232HL实现的jtag下载器硬件+jtag的通信引脚说明

JTAG接口说明 JTAG 接口根据需求可以选择20针或14针的配置,具体选择取决于应用场景和需要连接的功能。比如之前的可编程逻辑器件XC9572XL使用JTAG引脚(TCK、TDI、TDO、TMS、VREF、GND)用于与器件进行调试和编程通信。更详细的内容可以阅读11…

51单片机STC8H8K64U通过RA8889/RA8876如何控制彩屏(SPI源码下载)

【硬件部份】 一、硬件连接实物: STC8H系列单片机不需要外部晶振和外部复位,在相同的工作频率下,速度比传统的8051单片机要快12倍,具有高可靠抗干扰的优秀特性,与瑞佑的RA8889/RA8876控制芯片刚好可以完美搭配用于工…

redis实战-缓存雪崩问题及解决方案

定义理解 缓存雪崩是指在同一时间段,大量缓存的key同时失效,或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力 和缓存击穿的区别: 缓存雪崩是由于缓存中的大量数据同时失效或缓存服务器故障引起的&#xff1b…

机器学习周记(第四十五周:Graphformer)2024.6.24~2024.6.30

目录 摘要ABSTRACT1 论文信息1.1 论文标题1.2 论文摘要1.3 论文引言1.4 论文贡献 2 论文模型2.1 问题定义2.2 模型架构2.2.1 自注意下采样模块(Self-attention down-sampling module)2.2.2 稀疏图自注意力机制(Sparse graph self-attention m…

【C++】using namespace std 到底什么意思

📢博客主页:https://blog.csdn.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文作为 JohnKi 的学习笔记,引用了部分大佬的案例 📢未来很长&a…

新手练习项目 7:猜数字游戏

名人说:莫听穿林打叶声,何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder) 目录 一、项目描述二、项目实现三、项目步骤四、项目扩展方向 更多项目内容,请关注我、订…

打靶记录——靶机medium_socnet

靶机下载地址 https://www.vulnhub.com/entry/boredhackerblog-social-network,454/ 打靶过程 由于靶机和我的Kali都处于同一个网段,所以使用arpscan二次发现技术来识别目标主机的IP地址 arpscan -l除了192.168.174.133,其他IP都是我VMware虚拟机正…

【Spring Boot】认识 JPA 的接口

认识 JPA 的接口 1.JPA 接口 JpaRepository2.分页排序接口 PagingAndSortingRepository3.数据操作接口 CrudRepository4.分页接口 Pageable 和 Page5.排序类 Sort JPA 提供了操作数据库的接口。在开发过程中继承和使用这些接口,可简化现有的持久化开发工作。可以使 …

springboot学习,如何用redission实现分布式锁

目录 一、springboot框架介绍二、redission是什么三、什么是分布式锁四、如何用redission实现分布式锁 一、springboot框架介绍 Spring Boot是一个开源的Java框架,由Pivotal团队(现为VMware的一部分)于2013年推出。它旨在简化Spring应用程序…

大数据面试题之Spark(1)

目录 Spark的任务执行流程 Spark的运行流程 Spark的作业运行流程是怎么样的? Spark的特点 Spark源码中的任务调度 Spark作业调度 Spark的架构 Spark的使用场景 Spark on standalone模型、YARN架构模型(画架构图) Spark的yarn-cluster涉及的参数有哪些? Spark提交jo…

编码大模型系列:Meta创新的“代码编译优化”的LLM

鲁班号导读正式上线。移步“鲁班秘笈”,查阅更多内容。 大型语言模型 (LLM) 已在各种软件工程和编码任务中展现出卓越的能力。然而,它们在代码和编译器优化领域的应用仍未得到充分探索。训练LLM需要大量资源,需要大量的 GPU时间和大量的数据…

一个合理的前端应用文件结构

在大型应用中,最关键且最具挑战性的方面之一就是拥有一个良好且合理的文件结构。在考虑通过微前端将代码库拆分成多个应用之前,可以遵循一些步骤来改善项目级别的架构,并在您考虑这一路径时使过渡更容易。 我们的目标是应用某种模块化方法&am…

MSPM0G3507——定时器例程讲解4——timx_timer_mode_periodic

以下示例以周期模式配置TimerG并切换LED。周期从500ms开始,每次切换减少50ms,直到周期为100ms,然后重复。设备在等待中断时保持待机模式 #include "ti_msp_dl_config.h"/* ((32KHz / (321)) * 0.5s) 45 - 1 495 due to N1 ticks …