Structured-Streaming集成Kafka

一、上下文

《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming,Kafka作为目前最流行的一个分布式的实时流消息系统,是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的?

二、官方例子

这里我们先把官方例子贴出来,所属包路径为:org.apache.spark.examples.sql.streaming

该示例使用Kafka中一个或多个Topic的消息并进行字数统计。

object StructuredKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +"<subscribe-type> <topics> [<checkpoint-location>]")System.exit(1)}val Array(bootstrapServers, subscribeType, topics, _*) = argsval checkpointLocation =if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toStringval spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()import spark.implicits._// 创建表示来自kafka的输入行流的DataSetval lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option(subscribeType, topics).load().selectExpr("CAST(value AS STRING)").as[String]// 运行 word countval wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()// 开始运行将运行计数打印到控制台的查询val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()query.awaitTermination()}}

三、分析

1、参数解释

运行该官方示例需要3或4个参数,分别是

  • Kafka的bootstrap-servers
  • 订阅Kafka TopicPartition 的类型
  • 订阅Kafka的Topic
  • checkpointLocation(不是必须的)

bootstrap-servers用于连接Kafka集群。

订阅类型有3种,且只能选择1种:

  1. assign:手动指定分区消费,需要自己管理分区的分配和再平衡。需要指定一个Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
  2. subscribe:订阅一个或多个topic进行消费(逗号分割),Kafka会自动处理分区的分配和再平衡。
  3. subscribePattern:基于正则的topic订阅方式,但可能增加一些复杂性和性能开销。

Topic的指定根据订阅类型的变化而变化。

checkpointLocation如果不指定默认会在/tmp下存放。

2、将从Kafka订阅的数据做成一个DataSet

1、构建DataStreamReader

用于从外部存储系统(如文件系统、键值存储等)加载流式“数据集”的接口。使用`SparkSession.readStream`访问此内容。

2、指定输入源格式

默认的输入源格式是parquet,这里指定的是 kafka,输入源格式是DataStreamReader中的一个属性。

private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default").doc("The default data source to use in input/output.").version("1.3.0").stringConf.createWithDefault("parquet")

3、用输入的3个参数对DataStreamReader添加选项

DataStreamReader中维护了一个Map来接收这些选项,比如:

kafka.bootstrap.servers -> cdh1:9092

assign -> {"topicA":[0,1],"topicB":[2,4]}

subscribe -> topicA,topicB

subscribePattern -> topicP*

private var extraOptions = CaseInsensitiveMap[String](Map.empty)

4、加载输入流数据为DataFrame

final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {def load(): DataFrame = loadInternal(None)private def loadInternal(path: Option[String]): DataFrame = {//*******//根据输入源格式获取相应的输入源提供者//这里的 source 为 kafka ,因此会返回KafkaSourceProvider//它是 所有Kafka readers 和 writers 的提供者类//此外还有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).getConstructor().newInstance()// 我们需要生成V1数据源,以便将其作为匀场传递给V2关系。目前我们无法确定是否真的要使用V2,因为我们不知道编写者,也不知道查询是否是连续的。val v1DataSource = DataSource(sparkSession,userSpecifiedSchema = userSpecifiedSchema,className = source,options = optionsWithPath.originalMap)val v1Relation = ds match {case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))case _ => None}ds match {//******Dataset.ofRows(sparkSession,StreamingRelationV2( //用于将[[表]]链接到流式[[LogicalPlan]]。Some(provider), source, table, dsOptions,table.schema.toAttributes, None, None, v1Relation))//******}}}

并将表中的数据设置成STRING

3、WordCount统计

在第2步的基础上进行数据处理:

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

4、开始运行并将结果打印到控制台

val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()

writeStream是用于将流式数据集的内容保存到外部存储的接口。将返回一个DataStreamWriter

outputMode是指定如何将流式DataFrame/Dataset的数据写入流式接收器。

  1. append:只有流式DataFrame/Dataset中的新行才会写入接收器
  2. complete:每次有更新时,流式DataFrame/Dataset中的所有行都将写入接收器
  3. update:每次有更新时,只有流式DataFrame/Dataset中更新的行才会写入接收器。如果查询不包含聚合,则相当于“append”模式

format是指定外部存储,这里的取值有6种:memory、foreach、foreachBatch、console、table、noop。

四、运行

1、创建Topic

kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/

bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc

3、向topic推送数据

kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、控制台查看结果

 他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/65299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生物医学信号处理--绪论

前言 参考书籍&#xff1a;刘海龙&#xff0c;生物医学信号处理&#xff0c;化学工业出版社 生物医学信号分类 1、由生理过程自发或者诱发产生的电生理信号和非电生理信号 • 电生理信号&#xff1a;ECG/心电、EEG/脑电、EMG/肌电、 EGG/胃电、 EOG/眼电 • 非电生理信号&am…

unity 播放 序列帧图片 动画

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、方法一&#xff1a;代码控制播放序列帧1、设置图片属性2、创建Image组件3、简单的代码控制4、挂载代码并赋值 二、方法二&#xff1a;直接使用1.Image上添加…

小程序与物联网(IoT)融合:开启智能生活新篇章

一、引言 随着移动互联网技术的飞速发展&#xff0c;小程序作为一种轻量级的应用形式&#xff0c;凭借其无需下载安装、即用即走的特点&#xff0c;迅速渗透到人们生活的各个领域。与此同时&#xff0c;物联网&#xff08;IoT&#xff09;技术也在不断进步&#xff0c;将各种物…

如何很快将文件转换成另外一种编码格式?编码?按指定编码格式编译?如何检测文件编码格式?Java .class文件编码和JVM运行期内存编码?

如何很快将文件转换成另外一种编码格式? 利用VS Code右下角的"选择编码"功能&#xff0c;选择"通过编码保存"可以很方便将文件转换成另外一种编码格式。尤其&#xff0c;在测试w/ BOM或w/o BOM, 或者ANSI编码和UTF编码转换&#xff0c;特别方便。VS文件另…

PCL点云库入门——PCL库点云特征之PFH点特征直方图(Point Feature Histograms -PHF)

1、算法原理 PFH点&#xff08;Point Feature Histogram&#xff09;特征直方图的原理涉及利用参数化查询点与邻域点之间的空间差异&#xff0c;并构建一个多维直方图以捕捉点的k邻域几何属性。这个高维超空间为特征表示提供了一个可度量的信息空间&#xff0c;对于点云对应曲面…

5. CSS引入方式

5.1 CSS的三种样式 按照 CSS 样式书写的位置(或者引入的方式)&#xff0c;CSS样式表可以分为三大类&#xff1a; 1.行内样式表&#xff08;行内式&#xff09; 2.内部样式表&#xff08;嵌入式&#xff09; 3. 外部样式表&#xff08;链接式&#xff09; 5.2 内部样式表 …

为什么ip属地一会河南一会江苏

在使用互联网的过程中&#xff0c;许多用户可能会遇到这样一个问题&#xff1a;自己的IP属地一会儿显示为河南&#xff0c;一会儿又变成了江苏。这种现象可能会让人感到困惑&#xff0c;甚至产生疑虑&#xff0c;担心自己的网络活动是否受到了某种影响。为了解答这一疑问&#…

jmeter性能测试例子

目录 一、介绍 二、操作例子 设置线程数 添加同步定时器 添加聚合报告 一、介绍 在软件测试中&#xff0c;一般用jmeter来对接口做性能测试&#xff0c;对对接口进行一个压力的测试。 简述&#xff1a; 在接口的线程中设置线程的数量和时间&#xff0c;添加一个定时器…

PDFelement 特别版

Wondershare PDFelement Pro 是一款非常强大的PDF编辑软件&#xff0c;它允许用户轻松地编辑、转换、创建和管理PDF文件。这个中文特别版的软件具有许多令人印象深刻的功能&#xff0c;PDFelement Pro 提供了丰富的编辑功能&#xff0c;可以帮助用户直接在PDF文件中添加、删除、…

【OpenCV】使用Python和OpenCV实现火焰检测

1、 项目源码和结构&#xff08;转&#xff09; https://github.com/mushfiq1998/fire-detection-python-opencv 2、 运行环境 # 安装playsound&#xff1a;用于播放报警声音 pip install playsound # 安装opencv-python&#xff1a;cv2用于图像和视频处理&#xff0c;特别是…

深入理解Mybatis原理》MyBatis的sqlSessi

sqlSessionFactory 与 SqlSession 正如其名&#xff0c;Sqlsession对应着一次数据库会话。由于数据库会话不是永久的&#xff0c;因此Sqlsession的生命周期也不应该是永久的&#xff0c;相反&#xff0c;在你每次访问数据库时都需要创建它&#xff08;当然并不是说在Sqlsession…

《HarmonyOS第一课》焕新升级,赋能开发者快速掌握鸿蒙应用开发

随着HarmonyOS NEXT发布&#xff0c;鸿蒙生态日益壮大&#xff0c;广大开发者对于系统化学习平台和课程的需求愈发强烈。近日&#xff0c;华为精心打造的《HarmonyOS第一课》全新上线&#xff0c;集“学、练、考”于一体&#xff0c;凭借多维融合的教学模式与系统课程设置&…

springboot集成整合工作流,activiti审批流,整合实际案例,流程图设计,流程自定义,表单配置自定义,代码demo流程

前言 activiti工作流引擎项目&#xff0c;企业erp、oa、hr、crm等企事业办公系统轻松落地&#xff0c;一套完整并且实际运用在多套项目中的案例&#xff0c;满足日常业务流程审批需求。 一、项目形式 springbootvueactiviti集成了activiti在线编辑器&#xff0c;流行的前后端…

《探秘计算机视觉与深度学习:开启智能视觉新时代》

《探秘计算机视觉与深度学习&#xff1a;开启智能视觉新时代》 一、追溯起源&#xff1a;从萌芽到崭露头角二、核心技术&#xff1a;解锁智能视觉的密码&#xff08;一&#xff09;卷积神经网络&#xff08;CNN&#xff09;&#xff1a;图像识别的利器&#xff08;二&#xff0…

Vmware安装centos

用来记录自己安装的过程 一、创建虚拟机安装centos镜像 点击完成后&#xff0c;等待一会会进入centos的系统初始化界面 二、centos初始化配置 三、配置网络 1、虚拟网络编辑器&#xff0c;开启VMnet1、VMnet8的DHCP vmware左上角工具栏&#xff0c;点击【编辑】->【虚拟网…

Unity-Mirror网络框架-从入门到精通之Chat示例

文章目录 前言Chat聊天室Authentication授权ChatAuthenticatorChat示例中的授权流程聊天Chat最后 前言 在现代游戏开发中&#xff0c;网络功能日益成为提升游戏体验的关键组成部分。Mirror是一个用于Unity的开源网络框架&#xff0c;专为多人游戏开发设计。它使得开发者能够轻…

uniapp-vue3 实现, 一款带有丝滑动画效果的单选框组件,支持微信小程序、H5等多端

采用 uniapp-vue3 实现, 是一款带有丝滑动画效果的单选框组件&#xff0c;提供点状、条状的动画过渡效果&#xff0c;支持多项自定义配置&#xff0c;适配 web、H5、微信小程序&#xff08;其他平台小程序未测试过&#xff0c;可自行尝试&#xff09; 可到插件市场下载尝试&…

深度学习GPU服务器推荐:打造高效运算平台

文章来源于百家号&#xff1a;GPU服务器厂家 在深度学习和人工智能领域&#xff0c;一个高性能的GPU服务器是研究和开发工作的关键。今天&#xff0c;我们将为大家推荐一款基于详细硬件配置表的深度学习GPU服务器&#xff0c;它专为高效运算和数据处理而设计。 一、机箱设计 …

78、使用爱芯派2_AX630C开发板 3.2T高有效算力 低功耗 支持AI-ISP真黑光实验

基本思想:使用爱心元智最新的版本开发板进行实验 AX630C、AX620Q 都是 620E 这一代 一、参考这个官方教程,先把代码在本地交叉编译完成 https://github.com/AXERA-TECH/ax620e_bsp_sdk 然后在拷贝到620c设备上 root@ax630c:~/ax620e_bsp_sdk/msp/out/arm64_glibc/bin# ./…