Spark Streaming的基本数据流

先来介绍一下按照动静对数据的区分

静态数据

​ 静态数据(Static Data)指的是在一段时间内不会或很少发生变化的数据。这种类型的数据通常是固定的,并且不会随着时间的推移而更新或仅偶尔更新。静态数据的典型例子包括配置文件、参考表、历史记录、已发布的研究报告等。

​ 在大数据环境中,尤其是使用 Hadoop 分布式文件系统(HDFS)时,数据通常被认为是静态的,这是因为 HDFS 被设计成适合一次写入和多次读取的场景(Write Once, Read Many,即 WORM 模型)。这意味着一旦数据写入 HDFS,它通常不会被修改——这是 HDFS 的一个重要特性,也是它能够高效处理大规模数据的原因之一。

​ 但是hdfs中有些生态组件可以实现数据的更新修改操作

流数据

流数据处理的目的确实通常是为了实时分析和立即反馈,而不是将所有原始流数据长期存储。由于流数据是连续的,并且可能以非常高的速度产生,长期全量存储可能会非常昂贵且不实用。因此,流处理系统通常会进行如下操作:
实时处理:在数据流入的同时进行处理,如聚合、过滤或转换数据。
结果存储:仅存储处理结果,如聚合的统计信息、检测到的事件或触发的警报。
窗口操作:在流处理中常用“窗口”概念,即在指定时间或数据量范围内处理数据,这样可以限制存储和计算的范围。
摘要信息:为了后续分析,可能会保存一些原始数据的摘要信息,如摘要统计、样本或数据摘要(例如布隆过滤器或基数估计)。
滚动存储:在某些情况下,系统可能会采用滚动存储策略,即保留最近的数据流快照,并定期删除旧数据。

尽管原始流数据不常被完整存储,但在某些场景下,可能还是需要将原始流数据或其子集进行存储,以便于后续的批量分析或回溯分析。这可以通过以下方式实现:
热存储:将数据流的最新部分存储在高速访问存储系统中。
冷存储:将历史数据归档到成本较低的存储系统中,如云存储或Hadoop HDFS。
混合存储:结合热存储和冷存储,根据数据的访问频率和价值进行数据的层次化存储。

StreamingContext对象

在rdd编程中需要生成一个saprkcontext对象,在sparksql编程需要生成一个sparksession对象,同理运行sparkstreaming就需要生成一个streamingContext对象,他是sparkstreaming程序的主要入口

在 Spark Streaming 中,StreamingContext 对象是流处理的主要入口点。它是连接到 Spark 的核心,用于处理实时数据流的对象。以下是 StreamingContext 的一些关键特点:

  1. 初始化StreamingContext 通过连接到 SparkContext 来初始化。它负责创建 DStreams(离散流),这是 Spark Streaming 处理的基本抽象。

  2. 创建 DStreams:可以通过连接到不同的输入源(如 Kafka、Flume、套接字或文件系统)来创建 DStreams。

  3. 转换和操作StreamingContext 允许对 DStreams 进行多种转换和操作,例如 map、reduce、join、window 等。

  4. 启动和停止流处理:通过调用 start() 方法启动流处理,并持续处理数据直到调用 stop() 方法。

  5. 容错性和状态管理:它支持检查点机制,这有助于恢复状态和容错。

  6. 调度和管理StreamingContext 负责调度和管理流处理作业的生命周期。

在使用 Spark Streaming 编写应用程序时,StreamingContext 是编写和管理流处理逻辑的核心组件。

好的,这里是一个使用 StreamingContext 的 Spark Streaming 应用程序的基本示例:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 初始化 SparkContext
sc = SparkContext("local[2]", "NetworkWordCount")# 创建 StreamingContext,设置批处理间隔为1秒
ssc = StreamingContext(sc, 1)# 创建一个将连接到 localhost:9999 的 DStream
lines = ssc.socketTextStream("localhost", 9999)# 将收到的每行文本切分为单词
words = lines.flatMap(lambda line: line.split(" "))# 计算每个批次的单词频率
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)# 打印每个批次的单词频率
wordCounts.pprint()# 启动 Spark Streaming
ssc.start()# 等待流处理结束
ssc.awaitTermination()

在这个例子中,StreamingContext 用于设置流处理,并创建了一个套接字流(socketTextStream),它从本地主机的 TCP 端口 9999 读取数据。然后对数据进行处理,计算单词出现的频率,并在控制台上打印每个批次处理的结果。最后,通过 start() 方法启动流处理,并使用 awaitTermination() 使处理持续进行。

是的,socketTextStream 是连接到 TCP 套接字的一种方式,但 Spark Streaming 也可以连接到其他类型的输入源。以下是一些不同输入源连接方式的示例:

# 监控一个目录以获取新文件
fileStream = ssc.textFileStream("file:///path/to/directory")#在 Spark 2.4 及更高版本中,结合 Structured Streaming 使用:
df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "topic1") \.load()#接受flume数据流    
flumeStream = FlumeUtils.createStream(ssc, "[flume_hostname]", [flume_port])#接受Twitter数据流
twitterStream = TwitterUtils.createStream(ssc, None)#这里创建了一个 RDDs 的队列,并通过 `queueStream` 方法创建了一个 DStream。
rddQueue = []
for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
inputStream = ssc.queueStream(rddQueue)

尽管输入源不同,但在 Spark Streaming 中接收到数据后,处理方法是统一的。无论数据来自哪种源(如套接字、Kafka、文件系统等),一旦它们被转换成 DStreams(离散流),就可以应用一系列通用的转换和操作

基本数据流

在 Spark Streaming 中,文件流和套接字流确实可以被视为较为基础或“低级”的数据源。这主要是因为它们提供了最基本的数据输入功能,而没有像一些更高级数据源那样的复杂特性。例如:

文件流:简单地从指定目录读取新文件,但不支持更复杂的数据处理或状态管理。

套接字流:从指定的 TCP 套接字接收文本数据,主要用于测试和原型开发,但缺乏生产环境中所需的可靠性和伸缩性。

文件流

*spark Streaming 中的文件流监听主要针对指定目录下的文件变动。这个机制专门用于监控指定目录并处理新添加到该目录下的文件。关键点包括:
新增文件:Spark Streaming 只处理在监听开始之后新增的文件。
完整性:只有完全写入的文件会被处理,避免处理正在写入中的文件。
一次性处理:文件一旦被读取和处理,就不会再次被处理,即使应用程序重启。
需要注意的是,文件流不会响应目录内现有文件的任何修改或删除操作,也不会处理在监听开始之前就已经存在的文件。

以下是一个使用 Spark Streaming 监听文件系统目录并处理新文件的简单示例:

  1. 首先,您需要一个运行的 Spark Streaming 程序。 我们假设您希望监控的目录是 /path/to/directory

  2. 代码示例

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext# 初始化 SparkContext 和 StreamingContext
    sc = SparkContext("local[2]", "FileStreamExample")
    ssc = StreamingContext(sc, 10)  # 10秒的批处理间隔# 创建一个指向指定目录的文件流
    lines = ssc.textFileStream("file:///path/to/directory")# 对读取的数据进行简单处理
    counts = lines.flatMap(lambda line: line.split(" "))\.map(lambda word: (word, 1))\.reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()             # 启动流计算
    ssc.awaitTermination()  # 等待流计算结束
    
  3. 操作流程

    • 将上述代码保存为 Python 文件,例如 filestream_example.py
    • 运行这个 Python 脚本以启动 Spark Streaming 应用。
    • 将新文件添加到 /path/to/directory 目录中。这些文件应当在启动应用程序后添加。
    • Spark Streaming 将处理这些新文件,运行指定的转换和操作。

这个程序会持续监控指定目录,任何在程序运行期间新增的文件都会被读取和处理。文件内容被分割成单词,并计算每个批次中每个单词出现的次数。

假设你的 /path/to/directory 目录中新增了一个包含以下文本的文件:

vi test
hello world
hello pyspark

运行上述程序后,你可能会在控制台看到如下输出(根据实际文件内容和时间间隔可能略有不同):

-------------------------------------------
Time: [时间戳]
-------------------------------------------
('hello', 2)
('world', 1)
('pyspark', 1)

这个输出表示,程序读取了新增文件的内容,并成功计算了单词 “hello” 出现了 2 次,而 “world” 和 “pyspark” 各出现了 1 次。每次新文件被添加到监控目录时,类似的结果将会显示。

套接字流

开启端口号,关闭端口号

ps -ef | grep 9999
netstat -an | grep 9999 #查看是谁在使用端口号9999
ps -L | grep 9999#查看仅在监听的进程

查询结果:

用户PIDPPIDVSZRSSTTYSTATTIMECOMMAND
root1000?Ss0:00/sbin/init
root2000?S0:00[kthreadd]
root3200?S0:00[ksoftirqd/0]
root470741129202600?T0:00/usr/sbin/sshd: root@pts/0
root470754707400pts/0Ss0:00-bash
kill -9 12345  #关闭占用端口号的进程#这是需要下载的命令
fuser -k 9999
fuser -k :::9999#释放端口号

在 Spark Streaming 中,使用套接字流涉及读取从特定 IP 地址和端口发送的数据。以下是创建和运行一个简单套接字流应用程序的操作流程:

  1. 编写 Spark Streaming 程序

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext# 初始化 SparkContext 和 StreamingContext
    sc = SparkContext("local[2]", "SocketStreamExample")
    ssc = StreamingContext(sc, 1)  # 批处理间隔1秒# 连接到 localhost 的 9999 端口
    lines = ssc.socketTextStream("localhost", 9999)# 将行切分为单词
    words = lines.flatMap(lambda line: line.split(" "))
    words.pprint()ssc.start()             # 启动流计算
    ssc.awaitTermination()  # 等待流计算结束
    
  2. 运行 Spark Streaming 程序

    • 将上述代码保存为 Python 文件,例如 socketstream_example.py
    • 运行这个 Python 脚本以启动 Spark Streaming 应用。
  3. 发送数据到套接字

    • 可以使用 nc(netcat)工具向端口 9999 发送数据。在命令行运行 nc -lk 9999 并输入一些文本。
    • 输入的每一行文本都将作为一个数据批次发送给 Spark Streaming 应用。
  4. 观察结果

    • 在 Spark Streaming 应用的控制台输出中,你将看到处理的单词。

这个程序会持续监听 localhost 上的 9999 端口,任何发送到这个端口的数据都会被读取和处理。输入的每行文本被切分为单词,并在控制台上打印。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/179572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试题:说一下MyBatis动态代理原理?

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1.MyBatis简介2.使用步骤2.1、引入依赖2.2、配置文件2.3、接口定义2.4、加载执行 3.原理解析 1.MyBatis简介 MyBatis是一个ORM工具,封装了JDBC的操作&a…

Redis 主从架构,Redis 分区,Redis哈希槽的概念,为什么要做Redis分区

文章目录 Redis 主从架构redis replication 的核心机制redis 主从复制的核心原理过程原理Redis集群的主从复制模型是怎样的?生产环境中的 redis 是怎么部署的?机器是什么配置?你往内存里写的是什么数据?说说Redis哈希槽的概念&…

前端入门(四)Ajax、Promise异步、Axios通信、vue-router路由

文章目录 AjaxAjax特点 Promise 异步编程(缺)Promise基本使用状态 - PromiseState结果 - PromiseResult Axios基本使用 Vue路由 - vue-router单页面Web应用(single page web application,SPA)vue-router基本使用路由使…

Rabbitmq发送邮件并消费邮件

📑前言 本文主要是【Rabbitmq】——Rabbitmq发送邮件并消费邮件的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 &#x1…

jvm的相关知识点

Java Virtual Machine(JVM)是Java程序的运行环境,是Java技术的核心和关键之一。JVM负责执行Java字节码,并提供了一种平台无关性的执行环境,使得Java程序可以在不同的硬件和操作系统上运行。 下面是关于JVM的一些重要知…

spring应用在afterPropertiesSet方法中获取ApplicationContext

在afterPropertiesSet方法中获取ApplicationContext是可以的。Spring容器在初始化bean后,会自动调用afterPropertiesSet方法。在这个方法中,您可以获取到ApplicationContext对象。 以下是一个示例代码: import org.springframework.context…

【数学】旋转矩阵

参考链接 OpenGL from OpenGL.GL import * from OpenGL.GLUT import * from math import * import numpy as np def draw_axes():glClear(GL_COLOR_BUFFER_BIT)# 绘制坐标轴glColor3f(1.0, 1.0, 1.0) # 设置坐标轴颜色为白色glBegin(GL_LINES)glVertex2f(-1.0, 0.0) # x 轴g…

Python中使用matplotlib库绘图中如何给图形的图例设置中文字体显示

问题:当使用matplotlib绘图时遇到绘图,图例显示不出来中文字体 解决方式: 1)加载字体管理库 from matplotlib.font_manager import FontProperties 2)设置系统上字体的路径 font FontProperties(fname"C:\\W…

直线(蓝桥杯)

直线 题目描述 本题为填空题,只需要算出结果后,在代码中使用输出语句将所填结果输出即可。 在平面直角坐标系中,两点可以确定一条直线。如果有多点在一条直线上, 那么这些点中任意两点确定的直线是同一条。 给定平面上 2 3 个…

VMD扩展molUP安装与高斯接口使用

molUP是一个VMD扩展,提供了一个简单的方式来加载和保存高斯文件,并分析相关的结果。 molUP为VMD提供了一个图形界面,用户可以加载和保存高斯文件格式的化学结构。这个扩展包括一组工具来设置高斯支持的任何计算,包括ONIOM通过互动…

静态内部类(内部类) - Java

静态内部类 StaticInnerClass01.java 说明:静态内部类是定义在外部类的成员位置,并且有static修饰 可以直接访问外部类的所有静态成员,包含私有的,但不能直接访问非静态成员。可以添加任意访问修饰符(public.protec…

计算机网络基础知识自用

示例: 域名如:alibaba.com (URL地址) IP地址为:xx.233.xxs.12 (访问) 端口:80 1.ip Internet Protocol Address,又译为网际协议地址,常见的IP地址分为IPv4与IPv6两大类。目前我们使用的都是IPv4的地址&am…

音视频学习(十九)——rtsp收流(tcp方式)

前言 本文主要介绍以tcp方式实现rtsp拉流。 流程图 流程说明: 客户端发起tcp请求,如向真实相机设备请求,端口一般默认554;tcp连接成功,客户端与服务端开始rtsp信令交互;客户端收到play命令响应后,开启线…

Leetcode 501 二叉搜索树中的众数

题意理解: 首先明确: 二叉搜索树中序遍历是严格的单调递增序列,也就是说,传统意义上得到二叉搜索树不存在相同的数,也不可能存在众数。 所以: 这里的二叉搜索树不是严格意义上的二叉搜索树&#xf…

PVE中CT容器安装openwrt X86的极简方法

下载推荐:https://openwrt.ai/ 使用环境PVE8.0,openwrt是以上网址的最新版,内涵及其丰富组件。 问题来源: 在PVE虚拟机可以很方便的使用img文件,转换qm 成一个硬盘文件,加入到虚拟机也就完成了&#xff0c…

五、双向NAT

学习防火墙之前,对路由交换应要有一定的认识 双向NAT1.1.基本原理1.2.NAT Inbound NAT Server1.3.域内NATNAT Server —————————————————————————————————————————————————— 双向NAT 经过前面介绍,…

fiddler设置手机端抓包看这篇文章就足够了,轻松解决!

fiddler设置手机端抓包 安卓手机抓包 第一步:配置电脑和安卓的相关设置 1、手机和fiddler位于同一个局域网内;首先从fiddler处获取到ip地址和端口号: 添加图片注释,不超过 140 字(可选) ,点…

【Linux】coredump 文件的例子分析

1. 生成 core 文件 网上很多教程,我这里举一种 临时开启 生成 core 文件 # 0 就是没有开 coredump 功能 rootswd-Lenovo-G40-80:/home/swd/pros/c--learn/0.test_codes/demos# ulimit -c 0 # 设置值临时为 unlimited rootswd-Lenovo-G40-80:/home/swd/pros/c--le…

Java基础之常用类

Java基础之常用类 一、包装类1.1、Java基本数据类型及其对应的包装类1.2、包装类的自动装箱、自动拆箱机制1.3、包装类的优点 二、String类三、StringBuffer类和StringBuilder类3.1、主要区别:3.2、StringBuffer/StringBuilder用法(两者用法一致) 四、日期类4.1、Da…

setLineWrapMode 是 QTextEdit 类的成员函数,用于设置文本换行模式(Line Wrap Mode)

setLineWrapMode 是 QTextEdit 类的成员函数,用于设置文本换行模式(Line Wrap Mode)。 在 Qt 中,文本换行模式指定了文本编辑器中长行文本的显示方式。通过设置不同的换行模式,可以控制是否自动换行、如何换行以及是否…