结构化流(Structured Streaming)

结构化流介绍

有界和无界数据

  • 有界数据:
指的数据有固定的开始和固定的结束,数据大小是固定。我们称之为有界数据。对于有界数据,一般采用批处理方案(离线计算)特点:1-数据大小是固定2-程序处理有界数据,程序最终一定会停止
  • 无界数据:
指的数据有固定的开始,但是没有固定的结束。我们称之为无界数据
对于无界数据,我们一般采用流式处理方案(实时计算)特点:1-数据没有明确的结束,也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据,程序会一直运行不会结束

基本介绍

结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL …

Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

​ 真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)

实时数据案例–词频统计

需求:在这里插入图片描述
代码实现:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('structured_streaming_wordcount')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.readStream\.format("socket")\.option("host","192.168.88.161")\.option("port","55555")\.load()# 3- 数据处理result_df = init_df.select(F.explode(F.split('value',' ')).alias('word')).groupBy('word').agg(F.count('word').alias('cnt'))# init_df.show()# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()

程序运行结果:
在这里插入图片描述
代码测试操作步骤:

首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据
yum -y install nc执行nc命令, 开启端口号, 写入数据:
nc -lk 55555注意: 要先启动nc,再启动我们的程序查看端口号是否被使用命令:
netstat -nlp | grep 要查询的端口

在这里插入图片描述
可能遇到的错误:
在这里插入图片描述

结构化流的编程模型

数据结构

在这里插入图片描述
在结构化流中,我们可以将DataFrame称为无界的DataFrame或者无界的二维表

数据源部分

结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作。目前提供了如下数据源:

  • Socket Source:网络套接字数据源,一般用于测试。也就是从网络上消费/读取数据
  • File Source:文件数据源。读取文件系统,一般用于测试。如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行
  • Kafka Source:Kafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。
  • Rate Source:速率数据源。一般用于测试。通过配置参数,由结构化流自动生成测试数据。## Operation操作

对应官网文档内容:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources
在这里插入图片描述
File Source
将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet…
相关的参数:

option参数描述说明
maxFilesPerTrigger每次触发时要考虑的最大新文件数 (默认: no max)
latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)
fileNameOnly是否检查新文件只有文件名而不是完整路径(默认值:false)将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/dataset.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt”

读取代码通用格式:

sparksession.readStream.format('CSV|JSON|Text|Parquet|ORC...').option('参数名1','参数值1').option('参数名2','参数值2').option('参数名N','参数值N').schema(元数据信息).load('需要监听的目录地址')针对具体数据格式,还有对应的简写API格式,例如:sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)

代码操作

import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions","1")\.appName('file_source')\.master('local[*]')\.getOrCreate()# 2- 数据输入:File Source文件数据源"""File Source总结1- 只能监听目录,不能监听具体的文件2- 可以通过*通配符的形式监听目录中满足条件的文件3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况"""init_df = spark.readStream.csv(path="file:///export/data/",sep=",",encoding="UTF-8",schema="id int,name string")# 3- 数据处理# 4- 数据输出# 5- 启动流式任务init_df.writeStream.format("console").outputMode("append").start().awaitTermination()

可能遇到的错误一:
在这里插入图片描述

原因: 如果是文件数据源,需要手动指定schema信息

可能遇到的错误二:
在这里插入图片描述

原因: File source只能监听目录,不能监听具体文件
文件数据源特点:
1- 不能够监听具体的文件,否则会报错误java.lang.IllegalArgumentException: Option 'basePath' must be a directory
2- 可以通过通配符的形式,来监听目录下的文件,符合要求的才会被读取
3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况

Operations操作

指的是数据处理部分,该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理,也可以使用DSL方式进行处理。

Sink输出操作

在结构化流中定义好DataFrame或者处理好DataFrame之后,调用writeStream()方法完成数据的输出操作。在输出的过程中,我们可以设置一些相关的属性,然后启动结构化流程序运行。
在这里插入图片描述

输出模式

在进行数据输出的时候,必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式:

  • 1- append模式:增量模式

    特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。

  • 2- complete模式:完全(全量)模式

    特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。

  • 3- update模式:更新模式

    特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。

append模式:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('structured_streaming_wordcount')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.readStream\.format("socket")\.option("host","192.168.88.161")\.option("port","55555")\.load()init_df.createTempView("tmp_table")# 3- 数据处理# 正常:没有聚合操作,也没有排序result_df = spark.sql("""selectexplode(split(value,' ')) as wordfrom tmp_table""")# 异常:有聚合操作,没有排序# result_df = spark.sql("""#     select#         word,count(1) as cnt#     from (#         select#             explode(split(value,' ')) as word#         from tmp_table#     )#     group by word# """)# 异常:没有聚合操作,有排序# result_df = spark.sql("""#     select#         word#     from (#         select#             explode(split(value,' ')) as word#         from tmp_table#     )#     order by word# """)# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format('console').outputMode('append').start().awaitTermination()

如果有了聚合操作,会报如下错误:
在这里插入图片描述
如果有了排序操作,会报如下错误:
在这里插入图片描述

complete模式:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('structured_streaming_wordcount')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.readStream\.format("socket")\.option("host","192.168.88.161")\.option("port","55555")\.load()init_df.createTempView("tmp_table")# 3- 数据处理# 异常:没有聚合操作# result_df = spark.sql("""#     select#         explode(split(value,' ')) as word#     from tmp_table# """)# 正常:有聚合操作,没有排序result_df = spark.sql("""selectword,count(1) as cntfrom (selectexplode(split(value,' ')) as wordfrom tmp_table)group by wordorder by cnt""")# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()

如果没有聚合操作,会报如下错误:
在这里插入图片描述
update模式:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('structured_streaming_wordcount')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.readStream\.format("socket")\.option("host","192.168.88.161")\.option("port","55555")\.load()init_df.createTempView("tmp_table")# 3- 数据处理# 正常:没有聚合操作result_df = spark.sql("""selectexplode(split(value,' ')) as wordfrom tmp_table""")# 正常:有聚合操作,没有排序# result_df = spark.sql("""#     select#         word,count(1) as cnt#     from (#         select#             explode(split(value,' ')) as word#         from tmp_table#     )#     group by word# """)# 异常:有排序result_df = spark.sql("""selectwordfrom (selectexplode(split(value,' ')) as wordfrom tmp_table)order by word""")# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format('console').outputMode('update').start().awaitTermination()

如果有了排序操作,会报如下错误:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625023.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式学习-网络编程-Day2

思维导图 tcp通信流程 udp通信流程 作业1 写一个基于TCP协议的客户端来控制RobArm机械臂 代码 #include <myhead.h> #define SER_PORT 8888 #define SER_IP "192.168.122.71" #define CLI_PORT 6666 #define CLI_IP "192.168.122.36"int main(int…

分布式搜索引擎ElasticSearch——搜索功能

分布式搜索引擎ElasticSearch——搜索功能 文章目录 分布式搜索引擎ElasticSearch——搜索功能DSL查询文档DSL查询分类全文检索查询精确查询地理查询复合查询Function Score QueryBoolean Query 搜索结果处理排序![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/37d…

Qt 使用vs2019制作Qt静态库( *.lib )并使用

一 .创建静态库 1.创建Qt Class Library(Qt静态类库)项目 2.设置项目名以及项目路径(注意:不能有中文字符) 点击next 3.选则需要的模式以及Qt 模块 然后点击next,Finish完成创建 4. 然后手动添加Qt Widget Form File (.ui)并对设计ui 5. tpendialog.h #pragma once #includ…

书生.浦语大模型实战一

从专用模型到通用大模型 数据 书生.万卷1.0 文本图像-文本视频数据 OpenDataLab开放平台 图像&#xff1a;ImageNettokens语料&#xff1a;WikiQA音频视频&#xff1a;MovieNet3D模型 预训练 微调 增量续训 使用场景&#xff1a;让基座模型学习到一些新知识&#xff0…

Ubuntu 20.04 Intel RealSense D435i 相机标定教程

下载编译code_utils mkdir -p ~/imu_catkin_ws/src cd ~/imu_catkin_ws/src catkin_init_workspace source ~/imu_catkin_ws/devel/setup.bash git clone https://github.com/gaowenliang/code_utils.git cd .. catkin_make报错&#xff1a;sumpixel_test.cpp:2:10: fatal err…

vivado 使用IP Integrator源

使用IP Integrator源 在Vivado Design Suite中&#xff0c;您可以在RTL中添加和管理IP子系统块设计&#xff08;.bd&#xff09;项目或设计。使用Vivado IP集成程序&#xff0c;您可以创建IP子系统块设计。IP集成程序使您能够通过实例化和将Vivado IP目录中的多个IP核互连。可…

改进YOLOv8:添加CBAM注意力机制(涨点明显)

1、计算机视觉中的注意力机制 计算机视觉中的注意力机制是一种聚焦于局部信息的机制&#xff0c;其基本思想是让系统学会忽略无关信息而关注重点信息。这种机制在图像识别、物体检测和人脸识别等任务中都发挥了重要作用。 注意力机制的实现方法有多种&#xff0c;其中包括空间…

亲手打造一个本地LLM语音助手来管理智能家居

经历过 Siri 和 Google 助手之后&#xff0c;我发现尽管它们能够控制各种设备&#xff0c;但却无法进行个性化定制&#xff0c;并且不可避免地依赖于云服务。出于对新知识的渴望以及想在生活中使用一些酷炫的东西&#xff0c;我下定决心&#xff0c;要追求更高的目标。我的要求…

【RTOS】快速体验FreeRTOS所有常用API(2)任务管理

目录 二、任务管理2.1 任务创建&#xff08;三种方式&#xff09;1&#xff09;动态内存分配方式创建任务2&#xff09;静态内存分配方式创建任务3&#xff09;带有任务参数方式创建任务 2.2 任务删除2.3 两种delay 二、任务管理 该部分在上份代码基础上修改得来&#xff0c;代…

​HDD回暖于2024,与SSD决战于2028--part2

东芝和西部数据在2023年的硬盘产品中都没有采用类似希捷的HAMR技术产品&#xff0c;而是采用了其他的技术方案用于提升存储容量。 东芝采用了MAMR技术&#xff0c;通过微波磁通控制现象来提高高密度区域的写入信号质量。根据厂商的测试数据发现&#xff0c;MAMR的磁头可靠性比H…

Flink-容错机制

Flink中的容错机制 流式数据连续不断地到来&#xff0c;无休无止&#xff1b;所以流处理程序也是持续运行的&#xff0c;并没有一个明确的结束退出时间。机器运行程序&#xff0c;996 起来当然比人要容易得多&#xff0c;不过希望“永远运行”也是不切实际的。因为各种硬件软件…

HCIP ISIS实验

拓扑图&IP划分如下图&#xff1a; 第一步&#xff0c;配置IP地址&环回地址 以R1为例&#xff0c;R2~R8同理 interface GigabitEthernet 0/0/0 ip address 18.1.1.1 24 interface GigabitEthernet 0/0/1 ip address 12.1.1.1 24 interface LoopBack 0 ip address 1.1.…

第07章_面向对象编程(进阶)拓展练习(关键字:this,继承性和方法重写,关键字:super,多态性,Object类)

文章目录 第07章_面向对象编程&#xff08;进阶&#xff09;拓展练习01-关键字&#xff1a;this1、Circle类2、MyDate类3、Card类 02-继承性和方法重写4、Person、Student、Teacher类5、DepositCard、CreditCard类6、Employee、Programmer、Designer、Architect类7、判断输出结…

统计学-R语言-4.6

文章目录 前言列联表条形图及其变种---单式条形图条形图及其变种---帕累托图条形图及其变种---复式条形图条形图及其变种---脊形图条形图及其变种---马赛克图饼图及其变种---饼图饼图及其变种---扇形图直方图茎叶图箱线图小提琴图气泡图总结 前言 本篇文章是对数据可视化的补充…

Centos7.9忘记Root密码找回

Centos7.9忘记Root密码找回 1. 背景2. 目的3. 具体操作3.1 重启系统3.2 增加代码3.3 单用户模式3.4 单用户模式3.5 修改密码3.6 创建文件3.7 重启验证 1. 背景 由于物理主机上安装了多个虚拟机&#xff0c;部分虚拟机忘记了root密码&#xff0c;前段时间刚好要用这个虚拟机&…

智慧康养项目:智能技术与产品提升老年人生活品质

智慧康养项目需要集成的一些独特的技术和产品&#xff0c;其中包括&#xff1a; 智能健康监测设备&#xff1a;我们开发了一款能够实时监测老年人身体状况的智能健康监测设备&#xff0c;包括血压、血糖、心率等指标。该设备通过数据分析处理&#xff0c;能够提供个性化的健康…

内存泄漏问题

内存泄漏是一种常见的问题&#xff0c;它可能导致系统内存不断增加&#xff0c;最终耗尽可用内存。解决内存泄漏问题通常需要进行调试和分析。下面是一些可能有助于解决内存泄漏问题的步骤&#xff1a; 1. 监控内存使用情况&#xff1a; a. 使用 malloc 记录日志&#xff1a;…

【Dart】=> [05] Dart初体验-函数

文章目录 函数函数特点可选和默认参数函数对象箭头函数匿名函数综合案例 能够定义并使用Dart函数 学习内容&#xff1a; 函数定义可选和默认参数函数对象箭头函数匿名函数 函数 函数定义 Dart函数的结构&#xff1a; 调用函数&#xff1a; 案例&#xff1a;定义计算任意…

短视频账号矩阵剪辑分发系统无人直播技术开发源头

一、全行业独家源头最全面的核心技术 短视频矩阵新玩法是指利用批量自动混剪系统来处理大量短视频&#xff0c;通过智能算法自动进行视频剪辑、场景切换、特效添加等操作&#xff0c;最终生成高质量、精彩纷呈的混剪视频作品的方法和技术。这一方法的出现使得大规模短视频制作…

牛客周赛 Round 3 解题报告 | 珂学家 | 贪心思维场

前言 寒之不寒无水也&#xff0c;热之不热无火也。 整体评价 感觉比较简单&#xff0c;更加侧重于思维吧。和前几场的Round系列&#xff0c;风格不太一样。 A. 游游的7的倍数 因为连续7个数&#xff0c;比如有一个数是7的倍数 因此从个位数中着手添加&#xff0c;是最好的选…