Spark Executor解析

目录

 

1、Spark Executor如何工作

2、Spark Executor工作源码


1、Spark Executor如何工作

当Driver发送过来Task的时候,其实是发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体永远也无法接收远程发过来的信息)。

Driver向ExecutorBackend发送LaunchTask,这里实际上是把内容交给线程池中的线程去执行。首先判断Executor是否为空,反序列化TaskDescription,然后调用Executor.launchTask ,launchTask 里面是将Task封装在TaskRunner(是一个runnable对象)里面然后交给线程池中的线程处理。在TaskRunner的run方法里面会导致runTask的执行。

2、Spark Executor工作源码

(1)Worker接收到Master发送过来启动Executor的指令,通过ExecutorRunner启动另外一个进程来运行Executor。在这个基础上会启动一个CoarseGrainedExecutorBackend(粗粒度),ExecutorBackend启动的时候这个时候要向Driver注册,通过RegisterExecutor。Driver(CoarseGrainedSchedulerBackend的内部成员DriverEndpoint接收注册)在接收到RegisterExecutor信息后会返回一个信息RegisteredExecutor给CoarseGrainedExecutorBackend。

CoarseGrainedExecutorBackend:是Executor运行所在的进程的名称,他本身不会完成具体任务的计算,这个进程里面有个Executor对象和CoarseGrainedExecutorBackend是一一对应的。

Executor:完成具体的计算,真正处理Task的对象,内部通过线程池的方式完成Task的计算。

(2)Driver的CoarseGrainedSchedulerBackend内部的成员DriverEndpoint收到RegisterExecutor后,首先判断ExecutorDataMap中是否有这个ExecutorId。ExecutorDataMap是个内存数据结构(他是CoarseGrainedSchedulerBackend的成员,所以最终是注册给CoarseGrainedSchedulerBackend),他是一个HashMap。

如果ExecutorDataMap中不存在,就进行注册,注册的时候先看看address存在不存在,如果存在直接获取,如果不存在就获取senderAddress。这从实现的角度看adress就是senderAddress。然后相关数据结构添加数据

CoarseGrainedSchedulerBackend.this.synchronized这里加上synchronized是因为集群中有很多ExecutorBackend向Driver注册,担心注册的时候写冲突,所以加上一个同步代码块

最后把消息发还给CoarseGrainedExecutorBackend

(3)CoarseGrainedExecutorBackend收到RegisterExecutor后就new了一个Executor这个Executor对象是事实上负责Task计算的

(4)在Executor默认构造器中有一个非常关键的内容,有个成员threadPool(线程池)

创建线程池,线程池里面要有线程,线程怎么产生的呢?不会平白无故的产生,所以就搞了一个线程工厂,就是按照某种你需要的格式去产生线程,背后还是new出一个线程

setDamon(True)设置每个线程是后台运行的方式

(5)线程池也准备好后就是等待Driver端发任务过来,是发给CoarseGrainedExecutorBackend不是Executor,因为Executor不可能接收到消息的他本身就不是一个消息循环体。CoarseGrainedExecutorBackend收到LaunchTask后,这里实际上是把内容交给线程池中的线程去执行。判断Executor是否为空,反序列化TaskDescription,然后调用Executor.launchTask

(6)launchTask 里面是将Task封装在TaskRunner(是一个runnable对象)里面然后交给线程池中的线程处理。并把这个任务加入ConcurrentHashMap类型的名称为runningTasks的数据结构中管理。

(7)在TaskRunner的run方法里面会导致runTask的执行

补充:为什么要在worker接收到master发送过来的指令后为什么要启动另外一个进程,也就是说为什么开辟另外一个进程,在另外一个进程中注册给Driver,然后启动Executor。必须启动另外一个进程的原因:①Worker本身是管理当前机器上的资源的,当前机器上的资源变动的时候要汇报给Master,Worker不是用来做计算的不能在Worker里面计算;②Spark集群中可能有很多应用程序就可能有很多的Executor,如果不是为每个Executor启动一个进程而是所有Executor在Worker里面,那么一个程序奔溃了会导致其他程序奔溃。

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473637.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 381. O(1) 时间插入、删除和获取随机元素 - 允许重复(vector + 哈希)

文章目录1. 题目2. 解题1. 题目 设计一个支持在平均 时间复杂度 O(1) 下, 执行以下操作的数据结构。 注意: 允许出现重复元素。 insert(val):向集合中插入元素 val。remove(val):当 val 存在时,从集合中移除一个 val。getRando…

Stage划分和Task最佳位置

目录 1、Job Stage划分 2、Task最佳位置 3、总结 3.1 Stage划分总结: 3.2 Task最佳位置总结: 1、Job Stage划分 Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是…

LeetCode 1636. 按照频率将数组升序排序(哈希+排序)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums ,请你将数组按照每个值的频率 升序 排序。 如果有多个值的频率相同,请你按照数值本身将它们 降序 排序。 请你返回排序后的数组。 示例 1: 输入:nums [1,1,2,2,2,3] 输出…

TaskScheduler相关

目录 1、DAGScheduler与TaskScheduler 2、TaskScheduler与SchedulerBackend 3、任务调度过程总结 1、DAGScheduler与TaskScheduler DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stag…

LeetCode 1637. 两点之间不包含任何点的最宽垂直面积

文章目录1. 题目2. 解题1. 题目 给你 n 个二维平面上的点 points ,其中 points[i] [xi, yi] ,请你返回两点之间内部不包含任何点的 最宽垂直面积 的宽度。 垂直面积 的定义是固定宽度,而 y 轴上无限延伸的一块区域(也就是高度为…

Task执行流程

1、源码走读 (1)当Driver中的SchedulerBackend(Standalone模式为CoarseGrainedSchedulerBackend)给ExecutorBackend(Standalone模式为CoarseGrainedExecutorBackend)发送LaunchTask之后,Coarse…

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。 换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年,是Armin ronacher(人名)用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr ,数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces,其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是,不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”,或者,也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n,请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足:对于所有有效的 i,s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights ,表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程,不断向后面的建筑物移动,期间可能会用到砖块或梯子。 当从建筑…

ClickHouse高可用及副本测试

1 概述 ​ 对于默认的分布式表的配置,每个分片只有一份,这种多分片单副本集群,挂掉一个节点的话查询分布式表会报错。为了解决这个问题的话可以使用ClickHouse高可用集群,对于每个分片具有2个或2个以上的副本,当某个节…

阿里云Redis读写分离典型场景:如何轻松搭建电商秒杀系统

秒杀活动是绝大部分电商选择的低价促销,推广品牌的方式。不仅可以给平台带来用户量,还可以提高平台知名度。一个好的秒杀系统,可以提高平台系统的稳定性和公平性,获得更好的用户体验,提升平台的口碑,从而提…

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在,我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下: 对于三元组(A, B, C) ,“C”为顶层方块,方块“A”、“B”分别作为方块“…

Flask框架项目实例:**租房网站(一)

Flask是一款MVC框架,主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识,通过完成作者-读书功能,先来熟悉Flask框架的完整使用步骤。 操作步骤为: 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

LeetCode 316. 去除重复字母 / 1081. 不同字符的最小子序列(单调栈)

文章目录1. 题目2. 解题1. 题目 LC 316: 给你一个字符串 s ,请你去除字符串中重复的字母,使得每个字母只出现一次。需保证 返回结果的字典序最小(要求不能打乱其他字符的相对位置)。 示例 1: 输入&#…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受,比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组,例如:&qu…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署,主要包括两个方面: 1.nginx安装与配置: 1、Nginx 安装 系统平台:CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…