文章目录
- 1. Structured Streaming介绍
- 1.1 实时计算和离线计算
- 1.1.1 实时计算
- 1.1.2 离线计算
- 1.2 有界和无界数据
- 2. 简单使用
- 3. 编程模型
- 4. 数据处理流程
- 4.1 读取数据Source
- 4.1.1 文件数据处理
- 4.2 计算操作 Operation
- 4.3 数据输出 Sink
- 4.3.1 输出模式
- 4.3.2 指定输出位置
- 4.3.3 设置触发器Trigger
- 4.3.4 checkpoint检查目录设置
1. Structured Streaming介绍
spark进行实时数据流计算时有两个工具:Spark Streaming
和 Structured Streaming
。
Spark Streaming:编写RDD
代码处理数据流,可以解决非结构化的流式数据。
Structured Streaming:编写df
代码处理数据流,可以解决结构化和半结构化的流式数据。
Structured Streaming
是基于 Spark SQL 引擎构建的可扩展
和容错流处理引擎
,是在Spark 2.X
出来的流框架,采用了无界表(使用DataFrame)的概念,流数据相当于往一个表上不断追加行,数据会被转化为DataFrame和DataSets类型,可用采用SQL的方式操作结构化流数据。
1.1 实时计算和离线计算
1.1.1 实时计算
实时计算
,通常也称为“实时流计算”
、“流式计算”
流数据处理是指实时、连续地处理数据流。数据在被产生或接收后立即处理,并不需要等待所有数据到齐。数据的处理和传输是“逐条”进行的。
- 处理时间:由于数据被实时处理,系统响应时间非常短,通常在毫秒或者秒级。
- 数据量:流数据通常是无限的,数据持续不断地被生成和处理,系统需要持续运行。
1.1.2 离线计算
离线计算
,通常也称为“批处理”
,表示那些离线批量、延时较高的静态数据处理过程。
批数据处理是指在一个预定时间内收集一批数据,然后一次性对这批数据进行处理。数据是成批处理的,而不是逐条处理的。
- 处理时间:批处理通常不是实时的,处理的延迟可能是分钟、小时甚至更长,T+1.
- 数据量:批处理通常在所有数据收集完毕后进行,这意味着处理的数据集是固定大小的(如每日、每小时的数据)。数据处理完成后自动结束。
1.2 有界和无界数据
- 有界数据
有起始位置,有结束位置
。比如文件数据,有起始行,有结束行。有明确的数据容量大小
。处理数据时就能知道处理的数据大小。- 在处理数据时,按批次处理。
数据处理完程序就结束
。- 离线计算时处理的都是有界数据。
- 无界数据
有起始位置,没有结束位置
,知道数据的起始位置在哪里,但是数据的结束位置不知道(因为数据在不断产生,什么时候结束不知道)。- 流式数据都是无界数据。
- 无界数据的
总量是不确定的
,数据是不断产生的。 - 数据有时效性(有效期)。
- 处理无界数据时,
程序是持续运行的
。
2. 简单使用
- socket服务
- 安装ncat服务
- 在线安装
- yum install nc
- 离线安装
- rpm -ivh ncat-7.93-1.x86_64.rpm
- 在线安装
- 启动服务绑定9999端口
- ncat -lk 9999
- ncat -lk 9999
- 安装ncat服务
- Structured Streaming代码程序
- 使用的是SparkSQL,所以在进行代码编写时使用SparkSQL的方法进行编写。
- 使用 SparkSession
2,李四,22,男
3,王五,20,男
4,赵六,20,男
4,赵六,32,男
5,赵六,42,男
# 读取scoket产生的实时数据流
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#数据流读取使用readStream
opt = {#指定读取socket服务的IP地址'host':'192.168.88.100',#指定读取端口'port':9999
}
#使用socket方式读取网站服务的数据,得到一个无界的dataframe
df = ss.readStream.format('socket').options(**opt).load()#实时输出数据
df.writeStream.start(format='console', outputMode='append').awaitTermination()
运行结果:
3. 编程模型
在进行流式数据开发,代码实现的过程
Structured Streaming在处理数据时分为四个部分
-
Input Table输入数据表 无界表 readStream
-
Qurey 对数据进行查询计算 DSL或SQL计算过程
-
Result Table 保存计算结果
-
Output 输出结果 writerStream
-
读取指定数据源的数据得到一个无界表数据
-
对无界表数据进行计算处理
-
接收无界表计算的结果
-
将接收到的结果进行输出
4. 数据处理流程
4.1 读取数据Source
支持读取的数据源
- 文件数据
- 从文件中读取数据
- kafka数据 常用
- 从kafka中读取数据
- socket数据
- 从网络端口读取数据
- Rate数据
- 框架自己产生数据,测试性能,优化参数
4.1.1 文件数据处理
option参数 | 描述说明 |
---|---|
maxFilesPerTrigger | 每个batch最多的文件数,默认是没有限制。比如我设置了这个值为1,那么同时增加了5个文件,这5个文件会每个文件作为一波数据,更新streaming dataframe。 |
latestFirst | 是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)。 |
fileNameOnly | 是否检查新文件只有文件名而不是完整路径(默认值:false) 将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/数据集.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt”。 |
- 格式
spark.readStream.csv()spark.readStream.load(format='csv',**options)
#流式读取文件数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()
# 读取文件数据
# 指定目录 不要指定文件名
# df = ss.readStream.load(format='csv',path='hdfs://node1:8020/data_stream',schema='id int,name string,gender string,age int,cls string')
# df = ss.readStream.csv(path='hdfs://node1:8020/data_stream',header=False,sep=',',schema='id int,name string,gender string,age int,cls string')
# df = ss.readStream.text(path='hdfs://node1:8020/data_stream2')
df = ss.readStream.option('maxFilesPerTrigger',1).csv('hdfs://node1:8020/data_csv',header=False,sep=',',schema='id int,name string,gender string,age int,cls string')#输出
df.writeStream.start(format='console',outputMode='append').awaitTermination()
运行结果:
注意:
- 读取文件数据时,不能指定某个具体文件,而是指定文件所在的目录。
-
- 目录下的同一个文件只会被读取一次,处理过的文件数据不会再重新处理。
- 文件的读取方式在实际开发中用的比较少,生产一条数据,就要生成一个文件。但是,如果将多条数据收集之后写入统一文件,就变成了和批处理一样的开发。
- 实际开发中很少使用spark流读文件,可以使用flume工具流式读取文件,然后在通过spark读取产生flume。
4.2 计算操作 Operation
采用DSL的方式进行数据的计算
where 方法
groupby
orderby
#数据流的计算
from pyspark.sql import SparkSession, functions as Fss = SparkSession.builder.getOrCreate()
#流式读取数据,转为无界的dataframe
opt = {# 指定读取的socket服务的ip地址'host':'192.168.88.100',# 指定读取的端口'port':9999
}
df = ss.readStream.load(format='socket',**opt)
#流式数据的计算
#SQL
df.createTempView('stu')
sql_str = """
select gender,count(id),sum(age) from (select split(value,',')[0] as id,split(value,',')[1] as name,cast(split(value,',')[2] as int) as age,split(value,',')[3] as genderfrom stu) t1group by gender
"""
df_res = ss.sql(sql_str)#结果数据输出
df_res.writeStream.start(format='console',outputMode='complete').awaitTermination()
输入:
运行结果:
4.3 数据输出 Sink
在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下
- 输出模式(output mode):指定写入输出接收器的内容。
- 查询名称(query name):(可选)指定用于标识的查询的唯一名称。将数据输出到内存可以指定一个表名
- 触发间隔 (Trigger interval):(可选)指定触发间隔。如果未指定,系统将在上一次处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过触发时间,则系统将立即触发处理。 写入数据的时间间隔
- 检查点位置(checkpoint Location):对于可以保证端到端容错的某些输出接收器,请指定系统将写入所有检查点信息的位置。这应该是与HDFS兼容的容错文件系统中的一个目录。 文件方式输出时,必须指定。
4.3.1 输出模式
outputMode指定输出模式
- append模式,默认的模式,每次只能看到新增的行的内容,不支持聚合操作,一般在进行查询展示时使用。
- complete模式,每次都是对所有数据进行处理,必须聚合操作。
- update模式,当数据只有新增,没有聚合类似append;如果对数据进行聚合,只会显示更新的数据。
append
模式:
# 输出模式
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')# 数据计算
df_result = df.select(df.id,df.name,df.age,df.gender).where('age >= 20')# 数据输出
# outputMode 输出模式
# append 输出最新的数据 支持 select,where
# complete 输出所有聚合计算后的数据 支持 groupby聚合计算
# update 输出最新的数据包括聚合计算 支持 groupby聚合计算 select,where
df_result.writeStream.start(format='console',outputMode='append').awaitTermination()
运行结果:
complete
模式:
# 输出模式
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')# 数据计算
# df_result = df.select(df.id,df.name,df.age,df.gender).where('age >= 20')
df_result = df.groupby('gender').avg('age')
# 数据输出
# outputMode 输出模式
# append 输出最新的数据 支持 select,where
# complete 输出所有聚合计算后的数据 支持 groupby聚合计算
# update 输出最新的数据包括聚合计算 支持 groupby聚合计算
df_result.writeStream.start(format='console',outputMode='complete').awaitTermination()
运行结果:
update
模式:
# 输出模式
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')# 数据计算
# df_result = df.select(df.id,df.name,df.age,df.gender).where('age >= 20')
df_result = df.groupby('gender').avg('age')
# 数据输出
# outputMode 输出模式
# append 输出最新的数据 支持 select,where
# complete 输出所有聚合计算后的数据 支持 groupby聚合计算
# update 输出最新的数据包括聚合计算 支持 groupby聚合计算
df_result.writeStream.start(format='console',outputMode='update').awaitTermination()
运行结果:
总结:三种模式的使用场景
- append 适合没有聚合操作的计算结果输出,将数据输出到文件需要用的是append。
- complete 适合进行聚合操作,并且显示所有的数据计算结果。
- update 适合进行聚合操作,只展示新增数据的结果。
4.3.2 指定输出位置
- File Sink 把结果输出到文件中,仅支持追加
append模式
。 - Kafka Sink 把结果输出到kafka的topic中,append complete update都支持。
- Foreach Sink ForeachBatch Sink ,可以接收函数/对象,其中可以定义复杂的计算逻辑,对数据进行处理。数据最终输出到哪里,自己决定,append complete update都支持。
- console Sink,直接在终端中显示,append complete update都支持。
- Memory Sink,把数据输出到内存中,以表的形式存在,可以使用SparkSQL进行查询,支持append complete模式。
输出到文件
: - 格式
writeStream.start(path='输出文件路径',format='输出源',outputMode='输出模式',checkpointLocation='检查点路径')
- 使用
from pyspark.sql import SparkSession,functions as Fss=SparkSession.builder.getOrCreate()opt = {# 指定读取的socket服务的ip地址'host': '192.168.88.100',# 指定读取的端口'port': 9999
}
#使用soc
df = ss.readStream.load(format='socket',**opt)#数据计算
df_res = df.select(F.split('value',',')[0].alias('id'),F.split('value',',')[1].alias('name'),F.split('value', ',')[2].cast('int').alias('age'),F.split('value', ',')[3].alias('gender')
).where('age >30')# 将数据输出到文件中
option ={'checkpointLocation':'hdfs://node1:8020/stream_checkpoint'
}
df_res.writeStream.start(format='csv',outputMode='append',path='hdfs://node1:8020/csv_data',**option).awaitTermination()
运行结果:
注意:
- 写文件需要指定checkpointLocation
- 不支持聚合数据写入文件
ForeachBatch
:
需要自定义方法,完成对数据的计算,然后按照需求写入对应位置。本质是将df输出的数据传递到一个方法中。 - 格式
# 函数
def foreach_batch_function(df,df_id):# Transform and write batchDF passstreamingDF.writeStream.foreachBatch(foreach_batch_function).start()
- 使用
# 流数据输出到自定义函数中,将无界表就会转为有界表数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')# 数据计算
df_result = df.groupby('gender').avg('age')# 数据输出
# 自定义函数
def func(df,batch_id):# 必须接受两个数据# df数据就是接受无界表的数据,将无界表转为了有界表的df# batch_id 处理数据的批次编号df.show()print(batch_id)# 将有界表数据写入文件df.write.csv('hdfs://node1:8020/foreach_data_csv',header=True,sep=',',mode='overwrite')# 在输出时执行foreachBatch方法
df_result.writeStream.foreachBatch(func).start(outputMode='complete').awaitTermination()
运行结果:
4.3.3 设置触发器Trigger
触发器决定多久执行一次,在流式处理中,等一会儿(等多久)就是由触发器决定。
- 格式
writeStream.trigger(processingTime=None, once=None, continuous=None).start().awaitTermination()
-
默认触发器
- 没有设置触发器,执行完上一个批次,立即执行下一个批次,下一批次没有数据则等待产生新数据, 请求时间间隔的长短由spark对数据计算的时间决定。
- spark是基于内存计算的,所有计算时间会很多,官方的文档中 时间最短能达到1ms。
- 固定时间间隔 processingTime=‘5 seconds’
- 如果前一个微批处理在间隔内完成,则引擎将等待间隔结束,然后再启动下一个微批处理。
- 如果前一个微批处理的完成时间比间隔时间长(即如果错过了一个间隔边界),那么下一个微批处理将在前一个完成后立即开始(即它不会等待下一个间隔边界)。
- 如果没有新数据可用,则不会启动微批处理。
- 没有设置触发器,执行完上一个批次,立即执行下一个批次,下一批次没有数据则等待产生新数据, 请求时间间隔的长短由spark对数据计算的时间决定。
# 获取数据时的请求时间间隔设置
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
options = {'host':'192.168.88.100','port':9999
}
df = ss.readStream.load(format='socket',**options)# 数据计算
df_result = df.select(df.value)# 结果输出
df.writeStream.trigger(processingTime='5 seconds').start(format='console',outputMode='append').awaitTermination()
运行结果:
-
一次性微批处理 once=True 默认是false。
- 只会执行一次,适用于初始化 关闭资源这种只执行一次的场景。
# 获取数据时的请求时间间隔设置
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
options = {'host':'192.168.88.100','port':9999
}
df = ss.readStream.load(format='socket',**options)# 数据计算
df_result = df.select(df.value)# 结果输出
df.writeStream.trigger(once=True).start(format='console',outputMode='append').awaitTermination()
运行结果:
- 以固定时间间隔连续处理 continuous=‘5 seconds’
- 对固定间隔进行优化, 从而减低了延迟性的问题。
- 和固定间隔微批处理行为上很像,无论是否有数据,到指定间隔时间后都会触发。
# 获取数据时的请求时间间隔设置
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取数据
options = {'host':'192.168.88.100','port':9999
}
df = ss.readStream.load(format='socket',**options)# 数据计算
df_result = df.select(df.value)# 结果输出
# trigger的continuous不支持文件数据源
df.writeStream.trigger(continuous='5 seconds').start(format='console',outputMode='append').awaitTermination()
运行结果:
4.3.4 checkpoint检查目录设置
当获取到数据进行计算时,有可能数据在计算时,会计算失败。此时spark会重新进行计算,此时就需要知道要计算哪个数据,就需要借助checkpoint机制,将当前计算的信息保存起立,方便重新进行计算。保证spark处理数据的容错
1、偏移量目录【offsets】:记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。
2、提交记录目录【commits】:记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次;
3、元数据文件【metadata】:metadata 与整个查询关联的元数据,目前仅保留当前job id
4、记录状态目录【state】:当有状态操作时,如累加聚合、去重、最大最小等场景,这个目录会被用来记录这些状态数据,根据配置周期性地生成.snapshot文件用于记录状态。
#流计算的检查点设置
from pyspark.sql import SparkSession, functions as Fss = SparkSession.builder.getOrCreate()#流式读取数据,转为无界的DataFrame
opt = {# 指定读取的socket服务的ip地址'host': '192.168.88.100',# 指定读取的端口'port': 9999
}
df = ss.readStream.load(format='socket',**opt)#流式数据计算
#SQL
df.createTempView('stu')
sql_str = """select gender,count(id),sum(age) from (select split(value,',')[0] as id, split(value,',')[1] as name, cast(split(value,',')[2] as int) as age,split(value,',')[3] as genderfrom stu) t1group by gender"""
df_res = ss.sql(sql_str)
#结果数据输出
option = {#指定checkpoint 每个计算任务都有独立的checkpoint位置'checkpointLocation':'hdfs://node1:8020/checkpoint_stream'
}
df_res.writeStream.start(format='console',outputMode='complete',**option).awaitTermination()
运行结果: