Structured-Streaming集成Kafka

一、上下文

《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming,Kafka作为目前最流行的一个分布式的实时流消息系统,是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的?

二、官方例子

这里我们先把官方例子贴出来,所属包路径为:org.apache.spark.examples.sql.streaming

该示例使用Kafka中一个或多个Topic的消息并进行字数统计。

object StructuredKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +"<subscribe-type> <topics> [<checkpoint-location>]")System.exit(1)}val Array(bootstrapServers, subscribeType, topics, _*) = argsval checkpointLocation =if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toStringval spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()import spark.implicits._// 创建表示来自kafka的输入行流的DataSetval lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option(subscribeType, topics).load().selectExpr("CAST(value AS STRING)").as[String]// 运行 word countval wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()// 开始运行将运行计数打印到控制台的查询val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()query.awaitTermination()}}

三、分析

1、参数解释

运行该官方示例需要3或4个参数,分别是

  • Kafka的bootstrap-servers
  • 订阅Kafka TopicPartition 的类型
  • 订阅Kafka的Topic
  • checkpointLocation(不是必须的)

bootstrap-servers用于连接Kafka集群。

订阅类型有3种,且只能选择1种:

  1. assign:手动指定分区消费,需要自己管理分区的分配和再平衡。需要指定一个Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
  2. subscribe:订阅一个或多个topic进行消费(逗号分割),Kafka会自动处理分区的分配和再平衡。
  3. subscribePattern:基于正则的topic订阅方式,但可能增加一些复杂性和性能开销。

Topic的指定根据订阅类型的变化而变化。

checkpointLocation如果不指定默认会在/tmp下存放。

2、将从Kafka订阅的数据做成一个DataSet

1、构建DataStreamReader

用于从外部存储系统(如文件系统、键值存储等)加载流式“数据集”的接口。使用`SparkSession.readStream`访问此内容。

2、指定输入源格式

默认的输入源格式是parquet,这里指定的是 kafka,输入源格式是DataStreamReader中的一个属性。

private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default").doc("The default data source to use in input/output.").version("1.3.0").stringConf.createWithDefault("parquet")

3、用输入的3个参数对DataStreamReader添加选项

DataStreamReader中维护了一个Map来接收这些选项,比如:

kafka.bootstrap.servers -> cdh1:9092

assign -> {"topicA":[0,1],"topicB":[2,4]}

subscribe -> topicA,topicB

subscribePattern -> topicP*

private var extraOptions = CaseInsensitiveMap[String](Map.empty)

4、加载输入流数据为DataFrame

final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {def load(): DataFrame = loadInternal(None)private def loadInternal(path: Option[String]): DataFrame = {//*******//根据输入源格式获取相应的输入源提供者//这里的 source 为 kafka ,因此会返回KafkaSourceProvider//它是 所有Kafka readers 和 writers 的提供者类//此外还有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).getConstructor().newInstance()// 我们需要生成V1数据源,以便将其作为匀场传递给V2关系。目前我们无法确定是否真的要使用V2,因为我们不知道编写者,也不知道查询是否是连续的。val v1DataSource = DataSource(sparkSession,userSpecifiedSchema = userSpecifiedSchema,className = source,options = optionsWithPath.originalMap)val v1Relation = ds match {case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))case _ => None}ds match {//******Dataset.ofRows(sparkSession,StreamingRelationV2( //用于将[[表]]链接到流式[[LogicalPlan]]。Some(provider), source, table, dsOptions,table.schema.toAttributes, None, None, v1Relation))//******}}}

并将表中的数据设置成STRING

3、WordCount统计

在第2步的基础上进行数据处理:

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

4、开始运行并将结果打印到控制台

val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()

writeStream是用于将流式数据集的内容保存到外部存储的接口。将返回一个DataStreamWriter

outputMode是指定如何将流式DataFrame/Dataset的数据写入流式接收器。

  1. append:只有流式DataFrame/Dataset中的新行才会写入接收器
  2. complete:每次有更新时,流式DataFrame/Dataset中的所有行都将写入接收器
  3. update:每次有更新时,只有流式DataFrame/Dataset中更新的行才会写入接收器。如果查询不包含聚合,则相当于“append”模式

format是指定外部存储,这里的取值有6种:memory、foreach、foreachBatch、console、table、noop。

四、运行

1、创建Topic

kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/

bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc

3、向topic推送数据

kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、控制台查看结果

 他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/65299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 项目中集成 Kafka-03

在 Spring Boot 项目中集成 Kafka 有多种方式&#xff0c;适应不同的应用场景和需求。以下将详细介绍几种常用的集成方法&#xff0c;包括&#xff1a; 使用 Spring Kafka (KafkaTemplate 和 KafkaListener)使用 Spring Cloud Stream 与 Kafka Binder使用 Spring for Apache K…

生物医学信号处理--绪论

前言 参考书籍&#xff1a;刘海龙&#xff0c;生物医学信号处理&#xff0c;化学工业出版社 生物医学信号分类 1、由生理过程自发或者诱发产生的电生理信号和非电生理信号 • 电生理信号&#xff1a;ECG/心电、EEG/脑电、EMG/肌电、 EGG/胃电、 EOG/眼电 • 非电生理信号&am…

unity 播放 序列帧图片 动画

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、方法一&#xff1a;代码控制播放序列帧1、设置图片属性2、创建Image组件3、简单的代码控制4、挂载代码并赋值 二、方法二&#xff1a;直接使用1.Image上添加…

QT c++ 自定义按钮类 加载图片 美化按钮

如果你有需要利用图片美化按钮的情况&#xff0c;本文能帮助你。 鼠标左键按下按钮和松开&#xff0c;按钮显示不同的图片。 1.按钮类 //因为此类比较简单&#xff0c;1个头文件搞定&#xff0c;没有cpp文件 #ifndef CUSTOMBUTTON_H #define CUSTOMBUTTON_H #include <Q…

web漏洞之文件包含漏洞

一、文件包含漏洞 1、把DVWA页面改为low级别&#xff0c;然后点击File Inclusion页面 文件包含漏洞有四种include()/require()/include_once()/require_once() 常见的文件包含漏洞代码如下 <?php$file$_GET[filename]; filename随意定义include($file); ?> -----…

小程序与物联网(IoT)融合:开启智能生活新篇章

一、引言 随着移动互联网技术的飞速发展&#xff0c;小程序作为一种轻量级的应用形式&#xff0c;凭借其无需下载安装、即用即走的特点&#xff0c;迅速渗透到人们生活的各个领域。与此同时&#xff0c;物联网&#xff08;IoT&#xff09;技术也在不断进步&#xff0c;将各种物…

Ubuntu无法创建python venv环境

排查步骤如下 1. python3 -m venv venv he virtual environment was not created successfully because ensurepip is not available. On Debian/Ubuntu systems, you need to install the python3-venv package using the following command.apt install python3.8-venvYou…

如何很快将文件转换成另外一种编码格式?编码?按指定编码格式编译?如何检测文件编码格式?Java .class文件编码和JVM运行期内存编码?

如何很快将文件转换成另外一种编码格式? 利用VS Code右下角的"选择编码"功能&#xff0c;选择"通过编码保存"可以很方便将文件转换成另外一种编码格式。尤其&#xff0c;在测试w/ BOM或w/o BOM, 或者ANSI编码和UTF编码转换&#xff0c;特别方便。VS文件另…

PCL点云库入门——PCL库点云特征之PFH点特征直方图(Point Feature Histograms -PHF)

1、算法原理 PFH点&#xff08;Point Feature Histogram&#xff09;特征直方图的原理涉及利用参数化查询点与邻域点之间的空间差异&#xff0c;并构建一个多维直方图以捕捉点的k邻域几何属性。这个高维超空间为特征表示提供了一个可度量的信息空间&#xff0c;对于点云对应曲面…

5. CSS引入方式

5.1 CSS的三种样式 按照 CSS 样式书写的位置(或者引入的方式)&#xff0c;CSS样式表可以分为三大类&#xff1a; 1.行内样式表&#xff08;行内式&#xff09; 2.内部样式表&#xff08;嵌入式&#xff09; 3. 外部样式表&#xff08;链接式&#xff09; 5.2 内部样式表 …

为什么ip属地一会河南一会江苏

在使用互联网的过程中&#xff0c;许多用户可能会遇到这样一个问题&#xff1a;自己的IP属地一会儿显示为河南&#xff0c;一会儿又变成了江苏。这种现象可能会让人感到困惑&#xff0c;甚至产生疑虑&#xff0c;担心自己的网络活动是否受到了某种影响。为了解答这一疑问&#…

unity3d-搞个场景漫游如何实现Alpha

要处理两个问题&#xff1a; 如何设置地面人不掉下去 方法一、 游戏物体加刚体&#xff0c;将游戏物体和地面加collider。如果是地形&#xff0c;可以使用 Terrain Collider&#xff1b;如果是简单的平面&#xff0c;可以添加 Box Collider 或者 Mesh Collider&#xff08;如果…

git merge rebase

merge操作 Git自己分支合并dev分支 rebase 操作 git rebase

doris 2.1 temporay partition 测试及总结

测试步骤 创建表 drop table order_info_shuffle; CREATE TABLE order_info_shuffle ( order_id varchar(20), user_id varchar(20), goods_id

jmeter性能测试例子

目录 一、介绍 二、操作例子 设置线程数 添加同步定时器 添加聚合报告 一、介绍 在软件测试中&#xff0c;一般用jmeter来对接口做性能测试&#xff0c;对对接口进行一个压力的测试。 简述&#xff1a; 在接口的线程中设置线程的数量和时间&#xff0c;添加一个定时器…

C# 设计模式(行为型模式):解释器模式

C# 设计模式&#xff08;行为型模式&#xff09;&#xff1a;解释器模式 (Interpreter Pattern) 什么是解释器模式&#xff1f; 解释器模式&#xff08;Interpreter Pattern&#xff09;是一种行为型设计模式&#xff0c;用于定义一种语言的语法表示&#xff0c;并提供一个解释…

ubuntu16 重启之后lvm信息丢失故障恢复

一、背景 1、问题背景 业务有一台物理开发服务器&#xff0c;文件系统有损坏&#xff1b;由于重启时没有检查&#xff0c;导致重启卡住。后面通过断电重新启动之后&#xff0c;无法进入系统&#xff1b;进入救援模式&#xff0c;注释数据盘挂载。重启之后进入系统&#xff0c…

React函数组件中与生命周期相关Hooks详解

React 函数组件及其钩子渲染流程是 React 框架中的核心概念之一。以下是对该流程的详细解析&#xff1a; 一、React 函数组件基础 定义&#xff1a; React 函数组件是一个接收 props 作为参数并返回 React 元素的函数。它们通常用于表示 UI 的一部分&#xff0c;并且不保留内部…

水一篇水水水

为了拿推广卷&#xff0c;但不想把我原本完整的文章拆成零散的多篇&#xff0c;只能出此下策随便发一篇&#xff0c;认真写的都笔记专栏里 网络是由若干节点和连接这些节点的链路构成&#xff0c;表示诸多对象及其相互联系。 在1999年之前&#xff0c;人们一般认为网络的结构都…