Spark
Spark 是什么?
Apache Spark?是用于大规模数据处理的快速和通用引擎.
速度:在内存中,运行程序比Hadoop MapReduce快100倍,在磁盘上则要快10倍.
Apache Spark具有支持非循环数据流和内存计算的高级DAG执行引擎.
易用:可以使用Java,Scala,Python,R快速编写程序.
Spark提供80+高级操作方法,可以轻松构建并行应用程序.
Spark提供了一堆库,包括SQL和DataFrame,MLlib,GraphX和Spark Streaming。您可以在相同的应用程序中无缝地组合这些库. Spark在Hadoop,Mesos,独立或云端运行。它可以访问各种数据源,包括HDFS,Cassandra,HBase和S3
一,RDD 弹性分布式数据集
定义, TA 容错的,并行的数据结构,存储到磁盘和内存,控制数据分区。本质上是一个只读的分区记录集合,RDD包含多个分区,每个分区是一个dataset片段.
依赖, RDD可以相互依赖。如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,窄依赖;若多个Child RDD分区都可以依赖,宽依赖.
首先,窄依赖被划分到同一个stage,支持在同一个cluster node上以管道形式执行多条命令,eg,先map,紧接着filter.相反,宽依赖由于依赖的上游节点不止一个,往往跨界点传输数据.
其次从容灾角度讲,窄依赖的只需要执行父RDD的丢失分区的计算即可恢复.而宽依赖需要考虑恢复所有父RDD的丢失分区.
本质, RDD是Spark中的抽象数据结构类型,从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本质是一个抽象类,如下:
abstract class RDD[T: ClassTag](@transient private var _sc: SparkContext,@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging {}
type | function | use |
---|---|---|
transformation | map() | 函数应用于RDD每一个元素,返回值是新的RDD |
transformation | flatMap() | 函数应用于RDD每一个元素,将元素数据进行拆分变成迭代器返回值是新的RDD |
transformation | filter() | 过滤,返回值是新的RDD |
transformation | distinct() | 去重,返回值是新的RDD |
transformation | union() | 并集,返回值是新的RDD |
transformation | intersection() | 交集,返回值是新的RDD |
transformation | subtract() | 原RDD里和参数RDD里相同的元素去掉 |
transformation | cartesian() | 函数应用于RDD每一个元素,返回值是新的RDD |
type | function | use |
---|---|---|
action | collect() | 返回RDD所有元素 |
action | count() | RDD里元素个数 |
action | countByValue() | 各元素在RDD中出现次数 |
action | reduce() | 并行整合所有RDD数据,例如求和操作 |
action | fold(0)(func) | 和reduce功能一样,不过fold带有初始值 |
action | aggregate(0)(seqOp,combop) | 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 |
action | foreach(func) | 对RDD每个元素都是使用特定函数 |
DAG 有向无环图
容错处理
传统关系型数据库:采用日志记录容灾,数据恢复都依赖于重新执行日志中的SQL;
Hadoop:通过把数据备份到其他机器来容灾;
RDD:本身是一个不可变的数据集,当某个worker节点上的任务失败时,可以利用DAG重新调度计算这个失败的任务,由于不用复制数据,从而大大降低了网络通信.在流式计算场景中,Spark需要记录日志和检查点,以便利用checkpoint和日志对数据进行恢复;
二,Discretized Streams (DStreams)
DStream是一系列连续的RDD,是Spark Streaming提供的基本抽象如下图所示:
对DStream应用的任何操作都将转换为底层RDD上的操作
三,Initializing StreamingContext
要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口.
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
四,Input DStreams and Receivers
Spark Streaming提供两类内置流式传输源:
基本数据源:StreamingContext API中直接提供的源.比如:文件系统和套接字连接.(file 和 socket) 高级源:Kafka,Flume,Kinesis等资源可以通过额外的实用类来获得.
Spark Streaming 提供两种接收器:
可靠的接收器 - 当数据已被接收并且通过复制存储在Spark中时,可靠的接收器正确地向可靠的源发送确认。 不可靠的接收器 - 不可靠的接收器不向源发送确认。这可以用于不支持确认的源,或者甚至当不需要或需要进入确认的复杂性时,用于可靠的源。
五,Transformations on DStreams
Transformation | Meaning |
---|---|
map(func) | 通过func传递源DStream的每个元素,返回新的DStream |
flatMap(func) | 与map类似,但每个输入项可以映射到0个或更多的输出项 |
filter(func) | 过滤 |
repartition(numPartitions) | 通过修改分区来更改DStream中的并发数 |
union(otherStream) | 求两个DStream的并集 |
count() | 计算源DStream的每个RDD中的元素数量,返回RDD的新DStream |
reduce(func) | 使用函数func聚合源DStream的每个RDD中的元素来返回单个元素RDD的新DStream |
countByValue() | 根据value计算key. |
reduceByKey(func, [numTasks]) | 根据Key进行特定的计算 |
join(otherStream, [numTasks]) | 当(K,V)和(K,W)对的两个DStream被调用时,返回一个新的(K,(V,W))对的DStream与每个键的所有元素对 |
transform(func) | 通过对源DStream的每个RDD应用RDD到RDD函数来返回新的DStream。这可以用于对DStream进行任意RDD操作 |
updateStateByKey(func) | 返回一个新的“状态”DStream,其中每个key的状态通过在key的先前状态应用给定的功能和key的新值来更新。这可以用于维护每个key的任意状态数据 |
六,Output Operations on DStreams
Output Operation | Meaning |
---|---|
print() | 打印10个元素,用于调试 |
saveAsTextFiles(prefix, [suffix]) | 将此DStream的内容另存为文本文件。每个批处理间隔的文件名是根据前缀和后缀“prefix-TIME_IN_MS [.suffix]”生成的 |
saveAsObjectFiles(prefix, [suffix]) | 将此DStream的内容保存为序列化Java对象的SequenceFiles。每个批处理间隔的文件名是根据前缀和后缀“prefix-TIME_IN_MS [.suffix]”生成的。 |
saveAsHadoopFiles(prefix, [suffix]) | 将此DStream的内容另存为Hadoop文件。每个批处理间隔的文件名是根据前缀和后缀“prefix-TIME_IN_MS [.suffix]”生成的。 |
foreachRDD(func) | 对从流中生成的每个RDD应用函数func的最通用的输出运算符。此功能应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或将其通过网络写入数据库 |