Resilient Distributed Datasets: A fault-tolerant abstraction for in-Memory cluster computing, 是讲述 Spark RDD 的基础论文,通读论文能给我们带来全景的 Spark 知识面
摘要:RDD,全称Resilient Distributed Dataset,可伸缩性数据集。使用它编程,可以有效利用大规模集群的内存,并且兼顾容错。RDD的流行,完美解决了两类应用难题:迭代算法(Iterative Algorithm)和交互性数据挖掘工具。在这两类应用中,RDD缓存中间结果集的办法,使得程序运行性能提高了一个量级。在容错方面,RDD使用了粗放型的共享内存转换方法,而不是对其(共享内存)做精控更新。RDD完全可以胜任迭代算法(此前这类任务都由Pregel这样的编程模型完成),并且对新数据分析算法、应用都提供更好的支持。通过大量的用户应用和压力测试,最终Spark实现了RDD.
1 简介:
像MapReduce和Dryad这样的集群计算框架,已经广泛应用于大规模数据分析。这类计算框架,最大的两大优点,旨在帮助程序员专注业务编程,而非花精力分发计算任务和实现程序容错。
当今的计算框架虽然对利用集群中的计算资源做了各类抽象,但还没有实现对集群内存的抽象封装。这样对那些需要重复利用中间结果集的应用就很不友好,比如机器学习和图算法,PageRank,K-means 聚类以及逻辑回归等等。另一类计算,比如交互式数据分析,因涉及大量的即席数据查询,为确保下一次数据集可以被重用,需要借助存储物化结果集,这引发大量写入实体磁盘的操作,导致执行时间拉长。
意识到这个问题的存在,专家们做了大量尝试,比如 Pregel,把大量中间数据缓存起来,专为图计算封装了框架;HaLoop 则提供了实现迭代算法的MapReduce接口。但这些仅仅对个案有帮助,回到通用的计算上来,毫无优势。比如最常见的数据分析,装载多样化多源头数据,展开即席查询。
RDD弥补了“专家型计算框架”的缺陷,支持通用型分布式并行计算。使用集群中所有节点内存来装载同一应用所需的数据,兼容并包图形数据,二维数据,非结构化数据。且提供容错机制,控制并行数据结构和持久化中间数据集。最神奇的地方是,RDD根据分区有效控制数据分布,利用高度抽象丰富的API去操作数据。
设计RDD遇到最大的难点是容错。现存的对集群内存的抽象,包括分布式共享内存,键值对,数据库和Piccolo,提供的接口都是针对稳定状态的精控更新,比如二维表中的单元格。利用这类接口,保证容错的方法只能制作跨节点数据副本,或者异地日志备份。显而易见,这些操作对于大数据量的支持不够友好,既浪费网络流量还增加了存储开销。
与这些老式的设计相比,RDD的优势在于存储计算方法而不是数据。数据经过一系列计算得到最终的结果,如果要保存这些数据的中间状态来完成容错,那还不如保存如何得到这些数据的计算方法来的开销少。就如前面所说,保存这些中间数据集好处是可以提高性能。与容错机制并不矛盾。举例:读取数据源后,原始RDD就初始化成功,经过map,filter,reduce得到一系列新的RDD,一旦RDD失效,只要重新按照RDD的生成路径执行,数据还能复原。RDD天然还有分区属性,即他的数据是分区存储于集群中某些节点上,同一时间点不会所有分区都失效,那么重新计算某一个或几个失效分区,需花费的时间肯定比重新计算所有分区来的少。
在RDD发明之前,很多特殊的计算需求只能靠不断引入新的计算框架才能解决,比如 MapReduce, DryadLINQ,SQL, Pregel和HaLoop. 而RDD发明之后,对于这类在多个数据集上重复一组运算的操作,变得简单和通用了。乍看上去,RDD似乎有很多缺陷,但在解决实际问题上,RDD却是把合适的利剑。
以RDD为编程核心的Spark,广泛用于 UC Berkeley 和众多公司。它以Scala为首要编程语言,提供方便的集成编程接口,有点类似DryadLINQ.除此之外,Spark提供的Scala解释器,可以很容易让编程人员完成大规模数据集的集成处理。大概Spark是第一个使用通用编程语言来达到在集群中完成交互式数据挖掘的工具。
通过压力测试,Spark处理迭代计算的速度是Hadoop的20倍,完成一份数据报表的分析,总耗时比之前的技术快40倍。甚至在5-7秒的延迟内,可以处理1TB的数据扫描。从更底层的角度出发,实际上在Spark中还继承了Pregel和HaLoop编程模式,并采用了代码优化,使得编程库只有200行代码那么轻便。
2 可伸缩性分布式数据集(RDD-Resilient Distributed Datasets)
在本小节,主要探讨以下方面内容:
1)RDD 的编程接口
2)RDD 与精控共享内存的对比
3)RDD 的缺陷
2.1 RDD 抽象
首先,RDD 最明显的两个特征:1)只读;2)分区。
只读的属性注定了RDD的产生方式只有新建,要么从其他数据源读取而来,要么从已有的RDD修剪出来。
看到这里我有两个问题:1)其他数据源读取,如何分区并行读取?比如读一张数据库二维表,如何并行地去读? 2)RDD从另一个RDD派生出来,会造成大量数据重复,占用大量内容,如何优化?
RDD 的这类生产方式,叫做转换操作(Transformation).这类操作并没有直接作用在RDD本身的数据结构上,而是重新生成新的RDD.那么为什么不是直接在原RDD上做转换呢,这是需要思考的问题。
常见的转换操作有map(),filter(),join()等,后续详解。
但,RDD并不总需要物化数据。它记录了足够多的继承、转换步骤等信息,即血统,以便在必要的时候,实现自我修复,从头再生一个RDD。RDD的分区属性,又帮助再生RDD的过程执行得非常高效,仅再生丢失的RDD分区即可。如果RDD丢失了血统信息,它将不能被任何程序调用。那么RDD的血统数据结构又是如何的呢?
用户可控的两个RDD属性是分区和持久化。持久化提高RDD数据的可复用性,可以存储在内存,可以存在硬盘(当然是在逼不得已的情况下)。分区是特别优雅的属性,它方便程序员灵活的部署数据分布,使得最终需要JOIN的两个数据集,按照同一个键值做哈希分区(Hash-Partition),这样在Join时加快了处理速度。
2.2 Spark编程接口
与 DryadLINQ,FlumeJava 一样,Spark 操控RDD同样使用语言集成化编程接口(language-integrated API), 即把RDD当做对象,使用对象的方法来操作RDD.
在编写 Spark 应用程序时,程序员首先做的事情,便是通过转换函数,将源数据抽取过来,生成一组RDD;在RDD上执行动作函数(Action),使得结算结果返回驱动程序(Spark程序发起点),或是单值,或是数据集,或装载到其他存储设备或文件。整个过程中,最有技巧的地方是,RDD的动作函数(Action)才是真正的程序起始点,第一个动作函数开始执行时,整个数据流和任务流才开始。这是RDD典型的惰性计算。
在复杂的Spark程序中,转换函数在动作函数之前可能会有很多,每一步的转换函数都能重生一个RDD,当这些RDD需要在长链条的转换函数中重复利用时,把特定的RDD固化下来,是提高性能的不二法门。Spark做得完美的地方在于,他允许我们将中间结果集(RDD)用persist方法暂存于内存中。比如对于从Hive来的数据,我们既需要做总计,还需要分维度做分计,那么计算整理出来的原始数据,就最好存入内存。除非内存不够大,则选择存入硬盘,或复制到更远的远程服务器。甚至还可以控制RDD存盘的优先级别。
实例:使用控制台挖掘日志
当运营需要对上T的网络日志做错误分析时,如果使用Hadoop平台HDFS格式存储,要分析日志,首先要编写MapReduce程序,在程序中筛选错误日志,之后聚合汇总;也可以使用Hive来查询,前提是搭建Hive环境,并设计好表结构。
如果采用Spark查询,会是下面的编程脚本,非常简易:
image
图解:图中的方框代表RDD,箭头则表示一个转换函数。
lines=spark.textFile("hdfs://...")errors=lines.filter(_.startsWith("ERROR"))errors.persist()
这三行代码就能解决查询所有错误日志的信息。具体展开说明下:
lines=spark.textFile("hdfs://...")
lines 是RDD,作用是从 hdfs 读取日志;
errors=lines.filter(_.startsWith("ERROR"))
errors是另一个新RDD,用来存储含有ERROR的错误日志;
errors.persist()
是将errors的数据固化在内存中,以供之后程序反复使用。但此时,spark并未开始执行。
若要执行这个Spark程序,需要执行一个动作函数,比如:
errors.count()
这是在计算总共有多少次错误发生,此时Spark程序就执行了。这就是典型的“惰计算”,Spark独有的特性。
再举个具体的例子:比如MySQL数据库的错误日志,归档之后放在了HDFS上面,那么用Spark计算总数就简单了:
errors.filter(_.contains("MySQL")).count()
除了count()这个总计动作函数外,还有很多动作函数也可以使得Spark程序立即运行起来:
errors.filter(_.contains("HDFS")).map(_.split('')(3)).collect()
这是取了包含HDFS错误信息的第三个字段的值,并返回前台。
当Spark的第一个动作函数执行时,lines,errors就相继建立, lines因为没有将其他非错信息剔除,所以数据量巨大,全部装载到内存里就容易溢出。但errors就不一样了,因数据量小,适合暂留在内存中,为后续的复用提供准备。
最后,RDD是如何做到容错的呢?在开始的简易计算谱系图中,每一步转换操作都会被记录下来。一旦errors RDD其中的一个分区丢失,重新按照这份谱系图执行一遍,相当丢失分区的数据就回来了。
2.3 RDD模型的优点
image
(图1)
分布式共享内存的概念
Distributed Shared Memory, 分布式共享内存
https://en.wikipedia.org/wiki/Distributed_shared_memory
分布式共享内存,最大的优点在于写一次,多机同步。集群中的所有计算机节点,在同一内存位置存储了同一份数据。
弊端也很明显,一旦数据损坏,所有数据都要重新还原或重做;同步导致的延迟会很高,因为系统要保障数据的完整性。这在分布式数据库中常见。
RDD 与 DSM 的区别在于,前者是粗放式写入,通过转换函数生成,而后者在内存任意位置均可写入。 RDD不能很好地支持大批量写入,却可以很好的支持分区容错。前面也说道,谱系图是RDD容错的利器,丢失分区可重生。
RDD的第二大优势在于,备份节点可以迅速的被唤起,去代替那些缓慢节点执行任务。即在缓慢节点执行任务的同时,备份节点同时也执行相同的任务,哪个节点快就用那个节点的结果。而DSM则会被备份节点干扰,引起大家同时缓慢,因为共享内存之间会同步状态,互相干扰。
RDD的另外两大优点,基于数据存储分发任务和溢出缓存至硬盘。在大量写入的操作中,比如生成RDD,会选择离数据最近的节点开始任务(如下图所示);而在只读操作中,大量数据没发存入内存时,会自动存到硬盘上而不是报错停止执行。
image
(图2)
上图所示的,便是驱动器程序(Driver)将计算任务分发到数据分区所在节点,执行转换操作。多节点并行执行一个巨大数据量的操作得以完成。
不适合使用RDD的场景
如前所述,RDD的最大优点是,并行处理只读数据。RDD之间有完整的血统关系,称之为谱系图。其中之一丢失后,可以凭借谱系图恢复数据。但对于大量写入的程序,比如爬虫就不适合了。保障爬虫数据的完整性,需要做及时的checkpoint,实现多重副本的建立。这种异步机制,只能靠传统的日志型系统完成,比如数据库, RAMCloud, Percolator, Piccolo.
3 Spark编程接口
Spark提供了Scala,一种类DryadLINQ的Java vm函数编程语言,用来封装 RDD 的编程接口(Api). Scala有两个好处,一是方便交互式操作;二是静态类型的效率极高。
Scala 是静态类型的语言,即在编译时就已经完成了数据类型的检查,比起动态类型,是要提高不少效率
如图2所示,Spark是由Driver程序启动,分发任务到各节点上运行,这些节点称为worker程序,生成的RDD数据分区会在worker程序里面保存起来,直到程序结束。Driver还负责每个RDD分区的血统记录,即每个RDD分区的父分区或者数据源是什么,以便丢失后恢复。
在Spark的编程接口里,有个很重要的特性是传递函数闭包(function closure).函数闭包被当做变量可以传递到转换函数或动作函数中去,而闭包中的变量,常量都可以被共享访问。因此当转换函数与动作函数有闭包函数传入时,事实上每个RDD分区都会接收到相同的一个闭包函数。
比如:
var x = 5;rdd.map(_ + x)
就把 x 传到了每个RDD分区的map函数中。
Scala是门静态语言,RDD的元素类型需要首先定义好,但支持隐式转换,比如RDD[Int]理论上需要存储整型(int)元素,但事实上Int可以省却,因为一旦存储可以隐式转换成int的字符串,也没问题。
RDD及其操作非常简单,但理解RDD的重点却在于闭包函数。闭包函数在传递过程中,需要序列化,反射。这些都需要严肃处理。
3.1 RDD的操作
image
上图给出的是Spark支持的转换函数与动作函数,方括号[]中的T代表元素类型。转换函数用来生成RDD,而动作函数用来计算值或保存计算值到外部存储。最大的特性是惰性执行,即只有第一个动作函数的执行,才会引起数据流真正的流动。
详细解释下这些函数。比如:
-Join: 必须两个RDD都是键值对RDD;
-map:一对一匹配,输入与输出同数量,一条输入产生一条输出;
-flatMap:一对多匹配,输入与输出可不同数量,一条输入产生多条输出;
-groupByKey,reduceByKey,sort:自动产生一个哈希(hash)或范围(range)分区
3.2 应用一,逻辑回归
很多机器学习的算法都采用了迭代处理,使得最终算法更加优化。那么在迭代过程中,显然能把之前的结果保留下来,重复使用,使得迭代时间更快。
比如,逻辑回归,最常见的分类算法,用来计算最恰当的超平面分割线(比如区分垃圾邮件)。算法使用了梯度下降,从随机数开始,每一次迭代更优化一次求值。
val points = spark.textFile(...).map(parsePoint).persist()var w = //random initial vectorfor(i p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y }.rduce((a,b) => a+b) w -=gradient}
把 points 固化在内存中,可以使得计算时间缩短 20倍左右。
3.2 应用二,PageRank
PageRank是知名的网页排名(网页影响力)算法。一个网页被指向的次数越多,在搜索引擎中的排名越高。除了计算网页影响力之外,还可以用来计算社交网络中的影响力。
在计算过程中,每一次迭代更新,增加的是被指向网页的权重。每一个带有出链的网页,都将带给其出链网页r/n的贡献值,这些贡献值的总计,就是出链网页的排名。
a/N + (1-a)∑Ci
PageRank算法详细解答,可看这里 https://www.cnblogs.com/jpcflyer/p/11180263.html
用Spark来计算PageRank,可以这么写:
// 从源文件抽取RDD[URL,outlinks]val links = spark.textFile(...).map(...).persist()var ranks = // RDD[URL, rank]for(i links.map(dest =>(dest,rank/links.size)) } ranks = contribs.reduceByKey((x,y) => x+y).mapValues(sum => a/N + (1-a)*sum)}
下图是对这段代码的谱系图,每一次的迭代都会重新计算并生成ranks RDD.
image
从图中很明显的可以看出,ranks RDD的数量随着link的增加而长度变得越来越长,当 ranks RDD 有一次失效(丢失或者故障)时,重新计算会耗时很多。因此,需要将这些中间步骤的ranks RDD保存或者另存副本,执行这个操作,可以使用 persist函数的 RELIABLE 开关。
计算中有一处Join,如果links, ranks的分区都在同一个节点上,那么计算并不需要通信节点,假如不巧的是同一URL,links,ranks的分区却在不同的分区上,那通信成本就高了。所以控制links,ranks的分区就很讲究,尽量(使用相同分区方式,比如hash分区)使得参加join的两个分区都分配在同一个节点上。
控制分区的分配,也可以通过自定义分区类Partitioner,来完成:
links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist()
如果源文件在分布式系统比如hdfs上的分区,与 Spark 的分区不一致,在使用转换函数前,一定会经过混洗(shuffle),这是最大的耗时。
4 RDDs的表达手法
在长串的转换函数链条中,抽象地表现RDD的谱系,是非常困难的。从完美的角度来讲,一个实现了RDD的系统,必须能提供一系列丰富的转换函数,而且还要让用户自由的重组这些函数。Spark提供了图化的RDD表现形式,达到了这些目的。
总之,RDD的表现方式,在Spark中是常用接口,涵盖了5个方面的信息:
分区集合:
每个分区是最小的原子单位;
父RDD依赖:
每个子分区都依赖父分区;
转换函数:
每个父分区只有通过转换函数,才能生成子分区;
分区形式和分区数据地址:
分区形式(partitioning schema),即分区标准。比如按照销售区域(华东,华北,华西,华南,华中)分区);分区数据地址(partition data placement),按照标准分好的区,数据应该保存到哪些节点上。比如以HDFS文件为数据源,并要以HDFS文件数据块为分区,那么Spark创建RDD的时候,会从当前含有这些数据块的节点上,直接创建RDD分区。倘若要在RDD上应用转换函数,直接操作数据所在节点的本地内存即可,无需通过网络传输,非常高效。
image
partitions():
查询分区集合包含的所有分区;
preferredLocations(p):
根据数据归属地,查询能迅速找到数据分区的所在节点地址;
dependencies():
查询RDD的谱系图;
iterator(p,parentIters):
基于给定的父RDD,查找对应子分区所有对象;
partitioner():
确定分区方法是hash还是range分区
设计RDD接口的有趣之处,在于如何去表达依赖关系。最终,获得认可的有效方法是定义为两类,一是窄依赖(narrow dependencies),二是宽依赖(wide dependencies) 。窄依赖是指父RDD顶多能产生一个子RDD,比如map;宽依赖指父RDD能产生多个子RDD,比如Join.
之所以这么区分宽窄依赖关系,有两个原因:
1)窄依赖关系,使得父子分区可以在同一个节点上完成转换,比如map,filter;而宽依赖关系,则需要所有上层分区都同时存在,且大概率是要从不同的数据分区,抽取数据到一个分区或多个分区进行计算,这个过程称之为 shuffle, shuffle是 Spark 最具有破坏性能的操作。
2)故障恢复:窄依赖的数据分区如果故障了,只要从上层的RDD分区重新生成,而且就在本地即可高效完成,就算是多个分区损坏,也可以并行完成恢复;但宽依赖关系就需要多个RDD分区联合执行恢复,不亚于重新执行Spark程序。
image
最有意思的地方是Join操作。父RDD分区的方法决定了子RDD生成的方式,比如父RDD按照hash来分区,Join的时候,就不需要shuffle了。
5 Spark系统实现
Spark是以Scala写就的,总共有14000行代码(初始化版本,现在不止)。Spark程序运行在 Mesos 集群管理器上,但也可与 Hadoop, MAPI等做互连,利用Hadoop提供的输入接口插件,读取HDFS,HBase的数据。每个Spark程序作为一个单独应用运行在Mesos上,程序间的交互由Mesos处理。一个完整的Spark程序由Driver和Worker组成,Driver是主程,用来协调和收集各个Worker的工作。
接下来,主要阐述系统调度器,交互式程序解释器,内存管理和checkpointing技术。
5.1 任务调度
image
总体来说,任务调度器(scheduler)按照 driver, workder 中的程序,在集群中分配任务。上图是经典的有向无环图(DAG),每一步都是在生成一个新的RDD,只有第一个作用在RDD上的动作函数开始时,正式的数据流才开启。图中矩形框代表一个RDD,有背景色(不管蓝黑)的矩形代表一个分区,黑色代表该分区是持久化驻留在内存中的。
持久化驻留,只在当前程序中生效,一旦程序执行完毕,还是销毁,其他程序不能访问。
任务调度器最有特点的功能在于它对数据归属非常敏感。如果程序需要的RDD分区数据在某台节点的内存里,任务就优先分发到那台节点上;如果集群中所有内存都没有需要的分区数据,任务调取器则会根据RDD提供的优选地址,将任务分配到那些节点上。
窄依赖的RDD谱系比较简单,每次分区失效都可以高效重生,但宽依赖的RDD在恢复时就比较复杂,需要所有父RDD都存在,若父RDD也失效了,则需要更上层的RDD,依次类推,直到源RDD全部重生,才能恢复当前RDD,程序才能进行下去。所以宽依赖RDD通常会在产生时,将其所有父RDD都物化下来,以使得恢复时更快。
如果任务执行失败,原因有很多,内存不够,机器故障等等,任务调度器会安排另一台节点来继续执行失败的任务,只要父RDD都还存在。若父RDD失效了,也没关系,根据图谱自动再生成这些父RDD即可。但若任务调度器失败,则整个程序就是失败,并不会重新自动跑起来。
目前Spark的程序设计,都是在针对RDD的动作做响应式启动执行,当然另一种尝试也是有意义的,那就是针对动作中涉及的RDD,一步步往前推,少了什么RDD,根据图谱去生成。这种想法暂时还只是处于试验阶段。
5.2 集成的解释器(Interpreter Integration)
Spark计算框架允许用户在Scala提供的解释器窗口(与Python,Ruby类似的解释器窗口),交互式的利用大数据集群提供的算力,查询和操控大规模数据库集。交互式操作,即一次运算表达式,可以操作数千台计算机的计算资源,并且得益于集群内存计算模式,而非MapReduce借助硬盘的低效模式,以低延迟的方式得到该步计算的结果。
看以下简单代码,一窥Scala编程的不同:
var x = 5 ;println(x);
每一行Scala代码,会被解释为单行类,执行时,实际上运行的便是这单行类的赋值或者函数调用。
因此上面这两行代码,可以解释为:
println(Line1.getInstance().x)
Line1 就是将单行代码抽象为一个类并实例化后的结果对象。
实际上,我觉得更确切的说,应该是 Line2.getInstance().println(Line1.getInstance().x).但原论文并没有这么解释
最神秘的事情,并不是scala独特的解释器特性,而是Spark如何分发scala程序。就拿上面两行代码来说,Spark把这两行代码,分发到了1000台计算机上,并行地跑了一次批处理,得到最终结果,且中间有任何机器故障,都没有影响到程序的执行和结果的正确。
因此,探索Spark如何完成这整个执行过程就变得非常有意义。事实上,Spark解释器就暗藏了答案:
1)类运送(class shipping): 为了让每个工作节点(workder node)都能得到可执行代码字节(bytecode),scala提供的解释器,就负责为这些节点提供类运送,且是通过http传送的方式.
为什么 http 传送方式在这里会被指定为传送协议,值得思考!
2)改变代码的产生方式(modified code generation):让所有的工作节点(worker node)都得到相同的程序代码,最大的问题是同时传送闭包引用的上下文,包括闭包中引用的变量。如果变量是在闭包之前定义的,工作节点上的Java就无法定位闭包之前的变量。所以改变代码的产生方式就解决了这一点,也就是为什么每一行 Scala代码要被解释为当行类,这行里定义的变量或方法,在闭包中引用时,会被追溯到变量或方法定义的单行类,从而这些单行类会被遗弃运送到工作节点上。
在实际的业务应用场景里,在交互式解释器中查询大规模数据集,比如从HDFS上分析日志文件,非常实用。后期加入的 Spark SQL 更是将 Spark 的分布式计算能力扩大化到极致,普惠了每个数据分析师。
image
上面的示意图,很好地解释了单行类的同步运送,对于工作节点的意义。当闭包中引用了上行的变量,则需要将上行封装成一个类实例,同时运送到其他节点。
5.3 内存管理
Spark 为 RDD 提供了三种存储格式:
- 内存中反序列化的Java对象;
- 内存中序列化的Java对象;
- 以及硬盘存储
访问速度从快到慢,即第一种方式最快,无需任何转换就可以被自由访问。最后一种最慢,因每次使用,需从硬盘抽取数据,有不必要的IO开销
当内存吃紧,新建的RDD分区没有足够内存存储时,Spark会采用回收分区方式,以给新分区提供空间。除非新的分区和要回收的老分区在同一个RDD。回收机制采用的是常规LRU(Least Recently Used)算法,即最近最少使用的算法。这套回收机制很有用,至少目前来说是。但分权机制也很有用,比如设定RDD的权限等级,控制RDD分区被回收的可能性。
5.4 支持 checkpointing
checkpointing的技术本质是为长链操作尤其是依赖宽关系的计算做结果缓存。
长链操作:经由一系列转换操作得来的RDD,在故障之后,恢复需要经历同样多步骤,会导致时间过多的消耗,这就是长链操作。
实现checkpointing的api是persist的replicate开关,即:
rdd.persist(REPLICATE)
通过将数据暂存至稳定的存储设备,以防备RDD失效后的重算。
checkpointing的决策是留给用户的,但也可以做成自动化。在保障数据一致性角度看,自动在RDD创建成功后保留一份副本,不会引起数据不一致的尴尬,看起来是件一劳永逸的事情。为什么不这么做呢?我想这其中涉及的一个判断是,是否有足够的必要去消耗原本应该留给其他Spark程序的资源,来保障仅有百万分之一的可能会丢掉的分区。
6 性能评估
Spark 在性能方面的出众,对标物是Hadop,以下是基于 Amazon EC2做出的4相对比数据:
1)在图运算和迭代机器学习方面,优先Hadoop 20倍速度。性能的提高得益于无需硬盘I/O,且在内存中的Java对象计算,没有序列化和反序列化的开销
2)性能与扩展性都很好。单测一张分析报表,就比Hadoop提高了40倍性能
3)当有节点故障时,Spark能自动恢复已丢失的分区
4)查询1TB的数据,延迟仅在5-7秒
image
7 一些讨论
学习一门技术,就要彻底了解其历史,知其应用。从这些应用着手,由点到面的知悉这门技术的优势。而不至于学得茫然而不知所措。
7.1 囊括众多集群编程模式
当年Spark发明的时候,市面上有很多独立的软件解决方案,来完成大规模数据应用。这些独立的解决方案仅仅是某类应用中的佼佼者,换个场景,效果就没那么突出了。Spark的出现,统一了这些独立的软件解决方案,使得用户只需Spark一个框架,即可完成原本需要4-5个独立解决方案才能解决的问题。
因此,首先就要讨论Spark出现之前,市面上有哪些应用:
1) MapReduce
2) DryadLINQ
3) SQL
4) Pregel
5) Iterative MapReduce
6) Batched Stream Processing
这些应用就不再过多阐述了,Spark 将他们集成起来,提供方便的api供使用,原本这些技术的细节就不用深究了。
***7.2 RDD调试 ***
在分区故障时,如何快速恢复是个痛点。依赖RDD的谱系图,可以保障分区故障后的数据一致性。记录RDD的谱系图,对于程序的健壮性变得非常重要。与先前的分布式系统调试器,最大的优势在于,不需要记录每个事件在不同节点上的执行顺序。
8 其他相关工作进展
集群编程模式:在Spark出现之前,大规模利用集群计算资源处理数据应用已经有成熟的方案了,比如MapReduce,Dryad和Ciel. 这些方案靠的是移动硬盘数据来实现分布式进程之间的数据共享。Spark出现之后,数据共享有了新的突破,虽然稳定的存储依旧可以使用,但更多利用了高效的存储,实现了无盘(不需要借助硬盘)计算,之前借盘运算的开销,比如序列化,反序列化和刻录副本都可以去掉。
第二种高级编程语言的集群编程模式,就像 DryadLINQ 和 FlumeJava, 提供了语言集成的编程接口(API),用户需要调用集群处理大规模数据时,只要使用这些高级语言提供的编程接口,比如map, join 即可。这些系统唯一的缺点在于,他们无法把数据高效方便地共享到下一个查询中去,只能在同一个查询中,比如map接着一个map中,共享数据流。Spark 实现的 RDD,借用了同样的编程语言集成接口,仅仅是完成一次分布式数据的抽象,就完美的实现了在多个查询中共享数据流。
第三种集群编程模式,采用的是特殊高级接口定制,采用这种定制支持特定的应用,比如图运算和迭代计算。Pregel 系统支持迭代图计算,而 Twister 和 HaLoop 则是迭代的MapReduce计算运行时刻库。他们都不支持通用计算,比如建立数据集,装载到内存中,使用任何方式去查询这份数据集。而Spark使用的是分布式数据抽象,基于抽象做出灵活的操作标准,因此类似及时分析这样的操作,完全受到Spark的支持。
最后,有些分布式系统,比如Piccolo, 分布式共享内存(DSM)系统和键值对系统都采取的是共享可变状态集。用户既可以读也可以写入这些共享内存。由于系统状态可变,可被更新,只有依靠checkpoint技术才能保障数据完整性,一致性,因此开销会比Spark多很多。
缓存系统:Nectar 系统可以在任意的 DryadLINQ应用程序之间共享中间数据集,实现的方法是将数据集输出到稳定的存储设备上,而不是内存。并且Nectar也不允许用户倾倒指定的分区,连分区方法也不受用户控制。Ciel和FlumeJava提供结果缓存,但不支持用户自定义缓存内容。
谱系图: 在科学计算和数据库领域,谱系图或源数据管理一直是重点研究对象。一旦数据丢失,从从源头开始重新计算是最慢的一项恢复操作,如果自动修复能从丢失的上一级开始追溯,那是最快的。很多系统能保障断点恢复,但所用的措施却是耗时耗资源最多的构建副本方法。而谱系图在单个MapReduce任务之后,被丢失的无影无踪。
关系型数据库: 在数据库中,视图就像是RDD,物化视图就像是持久化的RDD,但数据库在更新这些对象时,都需要做日志登记的操作,有些类似构建副本的方法,开销巨大。