一、Structure Streaming
结构化流是基于Spark SQL引擎构建的可伸缩且容错的流处理引擎。可以像对静态数据进行批处理计算一样,来表示流计算。
当流数据继续到达时,Spark SQL引擎将负责递增地,连续地运行它并更新最终结果。可以在Scala,Java,Python或R中使用Dataset / DataFrame API来表示流聚合,事件时间窗口,流到批处理联接等。计算是在同一优化的Spark SQL引擎上执行的。最后,该系统通过检查点和预写日志来确保端到端的一次容错保证。简而言之,结构化流提供了快速,可扩展,容错,端到端的精确一次流处理,而用户无需推理流。
在内部,默认情况下,结构化流查询是使用微批量处理引擎处理的,该引擎将数据流作为一系列小批量作业处理,从而实现了低至100毫秒的端到端延迟以及一次精确的容错保证。但是,从Spark 2.3开始,我们引入了一种称为“连续处理”的新低延迟处理模式,该模式可以实现一次最少保证的低至1毫秒的端到端延迟。在不更改查询中的Dataset / DataFrame操作的情况下,您将能够根据应用程序需求选择模式。
二、Structure Streaming与Spark Streaming区别
(1)流模型
- Spark Streaming
Spark Streaming采用微批的处理方法。每一个批处理间隔的为一个批,也就是一个RDD,对RDD进行操作就可以源源不断的接收、处理数据。
- Structured Streaming
Structured Streaming将实时数据当做被连续追加的表。流上的每一条数据都类似于将一行新数据添加到表中。
“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:
-
完整模式-整个更新后的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。
-
追加模式-仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。
-
更新模式-仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等效于追加模式。
注意:每种模式都适用于某些类型的查询。
(2)数据集API
- Spark Streaming中的DStream编程接口是RDD
- Structured Streaming使用DataFrame和Dataset
(3)处理事件时间和延迟数据
Process Time:流处理引擎接收到数据的时间,Event Time:数据产生的时间
- Spark Streaming
Spark Streaming中由于其微批的概念,会将一段时间内接收的数据放入一个批内,进而对数据进行处理。划分批的时间是Process Time,而不是Event Time,Spark Streaming没有提供对Event Time的支持。
- Structured Streaming
Structured Streaming提供了基于事件时间处理数据的功能,如果数据包含事件的时间戳,就可以基于事件时间进行处理。
事件时间是嵌入数据本身的时间。对于许多应用程序,您可能希望在此事件时间进行操作。
例如,如果要获取每分钟由IoT设备生成的事件数,则可能要使用生成数据的时间(即数据中的事件时间),而不是Spark收到的时间。他们。此事件时间在此模型中非常自然地表达-设备中的每个事件都是表中的一行,而事件时间是该行中的列值。这允许基于窗口的聚合(例如,每分钟的事件数)只是事件时间列上的一种特殊类型的分组和聚合-每个时间窗口都是一个组,每行可以属于多个窗口/组。
由于Spark正在更新结果表,因此它具有完全控制权,可以在有较晚数据时更新旧聚合,并可以清除旧聚合以限制中间状态数据的大小。
(3)容错性
两者在容错性都使用了checkpoint机制。
checkpoint通过设置检查点,将数据保存到文件系统,在出现出故障的时候进行数据恢复。
- Spark Streaming
在spark streaming中,如果程序的代码修改重新提交任务时,是不能从checkpoint中恢复数据,需要删除checkpoint目录。
- Structured Streaming
在structured streaming中,对于指定的代码修改操作,不影响修改后从checkpoint中恢复数据。
(4)Output Sinks
- Spark Streaming
Spark Streaming只提供Foreach sink
- Structured Streaming
Structured Streaming提供File sink、Kafka sink、Foreach sink、Console sink、Memory sink
三、Structured Streaming例子
侦听TCP套接字的数据服务器接收到的文本数据的字数
SparkSession sparkSession = SparkSession.builder().appName("structuredStreaming").getOrCreate();//创建输入数据源
Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 8100).load();//转换格式
Dataset<String> words = lines.as(Encoders.STRING()).flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());//统计
Dataset<Row> wordCounts = words.groupBy("value").count();//创建输出流
StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();//等待结束
query.awaitTermination();