Spark(四) -- Spark工作机制

一、应用执行机制

一个应用的生命周期即,用户提交自定义的作业之后,Spark框架进行处理的一系列过程。

在这个过程中,不同的时间段里,应用会被拆分为不同的形态来执行。

1、应用执行过程中的基本组件和形态

Driver:
运行在客户端或者集群中,执行Application的main方法并创建SparkContext,调控整个应用的执行。

Application:
用户自定义并提交的Spark程序。

Job:
一个Application可以包含多个Job,每个Job由Action操作触发。

Stage:
比Job更小的单位,一个Job会根据RDD之间的依赖关系被划分为多个Stage,每个Stage中只存有RDD之间的窄依赖,即Transformation算子。

TaskSet:
每个Stage中包含的一组相同的Task。

Task:
最后被分发到Executor中执行的具体任务,执行Stage中包含的算子。

明确了一个应用的生命周期中会有哪些组件参与之后,再来看看用户是怎么提交Spark程序的。

2、应用的两种提交方式

Driver进程运行在客户端(Client模式):

即用户在客户端直接运行程序。
程序的提交过程大致会经过以下阶段:

  1. 用户运行程序。
  2. 启动Driver进行(包括DriverRunner和SchedulerBackend),并向集群的Master注册。
  3. Driver在客户端初始化DAGScheduler等组件。
  4. Woker节点向Master节点注册并启动Executor(包括ExecutorRunner和ExecutorBackend)。
  5. ExecutorBackend启动后,向Driver内部的SchedulerBackend注册,使得Driver可以找到计算节点。
  6. Driver中的DAGScheduler解析RDD生成Stage等操作。
  7. Driver将Task分配到各个Executor中并行执行。

Driver进程运行在集群中(某个Worker节点,Cluster模式):

即用户将Spark程序提交给Master分配执行。
大致会经过一下流程:

  1. 用户启动客户端,提交Spark程序给Master。
  2. Master针对每个应用分发给指定的Worker启动Driver进行。
  3. Worker收到命令之后启动Driver进程(即DriverRunner和其中的SchedulerBackend),并向Master注册。
  4. Master指定其他Worker启动Executor(即ExecutorRunner和其内部的ExecutorBackend)。
  5. ExecutorBackend向Driver中的SchedulerBackend注册。
  6. Driver中的DAGScheduler解析RDD生产Stage等。
  7. Executor内部启动线程池并行化执行Task。

可以看到,两种程序的提交方式在处理过程中,仅仅是在哪个地方启动Driver进程的区别而已。
为Client模式中时(使用Spark Shell直接执行的程序),Driver就在客户端上。
为Cluster模式时(提交Spark程序到Master),Driver运行与集群中的某个Worker节点。

二、调度与任务分配模块

Spark框架就像一个操作系统一样,有着自己的作业调度策略,当集群运行在不同的模式下,调度不同级别的单位,使用的策略也是有所不同的。

1、Application之间的调度

当有多个用户提交多个Spark程序时,Spark是如何调度这些应用并合理地分配资源呢?

Standalone模式下,默认使用FIFO,每个app会独占所有资源

可以通过以下几个参数调整集群相关的资源:

  • spark.cores.max:调整app可以在整个集群中申请的CPU core数量
  • spark.deploy.defaultCores:默认的CPU core数量
  • spark.executor.memory:限制每个Executor可用的内存

在Mesos模式下,可以使用

  • spark.mesos.coarse=true设置静态配置资源的策略
  • 使用mesos://URL且不配置spark.mesos.coarse=true(每个app会有独立固定的内存分配,空闲时其他机器可以使用其资源)

在Yarn模式下,提交作业时可以使用

  • 通过–num-executors控制分配多少个Executor给app
  • –executor-memory和–executor-cores分别控制Executor的内存和CPU core

2、Application内部的Job调度机制

一个Application中,由各个Action触发的多个Job之间也是存在调度关系的。

Action操作实现上是调用了SparkContext的runJob方法提交Job。

Spark中调度Job有两种策略

FIFO:

  • 第一个Job分配其所需的所有资源
  • 第二个Job如果还有剩余资源的话就分配,否则等待

FAIR:

  • 使用轮询的方式调度Job

可以通过配置spark.scheduler.mode调整Job的调度方式

另外也可以配置调度池,具体参考官方文档
或者参考conf/fairscheduler.xml.template文件。

3、Job中的Stage调度

Stage是由DAGScheduler组件生产的,在源码中,有三个比较特殊的变量:

  • waitingStages:存储等待执行的Stages
  • runningStages:存储正在执行的Stages
  • failedStages:存储执行失败的Stage

Spark会通过广度优先遍历找到最开始的Stage执行,若有父Stage没有执行完则等待。

4、Stage中的Task调度

暂未了解。。。

三、I/O制度

Spark虽然是基于内存计算的框架,但是不可避免的也会接触到一些存储层,那么在和存储层交互的时候,Spark做了哪些工作?

1、序列化

序列化的本质就是将对象转换为字节流,可以理解为将链表中存储的非连续空间的数据存储转化为连续空间存储的数组中

Spark为什么要做序列化操作?

内存或磁盘中RDD会含有对象的存储,而在节点间数据的传输时,序列化之后的数据可以节约空间和提高效率。

2、压缩

压缩是日常生活中的一个常见操作,好处显而易见,节约空间,从而就可以获得时间上的效率。

Spark中序列化之后的数据可以进行压缩以减少空间开销。
Spark支持两种压缩算法

  • Snappy算法:高压缩速度
  • LZF算法:高压缩比

在不同的场景中选择不同的压缩算法可以有效的提高程序运行的效率。

压缩配置方式:

  1. 启动前在spark-env.sh中设置:export SPARK_JAVA_OPTS=”-Dspark.broadcast.compress”
  2. 在应用程序中配置
    conf.getBoolean(“spark.broadcast.compress,true”)
    conf.set(“spark.broadcast.compress”,true)

3、块管理

RDD从物理上看是一个元数据结构,记录着Block和Node之间的映射关系。

存储RDD是以Block块为单位的,每个分区对应一个块,PartitionID通过元数据信息可以映射到Block。

BlockManager管理和接口、块读写流程、数据块读写管理等细节待继续深入了解。

四、通信模块

Spark中使用Akka作为通信框架

  • Actors是一组包含状态和行为的对象
  • 一个Actor接收到其他Actor的信息之后可以根据需求做出各种反应
  • Client、Master、Worker等都是一个Actor

Spark各个组件的之间协调工作都是基于Akka机制来的,待深入了解的有:

  • Client Actor通信代码逻辑
  • Master Actor通信代码逻辑
  • Worker Actor消息处理逻辑

五、容错机制

之前讲过,RDD之间的算子操作会形成DAG图,RDD之间的依赖关系会形成Lineage。

要理解Lineage机制首先要明确两种依赖的概念:

  • Shuffle Dependencies(宽依赖)
    父分区可以被多个子分区所用
    即多对多的关系

  • Narrow Dependencies(窄依赖)
    父分区最多被一个子分区所用
    即一对一或者多对一的关系

当出现某个节点计算错误的时候,会顺着RDD的操作顺序往回走

一旦是Narrow Dependencies错误,重新计算父RDD分区即可,因为其不依赖其他节点

而如果Shuffle Dependencies错误,重算代价较高,因为一旦重新计算其依赖的父RDD分区,会造成冗余计算

这时候就需要人为的添加检查点来提高容错机制的执行效率

什么情况下需要加CheckPoint

  • DAG中的Lineage过长,如果重算开销太大,故在特定几个Shuffle Dependencies上做CheckPoint是有价值的。
  • Checkpoint会产生磁盘开销,因为其就是将数据持久化到磁盘中,所以做检查点的RDD最好是已经在内存中缓存了。

六、Shuffle机制

Shuffle的定义:对无规则的数据进行重组排序等过程

为什么要Shuffle:分布式计算中数据是分布在各个节点上计算的,而汇总统计等操作需要在所有数据上执行

Spark中Shuffle经历的阶段:

Shuffle Write将各个节点数据写入到指定分区1、根据下一个Stage分区数分成相应的Bucket2、将Bucket写入磁盘
Shuffle Fetch获取各个分区发送的数据1、在存储有Shuffle数据节点的磁盘Fetch需要的数据2、Fetch到本地之后进行自定义的聚集函数操作

最后记录一下提交Spark作业的方法
在spark的bin目录下
执行spark-submit脚本
./spark-submit \
–class 入口函数所在的类名全称 \
–master spark master节点的地址(默认端口7077)\
–executor-memory 指定worker中Executor的内存 \
–total-executor-cores 100 \
jar文件所在的目录 \

转载于:https://www.cnblogs.com/jchubby/p/5449398.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/405855.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux下查看文件和文件夹大小的df和du命令

From: http://www.yayu.org/look.php?id162 当磁盘大小超过标准时会有报警提示,这时如果掌握df和du命令是非常明智的选择。 df可以查看一级文件夹大小、使用比例、档案系统及其挂入点,但对文件却无能为力。 du可以查看文件及文件夹的大小。 两者配…

php打开就执行url,php执行URL解析

方法一:$url"http://www.baidu.com";file_get_contents($url);方法二:// CURL 方法$url"http://www.baidu.com";$ch curl_init( );curl_setopt( $ch,CURLOPT_URL,$url );curl_setopt( $ch,CURLOPT_HEADER,0 );curl_setopt( $ch,CUR…

java的classpath

转自:http://takkymj.javaeye.com/blog/734408 1、class搜索路径的重要性 理解class搜索路径对所有Java开发人员来说都很重要,但是,IDE的广泛使用掩盖了这项技术,使大家普遍对它缺乏了解,甚至包括好多老鸟。这个问题在…

深入react技术栈(12):组件内通信

我是歌谣 放弃很容易 但是坚持一定很酷 微信公众号关注前端小歌谣获取前后端知识 父组件向子组件传值 子组件向父组件传值 跨级组件通信 没有嵌套关系的组件通信 文章参考深入React技术栈

《迷宫》之站立会议—5.15

会议时间:7:10 内容: 还没怎么做,组长有些着急,表示快快做。 转载于:https://www.cnblogs.com/little-clever/p/4505113.html

Ubuntu出现没有正确安装GNOME电源管理器的默认配置

From: http://forum.ubuntu.org.cn/viewtopic.php?f139&t236972 Ubuntu启动的时候,在用户登录的界面,右上角出现“没有正确安装GNOME电源管理器的默认配置,请与计算机管理员联系”,如果是英文版,出现“GNOME Pow…

php青茶什么时候拆,青茶的香味应该如何评判(天赐露)

青茶的香味应该如何评判关注天赐露安溪铁观音。一站式采购青茶的香味评判 ---青茶是一个大茶类,商业上习惯称之为“乌龙茶”。其实,乌龙茶仅是青茶中的一个品种而已。青茶产于福建、广东和台湾三省,其他产茶省区几乎不生产,因此&a…

学习笔记----linux下编译samba

linux下编译samba一、samba的启动命令:#cd /usr/local/samba/sbin#smbd关闭samba可以杀死smbd进程二、samba 的配置文件和rpm的存放路径一样#vi /etc/samba/smb.conf三、samba的内部命令#cd /usr/local/samba/bin#smbpasswd u1...................四、编译samba的一…

C#中使用post请求方法请求表单-用于两个网站的交互

网站A中的后台代码: 1 //请求的URL2 HttpWebRequest request WebRequest.Create("http://192.168.1.132:86/test01.aspx") as HttpWebRequest;3 request.Method "po…

Samba服务器(一):windows访问samba服务器共享文件的简单实现(图文并茂)

From:http://linux.chinaunix.net/techdoc/beginner/2009/07/09/1122864.shtml 一、linux的文件共享(当然也包括和windows进行文件共享)一般有NFS, FTP, SSH SAMB等,但是以samba最为普遍和流行。下面就对samba服务器做个全面的介绍和讲解&…

matlab变压器损耗仿真,基于Matlab的变压器运行特性仿真专题报告.docx

PAGE 1PAGE变压器运行特性数字仿真专题报告学 生 姓 名:班 级:学 号:指 导 教 师:所 在 单 位:电气工程学院提 交 日 期:2018作业评分一、概述(一)电压调整率对于负载来说,变压器相当于一个交流…

PAZU 是4Fang 为配合“四方在线”软件于2004年开发的WEB打印控件,适用于各种WEB软件项目的打印。...

PAZU 是4Fang 为配合“四方在线”软件于2004年开发的WEB打印控件,适用于各种WEB软件项目的打印。 PAZU是客户端软件,使用于IE作为客户端的所有应用,与服务器端开发语言无关,即PAZU支持所有的开发语言开发的WEB应用,无论…

GDOI2015 解题报告

首先嘛现在发现题目这么水我还啥都没想出来正是呵呵了。接下来就口胡下GDOI的题解吧 PS:代码什么的要请联系我 题目:快戳我 Day1: T1:这个嘛,可以先找到起点所能到达的每个点然后判断该点能否到达终点,后一步可以发现如果从终点沿…

linux上进程状态查询

From: http://www.cnblogs.com/dkblog/archive/2011/03/11/1980556.html linux上进程有5种状态: 1. 运行(正在运行或在运行队列中等待) 2. 中断(休眠中, 受阻, 在等待某个条件的形成或接受到信号) 3. 不可中断(收到信号不唤醒和不可运行, 进程必须等待直到有中断发生) 4. 僵…

python 3.9特性,开发者应该知道的Python 3.9新特性

导读:Python 3.9来了,有哪些新特性值得学,值得用?Python 2.9(5.10.2020)已经发布了一段时间,最后一个Alpha版(3.9.0a5)近期发布了测试版。开发者应该看新版有哪些新功能,改进和修复。安装测试版本节尝鲜安装…

React开发(193):react无障碍使用程序获取焦点第一种

class CustomTextInput extends React.Component {constructor(props) {super(props);// 创造一个 textInput DOM 元素的 ref this.textInput React.createRef(); }render() {// 使用 ref 回调函数以在实例的一个变量中存储文本输入 DOM 元素//(比如&#xff…

Oracle发布更新使数据库性能优化达到75%

甲骨文公司今天在甲骨文全球技术与应用大会(Oracle OpenWorld)上宣布,为优化Oracle软件和硬件而推出一款快速、现代和可靠的Linux内核——Oracle Unbreakable企业级内核(Unbreakable Enterprise Kernel)。 Oracle Unbr…

python一维数组定义,python一维数组保存

python中向一维数组添加元素的方法例如爬虫爬取到的数据只有[b11dab7a2f48d131fc9c26678636294381aedd41,供参考: old_array [b11dab7a2f48d131fc9c26678636294381aedd41, 5183353435e0a7b1681010bd71d3d7791492685e]new_array [magnet:?xturn:btih: …