一、上下文
《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming,Kafka作为目前最流行的一个分布式的实时流消息系统,是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的?
二、官方例子
这里我们先把官方例子贴出来,所属包路径为:org.apache.spark.examples.sql.streaming
该示例使用Kafka中一个或多个Topic的消息并进行字数统计。
object StructuredKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +"<subscribe-type> <topics> [<checkpoint-location>]")System.exit(1)}val Array(bootstrapServers, subscribeType, topics, _*) = argsval checkpointLocation =if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toStringval spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()import spark.implicits._// 创建表示来自kafka的输入行流的DataSetval lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option(subscribeType, topics).load().selectExpr("CAST(value AS STRING)").as[String]// 运行 word countval wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()// 开始运行将运行计数打印到控制台的查询val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()query.awaitTermination()}}
三、分析
1、参数解释
运行该官方示例需要3或4个参数,分别是
- Kafka的bootstrap-servers
- 订阅Kafka TopicPartition 的类型
- 订阅Kafka的Topic
- checkpointLocation(不是必须的)
bootstrap-servers用于连接Kafka集群。
订阅类型有3种,且只能选择1种:
- assign:手动指定分区消费,需要自己管理分区的分配和再平衡。需要指定一个Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
- subscribe:订阅一个或多个topic进行消费(逗号分割),Kafka会自动处理分区的分配和再平衡。
- subscribePattern:基于正则的topic订阅方式,但可能增加一些复杂性和性能开销。
Topic的指定根据订阅类型的变化而变化。
checkpointLocation如果不指定默认会在/tmp下存放。
2、将从Kafka订阅的数据做成一个DataSet
1、构建DataStreamReader
用于从外部存储系统(如文件系统、键值存储等)加载流式“数据集”的接口。使用`SparkSession.readStream`访问此内容。
2、指定输入源格式
默认的输入源格式是parquet,这里指定的是 kafka,输入源格式是DataStreamReader中的一个属性。
private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default").doc("The default data source to use in input/output.").version("1.3.0").stringConf.createWithDefault("parquet")
3、用输入的3个参数对DataStreamReader添加选项
DataStreamReader中维护了一个Map来接收这些选项,比如:
kafka.bootstrap.servers -> cdh1:9092
assign -> {"topicA":[0,1],"topicB":[2,4]}
subscribe -> topicA,topicB
subscribePattern -> topicP*
private var extraOptions = CaseInsensitiveMap[String](Map.empty)
4、加载输入流数据为DataFrame
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {def load(): DataFrame = loadInternal(None)private def loadInternal(path: Option[String]): DataFrame = {//*******//根据输入源格式获取相应的输入源提供者//这里的 source 为 kafka ,因此会返回KafkaSourceProvider//它是 所有Kafka readers 和 writers 的提供者类//此外还有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).getConstructor().newInstance()// 我们需要生成V1数据源,以便将其作为匀场传递给V2关系。目前我们无法确定是否真的要使用V2,因为我们不知道编写者,也不知道查询是否是连续的。val v1DataSource = DataSource(sparkSession,userSpecifiedSchema = userSpecifiedSchema,className = source,options = optionsWithPath.originalMap)val v1Relation = ds match {case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))case _ => None}ds match {//******Dataset.ofRows(sparkSession,StreamingRelationV2( //用于将[[表]]链接到流式[[LogicalPlan]]。Some(provider), source, table, dsOptions,table.schema.toAttributes, None, None, v1Relation))//******}}}
并将表中的数据设置成STRING
3、WordCount统计
在第2步的基础上进行数据处理:
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
4、开始运行并将结果打印到控制台
val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()
writeStream是用于将流式数据集的内容保存到外部存储的接口。将返回一个DataStreamWriter
outputMode是指定如何将流式DataFrame/Dataset的数据写入流式接收器。
- append:只有流式DataFrame/Dataset中的新行才会写入接收器
- complete:每次有更新时,流式DataFrame/Dataset中的所有行都将写入接收器
- update:每次有更新时,只有流式DataFrame/Dataset中更新的行才会写入接收器。如果查询不包含聚合,则相当于“append”模式
format是指定外部存储,这里的取值有6种:memory、foreach、foreachBatch、console、table、noop。
四、运行
1、创建Topic
kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
2、启动程序
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc
3、向topic推送数据
kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092
4、控制台查看结果
他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。
大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)
- 广州
- https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)
- 青岛
- https://ais.cn/u/nuQr6f
第六届大数据与信息化教育国际学术会议(ICBDIE 2025)
- 苏州
- https://ais.cn/u/eYnmQr
第三届通信网络与机器学习国际学术会议(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2