Spark Shuffle Tracking 原理分析

Shuffle Tracking

Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。

如果开启了 ESS,那么 Executor 计算完后,把 shuffle 数据交给 ESS, Executor 没有任务时,可以安全退出,下游任务从 ESS 拉取 shuffle 数据。

1. 背景

如果 Executor 执行了上游的 Shuffle Map Task 并且把 shuffle 数据些到本地。并且现在 Executor 没有 Task 运行,那么此 Executor 是否能销毁?

现状是如果 Executor 没有 active 的 shuffle 数据,则可以被销毁。
active shuffle 的定义:如果 Shuffle Map Stage 的 task 把 shuffle 数据输出到本地。如果依赖此 shuffle 的Stage 没有计算完毕,则称此 shuffle 为 active shuffle。因为依赖此 shuffle 的 Task 可能从 Driver 端获取了 MapStatus,但是还没有拉取完 shuffle 数据。

为了达到此目的,需要跟踪每个 Stage 和每个 Task 的运行信息。并且启动定时任务,定时扫描每个 Executor,判断是否有任务运行,是否有 active 的 shuffle,如果没有则可以退出。

退出有两种,如果开启了 decommission,则到期的 executors 进入 decommission 模式,否则执行 killExecutors。

参数配置

spark.dynamicAllocation.shuffleTracking.enabled: 默认 true,是否开启 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默认 Long.MaxValue,

2. 设计

ExecutorMonitor 为每个 Executor 创建一个 Tracker, 用于跟踪此 Executor 的状态。

private val executors = new ConcurrentHashMap[String, Tracker]()

定时任务间隔时间查找 timeout 的 executor,然后处理。

timedOutExecutors 方法的主要逻辑,就是遍历 executors。如果 executor 没有 active 的 shuffle 并且当前时间大于 executor 的超时时间 timeoutAt,则此 executor 可以被安全释放。

为什么 executor 有 active shuffle 数据就不能 kill?
在这里插入图片描述

  • Shuffle 的过程:
  1. MapTask 把 shuffle 写到本地,并且把状态汇报给 Driver.
  2. Reduce Task 从 Driver 获取 shuffle status,并从 shuffle status 获取每个 shuffle 数据的地址。
  3. 连接对应的 executor 获取 shuffle 数据。

如果在 reduce 获取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就无法获取 shuffle 数据。

如果执行 decommission 逻辑,把 MapTask 的 shuffle 数据长传到 bos 等分布式存储是否可以?

也是不可以的,因为 reduce 可能已经把 shuffle status 拿走,获取的 shuffle status 没有记录 shuffle 数据在分布式存储上。

参考: ExecutorMonitor,ExecutorAllocationManager

Executor 状态的更新

ExecutorMonitor 实现了 SparkListner 接口,当 Job, Stage, Task 等 start 和 end 时,都会执行回调。

以 hasActiveShuffle 为例
每个 executor 用一个集合 shuffleIds 存储其上拥有的 shuffle 数据。 当其为空时,说明没有 shuffle 数据。

在 onTaskEnd 和 onBlockUpdated 时调用 addShuffle 向 shuffleIds 添加数据。

在以下时机删除 shuffleIds 里的数据。

  1. 依赖 driver 端的 ContextCleaner,当 ShuffleRDD 仅有 weakReference 时触发。
  2. rdd.cleanShuffleDependencies 方法,但是此方法仅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的计算逻辑

总结:timeoutAt 根据 idle 的时间,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 这 3 个值中最大的值。

详细计算逻辑:
timeoutAt 在一些事件发生时触发计算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的计算逻辑:
当执行器有计算任务时 为 Long.MaxValue。
否则为 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果没有 cache 数据,为0,否则为参数 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默认 Long.MaxValue)。

_shuffleTimeout: 如果没有 shuffle数据,为 0, 否则为参数 spark.dynamicAllocation.shuffleTracking.timeout 的值(默认 Long.MaxValue)。
idleTimeoutNs 为 spark.dynamicAllocation.executorIdleTimeout

3. 测试

测试命令

spark-shell  \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.initialExecutors=2 \--conf spark.dynamicAllocation.maxExecutor=400 \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.shuffle.service.enabled=false \--conf spark.dynamicAllocation.shuffleTracking.enabled=true

参考资料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/713350.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 表的基本操作,结合项目的表自动初始化来讲

有了数据库以后,我们就可以在数据库中对表进行增删改查了,这也就意味着,一名真正的 CRUD Boy 即将到来(😁)。 查表 查看当前数据库中所有的表,使用 show tables; 命令 由于当前数据库中还没有…

基于Python3的数据结构与算法 - 09 希尔排序

一、引入 希尔排序是一种分组插入排序的算法。 二、排序思路 首先取一个整数d1 n/2,将元素分为d1个组,每组相邻量取元素距离为d1,在各组内直接进行插入排序;取第二个整数d2 d1/2, 重复上述分组排序过程&#xff0…

Angular 2 中的样式绑定和 NgStyle

在 Angular 2 模板中绑定内联样式很容易。以下是一个绑定单个样式值的示例&#xff1a; 你还可以指定单位&#xff0c;例如在这里我们将单位设置为 em&#xff0c;但也可以使用 px、% 或 rem&#xff1a; <p [style.font-size.em]"3">A paragraph at 3em! &l…

CSS 自测题 -- 用 flex 布局绘制骰子(一、二、三【含斜三点】、四、五、六点)

一点 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>css flex布局-画骰子</title><sty…

vue3 滚动条触底监听

问题&#xff1a;指定区域内&#xff0c;显示返回的数据&#xff0c;要求先显示20条&#xff0c;区域超出部分滚动显示&#xff0c;对滚动条进行监听&#xff0c;滚动条触底后&#xff0c;继续显示下20条... 解决过程&#xff1a; 1.在区域的div上&#xff0c;添加scroll事件…

Unity 切换场景

场景切换前必须要将场景拖动到Build中 同步加载场景 using System.Collections; using System.Collections.Generic; //using UnityEditor.SearchService; using UnityEngine; // 场景管理 需要导入该类 using UnityEngine.SceneManagement;public class c3 : MonoBehaviour {…

redis五大基础类型【重点】

之前写过一点小知识&#xff1a;https://blog.csdn.net/qq_45927881/article/details/134959181?spm1001.2014.3001.5501 参考链接 https://xiaolincoding.com/redis/data_struct/command.html#%E4%BB%8B%E7%BB%8D 目录 1. string&#xff08;字符串&#xff09;2. Hash&#…

MySql安全加固:配置不同用户不同账号禁止使用旧密码禁止MySql进程管理员权限

MySql安全加固&#xff1a;配置不同用户不同账号&禁止使用旧密码&禁止MySql进程管理员权限 1.1 检查是否配置不同用户不同账号1.2 检查是否禁止使用旧密码1.3 禁止MySql进程管理员权限 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496…

【c++】通讯录管理系统

1.系统功能介绍及展示 2.创建项目 3.菜单实现 4.退出功能实现 5.添加联系人—结构体设计 6.添加联系人—功能实现 7.显示联系人 8.删除练习人—检测联系人是否存在 9.删除联系人—功能实现 10.查找联系人 11.修改联系人 12.清空通讯录 #include <iostream> #include <…

什么是VR虚拟社区|VR元宇宙平台|VR主题馆加盟

VR虚拟社区是指一种基于虚拟现实技术构建的在线社交平台或环境&#xff0c;用户可以在其中创建虚拟化的个人形象&#xff08;也称为avatars&#xff09;并与其他用户进行交流、互动和合作。在VR虚拟社区中&#xff0c;用户可以选择不同的虚拟场景和环境&#xff0c;如虚拟公园、…

fly-barrage 前端弹幕库(3):滚动弹幕的设计与实现

项目官网地址&#xff1a;https://fly-barrage.netlify.app/&#xff1b; &#x1f451;&#x1f40b;&#x1f389;如果感觉项目还不错的话&#xff0c;还请点下 star &#x1f31f;&#x1f31f;&#x1f31f;。 Gitee&#xff1a;https://gitee.com/fei_fei27/fly-barrage&a…

显示器开机正常,插入HDMI线却不显示画面,换了HDMI线还是不行?

环境&#xff1a; 惠普/P24VG4 DELL笔记本 问题描述&#xff1a; 显示器开机正常&#xff0c;插入HDMI线却不显示画面&#xff0c;换了HDMI线还是不行&#xff0c;是不是显示器坏了&#xff1f; 解决方案&#xff1a; 1.前往显示器设置菜单里面查看input 2.把输入源默认设…

二百二十五、海豚调度器——用DolphinScheduler调度执行Flume数据采集任务

一、目的 数仓的数据源是Kafka&#xff0c;因此离线数仓需要用Flume采集Kafka中的数据到HDFS中 在实际项目中&#xff0c;不可能一直在Xshell中启动Flume任务&#xff0c;一是项目的Flume任务很多&#xff0c;二是一旦Xshell页面关闭Flume任务就会停止&#xff0c;这样非常不…

案例研究|DataEase助力众陶联应对产业链数据可视化挑战

佛山众陶联供应链服务有限公司&#xff08;以下简称为“众陶联”&#xff09;成立于2016年&#xff0c;是由34家陶瓷企业共同创办的建陶行业工业互联网平台&#xff0c;股东产值占整个行业的22.5%。众陶联以数据赋能为核心&#xff0c;积极探索新的交易和服务模式&#xff0c;构…

ant-design-vue如何限制图片上传的尺寸?

handleBeforeUpload(file, fileList) {// fileList 只包含了当次上传的文件列表&#xff0c;不包含已上传的文件列表// 所以长度要加上已上传的文件列表的长度const isLimit this.fileList.length fileList.length > this.limit;const indexOfFile fileList.findIndex(it…

C++ STL 之容器 vector 常见用法

一. 什么是vector vector为“变长数组”&#xff0c;即长度根据需要而自动改变的数组。 头文件&#xff1a; #include <vector>using namespace std;单独定义一个vector&#xff1a;vector<typename> name&#xff0c;相当于一维数组 name[SIZE] &#xff0c;其长…

mac-docker-php容器连接mac中的pgsql数据库失败以及出现table_msg存错误时的解决方法

以php中的thinkphp 5.1为例&#xff0c;php容器连接mac中的pgsql数据库失败时&#xff0c;出现如下错误 [7] PDOException in Connection.php line 528 SQLSTATE[08006] [7] could not connect to server: Connection refused Is the server running on host "localhost&…

Git 配置处理客户端无法正常访问到 github 原网站时,npm 下载依赖包失败的问题

Git 配置处理客户端无法正常访问到 github 原网站时&#xff0c;npm 下载依赖包失败的问题 使用 github 的镜像网站地址或类似的替代产品地址&#xff0c;代替到 npm 拉取依赖包的 git 地址本地Git配置 例如&#xff1a;执行一下命令&#xff0c;则是以https://kgithub.com 替…

requests库/urllib3库返回WEB响应内容的处理差异

requests库是一个广泛使用的HTTP库&#xff0c;用于发送HTTP请求和处理响应。 以下是requests库中一些主要类和方法的详细介绍&#xff1a;requests库主要类和方法 类:requests.models.Response: status_code: 响应状态码。text: 以Unicode形式返回响应内容。content: 以字节形…

MySQL的主从同步原理

MySQL的主从同步&#xff08;也称为复制&#xff09;是一种数据同步技术&#xff0c;用于将一个MySQL服务器&#xff08;主服务器&#xff09;上的数据和变更实时复制到另一个或多个MySQL服务器&#xff08;从服务器&#xff09;。这项技术支持数据备份、读写分离、故障恢复等多…