深入理解 Spark(三)SparkTask 执行与 shuffle 详解

SparkTask 的分发部署与启动流程分析

Spark Action 算子触发 job 提交

Spark 当中 Stage 切分源码详解

在这里插入图片描述

Task 的提交与执行

在这里插入图片描述

SparkShuffle 机制详解

MapReduceShuffle 全流程深度剖析

在这里插入图片描述
MapReduce 全流程执行过程中参与工作的组件以及他们的执行先后顺序:InputFormat => RecordReader => Mapper => Partitioner => Sorter => Combiner => Fetch => Group => Reducer => OutputFormat => RecordWriter

Partitioner 的常见规则

在这里插入图片描述

MapReduce Shuffle 为什么要排序?

  1. ReduceTask 的输入数据来自于上游 MapTask 的输出,并且普遍情况下,每个 MapTask 都会为每个 ReduceTask 提供部分输入。
  2. ReduceTask 的执行逻辑是:将 ReduceTask 的输入数据,按照标记分组之后,每次针对一组数据执行一次逻辑计算。
  3. 那么如果不排序,则每次为了得到待计算的一组数据,都需要完整扫描这个输入数据集一次,如果该 ReduceTask 的输入数据集的标记特比多,则需要扫描这个数据集成千上万次,这效率是非常低下的。
  4. 所以解决方案,就是给 ReduceTask 的输入数据集进行按照标记排序,然后 ReduceTask 在执行逻辑计算的时候只需要按照顺序扫描一次就能完成所有数据组的逻辑计算
  5. 既然 ReduceTask 的输入数据是需要排序的,而且 ReduceTask 的输入数据是来自于上游的MapTask 的,那为什么不让 MapTask 先给结果数据排序,最终 ReduceTask 拉取到所有数据再来做归并呢?利用分布式思想,本来该一个 ReduceTask 完成排序的,结果该排序的压力分散到多个上游 MapTask 之中,进一步提高效率。
  6. 所以 MapTask 的输出就不能直接写磁盘了,如果直接写磁盘,就只能最后做一次磁盘扫描将数据读取到内存再完成排序之后溢写到磁盘。这样做未免效率低,所以好方式是:MapTask 输出的时候,先将数据输出到内存,当然内存不可能是无限大,总有装不下的时候,所以当一定内存装满的时候,就对这部分输出数据进行排序,再刷写到磁盘,释放内存。
  7. 但是这样做,又发现一个新问题:当内存装满了之后,要先排序,然后溢写到内存,那这样是不是就阻塞了 MapTask 的继续输出呢?是的。所以又提供了优化方案:这个固定内存 100M 当写满 80% 的时候,就对这 80% 的数据执行排序后溢写到磁盘,这样子剩下的 20% 区域就可以继续接收 MapTask 的输出了。
  8. 最后再补充一点:当 MapTask 的输出做了排序之后,也可以让标记相同的待计算数据提前做预汇总,降低 Shuffle 中网络数据传输量,节省带宽。

MapReduce Shuffle 执行排序的局限性:

  1. 如果最终需要计算得到的结果集并不需要排序,这个排序则是多此一举。
  2. 多次溢写形成的多个临时磁盘文件需要做合并,这会导致磁盘 IO 的负担很重,这也是 MapReduce 效率低但是稳定的重要原因。

MapReduce Shuffle 为什么要文件合并?

  1. 由于 MapTask 输出数据的时候,是先写入 100M 的内存区间中,每次装满 80% 则执行一次溢写形成一个磁盘临时文件,这样必定会导致 MapTask 的输出磁盘文件会特别多,给文件系统带来负担。
  2. 如果不合并,那么 ReduceTask 过来拉取 MapTask 的输出数据的时候,需要打开很多的文件句柄,进一步增加负担。
  3. 每个 MapTask 输出的单个文件是有序的,但是不代表该 MapTask 输出的所有结果都是有序的,所以还需要做文件的合并来保证 MapTask 的输出有序。

Spark 当中的 Shuffle 机制介绍

大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。理解 Spark 的 shuffle 的工作原理,有助于对 spark application 进行调优,减少资源消耗,提升生产效率。
在 Spark 的源码中,负责 shuffle 过程的执行、计算和处理的组件主要就是 ShuffleManager,也即shuffle 管理器。

  • Spark-1.2 版本以前:默认实现是:HashShuffleManager
  • Spark-1.2 版本以后:默认实现是:SortShuffleManager
  • Spark-3.x 版本以后:彻底移除了:HashShuffleManager,只留下 SortShuffleManager

HashShuffleManager 的缺点是 shuffle 过程中会产生大量的临时结果文件,SortShuffleManager 的改进是让每个 Task 只产生一个结果文件(多个临时文件会合并到一个文件中),下游的 Task 过来拉取对应分区数据的时候,只需要去根据索引按需获取即可。

spark shuffle 概念介绍

Spark Job 依赖图:
在这里插入图片描述
将对应的 RDD 标注上去后:
在这里插入图片描述
其中的 shuffle 过程:
在这里插入图片描述
前一个 Stage 的 ShuffleMapTask 进行 Shuffle Write, 把数据存储在 BlockManager 上面, 并且把数据位置元信息上报到 Driver 的 MapOutTrack 组件中, 下一个 Stage 根据数据位置元信息, 进行 Shuffle Read, 拉取上个 Stage 的输出数据。

HashShuffle 过程详解

普通的 Hash Shuffle 机制

在这里插入图片描述
上图中,

  1. buffer 起到的是缓存作用,缓存能够加速写磁盘,提高计算的效率,buffer 的默认大小 32k。
  2. 分区器:根据 hash/numRedcue 取模决定数据由几个 Reduce 处理,也决定了写入几个 buffer 中。
  3. block file:磁盘小文件,从图中我们可以知道磁盘小文件的个数计算公式:block_file_cnt = M * R 。 M 为 map task 的数量,R 为 Reduce 的数量,一般 Reduce 的数量等于 buffer 的数量,都是由分区器决定的。
  4. Shuffle 阶段在磁盘上会产生海量的小文件,建立通信和拉取数据的次数变多,此时会产生大量耗时低效的 IO 操作 (因为产生过多的小文件)
  5. 大量耗时低效的 IO 操作 ,导致写磁盘时的对象过多,读磁盘时候的对象也过多,这些对象存储在堆内存中,会导致堆内存不足,相应会导致频繁的 GC,GC 会导致 OOM。由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

合并机制的 Hash shuffle

合并机制就是复用 buffer 缓冲区,开启合并机制的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。
在这里插入图片描述
此时 block_file = Core * R ,Core 为 CPU 的核数,R 为 Reduce 的数量,但是如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

SortShuffle 过程详解

Sort shuffle 的普通机制

在这里插入图片描述
这个机制的好处:

  1. 小文件明显变少了,一个 task 只生成一个 file 文件
  2. file 文件整体有序,加上索引文件的辅助,查找变快,虽然排序浪费一些性能,但是查找变快很多

ByPass 模式的 SortShuffle 机制

bypass 机制运行条件是 shuffle map task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数(默认值 200)的值,且不是聚合类的 shuffle 算子(比如 reduceByKey)。
在这里插入图片描述

补充

  • hash-based shuffle 中,排序发生在 reduce 阶段
  • sort-based shuffle 中,排序发生在 shuffle 阶段

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620984.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AIGC】电影风格的一组绝美高清图提示词解析

实际示例 女人主角,以时尚电影风格为灵感,追求照片般的逼真度,运用伦勃朗式光线,创造奇幻且细节丰富的场景,充满象征意义,使用3D渲染技术达到8K超高清晰度。 分类相关信息主角女人风格时尚电影风格逼真度…

LLM漫谈(三)| 使用Chainlit和LangChain构建文档问答的LLM应用程序

一、Chainlit介绍 Chainlit是一个开源Python包,旨在彻底改变构建和共享语言模型(LM)应用程序的方式。Chainlit可以创建用户界面(UI),类似于由OpenAI开发的ChatGPT用户界面,Chainlit可以开发类似…

5 个适用的免费数据恢复软件【2024 年 版本】

互联网上有许多免费的数据恢复软件产品。有些产品是免费软件,而其他产品则提供该工具的免费试用下载以进行评估。我们列出了2024 年 5 款最佳数据恢复工具 ,可以免费下载和试用。 5 个适用的免费数据恢复软件 1.奇客数据恢复(Windows和Mac&am…

Apache-Common-Pool2中对象池的使用方式

最近在工作中,对几个产品的技术落地进行梳理。这个过程中发现一些朋友对如何使用Apache的对象池存在一些误解。所以在写作“业务抽象”专题的空闲时间里,本人觉得有必要做一个关于对象池的知识点和坑点讲解。Apache Common-Pool2 组件最重要的功能&#…

中仕公考:2024年上半年中小学教师资格考试(笔试)报名已开始

2024年上半年中小学教师资格考试(笔试)报名工作于1月12日开始,此次笔试在31个省(自治区、直辖市)举办,各省(自治区、直辖市)的报名公告将陆续上网。 个别地区报名截止时间有所差异,上海1月13日报名截止,浙江、天津、河南1月14日截…

负荷预测 | Python基于CEEMDAN-VMD-BiGRU的短期电力负荷时间序列预测

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 提出一种分解去噪、重构分解的 CEEMDAN-VMD-BiGRU组合预测方法: 1 采用CEEMDAN将原始电力负荷数据分解成一组比较稳定的子序列,联合 小波阈值法将含有噪声的高频分量去噪,保留含有信…

微服务技术要点

一、服务注册到nacos 1.下载nacos,修改nacos启动模式为单机模式,另外需要在环境变量配置JAVA_HOME,否则启动不起来。 2.启动类加注解EnableDiscoveryClient 3.application.yml配置nacos地址 spring:cloud:nacos:discovery:server-addr: 127.0.0.1:884…

springboot怎样设置全局的traceId(包括MQ)

一、Controller打印TraceId 1、拦截所有的controller,输入输出将traceId放入MDC中: package com.perkins.ebicycle.mobile.trace;import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.stream.Collectors;import…

华为设备登录安全配置案例

知识改变命运,技术就是要分享,有问题随时联系,免费答疑,欢迎联系! 厦门微思网络​​​​​​ https://www.xmws.cn 华为认证\华为HCIA-Datacom\华为HCIP-Datacom\华为HCIE-Datacom Linux\RHCE\RHCE 9.0\RHCA\ Oracle O…

Python+甘特图及标签设置

图示 甘特图代码 import matplotlib.pyplot as plt import numpy as npclass ProjectEmement:def __init__(self, name_, starttime_: float, endtime_: float, fact_endtime_: float, grade_, rootlist_: list, keylist_: list, isover_=-1):self.name = name_self.starttime…

宝塔nginx部署前端页面刷新报404

问题: 当我们使用脚手架打包前端项目的时候,如果前端项目并没有静态化的配置,如以下 当我们刷新页面,或进行路由配置访问的时候就会报404的错误 原因: 这是因为通常我们做的vue项目属于单页面开发。所以只有index.html…

【教程】华为数据恢复的5个简单方法

您刚刚不小心从华为手机中删除了一些重要文件,现在您迫切希望将它们找回来。如果是这样,那么您现在可能会感到沮丧和无助。您可能已向您的朋友寻求帮助或在互联网上搜索答案,但似乎无济于事。 华为数据恢复的5个简单方法 好吧,别…

MyBatis第二课,灰度发布,@Results注解,使用xml书写mysql

目录 打印MyBatis的日志配置: 灰度发布:指发布环境,比如发布环境有200台机器,发布的时候是一批一批的机器的发布 2.删除与修改 使用Results注解,这样就和上面的别名一个意思,column是数据库的列 自动转驼峰&#…

学习资料: uni-app HBuilderX

编译器:HBuilderX HBuilderX-高效极客技巧 uni-app介绍:uni-app官网 uni-app 是一个使用 Vue.js 开发所有前端应用的框架,开发者编写一套代码,可发布到iOS、Android、Web(响应式)、以及各种小程序&#…

根据gbt81702008数值修约的C#函数

#region 修约函数/// </summary>/// <param name"data_val">输入数值</param>/// <param name"len">保留几位小数</param>/// <returns></returns>public static decimal round_gbt8170(decimal data_val,int l…

Unity组件开发--UI管理器

1.Canvas组件&#xff1a; 注意属性&#xff1a; &#xff08;1&#xff09;渲染模式是&#xff1a;屏幕空间相机 &#xff08;2&#xff09;创建一个UICamera节点&#xff0c;管理相机 &#xff08;3&#xff09;屏幕画布缩放模式 &#xff08;4&#xff09;画布下挂载两…

Android-基础

Activity生命周期 1.启动Activity&#xff1a;系统会先调用onCreate方法&#xff0c;然后调用onStart方法&#xff0c;最后调用onResume&#xff0c;Activity进入运行状态。 2.当前Activity被其他Activity覆盖其上或被锁屏&#xff1a;系统会调用onPause方法&#xff0c;暂停当…

Pandas实战100例-专栏介绍

Pandas&#xff0c;Python数据科学的心脏&#xff0c;是探索和分析数据世界的强大工具。想象一下&#xff0c;用几行代码就能洞察庞大数据集的秘密&#xff0c;无论是金融市场趋势还是社交媒体动态。 通过Pandas&#xff0c;你可以轻松地整理、清洗、转换数据&#xff0c;将杂…

山西电力市场日前价格预测【2024-01-15】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2024-01-15&#xff09;山西电力市场全天平均日前电价为399.10元/MWh。其中&#xff0c;最高日前电价为583.33元/MWh&#xff0c;预计出现在18:15。最低日前电价为275.09元/MWh&#xff0c;预计…

【MySQL】:探秘主流关系型数据库管理系统及SQL语言

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; MySQL从入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. MySQL概述1.1 数据库相关概念1.2 主流数据库1.3 数据模型1.3.1 关系型数据库…