Spark(37):Streaming DataFrame 和 Streaming DataSet 创建

目录

0. 相关文章链接

1. 概述

2. socket source

3. file source

3.1. 读取普通文件夹内的文件

3.2. 读取自动分区的文件夹内的文件

4. kafka source

4.1. 导入依赖

4.2. 以 Streaming 模式创建 Kafka 工作流

4.3. 通过 Batch 模式创建 Kafka 工作流

5. Rate Source


0. 相关文章链接

 Spark文章汇总 

1. 概述

        使用 Structured Streaming 最重要的就是对 Streaming DataFrame 和 Streaming DataSet 进行各种操作。从 Spark2。0 开始, DataFrame 和 DataSet 可以表示静态有界的表, 也可以表示流式无界表。与静态 Datasets/DataFrames 类似,我们可以使用公共入口点 SparkSession 从流数据源创建流式 Datasets/DataFrames,并对它们应用与静态 Datasets/DataFrames 相同的操作。通过spark.readStream()得到一个DataStreamReader对象, 然后通过这个对象加载流式数据源, 就得到一个流式的 DataFrame。

// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()
import spark.implicits._// 2. 从数据源(socket)中加载数据.
val lines: DataFrame = spark.readStream.format("socket") // 设置数据源.option("host", "localhost").option("port", 9999).load

spark 内置了几个流式数据源, 基本可以满足我们的所有需求:

  • File source 读取文件夹中的文件作为流式数据。 支持的文件格式: text, csv, josn, orc, parquet。 注意, 文件必须放置的给定的目录中, 在大多数文件系统中, 可以通过移动操作来完成。
  • kafka source 从 kafka 读取数据。 目前兼容 kafka 0。10。0+ 版本
  • socket source 用于测试。 可以从 socket 连接中读取 UTF8 的文本数据。 侦听的 socket 位于驱动中。 注意, 这个数据源仅仅用于测试。
  • rate source 用于测试。 以每秒指定的行数生成数据,每个输出行包含一个 timestamp 和 value。其中 timestamp 是一个 Timestamp类型(信息产生的时间),并且 value 是 Long 包含消息的数量。 用于测试和基准测试。
SourceOptionsFault-tolerantNotes
File sourcepath: path to the input directory, and common to all file formats. maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to true, the following files would be considered as the same file, because their filenames, “dataset.txt”, are the same: “file:///dataset.txt” “s3://a/dataset.txt” “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for “parquet” format options see DataStreamReader.parquet(). In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for “parquet”, see Parquet configuration section.YesSupports glob paths, but does not support multiple comma-separated paths/globs.
Socket Sourcehost: host to connect to, must be specified port: port to connect to, must be specifiedNo
Rate SourcerowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.Yes
Kafka SourceSee the Kafka Integration Guide.Yes

2. socket source

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQueryobject StreamTest {def main(args: Array[String]): Unit = {// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 2. 从数据源(socket)中加载数据.val lines: DataFrame = spark.readStream.format("socket") // 设置数据源.option("host", "localhost").option("port", 9999).load// 3. 把每行数据切割成单词val words: Dataset[String] = lines.as[String].flatMap((_: String).split("\\W"))// 4. 计算 word countval wordCounts: DataFrame = words.groupBy("value").count()// 5. 启动查询, 把结果打印到控制台val query: StreamingQuery = wordCounts.writeStream.outputMode("complete").format("console").startquery.awaitTermination()spark.stop()}
}

3. file source

3.1. 读取普通文件夹内的文件

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{LongType, StringType, StructType}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 定义 Schema, 用于指定列名以及列中的数据类型val userSchema: StructType = new StructType().add("name", StringType).add("job", StringType).add("age", LongType)// 使用SparkSession通过readStream方法读取文件(必须是目录, 不能是文件名)val user: DataFrame = spark.readStream.format("csv").schema(userSchema).load("/Project/Data/csv")// DataStreamReader中还有csv、json、text等方法,可以直接读取对应的文件val userCopy: DataFrame = spark.readStream.schema(userSchema).csv("/Project/Data/csv")// 将对应的数据输出(trigger表示触发器:数字表示毫秒值. 0 表示立即处理)val query: StreamingQuery = user.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(0)).format("console").start()// 启动执行器query.awaitTermination()spark.stop()}
}

模板数据:

lisi,male,18
zhiling,female,28

结果输出:

3.2. 读取自动分区的文件夹内的文件

        当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区。如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区。 另外, 同级目录下的文件夹的命名规则必须一致。

步骤一:创建如下目录结构

year=2023month=07month=08
year=2024month=07

步骤二:写入文件数据

lisi,male,18
zhiling,female,28

步骤三:编写代码(如上 读取普通文件夹内的文件 代码完全一致)

步骤四:启动运行打印日志

4. kafka source

4.1. 导入依赖

在其余Spark依赖的情况下,还需要导入如下SparkSQL的kafka依赖,参考文档: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.4.1 Documentation

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>2.4.3</version>
</dependency>

4.2. 以 Streaming 模式创建 Kafka 工作流

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用spark通过readStream方法可以以流的方式读取kafka里面的数据// 通过format设置 kafka 数据源// 通过 kafka.bootstrap.servers 设置kafka的参数// 通过 subscribe 设置订阅的主题,也可以订阅多个主题:   "topic1,topic2"// load后会返回一个DataFrame类型, 其schema是固定的: key,value,topic,partition,offset,timestamp,timestampTypeval df: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092").option("subscribe", "topic1").load// 通过 selectExpr 只获取其中的value字段// 通过as转换成 Datasetval lines: Dataset[String] = df.selectExpr("CAST(value AS string)").as[String]// 可以对 Dataset 进行各种操作val query: DataFrame = lines.flatMap((_: String).split("\\W+")).groupBy("value").count()// 进行输出,并且可以通过checkpointLocation来设置checkpoint// 下次启动的时候, 可以从上次的位置开始读取query.writeStream.outputMode("complete").format("console").option("checkpointLocation", "./ck1") .start.awaitTermination()// 关闭执行环境spark.stop()}
}

4.3. 通过 Batch 模式创建 Kafka 工作流

        这种模式一般需要设置消费的其实偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest。该模式为一次性作业(批处理), 而非持续性的处理数据。

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用 read 方法,而不是 readStream 方法val lines: Dataset[String] = spark.read.format("kafka").option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092").option("subscribe", "topic1").option("startingOffsets", "earliest").option("endingOffsets", "latest").load.selectExpr("CAST(value AS STRING)").as[String]// 同样对 Dataset[String] 进行各种操作val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()// 使用 write 而不是 writeStreamquery.write.format("console").save()// 关闭执行环境spark.stop()}
}

5. Rate Source

以固定的速率生成固定格式的数据, 用来测试 Structured Streaming 的性能

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._val rows: DataFrame = spark.readStream.format("rate") // 设置数据源为 rate.option("rowsPerSecond", 10) // 设置每秒产生的数据的条数, 默认是 1.option("rampUpTime", 1) // 设置多少秒到达指定速率 默认为 0.option("numPartitions", 2) /// 设置分区数  默认是 spark 的默认并行度.loadrows.writeStream.outputMode("append").trigger(Trigger.Continuous(1000)).format("console").start().awaitTermination()// 关闭执行环境spark.stop()}
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13922.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

随笔03 考研笔记整理

图源&#xff1a;文心一言 上半年的博文整理&#xff0c;下半年依然会更新考研类的文章&#xff0c;有需要的小伙伴看向这里~~&#x1f9e9;&#x1f9e9; 另外&#xff0c;这篇文章可能是我上半年的努力成果之一&#xff0c;因此仅关注博主的小伙伴能够查看它~~&#x1f9e…

嵌入式系统中的GPIO控制:从理论到实践与高级应用

本文将探讨嵌入式系统中的GPIO(通用输入输出)控制,着重介绍GPIO的原理和基本用法。我们将使用一个实际的示例项目来演示如何通过编程配置和控制GPIO引脚。将基于ARM Cortex-M微控制器,并使用C语言进行编写。 GPIO是嵌入式系统中最常见且功能最强大的接口之一。它允许硬件工…

基于RK3588+FPGA+AI算法定制的智慧交通与智能安防解决方案

随着物联网、大数据、人工智能等技术的快速发展&#xff0c;边缘计算已成为当前信息技术领域的一个热门话题。在物联网领域&#xff0c;边缘计算被广泛应用于智慧交通、智能安防、工业等多个领域。因此&#xff0c;基于边缘计算技术的工业主板设计方案也受到越来越多人的关注。…

linux 指令 第3期

cat cat 指令&#xff1a; 首先我们知道一个文件内容属性 我们对文件操作就有两个方面&#xff1a;对文件内容和属性的操作 扩展&#xff1a;echo 指令 直接打印echo后面跟的字符串 看&#xff1a; 这其实是把它打印到了显示器上&#xff0c;我们也可以改变一下它的打印位置…

网络层中一些零碎且易忘的知识点

异构网络&#xff1a;指传输介质、数据编码方式、链路控制协议以及数据单元格式和转发机制不同&#xff0c;异构即物理层和数据链路层均不同RIP、OSPF、BGP分别是哪一层的协议&#xff1a; -RIPOSPFBGP所属层次应用层网络层应用层封装在什么协议中UDPIPTCP 一个主机可以有多个I…

Bootloader

Bootloader 一段有下载和引导功能的程序 下载应用程序引导使MCU运行在应用程序中&#xff0c;只在有更新请求或者APP无效的时候才会激活 APP和Bootloader都存在Flash中Flash Driver用来擦除APP&#xff0c;下载临时存放在RAM中&#xff0c;下载完成后复位释放。一般随用随下&a…

Pytorch个人学习记录总结 玩俄罗斯方块の深度学习小项目

目录 前言 模型成果演示 训练过程演示 代码实现 deep_network tetris test train 前言 当今&#xff0c;深度学习在各个领域展现出了惊人的应用潜力&#xff0c;而游戏开发领域也不例外。俄罗斯方块作为经典的益智游戏&#xff0c;一直以来深受玩家喜爱。在这个项目中&…

Python web实战 | 用 Flask 框架快速构建 Web 应用【实战】

概要 Python web 开发已经有了相当长的历史&#xff0c;从最早的 CGI 脚本到现在的全栈 Web 框架&#xff0c;现在已经成为了一种非常流行的方式。 Python 最早被用于 Web 开发是在 1995 年&#xff08;90年代早期&#xff09;&#xff0c;当时使用 CGI 脚本编写动态 Web 页面…

spring启动流程 (6完结) springmvc启动流程

SpringMVC的启动入口在SpringServletContainerInitializer类&#xff0c;它是ServletContainerInitializer实现类(Servlet3.0新特性)。在实现方法中使用WebApplicationInitializer创建ApplicationContext、创建注册DispatcherServlet、初始化ApplicationContext等。 SpringMVC…

68. 文本左右对齐

题目链接&#xff1a;力扣 解题思路&#xff1a;遍历单词数组&#xff0c;确定每一行的单词数量&#xff0c; 之后就可以得到每一个需要补充的空格数量。从而得到单词之间需要补充的空格数量。具体算法如下&#xff1a; 确定每一行的单词数量 初始值&#xff1a; num 0&…

【JavaWeb】正则表达式

&#x1f384;欢迎来到边境矢梦的csdn博文&#xff0c;本文主要讲解Java 中正则表达式 的相关知识&#x1f384; &#x1f308;我是边境矢梦&#xff0c;一个正在为秋招和算法竞赛做准备的学生&#x1f308; &#x1f386;喜欢的朋友可以关注一下&#x1faf0;&#x1faf0;&am…

2023年的深度学习入门指南(22) - 百川大模型13B的运行及量化

2023年的深度学习入门指南(22) - 百川大模型13B的运行及量化 不知道上一讲的大段代码大家看晕了没有。但是如果你仔细看了会发现&#xff0c;其实代码还是不全的。比如分词器我们就没讲。 另外&#xff0c;13B比7B的改进点也没有讲。 再有&#xff0c;对于13B需要多少显存我们…

ios 查看模拟器沙盒的路径

打一个断点运行程序&#xff0c;在xcode consol底部控制台输入&#xff1a; po NSHomeDirectory() 复制路径粘帖到前往文件夹打开沙盒缓存文件夹

golang pprof

pprof是一个用于分析数据的可视化和分析工具&#xff0c;由谷歌公司的开发团队使用go语言编写成的。一般用于对golang资源占用进行分析。不是原创&#xff0c;参考&#xff1a;https://juejin.cn/post/7122473470424219656 1. 通过页面查看golang运行情况 访问 http://127.0.0…

ppt怎么压缩到10m以内?分享好用的压缩方法

PPT是一种常见的演示文稿格式&#xff0c;有时候文件过大&#xff0c;我们会遇到无法发送、上传的现象&#xff0c;这时候简单的解决方法就是压缩其大小&#xff0c;那怎么才能将PPT压缩到10M以内呢&#xff1f; PPT文件大小受到影响的主要因素就是以下几点&#xff1a; 1、图…

Keepalived 在CentOS安装

下载 有两种下载方式&#xff0c;一种为yum源下载&#xff0c;另一种通过源代码下载&#xff0c;本文章使用源代码编译下载。 官网下载地址&#xff1a;https://www.keepalived.org/download.html wget https://www.keepalived.org/software/keepalived-2.0.20.tar.gz --no-…

CNN卷积详解

转载自&#xff1a;https://blog.csdn.net/yilulvxing/article/details/107452153 仅用于自己学习过程中经典文章讲解的记录&#xff0c;防止原文失效。 1&#xff1a;单通道卷积 以单通道卷积为例&#xff0c;输入为&#xff08;1,5,5&#xff09;&#xff0c;分别表示1个通道…

支配树学习笔记

学习链接【学习笔记】支配树_cz_xuyixuan的博客-CSDN博客 主要的求法是最后两个结论&#xff1a; 定理4用来求sdom&#xff0c;先搞一个dfs树&#xff0c;然后将点按dfs序从大到小加入&#xff0c;对每个点维护到当前根&#xff08;即已加入点&#xff09;路径上sdom最小是哪个…

sky-notes-01

1、DTO类 DTO&#xff08;Data Transfer Object&#xff09;&#xff1a;数据传输对象&#xff0c;Service 或 Manager 向外传输的对象。 详见阿里巴巴Java开发手册中的DO、DTO、BO、AO、VO、POJO定义 当前端提交的数据和实体类中对应的属性差别比较大时&#xff0c;建议使用…

级联选择框

文章目录 实现级联选择框效果图实现前端工具版本添加依赖main.js导入依赖级联选择框样式 后端数据库设计 实现级联选择框 效果图 实现 前端 工具版本 node.js v16.6.0vue3 级联选择框使用 Element-Plus 实现 添加依赖 在 package.json 添加依赖&#xff0c;并 npm i 导入…