Spark Worker源码

目录

 

1、概述

2、LaunchDriver

3、LaunchDriver

4、总结


1、概述

worker肯定是实现RPC通信的,否则别人没法给你发消息。他继承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是线程安全的,意味着处理一条消息完成后再处理下一个消息。换句话说,在处理下一条消息时,可以看到对ThreadSafeRpcEndpoint的内部字段的更改,并且ThreadSafeRpcEndpoint中的字段不需要是volatile或等效的。但是,不能保证同一个线程将为不同的消息执行相同的ThreadSafeRpcEndpoint。即顺序处理消息,不能同时并发处理。

Worker本身在实际运行的时候是作为一个进程,他会接收master的指令,有几个非常重要的指令,如LaunchExecutor,LaunchDriver等。这两个指令是Schedule进行资源调度(Master的schedule方法中)的时候发送的。

2、LaunchDriver

Worker在收到Driver发送的LaunchDriver类型的信息后。

(1)首先首先打印一个日志,master传进来的时候肯定会告诉你driverId的,然后new 一个DriverRunner,然后把这个实例通过driverId交给一个HashMap数据结构val drivers = new HashMap[String, DriverRunner]。key就是driverId,value就是DriverRunner。这个数据结构非常重要,因为我们在worker上可能启动很多Executor,需要根据ExecutorId管理具体的DriverRunner,DriverRunner内部通过线程的方式启动了另外一个进程,所以可以简单理解DriverRunner是Driver所在进程中本身的process,这个就是一个代理模式。

(2)管理Driver的执行,包括在Driver失败的时候自动重启,要是在Standalone的模式下。失败是否重试是看DriverDescription中的supervise是否为true,如果指定了这个参数为true,driver在失败的时候worker会负责启动这个Driver

(3)构建好DriverRunner实例,并且已经将其加入到drivers中后,调用DriverRunner的start方法。在start方法中通过一个线程启动Driver,并管理Driver,线程运行的时候run方法会被执行。

在run方法中有个prepareAndRunDriver用于准备Driver需要的jar并运行Driver

(4)在prepareAndRunDriver方法中,会创建工作目录,下载jar包到本地,并封装好Driver的启动Command,通过buildProcessBuilder来启动Driver。

driverDesc.command这个指定他启动的时候应该运行什么类,就是类的入口。driverDesc是Master远程发送过来的,为CoarseGrainedExecutorBackend

进入到runDriver方法中,有个initialize方法,里面重定向输出和error,将stout和stderr重定向到baseDir下,这样就可以通过log看一下曾经执行的情况。

然后执行runCommandWithRetry,在参数中会构造ProcessBuilderLike。ProcessBuilderLike在apply的时候就new ProcessBuilderLike,在这里面processBuilder.start()

在runCommandWithRetry方法中,会一直循环,孵化出一个进程,有个方法这个是阻塞的,言外之意就是当他回复过来的时候估计就有问题了,那就判断一下

(5)在prepareAndRunDriver方法中启动Driver之后,如果运行出状况了,出状况后会给自己发一个消息

(6)在这里不同的情况打印log日志,最关键的是sendToMaster(driverStateChanged)发送给master。发送的类型是DriverStateChanged

(7)到Master方法中。master收到这个消息后就把他remove掉,是从自己的内存数据结构中remove,同时把这个driver曾经占用的数据,还有持久化的都remove,然后再次发消息给worker确认下,因为发生了资源的变动再次进行schedule

(8)回到Worker中,start之后记录耗了多少内存CPU

3、LaunchDriver

Worker在收到Driver发送的LaunchExecutor类型的信息后首先new 一个ExecutorRunner,然后start。过程与启动Driver类似,就不细说。

在ExecutorRunner的start方法中会通过一个线程启动Executor,并管理Executor,线程运行的时候run方法会被执行。

fetchAndRunExecutor方法中类似driver中创建该executor工作目录,下载运行的jar,开启执行application的进程executor。并向worker发送ExecutorStateChanged的事件通知,

worker先向自己发送ExecutorStateChanged的消息

在start方法之后,记录耗了多少内存CPU,然后向master发送接收到的ExecutorStateChanged的事件通知

 

4、总结

driver进程就是executor进程;ExecutorBackend是进程名称,standalone模式下是CoarseGrainedExecutorBackend。在CoarseGrainedExecutorBackend中有我们的Executor对象本身,Executor和ExecutorBackend是一对一的关系,就是一个ExecutorBackend进程里面有一个Executor,在Executor内部是通过线程池并发处理的方式来处理Spark提交过来的Task。

注意:Executor启动之后要向我们的driver注册。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473642.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

排序:快速排序与归并排序

快速排序 快速排序(英语:Quicksort),又称划分交换排序(partition-exchange sort),通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都…

LeetCode 764. 最大加号标志(DP)

文章目录1. 题目2. 解题1. 题目 在一个大小在 (0, 0) 到 (N-1, N-1) 的2D网格 grid 中,除了在 mines 中给出的单元为 0,其他每个单元都是 1。网格中包含 1 的最大的轴对齐加号标志是多少阶?返回加号标志的阶数。如果未找到加号标志&#xff…

机器学习基础—Kaggle泰坦尼克预测(完整分析)

1.引言 我们先找个简单的实际例子,来看看,所谓的数据挖掘或者机器学习实际应用到底是怎么样一个过程。 2.背景 2.1 关于Kaggle Kaggle是一个数据分析建模的应用竞赛平台,有点类似KDD-CUP(国际知识发现和数据挖掘竞赛)&…

Spark Executor解析

目录 1、Spark Executor如何工作 2、Spark Executor工作源码 1、Spark Executor如何工作 当Driver发送过来Task的时候,其实是发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体永远…

LeetCode 381. O(1) 时间插入、删除和获取随机元素 - 允许重复(vector + 哈希)

文章目录1. 题目2. 解题1. 题目 设计一个支持在平均 时间复杂度 O(1) 下, 执行以下操作的数据结构。 注意: 允许出现重复元素。 insert(val):向集合中插入元素 val。remove(val):当 val 存在时,从集合中移除一个 val。getRando…

Stage划分和Task最佳位置

目录 1、Job Stage划分 2、Task最佳位置 3、总结 3.1 Stage划分总结: 3.2 Task最佳位置总结: 1、Job Stage划分 Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是…

LeetCode 1636. 按照频率将数组升序排序(哈希+排序)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums ,请你将数组按照每个值的频率 升序 排序。 如果有多个值的频率相同,请你按照数值本身将它们 降序 排序。 请你返回排序后的数组。 示例 1: 输入:nums [1,1,2,2,2,3] 输出…

TaskScheduler相关

目录 1、DAGScheduler与TaskScheduler 2、TaskScheduler与SchedulerBackend 3、任务调度过程总结 1、DAGScheduler与TaskScheduler DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stag…

LeetCode 1637. 两点之间不包含任何点的最宽垂直面积

文章目录1. 题目2. 解题1. 题目 给你 n 个二维平面上的点 points ,其中 points[i] [xi, yi] ,请你返回两点之间内部不包含任何点的 最宽垂直面积 的宽度。 垂直面积 的定义是固定宽度,而 y 轴上无限延伸的一块区域(也就是高度为…

Task执行流程

1、源码走读 (1)当Driver中的SchedulerBackend(Standalone模式为CoarseGrainedSchedulerBackend)给ExecutorBackend(Standalone模式为CoarseGrainedExecutorBackend)发送LaunchTask之后,Coarse…

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。 换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年,是Armin ronacher(人名)用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr ,数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces,其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是,不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”,或者,也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n,请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足:对于所有有效的 i,s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights ,表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程,不断向后面的建筑物移动,期间可能会用到砖块或梯子。 当从建筑…

ClickHouse高可用及副本测试

1 概述 ​ 对于默认的分布式表的配置,每个分片只有一份,这种多分片单副本集群,挂掉一个节点的话查询分布式表会报错。为了解决这个问题的话可以使用ClickHouse高可用集群,对于每个分片具有2个或2个以上的副本,当某个节…

阿里云Redis读写分离典型场景:如何轻松搭建电商秒杀系统

秒杀活动是绝大部分电商选择的低价促销,推广品牌的方式。不仅可以给平台带来用户量,还可以提高平台知名度。一个好的秒杀系统,可以提高平台系统的稳定性和公平性,获得更好的用户体验,提升平台的口碑,从而提…

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在,我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下: 对于三元组(A, B, C) ,“C”为顶层方块,方块“A”、“B”分别作为方块“…