Spark Streaming流媒体引擎

Spark Streaming是Spark的上一代流媒体引擎。Spark Streaming不再有更新,它是一个遗留项目。Spark中有一个更新且更易于使用的流媒体引擎,称为结构化流媒体

概述

Spark Streaming是核心Spark API的扩展,支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源获取,如Kafka、Kinesis或TCP套接字,并且可以使用复杂的算法进行处理,这些算法用高级函数表示,如map、reduce、join和window。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。
原理:Spark Streaming接收实时输入数据流,并将数据划分为多个批次,然后由Spark引擎进行处理,以批量生成最终的结果流。
在这里插入图片描述

离散流(DStreams)

离散流或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,或者是从源接收的输入数据流,要么是通过转换输入流生成的处理后的数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变的分布式数据集的抽象(有关更多详细信息,请参阅Spark编程指南)。DStream中的每个RDD都包含特定间隔的数据,如下图所示。
对数据流应用的任何操作都转换为对底层RDD的操作,这些底层的RDD转换是由Spark引擎计算的。DStream操作隐藏了大多数这些细节,并为开发人员提供了更高级别的API以方便使用

每个输入DStream(除了文件流)都与一个Receiver(Scala-doc,Java-doc)对象相关联,该对象从源接收数据并将其存储在Spark的内存中进行处理。

Input DStreams和Receivers

在Spark Streaming中,封装了输入数据流的两个主要组件:Input DStreams和Receivers。

Input DStreams是Spark Streaming用来表示从数据源接收输入数据的抽象。每个输入DStream都可以看作是一个连续的数据流,它由多个RDD组成,这些RDD代表在一段时间内接收到的数据。Spark Streaming支持多种类型的输入DStreams,如基于文件、基于套接字、基于Kafka等。

接收器(Receiver)是实际负责从数据源获取数据并将其传递给Spark Streaming的组件。在Spark Streaming中,接收器是在工作节点上运行的独立任务,用于从数据源接收数据并将其存储在分布式存储系统中(如HDFS)。一旦数据被接收器接收并存储,Spark Streaming就会周期性地将存储的数据转换为RDD,并将其交给Spark引擎进行处理。

当输入DStream启动时,它会自动配置并启动与之关联的接收器。接收器会以并行的方式从数据源中获取数据,并将其划分为一系列小的数据块。然后,这些数据块会被Spark Streaming的计算引擎处理,形成最终的结果。

总结来说,Input DStreams和Receivers是Spark Streaming中用于接收和处理输入数据的关键组件。Input DStreams代表连续的数据流,而Receivers负责从数据源接收数据,并将其传递给Spark Streaming进行处理。
两类内置流媒体源:

  • 基本源:StreamingContext API中直接可用的源。示例:文件系统和套接字连接;
  • 高级资源:Kafka、Kinesis等资源可以通过额外的实用程序类获得

在Spark中,目录的监控是由Spark Streaming和Structured Streaming提供的功能。下面分别说明这两种流处理的方式:

  1. Spark Streaming:对于Spark Streaming,可以使用textFileStream方法来监控一个目录中的文件,并将新增的文件作为新的输入源。它会周期性地检查目录中是否有新的文件出现,然后将新的文件内容作为DStream的一部分进行处理。这种监控方式是基于轮询的,Spark Streaming会定期轮询目录以检查是否有新的文件。

以下是一个使用Spark Streaming监控目录的示例代码片段:

import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sparkConf, Seconds(1))val directory = "/path/to/directory"
val lines = ssc.textFileStream(directory)
lines.foreachRDD { rdd =>// Process the RDD
}ssc.start()
ssc.awaitTermination()
  1. Structured Streaming:对于Structured Streaming,可以使用readStream方法来监控一个目录中的数据,并将新增的数据作为新的输入源。类似于Spark Streaming,这种监控方式也是基于轮询的,Structured Streaming会定期轮询目录以检查是否有新的数据。

以下是一个使用Structured Streaming监控目录的示例代码片段:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()val directory = "/path/to/directory"
val df = spark.readStream.text(directory)
val query = df.writeStream.format("console").start()query.awaitTermination()

这个示例将会从指定目录中读取文本文件,然后通过console输出源将内容显示在控制台上。输出源可以根据需求进行修改,比如写入文件、写入到Kafka等。

无论是使用Spark Streaming还是Structured Streaming,监控目录时需要注意文件的命名规则和文件格式,以确保数据按照预期被输入到流处理任务中。

各种转换操作

在Spark Streaming中,DStream是一个连续的数据流抽象,可以应用各种转换操作进行实时处理和分析。以下是一些常见的DStream转换操作:

  1. map(func):应用一个函数到DStream中的每个元素,并返回一个新的DStream,其中包含转换后的结果。

  2. flatMap(func):应用一个函数到DStream中的每个元素,并返回一个包含所有转换后结果的新DStream。

  3. filter(func):过滤DStream中的元素,只保留满足条件的元素。

  4. union(otherStream):将当前DStream与另一个DStream合并,生成包含两个DStream元素的新DStream。

  5. count():返回一个新的DStream,其中每个批次的元素是当前批次的元素数量。

  6. reduce(func):将当前DStream中每个批次的元素使用给定的函数进行聚合操作,返回一个新的DStream,其中每个批次仅包含一个聚合结果。

  7. window(windowDuration, slideDuration):创建一个滑动窗口,用于对窗口内的元素进行批处理操作。每个窗口都包含指定的窗口时长的元素,并且以指定的滑动间隔进行移动。

  8. join(otherStream):将当前DStream中的元素与另一个DStream中的元素进行连接操作,生成一个新的DStream,其中每个元素是两个流中匹配的元素对。

  9. foreachRDD(func):将一个RDD操作应用于DStream中的每个RDD,可以用于实现自定义的输出操作或将数据存储到外部系统中。

这些是常见的DStream转换操作,还有其他更高级的操作可以使用,例如窗口操作、状态操作、累加器等。根据实际需求,选择适当的转换操作来对DStream进行处理和转换。

接收到的数据是以微批次(micro-batches)的形式处理的

在Spark Streaming中,接收到的数据是以微批次(micro-batches)的形式处理的。Spark Streaming将实时数据流划分为一系列小的时间窗口,每个窗口称为一个微批次。每个微批次都由一些时间段内到达的数据组成。

接收到的数据在每个微批次内按时间顺序进行处理。具体而言,对于每个微批次,Spark Streaming会将接收到的数据收集到一个RDD(Resilient Distributed Dataset)中,然后应用在DStream上定义的转换操作。

有几种不同的语义来处理接收到的数据:

  1. At-least-once:在这种语义下,Spark Streaming保证至少处理一次数据。它使用WAL(Write-Ahead Log)机制来记录接收到的数据,以便在故障恢复时进行重播,确保数据不会丢失。这种语义可以保证数据的可靠性,但可能会导致某些数据重复处理。

  2. At-most-once:在这种语义下,Spark Streaming只处理数据一次,不保证重复数据的处理。这比较适用于实时处理对数据丢失更敏感的场景,但可能会导致一些数据丢失。

  3. Exactly-once:这是最严格的语义,要求保证每条数据仅被处理一次,且不丢失。实现确切一次语义较为复杂,需要使用外部的数据存储系统(如Apache Kafka)和事务支持。Spark Streaming提供了与Kafka集成的功能,可以实现近似的确切一次语义。

值得注意的是,Spark Streaming的语义是基于微批次的处理,因此无法提供实时流处理系统(如Apache Flink或Apache Storm)所提供的低延迟。每个微批次的处理延迟取决于微批次的窗口大小和处理任务的复杂性,可能在几十毫秒到几秒之间。

选择适当的语义取决于应用的需求和容忍的数据处理保证级别。如果数据的准确性非常重要,可以使用At-least-once或Exactly-once语义。如果对数据处理的延迟更敏感,可以选择At-most-once语义。

输出批次的数据

在Spark Streaming中,DStream是一个连续的数据流抽象,可以应用各种输出操作来将处理结果发送到外部系统或执行其他特定操作。以下是一些常见的DStream输出操作:

  1. print():将DStream中每个批次的数据打印到控制台。这对于调试和快速查看处理结果非常有用。

  2. saveAsTextFiles(prefix, [suffix]):将每个批次的数据以文本文件的形式保存到指定的目录中。可提供前缀和后缀参数来自定义文件名。

  3. saveAsObjectFiles(prefix, [suffix]):将每个批次的数据以序列化对象的形式保存到指定的目录中。同样可提供前缀和后缀参数。

  4. foreachRDD(func):对DStream中每个RDD应用一个自定义函数。可以在这个函数中执行特定的操作,如将数据存储到外部数据库、发送到消息队列等。需要注意的是,这个函数必须是幂等的,因为RDD可以在故障恢复时被重新计算。

  5. foreach(func):对DStream中每个批次的数据应用一个自定义函数。与foreachRDD不同的是,这个函数直接应用于DStream的每个元素,而不是RDD。

  6. saveToHadoopFiles(prefix, [suffix]):将每个批次的数据以Hadoop文件格式保存到指定的目录中。

  7. foreachPartition(func):对DStream中每个RDD的每个分区应用一个自定义函数。这对于批量处理每个分区的数据非常有用,在处理大规模数据时可以提高性能。

这些输出操作允许将Spark Streaming处理的结果发送到外部系统、存储到文件中,或执行自定义的操作。根据需求选择合适的输出操作,以满足数据处理的要求和目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/12895.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

条款38:对变化多端的线程句柄析构函数行为保持关注

条款37解释过,可联结的线程对应着一个底层系统执行线程,未推迟任务(参见条款36)的期值和系统线程有类似关系。这么一来,std::thread型别对象和期值对象都可以视作系统线程的句柄。 从这个视角来看,std::th…

Opencv的Mat内容学习

来源&#xff1a;Opencv的Mat内容小记 - 知乎 (zhihu.com) 1.Mat是一种图像容器&#xff0c;是二维向量。 灰度图的Mat一般存放<uchar>类型 RGB彩色图像一般存放<Vec3b>类型。 (1)单通道灰度图数据存放样式&#xff1a; (2)RGB三通道彩色图存放形式不同&#x…

Flutter 添加 example流程

一、已有Flutter工程&#xff08;命令&#xff09;添加 example 1、cd 工程(flutter_plugin ,是自己创建的)根目录 例: flutter create example 执行命令创建example PS&#xff1a;cd example 后执行flutter doctor 后就可以看到效果 2、如果需要指定iOS/Android 语言,请添加…

如何建立Docker私有仓库?

文章目录 docker私有仓库harborHarbor仓库部署Harbor仓库使用 docker私有仓库 Docker 私有仓库是一个用于存储和管理 Docker 镜像的私有存储库。它允许你在内部网络中创建和管理 Docker 镜像&#xff0c;并提供了更好的安全性和控制&#xff0c;因为你可以完全控制谁能够访问和…

ansible自动化运维(一)

&#x1f618;作者简介&#xff1a;正在努力的99年公司职员。 &#x1f44a;宣言&#xff1a;人生就是B&#xff08;birth&#xff09;和D&#xff08;death&#xff09;之间的C&#xff08;choise&#xff09;&#xff0c;做好每一个选择。 &#x1f64f;创作不易&#xff0c;…

机器学习 day31(baseline、学习曲线)

语音识别的Jtrain、Jcv和人工误差 对于逻辑回归问题&#xff0c;Jtrain和Jcv可以用分类错误的比例&#xff0c;这一方式来代替单单只看Jtrain&#xff0c;不好区分是否高偏差。可以再计算人类识别误差&#xff0c;即人工误差&#xff0c;作为基准线来进行比较Jtrain与baselin…

回归预测 | MATLAB实现TCN-BiLSTM时间卷积双向长短期记忆神经网络多输入单输出回归预测

回归预测 | MATLAB实现TCN-BiLSTM时间卷积双向长短期记忆神经网络多输入单输出回归预测 目录 回归预测 | MATLAB实现TCN-BiLSTM时间卷积双向长短期记忆神经网络多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现TCN-BiLSTM时间卷积…

Thymeleaf入门

Thymeleaf是前端开发模板&#xff0c;springboot默认支持。前端模板用法大多数是类似的jsp、thymeleaf、vue.js都有while\for\if\switch等使用&#xff0c;页面组件化等。 1.前端模板区别 jsp是前后端完全不分离的&#xff0c;jsp页面写一堆Java逻辑。 thymeleaf好处是html改…

非Spring环境 | Mybatis-Plus插入数据返回主键两种方式(注解或XML)

废话不多说&#xff0c;直接撸代码: <?xml version"1.0" encoding"UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace&qu…

【Spring】什么是Bean的生命周期及作用域,什么是Spring的执行流程?

博主简介&#xff1a;想进大厂的打工人博主主页&#xff1a;xyk:所属专栏: JavaEE进阶 在前面的播客中讲解了如何从Spring中存取Bean对象&#xff0c;那么本篇我们来讲解Bean对象的生命周期是什么&#xff0c;Bean对象的6种作用域分别是什么&#xff0c;都有哪些区别&#xff…

通过STM32内部ADC将烟雾传感器发送的信号值显示在OLED上

一.CubeMX配置 首先我们在CubeMX配置ADC1, 设置一个定时器TIM2定时1s采样一次以及刷新一次OLED&#xff0c; 打开IIC用于驱动OLED显示屏。 二.程序 在Keil5中添加好oled的显示库&#xff0c;以及用来显示的函数、初始化函数、清屏函数等。在主程序中初始化oled,并将其清屏。…

【RTT驱动框架分析02】-串口驱动分析

串口驱动学习 0.串口驱动的使用方法 //定义一个时间 struct rt_event system_event; #define SYS_EVENT_UART_RX_FINISH 0x00000001 /* UART receive data finish event *//*串口接收回调函数 Receive data callback function */ static rt_err_t uart_input(rt_device_t …

掌握Python的X篇_16_list的切片、len和in操作

接上篇掌握Python的X篇_15_list容器的基本使用&#xff0c;本篇进行进一步的介绍。 文章目录 1. list的索引下标可以是负数2. 切片&#xff08;slice&#xff09;2.1 切片基础知识2.2 如何“取到尽头”2.3 按照步长取元素2.4 逆序取值 3. len函数获取lis的元素个数4. in操作符…

rocketmq客户端本地日志文件过大调整配置(导致pod缓存cache过高)

现象 在使用rocketmq时&#xff0c;发现本地项目中文件越来越大&#xff0c;查找发现在/home/root/logs/rocketmqlog目录下存在大量rocketmq_client.log日志文件。 配置调整 开启slf4j日志模式&#xff0c;在项目启动项中增加-Drocketmq.client.logUseSlf4jtrue因为配置使用的…

手把手教你从0入门线段树~

1. 什么是线段树? 1.1 初探线段树 定义&#xff1a;线段树是一种用于解决区间查询问题的数据结构&#xff0c;是一种广义上的二叉搜索树。 原理&#xff1a;它将一个区间划分为多个较小的子区间&#xff0c;并为每个子区间存储一些有用的信息&#xff0c;例如最大值、最小值…

如何降低TCP在局域网环境下的数据传输延迟

以Ping为例。本案例是一个测试题目&#xff0c;只有现象展示&#xff0c;不含解决方案。 ROS_Kinetic_26 使用rosserial_windows实现windows与ROS master发送与接收消息_windows 接收ros1 消息 什么是ping&#xff1f; AI&#xff1a; ping是互联网控制消息协议&#xff08;…

【Spring Boot】

目录 &#x1f36a;1 Spring Boot 的创建 &#x1f382;2 简单 Spring Boot 程序 &#x1f370;3 Spring Boot 配置文件 &#x1f36e;3.1 properties 基本语法 &#x1fad6;3.2 yml 配置文件说明 &#x1f36d;3.2.1 yml 基本语法 &#x1f369;3.3 配置文件里的配置类…

如何将ubuntu LTS升级为Pro

LTS支持周期是5年&#xff1b; Pro支持周期是10年。 Ubuntu Pro专业版笔记 步骤&#xff1a; 打开“软件和更新” 可以看到最右侧的标签是Ubuntu Pro。 在没有升级之前&#xff0c;如果使用下面两步&#xff1a; sudo apt updatesudo apt upgrade 出现如下提示&#xff…

【低代码专题方案】iPaaS运维方案,助力企业集成平台智能化高效运维

01 场景背景 随着IT行业的发展和各家企业IT建设的需要&#xff0c;信息系统移动化、社交化、大数据、系统互联、数据打通等需求不断增多&#xff0c;企业集成平台占据各个企业领域&#xff0c;成为各业务系统数据传输的中枢。 集成平台承接的业务系统越多&#xff0c;集成平台…

【数据结构】时间复杂度和空间复杂度

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 时间复杂度和空间复杂度 前…