【Spark系列4】Task的执行

一、Task的执行流程

1.1、Task执行流程

DAGScheduler将Stage生成TaskSet之后,会将Task交给TaskScheduler进行处理,TaskScheduler负责将Task提交到集群中运行,并负责失败重试,为DAGScheduler返回事件信息等,整体如流程如下:

当任务提交到TaskScheduler时,TaskScheduler会通知SchedulerBackend分配计算资源,SchedulerBackend将所有可用的Executor的资源信息转换成WorkerOffer交给TaskScheduler,WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。TaskScheduler负责根据WorkerOffer在相应的Executor分配TaskSet中的Task,并将Task转换为TaskDescription交给SchedulerBackend。最终有空闲的的CPU的Executor会被分配到一个或者多个TaskDescription,SchedulerBackend将这些TaskDescription提交到对应的Executor中执行。

1.2、集群资源管理

Task运行离不开集群中的计算资源,即在SparkContext初始化过程中创建的Executor资源。在Executor创建完毕后回向SchedulerBackend中注册。Executor在注册时发送的信息包含的内容有:executorId,Executor-Ref引用、Executor的hostname、可用的CPU核数。

SchedulerBackend收到后,会将Executor的注册信息转换为ExecutorData进行保存,并且在SchedulerBackend中使用Map结构保存每个executorId和ExecutorData的关系,ExecutorData中还记录了剩余的可用的CPU核数

在为计算任务分配资源时,只需遍历所有的ExecutorData,分配可用的资源即可。由ExecutorData分配的可用资源使用WorkerOffer表示,WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。

1.3、任务的分配

TaskScheduler在接受到DAGScheduler提交的TaskSet以后,会为每个TaskSet创建一个TaskSetManager,用于管理TaskSet中所有任务的运行。TaskSetManager会根据Task中的最佳运行位置计算TaskSet的所有本地运行级别,本地运行级别决定Task最终在哪个Executor上运行。Spark中本地运行级别从小到达可分为:

  1. 进程本地化
  2. 节点本地化
  3. 无优先位置
  4. 机架本地化
  5. 任意节点

在TaskSetManager初始化时,根据着5个本地运行级别分别创建5个Map,分别记录其下可以运行的所有Task。这些映射关系的建立,时根据生成Task时Task运行的最佳位置确定的。。在这5种映射关系中,某个Task可能会重复存在于几个本地化级别中。

当有新的TaskSet加入、由Task执行完成、由新的Executor加入时,都会触发SchedulerBackend重新计算可用资源。TaskScheduler根据调度的顺序,依次调度TaskSetManager中的TaskSet,对于每个TaskSet遍历所有本地化级别,从小到大尝试在Executor分配Task,根据每个WorkerOffer的executorId和hostname,使用TaskSetManager判断在当前本地化级别中,是否可以在该Executor或Host上分配任务,直到该本地化级别无法分配Task,再将本地化级别提高一级再次尝分配Task。经过对本地化级别的便利,即可实现WorkerOffer分配任务或将所有待执行的任务分配完成。TaskSet中部分任务分配完成以后会生成一组TaskDescription,每个TaskDescription中包含executorId和Task的其他运行信息。SchedulerBackend根据TaskDescription的executorId,将每个任务封装成LaunchTask消息提交到不同的Executor中

二、Task的执行

Executor收到SchedulerBackend提交的LaunchTask消息后,即可运行该消息中包含的Task。Executor将接收到的Task封装到TaskRunner中,TaskRunner是一个Runnable接口,从而可以将该任务提交到线程池中运行。

2.1、Executor可以并行运行Task的数量

在创建Executor时,每个Executor可能会分配多个CPU核数,而Executor运行的所有任务都是在线程池中运行。Executor运行的时候其本身没有记录CPU使用的情况,对于Executor能够同时运行多少个任务是由SchedulerBackend控制的,SchedulerBackend每在一个Executor中提交一个任务时,便在ExecutorData中减少该Executor可用的CPU核数,直到该Executor生成的WorkerOffer可用的CPU核数为0,便不再为Executor分配任务了。默认每个Task使用一个CPU核心运行,该变量可以通过Spark的配置spark.task.CPUs修改

2.2、Executor中资源共享

当在一个Executor上运行多个Task时,多个Task共享Executor中的SparkEnv的所有组件,共用Executor中分配的内存。如使用Spark广播变量时,每个Executor中会存在一份,Executor所有任务共享这一份变量。当Executor中的BlockManager缓存了某个rdd某分区的数据时,在该Executor上调度使用这个RDD的这个分区的数据的Task执行,可以有效的减少网络加载数据的过程,减少网络传输

2.3、ResultTask运行

在执行ResultTask时,首先会反序列化出该Task执行计算的RDD和对该RDD执行的操作。根据是否涉及Shuffle操作,分为两种

  1. 用户编写的RDDtransformation中,不涉及Shuffle操作,一个Job就只涉及一个ResultStage,rdd1直接从数据源中加载

  2. 过程中涉及Shuffle操作,划分为两个Stage,rdd1位Shuffle的Reduce阶段。由于DAGScheduler在划分Stage,必先会先计算父Stage,所以执行到ResultStage时,,其父Stage的Map阶段已经完成,并且计算结果已经保存到了BlockManager中,ResultStage中的rdd1之需要根据MapOutputTrackerMaster的计算结果位置信息加载该分区的数据即可

2.4、ShuffleMapTask运行

在计算ShuffleMapTask时,首先会反序列化出Task包含的计算的RDD和划分此Stage的ShulffleDependency。ShulffleDependency包含RDD需要执行分组操作的分区器partitioner,并且通过ShulffleDependency可以获取ShulffleManager的写入器,将本分区的分组计算结果通过写入器写入文件中进行保存。在这个过程中,一个分区的数据生成的多个分组的数据分别属于下游Reduce阶段的不同的分区的数据

ShuffleMapTask中计算的RDD同样为这个Stage中最后的一个RDD。

下图是多个ShuffleMapStage的RDD转换过程

三、Task结果处理

当Executor中Task运行完成时,需要将Task运行结果返回Driver程序,Driver程序根据结果判断该Stage是否计算完成

3.1、ResultTask结果

ResultTask完成后,会将其结果返回直Driver端。根据运行结果的大小返回的结果 被分为直接运行结果和非直接间接运行结果。 当运行结果大于Spark配置的最大直接结果大小的参数时, 会将运行结果保存至当前Executor的BlockManager中,并将保存的地址序列化后返回,否则直接将运行结果序列化后返回

3.2、ShuffleMapTask结果

ShuffleMapTask运行完成后,会将运行结果直接保存至当前Executor的BlockManager中,并将保存结果的位置封装到MapStatus中,最终ShuffleMapTask运行完成结果都为MapStatus类型

3.3、返回至Driver端

Executor将Task的运行结果序列化后,通过Driver的Endpoint-Ref发送至Driver端,Driver的Endpoint收到运行结果后,通知TaskScheduler Task运行完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenGL ES 渲染 NV21、NV12 格式图像有哪些“姿势”?

使用2个纹理实现 NV21 格式图像渲染 前文提到渲染 NV21 格式图像需要使用 2 个纹理,分别用于保存 Y plane 和 UV plane 的数据,然后在片段着色器中分别对 2 个纹理进行采样,转换成 RGB 数据。 OpenGLES 渲染 NV21或 NV12 格式图像需要用到 GL_LUMINANCE 和 GL_LUMINANCE_A…

http和https的区别是什么?https有什么优缺点?

HTTP(Hypertext Transfer Protocol,超文本传输协议)是一个简单的请求-响应协议,它通常运行在TCP之上。它指定了客户端可能发送给服务器什么样的消息以及得到什么样的响应。这个简单模型是早期Web成功的有功之臣,因为它…

The following untracked working tree files would be overwritten by merge问题的解决

作者:朱金灿 来源:clever101的专栏 为什么大多数人学不会人工智能编程?>>> 在更新git仓库时出现了一个The following untracked working tree files would be overwritten by merge的错误,具体如下图: 分析…

ES 分词器

概述 分词器的主要作用将用户输入的一段文本,按照一定逻辑,分析成多个词语的一种工具 什么是分词器 顾名思义,文本分析就是把全文本转换成一系列单词(term/token)的过程,也叫分词。在 ES 中,Ana…

2024年新提出的算法:(凤头豪猪优化器)冠豪猪优化算法Crested Porcupine Optimizer(附Matlab代码)

本次介绍一种新的自然启发式元启发式算法——凤头豪猪优化器(Crested Porcupine Optimizer,CPO)。该成果于2024年1月发表在中科院1区SCI top期刊Knowledge-Based Systems(IF 8.8)上。 1、简介 受到凤头豪猪(CP)各种…

iOS 自动打包如何配置配置打包证书和profile provision文件【脚本方式配置】

iOS 最新Jenkins自动化打包总结 本文主要内容: 1.Xcode和Jenkins的相关设置,以及环境切换 2.通过shell脚本将证书和描述文件拷贝到自动化打包的机器,并archive导出ipa包 3.上传到蒲公英 4.解决Swift不支持use_frameworks!的问题 开搞&…

【开源】SpringBoot框架开发天然气工程业务管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、使用角色3.1 施工人员3.2 管理员 四、数据库设计4.1 用户表4.2 分公司表4.3 角色表4.4 数据字典表4.5 工程项目表4.6 使用材料表4.7 使用材料领用表4.8 整体E-R图 五、系统展示六、核心代码6.1 查询工程项目6.2 工程物资…

[Grafana]ES数据源Alert告警发送

简单的记录一下使用es作为数据源,如何在发送告警是带上相关字段 目录 前言 一、邮件配置 二、配置 1.Query 2.Alerts 总结 前言 ES作为数据源,算是Grafana中比较常见的,Alerts告警是我近期刚接触,有一个需求是当表空间大于…

flutter实现:使用三方组件syncfusion_flutter_datagrid

Syncfusion Flutter DataGrid 是一个用于 Flutter 的数据网格组件,它提供了丰富的功能来显示和编辑数据。这个组件提供了灵活的配置选项,使得开发者能够根据需要定制数据的显示和编辑方式。 项目中有两个需求,一是在列表中要使用可变高度&am…

OpenCV 5 - 图像混合处理addWeighted()

图像混合 1 理论-线性混合操作 其中α的取值范围为0~1之间,表示图像的所占的权重 2 混合处理函数addWeighted() 3 代码示例 Mat src1, src2, dst;src1 imread("./1.png");src2 imread("./2.png");if (!src1.data && src2.empty()) //判断图片是…

云安全中的常见云漏洞和威胁,有哪些防范措施

随着企业在数字化时代的脚步中愈发倚重云托管服务,云安全问题成为不容忽视的焦点。云服务的便捷性为企业提供了强大的存储和计算能力,然而,与之伴随而来的攻击风险也日益显著。最新的研究数据揭示,云安全漏洞可能导致的数据泄露&a…

ETCD监控方法以及核心指标

文章目录 1. 监控指标采集1.1 监控指标采集1.2 配置promethues采集和大盘 2. 核心告警指标3. 参考文章 探讨etcd的监控数据采集方式以及需要关注的核心指标,便于日常生产进行监控和巡检。 1. 监控指标采集 etcd默认通过/metrics指标暴露相关指标,因此不…

opencv#37 形态学操作——腐蚀

图像腐蚀目的 去除图像中微小物体 分离较近的两个物体 我们对图像中所有米粒进行二值化处理,之后进行连通域分割以求去整个图像中共用多少米粒,处理结果在可以发现,在上图中有一小块区域上有个小点(非米粒)&#xff…

Kotlin快速入门系列7

Kotlin的数据类、密封类、枚举类 数据类 与Java等语言不同,kotlin创建只包含数据的类时,需要在类名前声明关键字:data。 data class KotlinBean (val brand : String) 在kotlin中,编译器会自动的从主构造函数中根据所有声明的…

RabbitMQ-如何保证消息不丢失

RabbitMQ常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等..... 在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么…

档案数字化转型面临问题

档案数字化转型面临以下问题: 1. 技术问题:档案数字化需要借助先进的技术手段和设备,包括扫描仪、存储设备和数据管理软件等。这些技术的成本高、操作复杂,需要专业的人员进行操作和维护。 2. 安全问题:档案数字化后的…

【Java程序设计】【C00176】基于SSM的图书管理系统(论文+PPT)

基于SSM的图书管理系统(论文PPT) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的图书管理系统 本系统分为前台系统、后台管理员以及后台学员3个功能模块。 前台系统:当游客打开系统的网址后,首先看到的就…

gmsh 01 对多个面,及体进行剖分

#include <set> #include <cmath> #include <gmsh.h>#include <iostream>int main(int argc, char** argv) {gmsh::initialize(argc, argv); // 初始化gmsh::model::add("t2"); // 创建 t2 modeldouble lc 0.05; gmsh::model::geo::add…

二手交易|校园二手交易小程序|基于微信小程序的闲置物品交易平台设计与实现(源码+数据库+文档)

校园二手交易小程序目录 目录 基于微信小程序的闲置物品交易平台设计与实现 一、前言 二、系统功能设计 三、系统实现 1、用户信息管理 2、商品信息管理 3、公告信息管理 4、论坛信息管理 四、数据库设计 1、实体ER图 五、核心代码 六、论文参考 七、最新计算机毕…

斜率优化dp模型整理

300. 任务安排1&#xff08;300. 任务安排1 - AcWing题库&#xff09; 思路&#xff1a;很明显这些任务是按顺序排好的&#xff0c;我们能执行的操作只是对它们进行分批&#xff0c;我们可以发现每一批之前的开始时间s&#xff0c;影响的不仅仅是当前这一批的结束时间&#xff…