Spark Shuffle Tracking 原理分析

Shuffle Tracking

Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。

如果开启了 ESS,那么 Executor 计算完后,把 shuffle 数据交给 ESS, Executor 没有任务时,可以安全退出,下游任务从 ESS 拉取 shuffle 数据。

1. 背景

如果 Executor 执行了上游的 Shuffle Map Task 并且把 shuffle 数据些到本地。并且现在 Executor 没有 Task 运行,那么此 Executor 是否能销毁?

现状是如果 Executor 没有 active 的 shuffle 数据,则可以被销毁。
active shuffle 的定义:如果 Shuffle Map Stage 的 task 把 shuffle 数据输出到本地。如果依赖此 shuffle 的Stage 没有计算完毕,则称此 shuffle 为 active shuffle。因为依赖此 shuffle 的 Task 可能从 Driver 端获取了 MapStatus,但是还没有拉取完 shuffle 数据。

为了达到此目的,需要跟踪每个 Stage 和每个 Task 的运行信息。并且启动定时任务,定时扫描每个 Executor,判断是否有任务运行,是否有 active 的 shuffle,如果没有则可以退出。

退出有两种,如果开启了 decommission,则到期的 executors 进入 decommission 模式,否则执行 killExecutors。

参数配置

spark.dynamicAllocation.shuffleTracking.enabled: 默认 true,是否开启 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默认 Long.MaxValue,

2. 设计

ExecutorMonitor 为每个 Executor 创建一个 Tracker, 用于跟踪此 Executor 的状态。

private val executors = new ConcurrentHashMap[String, Tracker]()

定时任务间隔时间查找 timeout 的 executor,然后处理。

timedOutExecutors 方法的主要逻辑,就是遍历 executors。如果 executor 没有 active 的 shuffle 并且当前时间大于 executor 的超时时间 timeoutAt,则此 executor 可以被安全释放。

为什么 executor 有 active shuffle 数据就不能 kill?
在这里插入图片描述

  • Shuffle 的过程:
  1. MapTask 把 shuffle 写到本地,并且把状态汇报给 Driver.
  2. Reduce Task 从 Driver 获取 shuffle status,并从 shuffle status 获取每个 shuffle 数据的地址。
  3. 连接对应的 executor 获取 shuffle 数据。

如果在 reduce 获取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就无法获取 shuffle 数据。

如果执行 decommission 逻辑,把 MapTask 的 shuffle 数据长传到 bos 等分布式存储是否可以?

也是不可以的,因为 reduce 可能已经把 shuffle status 拿走,获取的 shuffle status 没有记录 shuffle 数据在分布式存储上。

参考: ExecutorMonitor,ExecutorAllocationManager

Executor 状态的更新

ExecutorMonitor 实现了 SparkListner 接口,当 Job, Stage, Task 等 start 和 end 时,都会执行回调。

以 hasActiveShuffle 为例
每个 executor 用一个集合 shuffleIds 存储其上拥有的 shuffle 数据。 当其为空时,说明没有 shuffle 数据。

在 onTaskEnd 和 onBlockUpdated 时调用 addShuffle 向 shuffleIds 添加数据。

在以下时机删除 shuffleIds 里的数据。

  1. 依赖 driver 端的 ContextCleaner,当 ShuffleRDD 仅有 weakReference 时触发。
  2. rdd.cleanShuffleDependencies 方法,但是此方法仅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的计算逻辑

总结:timeoutAt 根据 idle 的时间,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 这 3 个值中最大的值。

详细计算逻辑:
timeoutAt 在一些事件发生时触发计算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的计算逻辑:
当执行器有计算任务时 为 Long.MaxValue。
否则为 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果没有 cache 数据,为0,否则为参数 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默认 Long.MaxValue)。

_shuffleTimeout: 如果没有 shuffle数据,为 0, 否则为参数 spark.dynamicAllocation.shuffleTracking.timeout 的值(默认 Long.MaxValue)。
idleTimeoutNs 为 spark.dynamicAllocation.executorIdleTimeout

3. 测试

测试命令

spark-shell  \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.initialExecutors=2 \--conf spark.dynamicAllocation.maxExecutor=400 \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.shuffle.service.enabled=false \--conf spark.dynamicAllocation.shuffleTracking.enabled=true

参考资料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/713350.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 表的基本操作,结合项目的表自动初始化来讲

有了数据库以后,我们就可以在数据库中对表进行增删改查了,这也就意味着,一名真正的 CRUD Boy 即将到来(😁)。 查表 查看当前数据库中所有的表,使用 show tables; 命令 由于当前数据库中还没有…

基于Python3的数据结构与算法 - 09 希尔排序

一、引入 希尔排序是一种分组插入排序的算法。 二、排序思路 首先取一个整数d1 n/2,将元素分为d1个组,每组相邻量取元素距离为d1,在各组内直接进行插入排序;取第二个整数d2 d1/2, 重复上述分组排序过程&#xff0…

CSS 自测题 -- 用 flex 布局绘制骰子(一、二、三【含斜三点】、四、五、六点)

一点 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>css flex布局-画骰子</title><sty…

Unity 切换场景

场景切换前必须要将场景拖动到Build中 同步加载场景 using System.Collections; using System.Collections.Generic; //using UnityEditor.SearchService; using UnityEngine; // 场景管理 需要导入该类 using UnityEngine.SceneManagement;public class c3 : MonoBehaviour {…

redis五大基础类型【重点】

之前写过一点小知识&#xff1a;https://blog.csdn.net/qq_45927881/article/details/134959181?spm1001.2014.3001.5501 参考链接 https://xiaolincoding.com/redis/data_struct/command.html#%E4%BB%8B%E7%BB%8D 目录 1. string&#xff08;字符串&#xff09;2. Hash&#…

MySql安全加固:配置不同用户不同账号禁止使用旧密码禁止MySql进程管理员权限

MySql安全加固&#xff1a;配置不同用户不同账号&禁止使用旧密码&禁止MySql进程管理员权限 1.1 检查是否配置不同用户不同账号1.2 检查是否禁止使用旧密码1.3 禁止MySql进程管理员权限 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496…

【c++】通讯录管理系统

1.系统功能介绍及展示 2.创建项目 3.菜单实现 4.退出功能实现 5.添加联系人—结构体设计 6.添加联系人—功能实现 7.显示联系人 8.删除练习人—检测联系人是否存在 9.删除联系人—功能实现 10.查找联系人 11.修改联系人 12.清空通讯录 #include <iostream> #include <…

什么是VR虚拟社区|VR元宇宙平台|VR主题馆加盟

VR虚拟社区是指一种基于虚拟现实技术构建的在线社交平台或环境&#xff0c;用户可以在其中创建虚拟化的个人形象&#xff08;也称为avatars&#xff09;并与其他用户进行交流、互动和合作。在VR虚拟社区中&#xff0c;用户可以选择不同的虚拟场景和环境&#xff0c;如虚拟公园、…

fly-barrage 前端弹幕库(3):滚动弹幕的设计与实现

项目官网地址&#xff1a;https://fly-barrage.netlify.app/&#xff1b; &#x1f451;&#x1f40b;&#x1f389;如果感觉项目还不错的话&#xff0c;还请点下 star &#x1f31f;&#x1f31f;&#x1f31f;。 Gitee&#xff1a;https://gitee.com/fei_fei27/fly-barrage&a…

显示器开机正常,插入HDMI线却不显示画面,换了HDMI线还是不行?

环境&#xff1a; 惠普/P24VG4 DELL笔记本 问题描述&#xff1a; 显示器开机正常&#xff0c;插入HDMI线却不显示画面&#xff0c;换了HDMI线还是不行&#xff0c;是不是显示器坏了&#xff1f; 解决方案&#xff1a; 1.前往显示器设置菜单里面查看input 2.把输入源默认设…

二百二十五、海豚调度器——用DolphinScheduler调度执行Flume数据采集任务

一、目的 数仓的数据源是Kafka&#xff0c;因此离线数仓需要用Flume采集Kafka中的数据到HDFS中 在实际项目中&#xff0c;不可能一直在Xshell中启动Flume任务&#xff0c;一是项目的Flume任务很多&#xff0c;二是一旦Xshell页面关闭Flume任务就会停止&#xff0c;这样非常不…

案例研究|DataEase助力众陶联应对产业链数据可视化挑战

佛山众陶联供应链服务有限公司&#xff08;以下简称为“众陶联”&#xff09;成立于2016年&#xff0c;是由34家陶瓷企业共同创办的建陶行业工业互联网平台&#xff0c;股东产值占整个行业的22.5%。众陶联以数据赋能为核心&#xff0c;积极探索新的交易和服务模式&#xff0c;构…

帝国cms7.5仿非小号区块链门户资讯网站源码 带手机版

帝国cms7.5仿非小号区块链门户资讯网站源码 带手机版 带自动采集 开发环境&#xff1a;帝国cms 7.5 安装环境&#xff1a;phpmysql 包含火车头采集规则和模块&#xff0c;采集目标站非小号官网。 专业的数字货币大数据平台模板&#xff0c;采用帝国cms7.5内核仿制&#xff0…

Ai-WB2-32S在window下使用vs 和 msys2编译以及烧录

文章目录 前言一、使用前准备第一步 安装vscode第二步 安装msys2 二、使用步骤1.打开MSYS2 MINGW64&#xff08;1&#xff09;在开始栏中找到MSYS2 MINGW64并打开&#xff08;2&#xff09;安装git&#xff08;3&#xff09;安装make&#xff08;4&#xff09;安装好之后的文件…

Redis 之五:Redis 的主从复制

概念 主从复制&#xff0c;是指将一台 Redis 服务器的数据&#xff0c;复制到其他的Redis服务器。前者称为主节点(master)&#xff0c;后者称为从节点(slave)&#xff1b;数据的复制是单向的&#xff0c;只能由主节点到从节点。 默认情况下&#xff0c;每台Redis服务器都是主节…

云天励飞战略投资神州云海,布局机器人市场

日前,AI上市企业云天励飞(688343.SH)完成了对深圳市神州云海智能科技有限公司(以下简称“神州云海”)的B轮战略投资。 公开资料显示,自2015年于深圳创立以来,神州云海始终聚焦人工智能与服务机器人广阔的应用市场,依托自主的核心算法能力,深耕机器人硬件本体研发,整合上下游产…

RabbitMQ-TTL/死信队列/延迟队列高级特性

文章目录 TTL死信队列消息成为死信的三种情况队列如何绑定死信交换机 延迟队列RabbitMQ如何实现延迟队列 总结来源B站黑马程序员 TTL TTLTTL(Time To Live):存活时间/过期时间当信息到达存活时间后&#xff0c;还没有被消费&#xff0c;会被自动清除。RabbitMQ可以对消息设置过…

Win10系統如何重置系统

Win10系統如何重置 大家可以使用Win10內建的重設電腦設定&#xff0c;如以下操作&#xff1a; 首先&#xff0c;可以先到桌面左下角的【開始】 選擇【設定】 在【設定】裡找到【更新與安全性】 在左側欄有一項【復原】 在復原的標題下&#xff0c;副標題有一項【重設此電腦】…

【algorithm】算法基础课---排序算法(附笔记 | 建议收藏)

&#x1f680;write in front&#x1f680; &#x1f4dd;个人主页&#xff1a;认真写博客的夏目浅石. &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f4e3;系列专栏&#xff1a;AcWing算法学习笔记 &#x1f4ac;总结&#xff1a;希望你看完…

​The Sandbox的南极之旅|链接世界:从南极洲到元宇宙

真正的发现之旅不在于寻找新的景观&#xff0c;而在于拥有新的眼光。 - 马塞尔-普鲁斯特 在这个数字世界和物理世界日益交织的时代&#xff0c;The Sandbox 的联合创始人 Arthur Madrid 和 Sebastien Borget 踏上了远离数字空间的旅程&#xff0c;前往地球上未被开发的宝藏地点…