Spark AQE 导致的 Driver OOM问题

背景

最近在做Spark 3.1 升级 Spark 3.5的过程中,遇到了一批SQL在运行的过程中 Driver OOM的情况,排查到是AQE开启导致的问题,再次分析记录一下,顺便了解一下Spark中指标的事件处理情况

结论

SQLAppStatusListener 类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致Driver OOM
解决方法:

  1. 关闭AQE spark.sql.adaptive.enabled false
  2. 合并对应的PR-SPARK-45439

分析

背景知识:对于一个完整链接的sql语句来说(比如说从 读取数据源,到 数据处理操作,再到插入hive表),这可以称其为一个最小的SQL执行单元,这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId来进行跟踪的。
对于一个sql,举例来说 :

insert into  TableA select * from TableB;

在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法,该方法会触发eagerlyExecuteCommands调用,最终会到SQLExecution.withNewExecutionId方法:

  def assertOptimized(): Unit = optimizedPlan...lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}...lazy val optimizedPlan: LogicalPlan = {// We need to materialize the commandExecuted here because optimizedPlan is also tracked under// the optimizing phaseassertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// clone the plan to avoid sharing the plan instance between different stages like analyzing,// optimizing and planning.val plan =sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// We do not want optimized plans to be re-analyzed as literals that have been constant// folded and such can cause issues during analysis. While `clone` should maintain the// `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of// paranoia.plan.setAnalyzed()plan}def assertCommandExecuted(): Unit = commandExecuted...private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// Since Command execution will eagerly take place here,// and in most cases be the bulk of time and effort,// with the rest of processing of the root plan being just outputting command results,// for eagerly executed commands we mark this place as beginning of execution.tracker.setReadyForExecution()val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)val name = commandExecutionName(c)val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {SQLExecution.withNewExecutionId(qe, Some(name)) {qe.executedPlan.executeCollect()}}  

SQLExecution.withNewExecutionId主要的作用是设置当前计划的所属的executionId:

    val executionId = SQLExecution.nextExecutionIdsc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

EXECUTION_ID_KEY的值会在JobStart的时候传递给Event,以便记录跟踪整个执行过程中的指标信息。
同时我们在方法中eagerlyExecuteCommands看到qe.executedPlan.executeCollect()这是具体的执行方法,针对于insert into 操作来说,物理计划就是
InsertIntoHadoopFsRelationCommand,这里的run方法最终会流转到DAGScheduler.submitJob方法:

    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleJobSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理,这里有会涉及到指标信息的存储,这里我们截图出dump的内存占用情况:
在这里插入图片描述

可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大,也就是 accumIdsToMetricType占用很大

那在AQE中是怎么回事呢?
我们知道再AQE中,任务会从source节点按照shuffle进行分割,从而形成单独的job,从而生成对应的shuffle指标,具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan中,如下:

      var result = createQueryStages(currentPhysicalPlan)val events = new LinkedBlockingQueue[StageMaterializationEvent]()val errors = new mutable.ArrayBuffer[Throwable]()var stagesToReplace = Seq.empty[QueryStageExec]while (!result.allChildStagesMaterialized) {currentPhysicalPlan = result.newPlanif (result.newStages.nonEmpty) {stagesToReplace = result.newStages ++ stagesToReplaceexecutionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))// SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting// for tasks to be scheduled and leading to broadcast timeout.// This partial fix only guarantees the start of materialization for BroadcastQueryStage// is prior to others, but because the submission of collect job for broadcasting is// running in another thread, the issue is not completely resolved.val reorderedNewStages = result.newStages.sortWith {case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => falsecase (_: BroadcastQueryStageExec, _) => truecase _ => false}// Start materialization of all new stages and fail fast if any stages failed eagerlyreorderedNewStages.foreach { stage =>try {stage.materialize().onComplete { res =>if (res.isSuccess) {events.offer(StageSuccess(stage, res.get))} else {events.offer(StageFailure(stage, res.failed.get))}// explicitly clean up the resources in this stagestage.cleanupResources()}(AdaptiveSparkPlanExec.executionContext)

这里就是得看stage.materialize()这个方法,这两个stage只有两类:BroadcastQueryStageExec 和 ShuffleQueryStageExec
这两个物理计划稍微分析一下如下:

  • BroadcastQueryStageExec
    数据流如下:
    broadcast.submitBroadcastJob||\/
    promise.future||\/
    relationFuture||\/
    child.executeCollectIterator()
    其中 promise的设置在relationFuture方法中,而relationFuture 会被doPrepare调用,而submitBroadcastJob会调用executeQuery,从而调用doPrepare,executeCollectIterator()最终也会发送JobSubmitted事件,分析和上面的一样
  • ShuffleQueryStageExec
     shuffle.submitShuffleJob||\/sparkContext.submitMapStage(shuffleDependency)||\/dagScheduler.submitMapStage

submitMapStage会发送MapStageSubmitted事件:

    eventProcessLoop.post(MapStageSubmitted(jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleMapStageSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理:

  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()...override def onJobStart(event: SparkListenerJobStart): Unit = {val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)if (executionIdString == null) {// This is not a job created by SQLreturn}val executionId = executionIdString.toLongval jobId = event.jobIdval exec = Option(liveExecutions.get(executionId))

该方法会获取事件中的executionId,在AQE中,同一个执行单元的executionId是一样的,所以stageMetrics内存占用会越来越大。
而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan等方法中。

这样整个事件的数据流以及问题的产生原因就应该很清楚了。

其他

为啥AQE以后多个Job还是共享一个executionId呢?因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/830456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hadoop之路---伪分布式环境搭建

hadoop更适合在liunx环境下运行,会节省后期很多麻烦,而用虚拟器就太占主机内存了,因此后面我们将把hadoop安装到wsl后进行学习,后续学习的环境是Ubuntu-16.04 (windows上如何安装wsl) 千万强调,创建完hado…

读天才与算法:人脑与AI的数学思维笔记14_人脑的极限

1. 数学研究 1.1. 数学研究变得更为艰难了 1.1.1. 学科分支越发密集,问题越发复杂 1.1.2. 攻读博士学位的3年时间,只够去理解导师所给题目的含义 1.1.3. 随后,再花费数年时间去研究、探索,运气不错的话,会得到一些…

CVE-2022-2602:unix_gc 错误释放 io_uring 注册的文件从而导致的 file UAF

前言 复现该漏洞只是为了学习相关知识,在这里仅仅做简单记录下 exp,关于漏洞的详细内容请参考其他文章,最后在 v5.18.19 内核版本上复现成功,v6.0.2 复现失败 漏洞利用 diff --git a/include/linux/skbuff.h b/include/linux/s…

10GMAC层设计系列-(1)10G Ethernet PCS/PMA

一、引言 对于10G以太网MAC层的实现,Xilinx提供了 3种IP核,分别是 10G Ethernet MAC、10G Ethernet PCS/PMA、10G Ethernet Subsystem。 10G Ethernet MAC只包含MAC层,外部需要提供一个PHY芯片进行数据对齐,10G Ethernet MAC与P…

软考 系统架构设计师系列知识点之软件可靠性基础知识(7)

接前一篇文章:软考 系统架构设计师系列知识点之软件可靠性基础知识(6) 所属章节: 第9章. 软件可靠性基础知识 第1节 软件可靠性基本概念 9.1.5 广义的可靠性测试和狭义的可靠性测试 广义软件可靠性测试 广义的软件可靠性测试是…

sql注入工具-​sqlmap

介绍: sqlmap是一款开源的自动化SQL注入工具,用于自动化检测和利用Web应用程序中的SQL注入漏洞。它具有强大的参数化查询和自定义注入脚本的功能,可以通过检测和利用SQL注入漏洞来获取数据库的敏感信息,如用户名、密码和其他重要…

C++ | Leetcode C++题解之第60题排列序列

题目&#xff1a; 题解&#xff1a; class Solution { public:string getPermutation(int n, int k) {vector<int> factorial(n);factorial[0] 1;for (int i 1; i < n; i) {factorial[i] factorial[i - 1] * i;}--k;string ans;vector<int> valid(n 1, 1);…

gateway全局token过滤器

添加gateway依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency>创建一个tokenFilter 实现全局过滤器GlobalFilter,并且实现fitler方法 Value("${…

WebGL/Cesium 大空间相机抖动 RTE(Relative to Eye)实现原理简析

在浏览器中渲染大尺寸 3D 模型&#xff1a;Speckle 处理空间抖动的方法 WebGL/Cesium 大空间相机抖动 RTE(Relative to Eye)实现原理简析 注: 相机空间和视图空间 概念等效混用 1、实现的关键代码 const material new THREE.RawShaderMaterial({uniforms: {cameraPostion: {…

【全开源】Java上门老人护理老人上门服务类型系统小程序APP源码

功能&#xff1a; 服务分类与选择&#xff1a;系统提供详细的老人护理服务分类&#xff0c;包括日常照护、康复训练、医疗护理等&#xff0c;用户可以根据老人的需求选择合适的服务项目。预约与订单管理&#xff1a;用户可以通过系统预约护理服务&#xff0c;并查看订单详情&a…

final原理

文章目录 1. 设置 final 变量的原理2. 获取 final 变量的原理 1. 设置 final 变量的原理 理解了 volatile 原理&#xff0c;再对比 final 的实现就比较简单了 public class TestFinal {final int a 20; }字节码 0: aload_0 1: invokespecial #1 // Method java/lang/Object…

Python进阶之-traceback详解

✨前言&#xff1a; 在日常开发中&#xff0c;我们会做一些基本的异常处理&#xff0c;但是有时候只能打印我们处理的结果或者将异常打印出来&#xff0c;不能直观的知道在哪个文件中的哪一行出错。在Python中&#xff0c;traceback是一个用来跟踪异常错误信息的标准库&#x…

Docker搭建LNMP+Wordpress

一.项目模拟 1.项目环境 公司在实际的生产环境中&#xff0c;需要使用 Docker 技术在一台主机上创建 LNMP 服务并运行 Wordpress 网站平台。然后对此服务进行相关的性能调优和管理工作。 安装包下载&#xff1a; wget http://101.34.22.188/lnmp_wordpress/mysql-boost-5.7…

记录k8s以docker方式安装Kuboard v3 过程

原本是想通过在k8s集群中安装kuboad v3的方式安装kuboard&#xff0c;无奈在安装过程中遇到了太多的问题&#xff0c;最后选择了直接采用docker安装的方式&#xff0c;后续有时间会补上直接采用k8s安装kuboard v3的教程。 1.kuboard安装文档地址&#xff1a; 安装 Kuboard v3 …

【机器学习】视觉基础模型的三维意识:前沿探索与局限

视觉基础模型的三维意识&#xff1a;前沿探索与局限 一、引言二、视觉基础模型的三维意识三、当前模型的局限性四、实验与结果五、总结与展望 大规模预训练的进展已经产生了具有强大能力的视觉基础模型。最近的模型不仅可以推广到任意图像的训练任务&#xff0c;而且它们的中间…

【AIGC调研系列】LLaVA++整合Phi-3和Llama-3能够实现什么

LLaVA能够为Phi-3和Llama-3带来的主要好处包括&#xff1a; 视觉处理能力的增强&#xff1a;通过整合Phi-3和Llama-3模型&#xff0c;创建了具备视觉处理能力的Phi-3-V和Llama-3-V版本&#xff0c;这意味着这些模型现在能够理解和生成与图像相关的内容[1]。这种能力的增加&…

第G9周:ACGAN理论与实战

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制&#x1f680; 文章来源&#xff1a;K同学的学习圈子 上一周已经给出代码&#xff0c;需要可以跳转上一周的任务 第G8周&#xff1a;ACGAN任…

ARP学习及断网攻击

1.什么是ARP ARP&#xff08;Address Resolution Protocol&#xff09;是一种用于在IPv4网络中将IP地址映射到MAC地址的协议。在计算机网络中&#xff0c;每个网络接口都有一个唯一的MAC地址&#xff08;Media Access Control address&#xff09;&#xff0c;用于识别网络设备…

c#创建新项目

确保已安装.NET Core SDK。&#xff08;visual studio installer中可安装&#xff09; cmd中先引用到文件夹目录下。 mkdir MyConsoleApp MyConsoleApp是项目文件夹的名字。 mkdir 是一个命令行工具&#xff0c;用于在文件系统中创建新的目录&#xff08;文件夹&#xff09;…

TCP协议在物联网中实战

一、TCP协议介绍 网上对TCP协议介绍众多&#xff0c;本人按照自己的理解简单介绍一下。 TCP&#xff08;Transmission Control Protocol&#xff0c; 传输控制协议&#xff09;是一种面向连接的、可靠的、基于字节流的传输控制层通信协议。 1.1 协议机制 1.1.1 三次握手 &…