Spark Streaming DStream

Spark Streaming DStream

DStream

Discretized Stream,中文叫做离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。

DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。

DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即不可变的,分布式的数据集。

DStream中的每个RDD都包含了一个时间段内的数据。
spark streaming 离散数据流

对DStream应用的算子,其实在底层会被翻译为对DStream中每个RDD的操作,比如对一个DStream执行一个map操作,会产生一个新的DStream,其底层原理为,对输入DStream中的每个时间段的RDD,都应用一遍map操作,然后生成的RDD,即作为新的DStream中的那个时间段的一个RDD。

底层的RDD的transformation操作,还是由Spark Core的计算引擎来实现的,Spark Streaming对Spark core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次API。
spark streaming DStream

操作DStream

对于从数据源得到的DStream,用户可以在其基础上进行各种操作。

与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。

普通的转换操作

转换描述
map(func)源 DStream的每个元素通过函数func返回一个新的DStream。
flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。
repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。
union(otherStream)返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。
count()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。
reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。
cogroup(otherStream, [numTasks])当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。
updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。
transform(func)

该transform操作(转换操作)及其类似的transformWith操作,允许在DStream上应用任意的RDD-to-RDD函数。它可以实现DStream API中未提供的操作,比如两个数据流的连接操作。

示例代码如下:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
}
updateStateByKey操作

我们使用的一般操作都是不记录历史数据的,也就说只记录当前定义时间段内的数据,跟前后时间段无关。如果要统计历史时间内的总共数据并且实时更新,如何解决呢?该updateStateByKey操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用updateStateByKey操作,必须进行下面两个步骤 :

  • 定义状态: 状态可以是任意的数据类型。
  • 定义状态更新函数:用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。

对DStream通过updateStateByKey(updateFunction)来实现实时更新。

更新函数有两个参数 :

  • newValues是当前新进入的数据。
  • runningCount 是历史数据,被封装到了Option中。

为什么历史数据要封装到Option中呢?有可能我们没有历史数据,这个时候就可以用None,有数据可以用Some(x)。当然我们的当前结果也要封装到Some()中,以便作为之后的历史数据。

我们并不用关心新进入的数据和历史数据,系统会自动帮我们产生和维护,我们只需要专心写处理方法就行。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {//定义的更新函数val newCount = ...  // add the new values with the previous running count to get the new countSome(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction)//应用

示例:

  1. 首先我们需要了解数据的类型

  2. 编写处理方法

  3. 封装结果

    //定义更新函数
    //我们这里使用的Int类型的数据,因为要做统计个数
    def updateFunc(newValues : Seq[Int],state :Option[Int]) :Some[Int] = {//传入的newVaules将当前的时间段的数据全部保存到Seq中//调用foldLeft(0)(_+_) 从0位置开始累加到结束   val currentCount = newValues.foldLeft(0)(_+_) //获取历史值,没有历史数据时为None,有数据的时候为Some//getOrElse(x)方法,如果获取值为None则用x代替val  previousCount = state.getOrElse(0)//计算结果,封装成Some返回Some(currentCount+previousCount) 
    }
    //使用
    val stateDStream = DStream.updateStateByKey[Int](updateFunc)
    

窗口转换函数

Spark Streaming 还提供了窗口的计算,它允许你通过滑动窗口对数据进行转换,窗口转换操作如下:

转换描述
window(windowLength, slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的。因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。

在这里必须注意的一点是滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。
spark streaming 窗口转换函数

如图所示,批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time 1-time 3,只有窗口间隔满足了定义的长度也就是3才触发数据的处理,不够3继续等待。当间隔满足3之后进行计算后然后进行窗口滑动,滑动2个单位,会有新的数据流入窗口。然后重复等待满足窗口间隔执行计算。

输出操作

Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD操作)。

以下表列出了目前主要的输出操作:

转换描述
print()在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

DStream持久化

与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768618.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

主干网络篇 | YOLOv8更换主干网络之MobileNetV3

前言:Hello大家好,我是小哥谈。MobileNetV3是一种轻量级的卷积神经网络架构,用于图像分类和目标检测任务。它是MobileNet系列的第三个版本,旨在在保持高准确性的同时减少模型的计算量和参数数量。MobileNetV3引入了一些新的设计思想和技术,以提高模型的性能。其中一项重要…

【微服务】Gateway服务网关

📝个人主页:五敷有你 🔥系列专栏:微服务 ⛺️稳中求进,晒太阳 Spring Cloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等响…

Windows 7 静态 IP 地址

Windows 7 静态 IP 地址 References 静态 IP 地址设置为 192.168.1.198 控制面板 -> 查看网络状态和任务 更改适配器设置 网络连接 -> 属性 TCP / IPv4 警告 - 已计划将多个默认网关用于提供单一网络 (例如 intranet 或者 Internet) 的冗余。 6.1. 关闭 redundancy VMw…

个人博客系列-后端项目-系统角色配置(8)

系统角色配置需要设置的接口 用户可以绑定多个角色,角色对应有多个路由权限。用户绑定角色后,可以访问当前角色下的各个api路由和菜单路由。 用户注册时设置用户角色修改用户角色(同时对应用户可以访问的路由将会同步变更)添加修…

【Linux】写个日志和再谈线程池

欢迎来到Cefler的博客😁 🕌博客主页:折纸花满衣 🏠个人专栏:信号量和线程池 目录 👉🏻日志代码Log.cppMain.cc 👉🏻线程池代码LockGuard.hpp(自定义互斥锁,进…

Spring Boot集成zxing实现生成二维码功能

1.二维码介绍 二维码QR Code(Quick Response Code) 由Denso公司于1994年9月研制的一种矩阵二维码符号,它具有一维条码及其它二维条码所具有的信息容量大、可靠性高、可表示汉字及图象多种文字信息、保密防伪性强等优点。 ZXing 一个支持在图像中解码和生成条形码(如…

【Web】NKCTF 2024 个人wp(部分)

目录 my first cms 全世界最简单的CTF attack_tacooooo 属实太菜了,3/4 my first cms 一眼搜版本2.2.19 CVE -CVE-2024-27622 GitHub - capture0x/CMSMadeSimple 访问/admin/login.php 爆出弱口令,后台登录 admin Admin123 Extensions > User D…

鸿蒙实战开发-使用关系型数据库实现对账单的增、删、改、查

介绍 本Codelab以记账为例,使用关系型数据库的相关接口实现了对账单的增、删、改、查操作。实现效果如图所示: 相关概念 关系型数据库:基于关系模型来管理数据的数据库,提供了增、删、改、查等接口,也可运行输入的SQ…

Mac电脑高清媒体播放器:Movist Pro for mac下载

Movist Pro for mac是一款专为Mac操作系统设计的高清媒体播放器,支持多种常见的媒体格式,包括MKV、AVI、MP4等,能够流畅播放高清视频和音频文件。Movist Pro具有强大的解码能力和优化的渲染引擎,让您享受到更清晰、更流畅的观影体…

Matlab|【免费】基于数据驱动的模型预测控制电力系统机组组合优化

目录 1 主要内容 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序复现文章《Feature-Driven Economic Improvement for Network-Constrained Unit Commitment: A Closed-Loop Predict-and-Optimize Framework》,程序主要做的是一个基于数据驱动的电力系统机…

51单片机学习笔记——LED闪烁和流水灯

任务分析 首先要知道LED闪烁主要是怎么工作的,闪烁亮灭自然是一下为高一下为低,亮灭的频率则需要延时来进行控制。 上节已经知道了如何点亮那延时如何做呢首先先编写主框架 这样是否可以通过循环将LED灯一直循环闪烁。 以为while一直在循环所以其实是可…

[Java基础揉碎]final关键字

目录 介绍 在某些情况下,程序员可能有以下需求,就会使用到final final注意事项和讨论细节 1) final修饰的属性又叫常量,一般用XX_XX_XX来命名 2) final修饰的属性在定义时,必须赋初值,并且以后不能再修改&#…

Spring Cloud五:Spring Cloud与持续集成/持续部署(CI/CD)

Spring Cloud一:Spring Cloud 简介 Spring Cloud二:核心组件解析 Spring Cloud三:API网关深入探索与实战应用 Spring Cloud四:微服务治理与安全 文章目录 一、Spring Cloud在CI/CD中的角色1. 服务注册与发现:自动化管理…

Java安全 反序列化(4) CC1链-LazyMap版

Java安全 反序列化(4) CC1链-LazyMap版 实验环境:存在漏洞的版本 commons-collections3.1-3.2.1 jdk 8u71之后已修复不可利⽤ 文章目录 Java安全 反序列化(4) CC1链-LazyMap版一.跟踪挖掘CC1_LazyMap原理二.完整CC1_Lazy版Poc 接着上一篇文章我们通过ChainedTransFormer实现任意…

client-go中ListAndWatch机制,informer源码详解

文章首发地址: 学一下 (suxueit.com)https://suxueit.com/article_detail/s9UMb44BWZdDRfKqFv22 先上一张,不知道是那个大佬画的图 简单描述一下流程 client-go封装部分 以pod为例 、先List所有的Pod资源,然后通过已经获取的pod资源的最大版…

SQLiteC/C++接口详细介绍sqlite3_stmt类(八)

返回:SQLite—系列文章目录 上一篇:SQLiteC/C接口详细介绍sqlite3_stmt类(七) 下一篇: SQLiteC/C接口详细介绍sqlite3_stmt类(九) 27、sqlite3_column_int 函数 sqlite3_column_int 用于返…

0.2W超精密金属膜电阻器-陶瓷封装-塑封

高精度欧姆值,温度系数低 长期稳定性,无感设计 UPSC 欧姆范围:40 Ω 至 5 MΩ UPR 欧姆范围:10 Ω 至 5 MΩ 温度系数:2 ppm/C 至 25 ppm/C 符合 RoHS 规范 由EE1/10(RE55)成品高精密金属膜电阻器选配组装而成&am…

java数据结构与算法刷题-----LeetCode75. 颜色分类

java数据结构与算法刷题目录(剑指Offer、LeetCode、ACM)-----主目录-----持续更新(进不去说明我没写完):https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 双指针两次遍历2. 三指针 1. 双指针两次遍历 解题思路&#…

以行动激发消费活力,加多宝引领高品质消费浪潮

2024年“315”期间,加多宝携手全国多地市场监督管理局、消费者协会等单位,围绕今年“激发消费活力”主题,积极配合各地相关政府部门开展系列宣传活动,以实际行动呼吁切实保护消费者合法权益,共建诚信消费环境&#xff…

Python综合实战案例-数据清洗分析

写在前面: 本次是根据前文讲解的爬虫、数据清洗、分析进行的一个纵隔讲解案例,也是对自己这段时间python爬虫、数据分析方向的一个总结。 本例设计一个豆瓣读书数据⽂件,book.xlsx⽂件保存的是爬取豆瓣⽹站得到的图书数据,共 6067…