Spark Master的注册机制与状态管理

目录

 

1、Master接收注册的主要对象

2、Master接收Worker的注册

3、Master接收Driver的注册

4、Master处理Driver状态变化

5、Master接收Application的注册

6、Master处理Executor状态变化


1、Master接收注册的主要对象

Master主要接受注册的对象是:Application,Driver,Worker。

注意:Executor不是注册给master而是注册给Driver中的SchedulerBackend

2、Master接收Worker的注册

Worker启动后主动向Master注册,所以在生产环境下不需要重启集群就能够使用新的Worker。

(1)worker是一个消息循环体,因为继承了ThreadSafeRPCEndpoint。他启动的时候有一大堆我们找到onStart方法。

(2)这里面调用了registerWithMaster,这里面用了tryRegisterAllMaster方法向所有的master提交。

(3)在具体注册向所有的master提交的时候,是用线程池的中一个线程来提交。然后就获得了masterEndpoint,获得了masterEndpoint之后,将其作为参数传入registerWithMaster方法。masterRpcAddresses是所有的master的地址,这里的map是进行对master进行逐个注册。这里发送给所有的master而不时发送给active是因为:如果不这样做的话,worker启动的时候就要搞明白谁是active的master,这样他的负担就加重了,就不符合强内聚,弱耦合的架构设计。

(4)在registerWithMaster的时候传进去的消息体就是RegisterWorker,它是个case class

这个worker是自己的引用,master可以通过这个ref来通信

(5)worker发出的消息经过通信之后,master会接收到注册请求,worker发送的是RegisterWorker,master的消息接收器,接收到了这个消息,如下

这个代码逻辑是:收到了worker启动注册的信息之后,首先判断一下自己的状态,如果是standby那肯定没戏。然后判断idToWorker这个HashMap中有没有这个worker,他的key就代表了worker本身字符串级别的描述,WorkerInfo和注册的信息基本是一致的。所以WorkerInfo就包含了对worker所又了解的内容。也就是说他构建了一个内存数据结构,包含了所有已经注册的信息,如果已经注册过的话就不会注册。

如果master不是standby,worker也没注册过的话我们会构建workerInfo,这里是接收通过模式匹配匹配到的具体的worker发过来的信息并报存。在这基础之上,把workerInfo作为参数传入到registerWorker,执行具体的注册的过程。

注册其实就是登记一下,保留信息。这里会过滤掉workState为dead的work,也就是说如果worker已经dead掉了,将来某段时间注册的话,是不会接受的(因为曾经已经dead掉了,现在突然活了,认为是不可以思议的事情)。

也就是首先会判断worker的状态是否是DEAD的状态则直接过滤掉,对于UNKOWN状态的内容会调用removeWorker清理(包括清理该worker下的executors和drivers),因为在具体的机器上,这个节点向executor和driver被当前的worker,也就是要remove的worker生成和管理的,worker挂了没人管理了。如果都没问题就加入到workers,idToWorker,addressWorker这3个内存数据结构中

(6)回到receiveAndReply中注册完毕之后,persistenceEngine持久化引擎要把注册的worker持久化起来(HA)。然后就reply回复。再然后Schedule()调度

3、Master接收Driver的注册

Driver注册给Master,Master会将Driver的信息放入内存缓存中,然后默认加入等待调度的队列,然后再次用持久化引擎将Driver持久化,然后使用schedule进行调度

(1)在Client启动的时候会创建一个ClientEndpoint。

一个RpcEndpoint经历的过程依次是:构建->onStart→receive→onStop。其中onStart在接收任务消息前调用,receive和receiveAndReply分别用来接收另一个RpcEndpoint(也可以是本身)send和ask过来的消息。在他的onStart方法中调用ayncSendToMasterAndForwardReply方法向每个master发送RequestSubmitDriver这个case class他的参数是driverDescription也是个case class包含了driver的信息

(2)master在接收到client发送过来的RequestSubmitDriver消息后,创建一个DriverInfo,然后persistenceEngine持久化引擎要把driver持久化起来,并且加入到waitingDrivers这个等待队列中

4、Master处理Driver状态变化

Driver的状态有SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR。当进行改变时会发送DriverStateChanged的消息给Master。如果状态是ERROR、FINISHED、KILLED、FAILED这几种情况,则调用removeDriver。

removeDriver里面首先根据driverId来find看下有没有这个driver,你说你的状态发生变化我要看你曾经有没有登记,如果没有登记的话就打印日志如果有的话就从drivers中把当前这个driver去掉,同时看下completedDrivers的个数是不是>=RETAED_DRIVERS的个数

5、Master接收Application的注册

Application本身就是通过spark-submit的方式提交application的时候是通过schedulerBackend注册,然后将Application的信息放入内存缓存中,然后application加入等待调度的application队列,然后再次用持久化引擎将drievr持久化,然后使用schedule进行调度。

注意:注册的时候是先注册Driver然后再注册Application

在schedulerBackend的实现类StandaloneSchedulerBackend中的start方法中会创建一个StandaloneAppClient。

在StandaloneAppClient发送RegisterApplication的消息向master注册

Master收到RegisterApplication消息后Application的信息放入内存缓存中,然后application加入等待调度的application队列,然后再次用持久化引擎将drievr持久化,然后使用schedule进行调度。

 

6、Master处理Executor状态变化

当Executor状态发送变化时也同时会更新在Master端的注册信息,收到的消息是ExecutorStateChanged

首先查一下已注册的executors中有没有这个executor。任何获得executorState信息 如果是RUNNING的状态resetRetryCount重置一下次数。就是Executor挂掉的时候会尝试一定次数的重启(最多重试10次)。这里只是记录次数。

这个时候Executor的状态发生变化,他会告诉driver,send给他,就是driver要知道这个内容。如果是状态是Finish的话就把他remove掉,executor的lost大多是有Shuffle的输出导致的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

排序:插入排序与希尔排序

插入排序 插入排序(英语:Insertion Sort)是一种简单直观的排序算法。它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。插入排序在实现上,在从…

jieba分词提取小说人名

文章目录1. 读入文本2. 分词3. 计数4. 排序5. 添加用户字典以《神雕侠侣》为例: 使用 jieba.posseg获取词性,人名的词性为 nr 1. 读入文本 import jieba.posseg as psg with open(shendiaoxialv.txt,encodingutf-8) as f:text f.readlines()print(te…

Spark Worker源码

目录 1、概述 2、LaunchDriver 3、LaunchDriver 4、总结 1、概述 worker肯定是实现RPC通信的,否则别人没法给你发消息。他继承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是线程安全的,意味着处理一条消息完成后再处理下一个消息。换…

排序:快速排序与归并排序

快速排序 快速排序(英语:Quicksort),又称划分交换排序(partition-exchange sort),通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都…

LeetCode 764. 最大加号标志(DP)

文章目录1. 题目2. 解题1. 题目 在一个大小在 (0, 0) 到 (N-1, N-1) 的2D网格 grid 中,除了在 mines 中给出的单元为 0,其他每个单元都是 1。网格中包含 1 的最大的轴对齐加号标志是多少阶?返回加号标志的阶数。如果未找到加号标志&#xff…

机器学习基础—Kaggle泰坦尼克预测(完整分析)

1.引言 我们先找个简单的实际例子,来看看,所谓的数据挖掘或者机器学习实际应用到底是怎么样一个过程。 2.背景 2.1 关于Kaggle Kaggle是一个数据分析建模的应用竞赛平台,有点类似KDD-CUP(国际知识发现和数据挖掘竞赛)&…

Spark Executor解析

目录 1、Spark Executor如何工作 2、Spark Executor工作源码 1、Spark Executor如何工作 当Driver发送过来Task的时候,其实是发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体永远…

LeetCode 381. O(1) 时间插入、删除和获取随机元素 - 允许重复(vector + 哈希)

文章目录1. 题目2. 解题1. 题目 设计一个支持在平均 时间复杂度 O(1) 下, 执行以下操作的数据结构。 注意: 允许出现重复元素。 insert(val):向集合中插入元素 val。remove(val):当 val 存在时,从集合中移除一个 val。getRando…

Stage划分和Task最佳位置

目录 1、Job Stage划分 2、Task最佳位置 3、总结 3.1 Stage划分总结: 3.2 Task最佳位置总结: 1、Job Stage划分 Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是…

LeetCode 1636. 按照频率将数组升序排序(哈希+排序)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums ,请你将数组按照每个值的频率 升序 排序。 如果有多个值的频率相同,请你按照数值本身将它们 降序 排序。 请你返回排序后的数组。 示例 1: 输入:nums [1,1,2,2,2,3] 输出…

TaskScheduler相关

目录 1、DAGScheduler与TaskScheduler 2、TaskScheduler与SchedulerBackend 3、任务调度过程总结 1、DAGScheduler与TaskScheduler DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stag…

LeetCode 1637. 两点之间不包含任何点的最宽垂直面积

文章目录1. 题目2. 解题1. 题目 给你 n 个二维平面上的点 points ,其中 points[i] [xi, yi] ,请你返回两点之间内部不包含任何点的 最宽垂直面积 的宽度。 垂直面积 的定义是固定宽度,而 y 轴上无限延伸的一块区域(也就是高度为…

Task执行流程

1、源码走读 (1)当Driver中的SchedulerBackend(Standalone模式为CoarseGrainedSchedulerBackend)给ExecutorBackend(Standalone模式为CoarseGrainedExecutorBackend)发送LaunchTask之后,Coarse…

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。 换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年,是Armin ronacher(人名)用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr ,数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces,其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是,不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”,或者,也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n,请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足:对于所有有效的 i,s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights ,表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程,不断向后面的建筑物移动,期间可能会用到砖块或梯子。 当从建筑…