Spark Streaming是Spark的上一代流媒体引擎。Spark Streaming不再有更新,它是一个遗留项目。Spark中有一个更新且更易于使用的流媒体引擎,称为结构化流媒体
概述
Spark Streaming是核心Spark API的扩展,支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源获取,如Kafka、Kinesis或TCP套接字,并且可以使用复杂的算法进行处理,这些算法用高级函数表示,如map、reduce、join和window。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。
原理:Spark Streaming接收实时输入数据流,并将数据划分为多个批次,然后由Spark引擎进行处理,以批量生成最终的结果流。
离散流(DStreams)
离散流或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,或者是从源接收的输入数据流,要么是通过转换输入流生成的处理后的数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变的分布式数据集的抽象(有关更多详细信息,请参阅Spark编程指南)。DStream中的每个RDD都包含特定间隔的数据,如下图所示。
对数据流应用的任何操作都转换为对底层RDD的操作,这些底层的RDD转换是由Spark引擎计算的。DStream操作隐藏了大多数这些细节,并为开发人员提供了更高级别的API以方便使用
每个输入DStream(除了文件流)都与一个Receiver(Scala-doc,Java-doc)对象相关联,该对象从源接收数据并将其存储在Spark的内存中进行处理。
Input DStreams和Receivers
在Spark Streaming中,封装了输入数据流的两个主要组件:Input DStreams和Receivers。
Input DStreams是Spark Streaming用来表示从数据源接收输入数据的抽象。每个输入DStream都可以看作是一个连续的数据流,它由多个RDD组成,这些RDD代表在一段时间内接收到的数据。Spark Streaming支持多种类型的输入DStreams,如基于文件、基于套接字、基于Kafka等。
接收器(Receiver)是实际负责从数据源获取数据并将其传递给Spark Streaming的组件。在Spark Streaming中,接收器是在工作节点上运行的独立任务,用于从数据源接收数据并将其存储在分布式存储系统中(如HDFS)。一旦数据被接收器接收并存储,Spark Streaming就会周期性地将存储的数据转换为RDD,并将其交给Spark引擎进行处理。
当输入DStream启动时,它会自动配置并启动与之关联的接收器。接收器会以并行的方式从数据源中获取数据,并将其划分为一系列小的数据块。然后,这些数据块会被Spark Streaming的计算引擎处理,形成最终的结果。
总结来说,Input DStreams和Receivers是Spark Streaming中用于接收和处理输入数据的关键组件。Input DStreams代表连续的数据流,而Receivers负责从数据源接收数据,并将其传递给Spark Streaming进行处理。
两类内置流媒体源:
- 基本源:StreamingContext API中直接可用的源。示例:文件系统和套接字连接;
- 高级资源:Kafka、Kinesis等资源可以通过额外的实用程序类获得
在Spark中,目录的监控是由Spark Streaming和Structured Streaming提供的功能。下面分别说明这两种流处理的方式:
- Spark Streaming:对于Spark Streaming,可以使用
textFileStream
方法来监控一个目录中的文件,并将新增的文件作为新的输入源。它会周期性地检查目录中是否有新的文件出现,然后将新的文件内容作为DStream的一部分进行处理。这种监控方式是基于轮询的,Spark Streaming会定期轮询目录以检查是否有新的文件。
以下是一个使用Spark Streaming监控目录的示例代码片段:
import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sparkConf, Seconds(1))val directory = "/path/to/directory"
val lines = ssc.textFileStream(directory)
lines.foreachRDD { rdd =>// Process the RDD
}ssc.start()
ssc.awaitTermination()
- Structured Streaming:对于Structured Streaming,可以使用
readStream
方法来监控一个目录中的数据,并将新增的数据作为新的输入源。类似于Spark Streaming,这种监控方式也是基于轮询的,Structured Streaming会定期轮询目录以检查是否有新的数据。
以下是一个使用Structured Streaming监控目录的示例代码片段:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()val directory = "/path/to/directory"
val df = spark.readStream.text(directory)
val query = df.writeStream.format("console").start()query.awaitTermination()
这个示例将会从指定目录中读取文本文件,然后通过console
输出源将内容显示在控制台上。输出源可以根据需求进行修改,比如写入文件、写入到Kafka等。
无论是使用Spark Streaming还是Structured Streaming,监控目录时需要注意文件的命名规则和文件格式,以确保数据按照预期被输入到流处理任务中。
各种转换操作
在Spark Streaming中,DStream是一个连续的数据流抽象,可以应用各种转换操作进行实时处理和分析。以下是一些常见的DStream转换操作:
-
map(func):应用一个函数到DStream中的每个元素,并返回一个新的DStream,其中包含转换后的结果。
-
flatMap(func):应用一个函数到DStream中的每个元素,并返回一个包含所有转换后结果的新DStream。
-
filter(func):过滤DStream中的元素,只保留满足条件的元素。
-
union(otherStream):将当前DStream与另一个DStream合并,生成包含两个DStream元素的新DStream。
-
count():返回一个新的DStream,其中每个批次的元素是当前批次的元素数量。
-
reduce(func):将当前DStream中每个批次的元素使用给定的函数进行聚合操作,返回一个新的DStream,其中每个批次仅包含一个聚合结果。
-
window(windowDuration, slideDuration):创建一个滑动窗口,用于对窗口内的元素进行批处理操作。每个窗口都包含指定的窗口时长的元素,并且以指定的滑动间隔进行移动。
-
join(otherStream):将当前DStream中的元素与另一个DStream中的元素进行连接操作,生成一个新的DStream,其中每个元素是两个流中匹配的元素对。
-
foreachRDD(func):将一个RDD操作应用于DStream中的每个RDD,可以用于实现自定义的输出操作或将数据存储到外部系统中。
这些是常见的DStream转换操作,还有其他更高级的操作可以使用,例如窗口操作、状态操作、累加器等。根据实际需求,选择适当的转换操作来对DStream进行处理和转换。
接收到的数据是以微批次(micro-batches)的形式处理的
在Spark Streaming中,接收到的数据是以微批次(micro-batches)的形式处理的。Spark Streaming将实时数据流划分为一系列小的时间窗口,每个窗口称为一个微批次。每个微批次都由一些时间段内到达的数据组成。
接收到的数据在每个微批次内按时间顺序进行处理。具体而言,对于每个微批次,Spark Streaming会将接收到的数据收集到一个RDD(Resilient Distributed Dataset)中,然后应用在DStream上定义的转换操作。
有几种不同的语义来处理接收到的数据:
-
At-least-once:在这种语义下,Spark Streaming保证至少处理一次数据。它使用WAL(Write-Ahead Log)机制来记录接收到的数据,以便在故障恢复时进行重播,确保数据不会丢失。这种语义可以保证数据的可靠性,但可能会导致某些数据重复处理。
-
At-most-once:在这种语义下,Spark Streaming只处理数据一次,不保证重复数据的处理。这比较适用于实时处理对数据丢失更敏感的场景,但可能会导致一些数据丢失。
-
Exactly-once:这是最严格的语义,要求保证每条数据仅被处理一次,且不丢失。实现确切一次语义较为复杂,需要使用外部的数据存储系统(如Apache Kafka)和事务支持。Spark Streaming提供了与Kafka集成的功能,可以实现近似的确切一次语义。
值得注意的是,Spark Streaming的语义是基于微批次的处理,因此无法提供实时流处理系统(如Apache Flink或Apache Storm)所提供的低延迟。每个微批次的处理延迟取决于微批次的窗口大小和处理任务的复杂性,可能在几十毫秒到几秒之间。
选择适当的语义取决于应用的需求和容忍的数据处理保证级别。如果数据的准确性非常重要,可以使用At-least-once或Exactly-once语义。如果对数据处理的延迟更敏感,可以选择At-most-once语义。
输出批次的数据
在Spark Streaming中,DStream是一个连续的数据流抽象,可以应用各种输出操作来将处理结果发送到外部系统或执行其他特定操作。以下是一些常见的DStream输出操作:
-
print():将DStream中每个批次的数据打印到控制台。这对于调试和快速查看处理结果非常有用。
-
saveAsTextFiles(prefix, [suffix]):将每个批次的数据以文本文件的形式保存到指定的目录中。可提供前缀和后缀参数来自定义文件名。
-
saveAsObjectFiles(prefix, [suffix]):将每个批次的数据以序列化对象的形式保存到指定的目录中。同样可提供前缀和后缀参数。
-
foreachRDD(func):对DStream中每个RDD应用一个自定义函数。可以在这个函数中执行特定的操作,如将数据存储到外部数据库、发送到消息队列等。需要注意的是,这个函数必须是幂等的,因为RDD可以在故障恢复时被重新计算。
-
foreach(func):对DStream中每个批次的数据应用一个自定义函数。与foreachRDD不同的是,这个函数直接应用于DStream的每个元素,而不是RDD。
-
saveToHadoopFiles(prefix, [suffix]):将每个批次的数据以Hadoop文件格式保存到指定的目录中。
-
foreachPartition(func):对DStream中每个RDD的每个分区应用一个自定义函数。这对于批量处理每个分区的数据非常有用,在处理大规模数据时可以提高性能。
这些输出操作允许将Spark Streaming处理的结果发送到外部系统、存储到文件中,或执行自定义的操作。根据需求选择合适的输出操作,以满足数据处理的要求和目标。