大数据计算平台Spark内核全面解读

1Spark介绍

Spark是起源于美国加州大学伯克利分校AMPLab的大数据计算平台,在2010年开源,目前是Apache软件基金会的顶级项目。随着Spark在大数据计算领域的暂露头角,越来越多的企业开始关注和使用。201411月,SparkDaytona Gray Sort 100TB Benchmark竞赛中打破了由Hadoop MapReduce保持的排序记录。Spark利用1/10的节点数,100TB数据的排序时间从72分钟提高到了23分钟

Spark在架构上包括内核部分和4个官方子模块--Spark SQLSpark Streaming、机器学习库MLlib和图计算库GraphX。图1所示为Spark在伯克利的数据分析软件栈BDASBerkeley Data Analytics Stack)中的位置。可见Spark专注于数据的计算,而数据的存储在生产环境中往往还是由Hadoop分布式文件系统HDFS承担。

1 SparkBDAS中的位置 

Spark被设计成支持多场景的通用大数据计算平台,它可以解决大数据计算中的批处理,交互查询及流式计算等核心问题。Spark可以从多数据源的读取数据,并且拥有不断发展的机器学习库和图计算库供开发者使用。数据和计算在Spark内核及Spark的子模块中是打通的,这就意味着Spark内核和子模块之间成为一个整体。Spark的各个子模块以Spark内核为基础,进一步支持更多的计算场景,例如使用Spark SQL读入的数据可以作为机器学习库MLlib的输入。表1列举了一些在Spark平台上的计算场景。

1 Spark的应用场景举例

在本文写作是,Spark的最新版本为1.2.0,文中的示例代码也来自于这个版本。

2Spark内核介绍 

相信大数据工程师都非常了解Hadoop MapReduce一个最大的问题是在很多应用场景中速度非常慢,只适合离线的计算任务。这是由于MapReduce需要将任务划分成mapreduce两个阶段,map阶段产生的中间结果要写回磁盘,而在这两个阶段之间需要进行shuffle操作。Shuffle操作需要从网络中的各个节点进行数据拷贝,使其往往成为最为耗时的步骤,这也是Hadoop MapReduce慢的根本原因之一,大量的时间耗费在网络磁盘IO中而不是用于计算。在一些特定的计算场景中,例如像逻辑回归这样的迭代式的计算,MapReduce的弊端会显得更加明显。

Spark是如果设计分布式计算的呢?首先我们需要理解Spark中最重要的概念--弹性分布数据集(Resilient Distributed Dataset),也就是RDD。 

2.1 弹性分布数据集RDD

RDDSpark中对数据和计算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可变的并能够被并行操作的数据集合。对RDD的操作分为两种transformationactionTransformation操作是通过转换从一个或多个RDD生成新的RDDAction操作是从RDD生成最后的计算结果。在Spark最新的版本中,提供丰富的transformationaction操作,比起MapReduce计算模型中仅有的两种操作,会大大简化程序开发的难度。

RDD的生成方式只有两种,一是从数据源读入,另一种就是从其它RDD通过transformation操作转换。一个典型的Spark程序就是通过Spark上下文环境(SparkContext)生成一个或多个RDD,在这些RDD上通过一系列的transformation操作生成最终的RDD,最后通过调用最终RDDaction方法输出结果。

每个RDD都可以用下面5个特性来表示,其中后两个为可选的:

  • 分片列表(数据块列表)

  • 计算每个分片的函数

  • 对父RDD的依赖列表

  • key-value类型的RDD的分片器(Partitioner)(可选)

  • 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)(可选)

虽然Spark是基于内存的计算,但RDD不光可以存储在内存中,根据useDiskuseMemoryuseOffHeap, deserializedreplication五个参数的组合Spark提供了12种存储级别,在后面介绍RDD的容错机制时,我们会进一步理解。值得注意的是当StorageLevel设置成OFF_HEAP时,RDD实际被保存到Tachyon中。Tachyon是一个基于内存的分布式文件系统,目前正在快速发展,本文不做详细介绍,可以通过其官方网站进一步了解。

  1. class StorageLevel private(

  2.     private var _useDisk: Boolean,

  3.     private var _useMemory: Boolean,

  4.     private var _useOffHeap: Boolean,

  5.     private var _deserialized: Boolean

  6.     private var _replication: Int = 1)

  7.   extends Externalizable { //… }

  8.  

  9. val NONE = new StorageLevel(false, false, false, false)

  10.   val DISK_ONLY = new StorageLevel(true, false, false, false)

  11.   val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

  12.   val MEMORY_ONLY = new StorageLevel(false, true, false, true)

  13.   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

  14.   val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

  15.   val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

  16.   val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

  17.   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

  18.   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

  19.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

  20.   val OFF_HEAP = new StorageLevel(false, false, true, false)

2.2 DAGStage与任务的生成

Spark的计算发生在RDDaction操作,而对action之前的所有transformationSpark只是记录下RDD生成的轨迹,而不会触发真正的计算。

Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。举个例子,在图2中,从输入中逻辑上生成AC两个RDD,经过一系列transformation操作,逻辑上生成了F,注意,我们说的是逻辑上,因为这时候计算没有发生,Spark内核做的事情只是记录了RDD的生成和依赖关系。当F要进行输出时,也就是F进行了action操作,Spark会根据RDD的依赖生成DAG,并从起点开始真正的计算。

2 逻辑上的计算过程:DAG 

有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage,这样可以将任务提交到计算节点进行真正的计算。Spark计算的中间结果默认是保存在内存中的,Spark在划分Stage的时候会充分考虑在分布式计算中可流水线计算(pipeline)的部分来提高计算的效率,而在这个过程中,主要的根据就是RDD的依赖类型。根据不同的transformation操作,RDD的依赖可以分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency,在代码中为ShuffleDependency)两种类型。窄依赖指的是生成的RDD中每个partition只依赖于父RDD(s) 固定的partition。宽依赖指的是生成的RDD的每一个partition都依赖于父 RDD(s) 所有partition。窄依赖典型的操作有map, filter, union等,宽依赖典型的操作有groupByKey, sortByKey等。可以看到,宽依赖往往意味着shuffle操作,这也是Spark划分stage的主要边界。对于窄依赖,Spark会将其尽量划分在同一个stage中,因为它们可以进行流水线计算。

3 RDD的宽依赖和窄依赖

我们再通过图4详细解释一下Spark中的Stage划分。我们从HDFS中读入数据生成3个不同的RDD,通过一系列transformation操作后再将计算结果保存回HDFS。可以看到这幅DAG中只有join操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage. 同时我们可以注意到,在图中Stage2中,从mapunion都是窄依赖,这两步操作可以形成一个流水线操作,通过map操作生成的partition可以不用等待整个RDD计算结束,而是继续进行union操作,这样大大提高了计算的效率。

4 Spark中的Stage划分 

Spark在运行时会把Stage包装成任务提交,有父StageSpark会先提交父Stage。弄清楚了Spark划分计算的原理,我们再结合源码看一看这其中的过程。下面的代码是DAGScheduler中的得到一个RDDStage的函数,可以看到宽依赖为划分Stage的边界。

  1. /**

  2.    * Get or create the list of parent stages for a given RDD. The stages will be assigned the

  3.    * provided jobId if they haven't already been created with a lower jobId.

  4.    */

  5.  

  6.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {

  7.     val parents = new HashSet[Stage]

  8.     val visited = new HashSet[RDD[_]]

  9.     // We are manually maintaining a stack here to prevent StackOverflowError

  10.     // caused by recursively visiting

  11.     val waitingForVisit = new Stack[RDD[_]]

  12.     def visit(r: RDD[_]) {

  13.       if (!visited(r)) {

  14.         visited += r

  15.         // Kind of ugly: need to register RDDs with the cache here since

  16.         // we can't do it in its constructor because # of partitions is unknown

  17.         for (dep <- r.dependencies) {

  18.           dep match {

  19.             case shufDep: ShuffleDependency[_, _, _] =>

  20.               parents += getShuffleMapStage(shufDep, jobId)

  21.             case _ =>

  22.               waitingForVisit.push(dep.rdd)

  23.           }

  24.         }

  25.       }

  26.     }

  27.  

  28.     waitingForVisit.push(rdd)

  29.     while (!waitingForVisit.isEmpty) {

  30.       visit(waitingForVisit.pop())

  31.     }

  32.     parents.toList

  33.   }

上面提到Spark的计算是从RDD调用action操作时候触发的,我们来看一个action的代码

RDDcollect方法是一个action操作,作用是将RDD中的数据返回到一个数组中。可以看到,在此action中,会触发Spark上下文环境SparkContext中的runJob方法,这是一系列计算的起点。

  1. abstract class RDD[T: ClassTag](

  2.     @transient private var sc: SparkContext,

  3.     @transient private var deps: Seq[Dependency[_]]

  4.   ) extends Serializable with Logging {

  5.   //….

  6. /**

  7.    * Return an array that contains all of the elements in this RDD.

  8.    */

  9.   def collect(): Array[T] = {

  10.     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  11.     Array.concat(results: _*)

  12.   }

  13. }

SparkContext拥有DAGScheduler的实例,在runJob方法中会进一步调用DAGSchedulerrunJob方法。在此时,DAGScheduler会生成DAGStage,将Stage提交给TaskSchedulerTaskSchdulerStage包装成TaskSet,发送到Worker节点进行真正的计算,同时还要监测任务状态,重试失败和长时间无返回的任务。整个过程如图5所示。

 

5 Spark中任务的生成 

2.3 RDD的缓存与容错

上文提到,Spark的计算是从action开始触发的,如果在action操作之前逻辑上很多transformation操作,一旦中间发生计算失败,Spark会重新提交任务,这在很多场景中代价过大。还有一些场景,如有些迭代算法,计算的中间结果会被重复使用,重复计算同样增加计算时间和造成资源浪费。因此,在提高计算效率和更好支持容错,Spark提供了基于RDDcache机制和checkpoint机制。

我们可以通过RDDtoDebugString来查看其递归的依赖信息,图6展示了在spark shell中通过调用这个函数来查看wordCount RDD的依赖关系,也就是它的Lineage.

6 RDD wordCountlineage 

如果发现Lineage过长或者里面有被多次重复使用的RDD,我们就可以考虑使用cache机制或checkpoint机制了。

我们可以通过在程序中直接调用RDDcache方法将其保存在内存中,这样这个RDD就可以被多个任务共享,避免重复计算。另外,RDD还提供了更为灵活的persist方法,可以指定存储级别。从源码中可以看到RDD.cache就是简单的调用了RDD.persist(StorageLevel.MEMORY_ONLY)

  1. /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

  2.   def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  3.   def cache(): this.type = persist()

同样,我们可以调用RDDcheckpoint方法将其保存到磁盘。我们需要在SparkContext中设置checkpoint的目录,否则调用会抛出异常。值得注意的是,在调用checkpoint之前建议先调用cache方法将RDD放入内存,否则将RDD保存到文件的时候需要重新计算。 

  1.   /**

  2.    * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

  3.    * directory set with SparkContext.setCheckpointDir() and all references to its parent

  4.    * RDDs will be removed. This function must be called before any job has been

  5.    * executed on this RDD. It is strongly recommended that this RDD is persisted in

  6.    * memory, otherwise saving it on a file will require recomputation.

  7.    */

  8.   def checkpoint() {

  9.     if (context.checkpointDir.isEmpty) {

  10.       throw new SparkException("Checkpoint directory has not been set in the SparkContext")

  11.     } else if (checkpointData.isEmpty) {

  12.       checkpointData = Some(new RDDCheckpointData(this))

  13.       checkpointData.get.markForCheckpoint()

  14.     }

  15.   }

Cache机制和checkpoint机制的差别在于cacheRDD保存到内存,并保留Lineage,如果缓存失效RDD还可以通过Lineage重建。而checkpointRDD落地到磁盘并切断Lineage,由文件系统保证其重建。

2.4 Spark任务的部署

Spark的集群部署分为StandaloneMesosYarn三种模式,我们以Standalone模式为例,简单介绍Spark程序的部署。如图7示,集群中的Spark程序运行时分为3种角色,driver, masterworkerslave)。在集群启动前,首先要配置masterworker节点。启动集群后,worker节点会向master节点注册自己,master节点会维护worker节点的心跳。Spark程序都需要先创建Spark上下文环境,也就是SparkContext。创建SparkContext的进程就成为了driver角色,上一节提到的DAGSchedulerTaskScheduler都在driver中运行。Spark程序在提交时要指定master的地址,这样可以在程序启动时向master申请worker的计算资源。Drivermasterworker之间的通信由Akka支持。Akka 也使用 Scala 编写,用于构建可容错的、高可伸缩性的Actor 模型应用。关于Akka,可以访问其官方网站进行进一步了解,本文不做详细介绍。

7 Spark任务部署

3、更深一步了解Spark内核

了解了Spark内核的基本概念和实现后,更深一步理解其工作原理的最好方法就是阅读源码。最新的Spark源码可以从Spark官方网站下载。源码推荐使用IntelliJ IDEA阅读,会自动安装Scala插件。读者可以从core工程,也就是Spark内核工程开始阅读,更可以设置断点尝试跟踪一个任务的执行。另外,读者还可以通过分析Spark的日志来进一步理解Spark的运行机制,Spark使用log4j记录日志,可以在启动集群前修改log4j的配置文件来配置日志输出和格式。

【编辑推荐】

  1. Spark:利用Eclipse构建Spark集成开发环境

  2. Spark实战:单节点本地模式搭建Spark运行环境

  3. Spark:为大数据处理点亮一盏明灯

  4. 专访Spark亚太研究院王家林:从技术的角度探索Spark

  5. StormSpark:谁才是我们的实时处理利器


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/543942.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javascript对话框_JavaScript中的对话框

javascript对话框JavaScript对话框 (JavaScript Dialog Boxes) Dialog boxes are a great way to provide feedback to the user when they submit a form. In JavaScript, there are three kinds of Dialog boxes, 对话框是向用户提交表单时提供反馈的好方法。 在JavaScript中…

排查死锁的 4 种工具,秀~

作者 | 磊哥来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;死锁&#xff08;Dead Lock&#xff09;指的是两个或两个以上的运算单元&#xff08;进程、线程或协程&#xff09;&#xf…

MySQL 常见的 9 种优化方法

大家好&#xff0c;我是磊哥&#xff01;今天给大家分享一些简单好用的数据库优化方式&#xff01;1、选择最合适的字段属性Mysql是一种关系型数据库&#xff0c;可以很好地支持大数据量的存储&#xff0c;但是一般来说&#xff0c;数据库中的表越小&#xff0c;在它上面执行的…

oracle中dbms_DBMS中的实例和架构

oracle中dbms1)实例 (1) Instances) What is the Instance? If we look towards it in real life, we refer instance as an occurrence of something at a particular moment of time. In Database Management system, there are a lot of changes occurring over time to th…

acess() 判断目录是否存在

acess()功能描述&#xff1a; 检查调用进程是否可以对指定的文件执行某种操作。 <pre lang"c" escaped"true">#include <unistd.h>int access(const char *pathname, int mode); </pre>参数说明&#xff1a;pathname: 需要测试的文件路径…

过滤器和拦截器的 5 个区别!

作者 | 磊哥来源 | Java面试真题解析&#xff08;ID&#xff1a;aimianshi666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;过滤器&#xff08;Filter&#xff09;和拦截器&#xff08;Interceptor&#xff09;都是基于 AOP&#xff08;Aspec…

简单的求和(打表)

简单的求和 Time Limit: 1 Sec Memory Limit: 128 MB Submit: 130 Solved: 20SubmitStatusWeb BoardDescription 定义f(i)代表i的所有因子和(包括1和i)&#xff0c;给定一个l,r。求f(l)f(l1)...f(r)。 Input 第一行输入一个t(t<1000)&#xff0c;代表有t组测试数据&#x…

chroot函数使用_PHP chroot()函数与示例

chroot函数使用PHP chroot()函数 (PHP chroot() function) The full form of chroot is "Change Root", the function chroot()" is used to change the root directory, and, also changes the current working directory to "/". chroot的完整格式为…

面试突击第一季完结:共 91 篇!

感谢各位读者的支持与阅读&#xff0c;面试突击系列第一季到这里就要和大家说再见了。希望所写内容对大家有帮助&#xff0c;也祝你们找到满意的工作。青山不改&#xff0c;细水长流&#xff0c;我们下一季再见&#xff01;91&#xff1a;MD5 加密安全吗&#xff1f;90&#xf…

linux升级python

Centos 6.6自带的是Python 2.6.6, 现在升级为2.7.6[rootoffice-vps4052 ~]# python -VPython 2.6.6操作步骤如下:1) 下载并解压python 2.7.6源码包[rootoffice-vps4052 ~]# cd /usr/local/src[rootoffice-vps4052 ~]# wget http://python.org/ftp/python/2.7.6/Python-2.7.6.tg…

SpringBoot官方热部署和远程调试神器

平时使用SpringBoot开发应用时&#xff0c;修改代码后需要重新启动才能生效。如果你的应用足够大的话&#xff0c;启动可能需要好几分钟。有没有什么办法可以加速启动过程&#xff0c;让我们开发应用代码更高效呢&#xff1f;今天给大家推荐一款SpringBoot官方的热部署工具spri…

c# 小程序支付后台示例_C中的#if指令示例| C预处理程序

c# 小程序支付后台示例The #if is a preprocessor directive in C programming language and it is used for conditional compilation. #if是C编程语言中的预处理程序指令&#xff0c;用于条件编译。 General for of the #if directive is: #if指令的常规为&#xff1a; #if…

MySQL 优化:Explain 执行计划详解

昨天中午在食堂&#xff0c;和部门的技术大牛们坐在一桌吃饭&#xff0c;作为一个卑微技术渣仔默默的吃着饭&#xff0c;听大佬们高谈阔论&#xff0c;研究各种高端技术&#xff0c;我TM也想说话可实在插不上嘴。聊着聊着突然说到他上午面试了一个工作6年的程序员&#xff0c;表…

c语言中的逻辑运算符_C / C ++中的逻辑运算符

c语言中的逻辑运算符逻辑运算符 (Logical Operators) Logical operators are used to check the combinations of the two conditional expressions. 逻辑运算符用于检查两个条件表达式的组合。 The following are the types of logical operators. 以下是逻辑运算符的类型 。…

顶级 Javaer 常用的 14 个类库

作者&#xff1a;小姐姐味道昨天下载下来Java16尝尝鲜。一看&#xff0c;好家伙&#xff0c;足足有176MB大。即使把jmc和jvisualvm给搞了出去&#xff0c;依然还是这么大&#xff0c;真的是让人震惊不已。但即使JDK足够庞大&#xff0c;它的功能也已经不够用了。我们需要借助于…

势头迅猛的儿童手表:恐陷下一个文曲星之地?

历史的节奏&#xff0c;就是不断重复此前发生过的事。虽然表现形态不一&#xff0c;但蕴藏的规律、趋势总是有着惊人的相似。在科技行业&#xff0c;同样如此——iPhone开启的智能手机时代走过的大兴—→平稳→下降态势&#xff0c;与PC的历程几乎是一样的。而在国内&#xff0…

scala 类中的对象是类_Scala中的类和对象

scala 类中的对象是类Scala中的课程 (Classes in Scala) A class is a blueprint for objects. It contains the definition of all the members of the class. There are two types of members of the class in Scala, 类是对象的蓝图。 它包含该类的所有成员的定义。 Scala中…

2022年终总结:不再用“拼命”来应对极度的不安全感

作者 | 磊哥来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;人生匆匆三十四余载&#xff0c;今天又到了辞旧迎新和 2022 年说再&#xff08;也不&#xff09;见的时刻了&#xff0c;所以…

c++中std::find_std :: find()与C ++中的示例

c中std::findfind()作为STL函数 (find() as a STL function) find() is an STL function that comes under the <algorithm> header file which returns an iterator to the first occurrence of the searching element within a range. find()是STL函数&#xff0c;位于…

Java 最常见的 200+ 面试题:面试必备

这份面试清单是从我 2015 年做了 TeamLeader 之后开始收集的&#xff0c;一方面是给公司招聘用&#xff0c;另一方面是想用它来挖掘在 Java 技术栈中&#xff0c;还有那些知识点是我不知道的&#xff0c;我想找到这些技术盲点&#xff0c;然后修复它&#xff0c;以此来提高自己…