Structured Streaming的模型介绍与实际操作

微批处理(Micro-Batching)

微批处理是 Structured Streaming 默认的处理模型。

微批处理 (Micro-batching):

  • 在微批处理模型中,实时数据流被分割成小的批次。
  • 这些批次按顺序处理,每个批次处理像一个小的批处理作业。
  • 处理完一个批次后,结果被输出,然后处理下一个批次。
  • 这意味着会有一个小的延迟,等于批次的大小,因为系统需要等待整个批次处理完毕才输出结果。
  • 微批处理模型中通常有一个处理周期的概念,即系统以固定的时间间隔处理数据。

优点:

  • 容错性: 基于 Spark 的容错机制,可以容易地恢复状态和输出。
  • 简单性: 开发人员可以使用与批处理相同的API进行流处理,降低了学习曲线。
  • 集成性: 可以与Spark的其他组件(如MLlib、Spark SQL)无缝集成。

缺点:

  • 延迟性: 因为处理是按批次进行的,所以有固有的延迟,通常是秒级。

持续处理(Continuous Processing)

持续处理是 Structured Streaming 在 Spark 2.3 版本中引入的实验性功能。在这种模型中,实时数据流被视为连续的记录流,Spark 引擎以较低的延迟(毫秒级)持续处理每条记录。

持续处理 (Continuous Processing):

  • 持续处理模型中,数据是随着其到达即时处理的。
  • 没有将数据分批处理,而是持续不断地处理流入的数据。
  • 这种模式可以减少延迟,因为数据一到达就开始处理,不必等待。
  • 持续处理模型通常能够提供更低的端到端延迟,但可能需要更复杂的管理状态和容错机制。

优点:

  • 低延迟: 可以实现毫秒级的处理延迟,适用于对延迟敏感的应用。
  • 高吞吐: 由于不需要划分批次,可以连续不断地处理数据,提高了吞吐量。

缺点:

  • 复杂性: 相对于微批处理,需要更复杂的容错机制。
  • 成熟度: 这是一个较新的功能,可能不如微批处理稳定。

这两种模型可以用以下表格进行比较:

特性微批处理持续处理
处理延迟秒级毫秒级
容错性中到高
API一致性与批处理一致与批处理一致
成熟度
吞吐量
复杂性
状态管理容易较复杂
与其他Spark组件集成无缝无缝

在选择模型时,需要根据具体的应用场景、延迟要求和资源情况来决定使用哪种模型。如果应用可以容忍秒级的延迟,微批处理是一个成熟且简单的选择。如果应用需要极低的延迟,可以尝试使用持续处理模型。

​ 微批处理模型中,“写日志”通常是指在处理批次之前记录其信息以便于故障恢复。而在持续处理模型中,“写日志”可能更多地关联于实时记录每个事件或数据项的处理状态。

编写Structured Streaming程序

导入pyspark模块

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
#如果直接使用pyspark交互就不需要导入,但是如果是自己编写python就需要导入模块

创建SparkSession对象:

  • 在任何Spark应用程序中,第一步是创建一个SparkSession对象。这是Structured Streaming编程的入口点。
from pyspark.sql import SparkSessionspark = SparkSession \.builder \.appName("Structured Streaming App") \ .getOrCreate()
spark.sparkContext.setLogLevel("warn")

稍微讲解一下appName得是被唯一标识的,spark.sparkContext.setLogLevel(“warn”)是设置日志显示级别,无关紧要的就不输出

定义输入源:

  • 定义输入数据源以及如何读取数据。Structured Streaming支持多种输入源,如Kafka、文件系统、Socket等。
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()

定义转换:

  • 对数据流进行转换处理,比如选择需要的字段、进行聚合等操作。
  • 这是需要我们自己定义如何操作的
from pyspark.sql.functions import col, windowwords = df.selectExpr("CAST(value AS STRING)")
wordCounts = words.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"),col("word")
).count()

定义输出接收器:

  • 定义如何输出处理后的数据流。输出可以写入到多种外部存储系统中,如文件系统、数据库、控制台等。
query = wordCounts \.writeStream \.outputMode("complete") \.format("console") \.start()

启动流处理:

  • 最后,启动流处理。启动后,Spark会持续运行,处理实时进入的数据。
query.awaitTermination()

监控和异常处理:

  • 你可以监控流处理的进度和性能,以及添加异常处理逻辑。
try:query.awaitTermination()
except KeyboardInterrupt:query.stop()

输入源

file源

Structured Streaming中的文件源允许你监视指定目录中的新文件,并从中读取数据。这里是一些常见的选项:

  1. path: 需要监控的目录路径。
  2. maxFilesPerTrigger: 每个触发器处理的最大文件数。这个选项可以限制在每个触发器批次中应该读取的文件数量,有助于控制流处理的速率。
  3. latestFirst: 是否先处理最新的文件。设置为true会首先处理最新的文件,这可能对某些实时性要求较高的应用程序有用。
  4. fileFormat: 文件的格式,如jsoncsvparquet等。
# 创建一个DataFrame表示从目录`/path/to/directory`中连续读取的数据
csvDF = spark \.readStream \.option("sep", ",") \.schema(userSchema) \  # 可以定义一个schema.csv("/path/to/directory")# 启动流查询,输出模式为追加模式,并将结果输出到控制台
query = csvDF.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

在这个例子中,我们首先使用readStream来创建一个DataFrame读取流。我们通过.option("sep", ",")指定了CSV值之间的分隔符为逗号。schema(userSchema)部分定义了CSV文件的结构,你需要在代码中提前定义userSchema

然后,我们指定了监视的目录路径。csv("/path/to/directory")表示我们希望读取的文件类型是CSV。

最后,我们定义了一个查询,该查询将输出模式设置为append,这意味着仅将新的数据行附加到结果中。我们使用.format("console")将输出结果发送到控制台,这对于调试和开发是很有用的。start()方法启动流查询,而awaitTermination()方法则是让应用程序等待流处理的终止,以进行长时间运行。

kafka源

在Structured Streaming中,Kafka源允许你从Kafka主题读取数据流。这里是一些常见的Kafka源选项:

  1. kafka.bootstrap.servers: Kafka集群的地址列表,通常是"host1:port1,host2:port2"的形式。
  2. subscribe: 要订阅的Kafka主题的名称或用逗号分隔的多个主题的列表。
  3. startingOffsets: 指定流应从何处开始读取。它可以是"latest"(默认值),“earliest”,或是JSON字符串指定每个主题的分区起始偏移量。
  4. endingOffsets: 流查询终止时的偏移量。仅在批处理查询中使用。
  5. failOnDataLoss: 是否在数据丢失(例如,Kafka主题被删除)时使查询失败。默认为true

下面是一个Kafka源的操作示例,它从Kafka主题读取数据流,并将其加载为DataFrame:

# 创建一个DataFrame表示从名为"updates"的Kafka主题读取的数据
kafkaDF = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "updates") \.option("startingOffsets", "latest") \.load()# 选择我们需要的字段并将value字段从字节转换为字符串
selectedDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 启动流查询,将结果输出到控制台
query = selectedDF.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

在这个例子中,我们使用.format("kafka")来告诉Spark我们正在使用Kafka源,并通过option来设置Kafka的相关参数。.load()方法加载Kafka主题为DataFrame。

以下是.option()方法设置的参数的解释:

  1. kafka.bootstrap.servers: 指定Kafka集群的服务器地址及其端口。这个设置是必须的,因为它告诉Spark Streaming在哪里可以找到Kafka集群。格式是一个逗号分隔的主机和端口对的列表(例如:“host1:port1,host2:port2”)。这里的hostport分别对应Kafka服务器的IP地址和监听端口。
  2. subscribe: 这个选项用于指定一个或多个Kafka主题来订阅。只要这些主题有数据写入,Spark Streaming就会读取这些数据。在这个例子中,"updates"是你想要订阅的Kafka主题的名称。
  3. startingOffsets: 定义当你的Spark应用第一次启动并且没有设置偏移量的时候,它应该从Kafka主题的哪里开始读取数据。"latest"表示只读取启动应用程序后生成的数据,而"earliest"表示从可用的最早的数据开始读取。你还可以指定一个JSON字符串来表示每个主题的每个分区的确切开始偏移量。

selectExpr是一个转换操作,它允许你运行SQL表达式。在这里,我们将keyvalue列从字节转换为字符串类型,便于阅读和处理。

查询的其余部分和之前的例子类似,这次我们也是以追加模式输出到控制台,并启动流查询。

请注意,你需要有一个运行中的Kafka集群,并已经创建了相关的主题,以及在Spark集群上配置了适当的Kafka依赖。

socket源

在Structured Streaming中,使用socket源意味着数据流将来自于一个TCP套接字连接。这是最简单的流式数据源之一,通常用于测试和原型设计阶段。它允许您从通过TCP连接发送的数据流中读取文本数据。

以下是如何在Structured Streaming中设置socket源:

from pyspark.sql import SparkSession# 初始化SparkSession
spark = SparkSession.builder \.appName("StructuredSocketRead") \.getOrCreate()# 创建流式DataFrame,连接到指定的socket
lines = spark.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()# 使用DataFrame API进行数据处理
# ...# 启动流查询
query = lines.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

上面的代码片段执行了以下操作:

  1. 通过SparkSession.builder初始化了一个SparkSession。
  2. 使用readStream方法创建了一个流式DataFrame,它将会连接到在localhost上的9999端口监听的TCP套接字。
  3. 通过format("socket")指定了数据源格式为socket。
  4. 使用.option("host", "localhost").option("port", 9999)设置了监听的主机地址和端口号。
  5. .load()方法触发了对socket源的连接。
  6. 之后可以在lines DataFrame上应用各种转换操作,如过滤、选择、聚合等。
  7. writeStream定义了如何输出处理后的流数据,这里通过.format("console")指定了输出到控制台。
  8. .start()开始接收数据并处理,.awaitTermination()方法让程序持续运行,直到手动停止或者遇到错误。

使用socket源进行Structured Streaming是一个好方法,可以实时测试您的流处理逻辑,因为您可以很容易地通过如netcat之类的工具来发送数据。

然后在另外一个打开虚拟机另外窗口输入

nc -lk 你的端口号

rate源

在Structured Streaming中,rate源每秒生成特定的数据行,两个列的数据流:timestampvalue。这个源非常适合生成简单的数据流进行测试和调试。

每个输出行包含一个timestampvaluetimestamp是每个触发器触发时的当前时间戳,而value是从程序开始以来的触发器触发的次数。

下面是如何在Structured Streaming中设置rate源的例子:

from pyspark.sql import SparkSession# 初始化SparkSession
spark = SparkSession.builder \.appName("StructuredRateRead") \.getOrCreate()# 创建流式DataFrame,用rate源
df = spark.readStream \.format("rate") \.option("rowsPerSecond", "1") \.load()# 使用DataFrame API进行数据处理
# ...# 启动流查询,输出到控制台
query = df.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

上面的代码片段执行了以下操作:

  1. 使用SparkSession.builder初始化了一个SparkSession。
  2. 使用readStream方法创建了一个流式DataFrame,它将会使用rate源。
  3. 通过.option("rowsPerSecond", "1")设置每秒生成的行数。
  4. .load()方法触发了对rate源的连接。
  5. 使用.writeStream定义了如何输出处理后的流数据。
  6. 通过.format("console")指定了输出到控制台。
  7. .start()开始接收数据并处理,.awaitTermination()方法让程序持续运行,直到手动停止或者遇到错误。

rate源非常适合开发和测试时生成连续的、预测的数据流,但它不适用于生产环境,因为它不是从实际的数据源读取数据。

可以使用option方法来配置这个源的行为。以下是rate源的一些常见选项和它们的功能:

  1. rowsPerSecond:指定每秒生成的行数。这个选项可以帮助你控制数据生成的速率。

    示例:.option("rowsPerSecond", "10") 表示每秒生成10行数据。

  2. rampUpTime:在指定时间内逐渐增加到rowsPerSecond指定的速率。这通常用于模拟数据源在启动时从没有数据到达指定速率的过渡过程。

    示例:.option("rampUpTime", "1m") 在1分钟内逐渐增加生成的数据行数。

  3. numPartitions:指定生成的数据将在多少个分区中分布。这可以帮助模拟并行数据流的情况。

    示例:.option("numPartitions", "2") 表示数据将分布在两个分区中。

使用这些选项的例子:

df = spark.readStream \.format("rate") \.option("rowsPerSecond", "100") \.option("rampUpTime", "5s") \.option("numPartitions", "2") \.load()

这将会创建一个数据流,初始时每秒100行数据,5秒内逐渐增加到这个速率,并且数据在两个分区中分布。

接收器

Structured Streaming中的输出接收器(Sink)是指数据流最终输出到的地方。Spark Structured Streaming提供了多种不同的输出接收器,以支持将数据流输出到各种外部系统和格式。以下是一些常见的输出接收器:

  1. 文件接收器(File Sink):输出数据到文件系统。支持的格式包括文本、JSON、CSV、Parquet等。可以指定文件输出目录和文件格式。

  2. Kafka接收器(Kafka Sink):输出数据到Kafka主题。可以指定Kafka服务器的地址和要写入的主题。

  3. 控制台接收器(Console Sink):将数据输出到控制台,主要用于调试和开发。

  4. 内存接收器(Memory Sink):输出数据到内存表中,允许在内存中查询数据。这主要用于快速测试和原型开发。

  5. Foreach接收器:提供了一个通用接口,允许你对数据流中的每个记录执行任意操作。这可以用于实现自定义的输出逻辑,例如写入自定义外部存储或调用外部API。

使用Structured Streaming的输出接收器时,你需要指定输出模式(如"append"、“complete"或"update”),输出接收器类型(如"console"、“kafka”、"file"等),以及任何必要的配置选项。

例如,将数据流输出到控制台的代码示例:

query = df.writeStream \.outputMode("append") \.format("console") \.start()

将数据流输出到Kafka的代码示例:

query = df.writeStream \.outputMode("update") \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("topic", "updates") \.start()

这些代码示例展示了如何将数据流输出到不同类型的接收器。每种类型的接收器可能有不同的配置选项和限制,所以在使用时需要查阅具体的文档。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/184332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

恋上数据结构与算法之二叉堆

文章目录 需求分析Top K 问题堆堆的基本接口设计二叉堆(Binary Heap)最大堆添加思路交换位置的优化实现 删除思路流程图解实现 replace批量建堆自上而下的上滤自下而上的下滤效率对比复杂度计算实现 完整代码 最小堆比较器解析Top K 问题问题分析代码实现内部方法分析问题 2 堆…

【程序员养生心得】—— 编程之路,健康同行

身为程序员,我们似乎总和亚健康、熬夜、颈椎病等标签紧密相连。但工作虽重要,健康价更高。在此,我想与大家分享一些在编程之路上的养生心得,希望我们都能在职业发展的同时,照顾好自己。 定时休息,活动身体&…

小程序云开发中引入vant

首先看一下云开发中的小程序的目录结构 安装 vant 上面是官方的方法 具体到我们的项目是这样子的 最后,构建一下就可以了

rv1126-rv1109-rk809

是这样的,新来板子走的是rk809部分 然后我的编译方式里面没有,走的是别的方式,打印到log如下,然后就卡死 DDR V1.09 8fef64cfb9 wesley.yao 22/10/25-20:03:00 DDR4, 328MHz BW=16 Col=10 Bk=4 BG=2 CS0 Row=15 CS=1 Die BW=16 Size=512MB change to: 328MHz change to: 528MHz…

重要端口及服务速查

重要端口及服务 TCP/UDP端口列表K8s端口列表portservicedescription21ftp/tftp/vsftpd文件传输协议爆破/嗅探/溢出/后门22ssh远程连接爆破/openssh漏洞23telnet远程连接爆破/嗅探/弱口令25smtp邮件服务邮件伪造53dns域名解析系统域传送/劫持/缓存投毒/欺骗67/68dhcp服务劫持/欺…

微信小程序踩坑记录

一、引言 作者在开发微信小程序《目的地到了》的过程中遇到过许多问题,这里讲讲一些技术和经验问题。 基本目录机构: 二、问题 1、定位使用 获取定位一定要在app.json里面申明,不然是没办法获取定位信息的 "requiredPrivateInfos"…

【分享】Java Helper 与 Utility 类的区别

什么是Helper类? Helper类是一个包含一些常用方法或功能的类,用来辅助完成某个模块或任务的功能。它们通常不是直接提供业务功能的类,而是被其他类调用来完成一些特定的任务。Helper类的作用是提高代码的重用率、可维护性和可测试性&#xf…

Linux | Ubuntu设置 netstat(网络状态)

netstat命令用于显示与IP、TCP、UDP和ICMP协议相关的统计数据,一般用于检验本机各端口的网络连接情况。netstat是在内核中访问网络及相关信息的程序,它能提供TCP连接,TCP和UDP监听,进程内存管理的相关报告。 1.netstat的安装 搜…

JVM执行引擎以及调优

1.JVM内部的优化逻辑 1.1JVM的执行引擎 javac编译器将Person.java源码文件编译成class文件[我们把这里的编译称为前期编译],交给JVM运行,因为JVM只能认识class字节码文件。同时在不同的操作系统上安装对应版本的JDK,里面包含了各自屏蔽操作…

C#学习相关系列之数组---常用方法使用(二)

1、声明和初始化数组 int[] arr1 new int[5]; // 声明一个长度为5的整型数组 int[] arr2 {1, 2, 3, 4, 5}; // 声明并初始化一个整型数组 2、访问数组元素 int[] arr {1, 2, 3, 4, 5}; Console.WriteLine(arr[0]); // 输出:1 3、获取数组长度 int[] arr {1, …

网络通信与TCP.IP协议

网络通信与TCP.IP协议 URI 用字符串标识某一互联网资源,而 URL 表示资源的地点(互联网上所处的位置)。可见 URL 是 URI 的子集 URL (Uniform Resource Locator),统一资源定位符 ,用于描述一个网络上的资源 DNS: &#…

element-plus 使用密码输入框的自定义图标

<el-inputv-model"ruleFormPassword.newPassword"placeholder"请输入新密码":type"showPassword ? text : password":style"{ width: 360px }"><template #suffix><span class"input_icon" click"swit…

linux环境下编译安装OpenCV For Java(CentOS 7)

最近在业余时间学习了一些有关图像处理的代码&#xff0c;但是只能本地处理&#xff0c;满足不了将来开放远程服务的需求。 因此&#xff0c;查找并参考了一些资料&#xff0c;成功在centos7环境安装上了opencv 460。 下面上具体安装步骤&#xff0c;希望能帮到有需要的同学。 …

FP5207 DC-DC 电源升压模块/12V升24V(5A) 升压板/升压电路/直流稳压/直流升压-应用蓝牙音箱、快充、应急电源、车载设备等

目录 概述 特征 应用 概述 FP5207是异步升压控制IC&#xff0c;透过EXT Pin控制外部NMOS&#xff0c;输入低启动电压2.8V与宽工作电压5V~24V&#xff0c;单节锂电池3V~4.2V应用&#xff0c;将Vout接到HVDD Pin&#xff1b;精准的反馈电压1.2V&#xff0c;内置软启动&#x…

文件存储、块存储、对象存储是什么?

文件存储&#xff1a;允许将数据组织为传统的文件系统。数据保存在一个文件中&#xff0c;该文件具有名称和一些相关的元数据&#xff0c;例如修改时间戳、所有者和访问权限。提供基于文件的存储使用目录和子目录的层次结构来组织文件的存储方式。 块存储&#xff1a;块存储提…

Flutter App混淆加固、保护与优化原理

​ 引言 在移动应用程序开发中&#xff0c;保护应用程序的代码和数据安全至关重要。本文将探讨如何对Flutter应用程序进行混淆、优化和保护&#xff0c;以提高应用程序的安全性和隐私。 一、混淆原理 混淆是一种代码保护技术&#xff0c;通过修改源代码或编译后的代码&#…

c/c++概念辨析-指针常量常量指针、指针函数函数指针、指针数组数组指针

概念澄清&#xff1a; 统一规则&#xff1a; 不管是XX指针&#xff0c;还是指针XX&#xff0c;后者是本体&#xff0c;前者只是个定语&#xff0c;前者也可以替换为其他同类&#xff08;例如字符串&#xff09;&#xff0c;帮助理解。 XX指针&#xff1a; 可简单理解为&#…

Image Segmentation Using Deep Learning: A Survey

论文标题&#xff1a;Image Segmentation Using Deep Learning:A Survey作者&#xff1a;发表日期&#xff1a;阅读日期 &#xff1a;研究背景&#xff1a;scene understanding,medical image analysis, robotic perception, video surveillance, augmented reality, and image…

安卓+charles实现抓包(主要解决证书网站无法打开问题)

安装 官网下载 https://www.charlesproxy.com/latest-release/download.do 使用介绍 Charles介绍 上面链接看一至三即可 初步代理配置 如何获取代理服务器IP和手机端IP 代理服务器IP 点击help&#xff0c;选中ssl 代理&#xff0c;点击在移动设备或远程浏览器上安装Cha…

Vue系列:Vue Element UI中,使用按钮实现视频的播放、停止、停止后继续播放、播放完成后重新播放功能

最近在工作中有个政务大屏用到了视频播放&#xff1b; 技术栈是Vue2、Element UI&#xff1b; 要实现的功能是&#xff1a;使用按钮实现视频的播放、停止、停止后继续播放、播放完成后重新播放功能 具体可以按照以下步骤进行操作&#xff1a; 引入插件&#xff1a; 在Vue组件…