Spark 的 Skew Join 详解

    Skew Join 是 Spark 中为了解决数据倾斜问题而设计的一种优化机制。数据倾斜是指在分布式计算中,由于某些 key 具有大量数据,而其他 key 数据较少,导致某些分区的数据量特别大,造成计算负载不均衡。数据倾斜会导致个别节点出现性能瓶颈,影响整个任务的完成时间。

    Skew Join 的优化机制在 Spark 中主要解决了 JOIN 操作中的数据倾斜问题。为了更好地理解 Skew Join 的原理和实现,我们需要从数据倾斜产生的原因、Spark 如何识别数据倾斜、以及 Skew Join 的优化策略和底层实现等方面来进行详细解释。

一、什么是数据倾斜

        数据倾斜指的是当某些 key 关联了异常大量的数据,而其他 key 关联的数据量较少时,数据分布的不均衡会导致计算瓶颈。例如,在 JOIN 操作中,如果表 A 中某个 key 具有大量的数据,而表 B 中同样的 key 也有大量数据,当这两个表基于这个 key 进行 JOIN 时,由于该 key 被分配到一个或少数几个分区,相关的任务会处理大量的数据,而其他分区的任务数据量却较少。这会导致部分任务比其他任务运行时间长,从而影响整个任务的执行时间。

二、Spark 中如何识别数据倾斜

        在执行 JOIN 操作时,Spark 会通过数据采样和统计信息来检测是否存在数据倾斜。Spark SQL 可以通过分析数据分布,计算每个 key 的数据量,当发现某些 key 占据了大量的行时,Spark 会将其标记为 "倾斜的 key"。对于这些倾斜的 key,Spark 会进行特殊处理,避免过度集中在某些分区中。

Spark 的 Skew Join 优化主要依赖于配置参数和数据采样来检测并处理这些倾斜的 key

检测数据倾斜的主要参数:
  • spark.sql.autoSkewJoin.enabled: 默认是 false,如果设置为 true,Spark 会自动检测和处理数据倾斜的 JOIN 操作。
  • spark.sql.skewJoin.threshold: 用来设定 Spark 如何判断某个分区是否倾斜。该参数设置的值是数据倾斜的阈值,通常是一个比例值,如果某个分区的数据量超过该比例值,则会被视为倾斜的分区。

三、Skew Join 的底层原理

        当 Spark 识别出 JOIN 中存在数据倾斜时,Skew Join 会将倾斜的 key 拆分成多个子任务分别处理。具体而言,Skew Join 的主要思想是将倾斜的 key 拆分到多个不同的分区,从而将任务的计算负载均匀分布,避免单个分区处理过多数据。

以下是 Skew Join 的执行流程:

  1. 普通的非倾斜 key 处理

    对于普通的非倾斜 keySkew Join 没有特别的处理方式,Spark 直接按照 key 进行 Shuffle,将数据发送到相应的分区,并进行 JOIN 操作。
  2. 倾斜的 key 处理

        对于检测到的倾斜 key,Spark 会进行特殊处理,具体步骤如下:

  • Spark 会将倾斜的 key 的数据进行重新分片,将大数据量的倾斜 key 拆分成多个子分区。
  • 然后对于每一个子分区,分别与另一个表中的对应数据进行 JOIN
  • 通过多次 JOIN 操作,将这些子分区结果合并为最终的 JOIN 输出结果。

     3. Hash Salt(哈希加盐)

        为了避免倾斜的 key 被集中到同一个分区,Spark 会通过对倾斜的 key 添加一个随机的 salt(盐值)来打散数据。具体来说,Spark 会将倾斜的 key 拆分成多个子 key,通过附加随机数(salt),使得这些子 key 被分布到不同的分区。

伪代码展示:
// 倾斜 key 的原始 join
tableA.join(tableB, "key")// Skew Join 处理
val skewKeys = getSkewKeys()
for (skewKey <- skewKeys) {val saltedTableA = tableA.filter($"key" === skewKey).withColumn("salt", rand())val saltedTableB = tableB.filter($"key" === skewKey).withColumn("salt", rand())saltedTableA.join(saltedTableB, Seq("key", "salt"))
}

通过引入 salt,可以有效地将数据均匀分布到不同的分区,减少单个分区处理的数据量。

四、Skew Join 的源代码实现

        在 Spark SQL 中,Skew Join 是作为 PhysicalPlan 中 Join 的一个优化执行计划。关键类为 EnsureRequirements,其主要职责是对 Join 的物理计划执行前进行必要的调整,包括处理数据倾斜的 Skew Join 优化。

以下是 EnsureRequirements 中处理数据倾斜的相关部分源代码:

private def applySkewJoin(plan: SparkPlan): SparkPlan = plan match {case join @ ShuffledHashJoinExec(_, _, _, _, left, right) =>// 检查是否有数据倾斜if (isSkewed(join)) {// 处理 skew join,使用 hash salt 拆分倾斜的 keyval skewJoin = handleSkewJoin(join)skewJoin} else {join}case other => other
}

        在 EnsureRequirements 中,applySkewJoin 函数会检测当前的 JOIN 是否存在数据倾斜问题。如果检测到数据倾斜,handleSkewJoin 函数会对数据进行处理,创建一个带有 salt 的 Skew Join 执行计划。

具体实现步骤:
  1. 检测数据倾斜isSkewed(join) 函数负责检测 JOIN 中的分区是否有数据倾斜。通常,通过采样和统计每个分区的数据量,来判断某个分区的数据量是否超出设定的阈值(spark.sql.skewJoin.threshold)。

  2. 处理倾斜数据handleSkewJoin(join) 函数是 Skew Join 的核心实现。它会通过对倾斜的 key 添加 salt 进行打散,使得数据均匀分布到多个子分区。

private def handleSkewJoin(join: ShuffledHashJoinExec): SparkPlan = {val skewKeys = getSkewKeys(join)val saltedLeft = splitAndSalt(join.left, skewKeys)val saltedRight = splitAndSalt(join.right, skewKeys)saltedLeft.join(saltedRight)
}private def splitAndSalt(plan: SparkPlan, skewKeys: Seq[KeyType]): SparkPlan = {// 对每个倾斜 key 进行拆分并添加 saltplan.transform {case rdd: RDD[_] => rdd.mapPartitionsInternal { iter =>iter.flatMap { row =>val key = getJoinKey(row)if (skewKeys.contains(key)) {val salt = Random.nextInt(numSplits) // 随机生成 saltSome((key, salt, row))} else {Some((key, row))}}}}
}

        在上面的代码中,splitAndSalt 函数将每个倾斜的 key 拆分成多个子 key,并为它们添加随机 salt,从而打散数据,均匀分布到不同的分区。

五、Skew Join 的优化策略

Spark 中 Skew Join 的优化需要考虑以下几个方面:

  1. 自动启用 Skew Join:通过设置 spark.sql.autoSkewJoin.enabled 为 true,Spark 会自动检测并处理倾斜的 JOIN 操作。对于那些倾斜的分区,Spark 会自动进行 Skew Join 优化。

  2. 调优 salt 值salt 的值影响了倾斜数据被打散的粒度。通过调节 salt 的随机范围,可以控制数据的打散程度。如果 salt 的范围太小,数据可能仍然集中在某些分区;如果范围太大,则可能会产生过多的小分区,导致计算开销增加。

  3. 采样优化:通过调整采样参数,Spark 可以更好地识别出数据倾斜的 key,从而提高 Skew Join 的处理效率。spark.sql.skewJoin.threshold 参数允许用户设定数据倾斜的阈值。

  4. 数据预处理:在某些场景中,用户可以通过在数据加载和预处理阶段手动解决数据倾斜问题。例如,用户可以通过聚合或者过滤数据的方式,减少倾斜 key 的数据量。

六、总结

    Skew Join 是 Spark 中为了解决数据倾斜问题而提供的一种重要优化机制。其核心思想是通过检测数据倾斜的 key,并对这些 key 进行分片和哈希加盐处理,使得倾斜的数据被均匀分布到不同的分区,从而避免计算负载的不均衡。通过 Skew Join,Spark 可以显著提高 JOIN 操作的性能,尤其是在数据倾斜严重的场景下。

合理的参数调优和数据预处理是确保 Skew Join 有效的关键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/880496.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

韦东山FreeRTOS笔记

介绍 这篇文章是我学习FreeRTOS的笔记 学的是哔哩哔哩韦东山老师的课程 在学习FreeRTOS之前已经学习过江协的标准库和一丢丢的超子说物联网的HAL了。他们讲的都很不错 正在更新&#xff0c; 大家可以在我的Gitee仓库中下载笔记源文件、项目资料等 笔记源文件可以在Notion…

idea.vmoptions 最佳配置

1. 推荐的 idea64.exe.vmoptions 配置&#xff1a; -Xms1024m -Xmx4096m -XX:ReservedCodeCacheSize512m -XX:UseG1GC -XX:SoftRefLRUPolicyMSPerMB50 -XX:CICompilerCount4 -XX:HeapDumpOnOutOfMemoryError -XX:-OmitStackTraceInFastThrow -Dsun.io.useCanonCachesfalse -Dj…

微服务JSR303解析部署使用全流程

目录 1、什么是JSR303校验 2、小试牛刀 【2.1】添加依赖 【2.2】添加application.yml配置文件修改端口 【2.3】创建实体类User 【2.4】创建控制器 【2.5】创建启动类 【注意】不必创建前端页面 3、规范返回值格式&#xff1a; 3.1添加ResultCode工具类 3.2添加Resul…

NASA数据集:ATLAS/ICESat-2 L3B 南极和北极网格陆地冰高,第 3 版

目录 简介 摘要 代码 引用 网址推荐 0代码在线构建地图应用 机器学习 ATLAS/ICESat-2 L3B Gridded Antarctic and Arctic Land Ice Height V003 简介 ATLAS/ICESat-2 L3B 南极和北极网格陆地冰高&#xff0c;第 3 版 ATL14 和 ATL15 将 ATLAS/ICESat-2 L3B 年度陆地冰…

【蓝桥杯省赛真题55】Scratch找不同游戏 蓝桥杯scratch图形化编程 中小学生蓝桥杯省赛真题讲解

scratch找不同游戏 第十五届青少年蓝桥杯scratch编程选拔赛真题解析 PS&#xff1a;其实这题在选拔赛里面就出现过类似的题目&#xff0c;只是难度提升了一点&#xff0c;具体可以见【蓝桥杯选拔赛真题84】Scratch找不同游戏 第十五届蓝桥杯scratch图形化编程 少儿编程创意编…

java日志门面之JCL和SLF4J

文章目录 前言一、JCL1、JCL简介2、快速入门3、 JCL原理 二、SLF4J1、SLF4J简介2、快速入门2.1、输出动态信息2.2、异常信息的处理 3、绑定日志的实现3.1、slf4j实现slf4j-simple和logback3.2、slf4j绑定适配器实现log4j3.2、Slf4j注解 4、桥接旧的日志框架4.1、log4j日志重构为…

通过队列实现栈

请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&#xff09;。 实现 MyStack 类&#xff1a; void push(int x) 将元素 x 压入栈顶。int pop() 移除并返回栈顶元素。int to…

Android源码管理

文章目录 需求及场景需求困难疑惑点 源码管理方式及过程基本仓库管理方式 常用源码git 命令git init添加.gitignoregit add allgit add 文件名称git commit -a -m "提交内容说明"git statusgit loggit reset --hardgit clean -fd实际场景&#xff0c;从一个项目切换到…

大屏走马灯与echarts图表柱状图饼图开发小结

一、使用ant-design-vue的走马灯(a-carousel)注意事项 <!-- 左边的轮播图片 --><a-carousel :after-change"handleCarouselChange" autoplay class"carousel" :transition"transitionName"><div v-for"(item, index) in it…

论文阅读【时间序列】ModerTCN (ICLR2024)

【时间序列】ModerTCN (ICLR2024) 原文链接&#xff1a;ModernTCN: A Modern Pure Convolution Structure for General Time Series Analysis 代码仓库&#xff1a;ModerTCN 简易版本实现代码可以参考&#xff1a;&#xff08;2024 ICLR&#xff09;ModernTCN&#xff1a;A Mod…

解决hbase和hadoop的log4j依赖冲突的警告

一、运行hbase的发现依赖冲突的警告 这警告不影响使用 二、重命名log4j文件 进入HBase的lib包下&#xff0c;将HBase的log4j文件重命名&#xff0c;改成备份&#xff0c;这样再次运行hbase的时候&#xff0c;就没有依赖冲突了。 三、冲突成功解决

C++模版类实现栈

text.h #ifndef TEXT_H #define TEXT_H#include <stdexcept> // 用于 std::out_of_rangetemplate <typename T> class MyStack { private:T* data; // 指向底层数组的指针int capacity; // 容量int top; // 栈顶索引int size; // 当前元…

基于Hive和Hadoop的图书分析系统

本项目是一个基于大数据技术的图书分析系统&#xff0c;旨在为用户提供全面的图书信息和深入的图书销售及阅读行为分析。系统采用 Hadoop 平台进行大规模数据存储和处理&#xff0c;利用 MapReduce 进行数据分析和处理&#xff0c;通过 Sqoop 实现数据的导入导出&#xff0c;以…

光耦合器在信号传输和隔离中的作用

光耦合器&#xff0c;也称为光隔离器&#xff0c;是电子电路中的关键元件&#xff0c;它结合了两个基本功能&#xff1a;信号传输和电气隔离。它们允许信号在电路的不同部分之间传递&#xff0c;同时保持它们彼此电气隔离。此功能对于保护敏感的低压控制电路免受更高电压、噪声…

群晖套娃:群晖+飞牛fnOS二合一,群晖nas安装飞牛fnOS系统实录(飞牛fnOS初体验,如何挂载网盘视频,轻松实现影视刮削)

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 飞牛fnOS 📒📝 什么是飞牛fnOS?📝 准备工作📝 安装飞牛fnOS📝 影视刮削⚓️ 相关链接 ⚓️📖 介绍 📖 最近有一款很火的国产NAS系统吸引了不少用户的注意。你是否曾想过,将这种新兴系统安装到你的群晖设备上,实…

“数字武当”项目荣获2024年“数据要素×”大赛湖北分赛文化旅游赛道一等奖

9月26日&#xff0c;由国家数据局、湖北省人民政府指导的首届湖北省数据要素创新大会暨2024年“数据要素”大赛湖北分赛颁奖仪式在湖北武汉举行。由大势智慧联合武当山文化旅游发展集团有限公司参报的武当山“数字武当”项目&#xff0c;荣获文化旅游赛道一等奖。 据悉&#x…

一次阿里云ECS免费试用实践

必坑指南 域名注册了&#xff0c;但是试用版无法完成ICP认证的流程 外网不能访问&#xff0c;推荐使用香港地区–自己就是坑在了杭州 阿里云的网站界面有点太复杂了&#xff0c;经常找不到自己想要的界面 为什么使用ECS ECS 一个在云端的弹性计算服务器。 可以支持对外公网映…

VBA技术资料MF205:移动工作表时名称重复的处理

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…

AIGC: 10 AI转文服务器的搭建过程记录

上图是台风席卷城市&#xff0c;现在企业的服务基本都是混合部署&#xff0c;云计算厂商的机房往往可以提供比较好的保护&#xff0c;一般在地下&#xff0c;扛多少级地震&#xff0c;扛多少级台风&#xff0c;而自建机房&#xff0c;往往写字楼经常停电&#xff0c;网络运营上…

Spring:强制登陆与拦截器

1.只使用session验证 &#xff08;1&#xff09;第一步&#xff1a;用户登陆时存储session ApiOperation("用户登陆") PostMapping("/login") public AppResult login(HttpServletRequest request,RequestParam("username") ApiParam("用…