详解 Spark Streaming 的 DStream 对象

一、DStream 的创建

1. 通过 RDD 队列

DStream 在内部实现上是一系列连续的 RDD 来表示。每个 RDD 包含有采集周期内的数据

/**
基本语法:StreamingContext.queueStream(queueOfRDDs: Queue, oneAtATime = false)
*/
object DStreamFromRddQueue {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val queueOfRdds = mutable.Queue[RDD[Int]]()val ds = ssc.queueStream(queueOfRdds, oneAtATime = false)ds.print()ssc.start()// 向 RDD 队列中添加元素for(i <- 1 to 5) {queueOfRdds += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}

2. 通过自定义数据源

通过继承 Receiver 抽象类,并实现 onStart、onStop 方法来自定义数据源采集

/**实现步骤:1.继承 Receiver[T]() 抽象类,定义泛型,并传递参数1.1 泛型是采集的数据类型1.2 传递的参数是存储级别,StorageLevel 中的枚举值2.实现 onStart、onStop 方法3.使用 receiverStream(receiver) 创建 DStream
*/
object DStreamFromDiy {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))// 使用自定义数据源采集数据val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())ds.print()ssc.start()ssc.awaitTermination()}
}// 自定义数据源采集
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {private val flag = true// 当 ssc.start() 调用后,启动一个独立的线程去采集数据override def onStart(): Unit = {new Thread(new Runnable(){override def run() {while(flag) {val data = "数据为:" + new Random().nextInt(10)// 将数据存储封装为 DStreamstore(data)Thread.sleep(500)}}}, "receiver").start()}// 停止数据采集override def onStop(): Unit = {flag = false}
}

3. 通过 Kafka 数据源

3.1 版本选型
  • ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。所以当接收数据的 Executor 和计算的 Executor 速度不同时,特别在接收数据的 Executor 速度大于计算的 Executor 速度时,会导致计算数据的节点内存溢出。(早期版本中提供此方式,当前版本不适用)
  • DirectAPI:是由计算的 Executor 来主动接收消费 Kafka 的数据,速度由自身控制
3.2 实现
  • 引入依赖

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
    </dependency>
    <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version> 
    </dependency>
    
  • 编码

    /**
    基本语法:使用 KafkaUtils 工具类的 createDirectStream[K, V] 方法连接 Kafka 创建
    相关参数:1.StreamingContext:环境对象2.LocationStrategies:位置策略,PreferConsistent 表示自动匹配3.ConsumerStrategies:消费策略,Subscribe[K,V](Set(topic)) 订阅主题4.Map[String, Object]:Kafka 连接配置参数
    */
    object DStreamFromKafka {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))// 封装 Kafka 配置参数val kafkaConf: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->  "linux1:9092,linux2:9092,linux3:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer")// 创建 Kafka 数据源的 DStreamval ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("topic1")),kafkaConf)// 打印输出val data: DStream[String] = ds.map(_.value())data.print()ssc.start()ssc.awaitTermination()}
    }
    
  • 测试

    • 启动 Zookeeper 和 Kafka 集群
    • 运行程序 main 方法
    • 向 Kafka 的主题中生产数据,并查看程序控制台输出

二、DStream 的转换

1. 无状态转换操作

无状态的操作只作用于一个采集周期的 RDD 中,不同采集周期的 RDD 之间的操作结果不会归约汇总

1.1 常见操作
/**
常见原语:map/flatMap/filter/repartition/reduceByKey/groupByKey
*/
object DStreamNoStateChange {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))val wordCount = wordAsOne.reduceByKey(_ + _)wordCount.print()/*测试:在 cmd 窗口执行 nc -lp 999,然后分次输入 10 个 hello结果:由于采集周期为 3 秒,所以输出结果为多个 (hello, num),数量与采集周期个数一致,不同的采集周期结果是独立输出的*/ssc.start()ssc.awaitTermination()}
}
1.2 transform
/**
功能:可以将 DStream 中底层的 RDD 获取进行操作,可以扩展功能和实现周期性代码执行
基本语法:Dstream.transform(func: RDD => RDD): Dstream
*/
object DStreamTransform {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val word = ssc.socketTextStream("localhost", 9999)word.transform(rdd => {// Driver端:此处的代码会周期性的执行,每个采集周期执行一次rdd.map(str => {// Executor 端str    })})ssc.start()ssc.awaitTermination()}
}
1.3 join
/**
功能:对当前批次(采集周期)内的两个 DStream 中各自的 RDD 中相同的 key 进行 join,效果与两个 RDD 的 join 相同
基本语法:Dstream1.join(Dstream2)
*/
object DStreamTransform {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val ds9999 = ssc.socketTextStream("localhost", 9999)val ds8888 = ssc.socketTextStream("localhost", 8888)val data: DStream[(String, (Int, Int))] = ds9999.map((_, 1)).join(ds888.map((_, 2)))data.print()ssc.start()ssc.awaitTermination()}
}

2. 有状态转换操作

有状态转换操作会将一个采集周期的结果(状态)保存到检查点,并且不断将下一个采集周期的结果(状态)更新保存到检查点中,最终输出所有采集周期归约汇总的结果

2.1 updateStateByKey
/**基本语法:DStream.updateStateByKey(func: (seq: Seq[T], op: Option[T]) => op)参数:1.seq 表示当前采集周期相同 key 的 Value 集合2.op 表示检查点中相同 key 的总 Value (Some 或 None)说明:1.使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态2.updateStateByKey会根据 key 对数据的状态进行更新
*/
object DStreamStateChange {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))// 必须设置检查点保存路径ssc.checkpoint("cp")val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))// val wordCount = wordAsOne.reduceByKey(_ + _)val wordCount = wordAsOne.updateStateByKey((seq: Seq[Int], op: Option[Int]) => {val sum = seq.sumval newVal = op.getOrElse(0) + sumOption(newVal)})wordCount.print()/*测试:在 cmd 窗口执行 nc -lp 999,然后分次输入 10 个 hello结果:最终的输出结果为 (hello, 10)*/ssc.start()ssc.awaitTermination()}
}
2.2 window 操作
/**基本语法:1.DStream.window(windowSize: Duration, step: Duration)参数:1.windowSize 表示窗口大小2.step 表示窗口滑动步长说明:1.窗口大小和步长必须为采集周期大小的整数倍2.步长默认为一个采集周期大小2.countByWindow(windowSize: Duration, step: Duration):统计滑动窗口计数流中的元素个数3.reduceByWindow(func, windowSize: Duration, step: Duration):通过自定义函数聚合滑动窗口流中的元素4.reduceByKeyAndWindow(func, windowSize: Duration, step: Duration, [numTasks]):通过自定义函数聚合滑动窗口流中相同 key 的 value5.reduceByKeyAndWindow(func, invFunc, windowSize: Duration, step: Duration, [numTasks])参数说明:1.func 表示窗口中相同 key 的聚合计算方式2.invFunc 表示删除在窗口滑动后不再存在的数据值
*/
object DStreamWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint("cp")val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))// val ds = wordAsOne.window(Seconds(6)) // 会有重复数据val ds = wordAsOne.window(Seconds(6), Seconds(6))val wordCount = ds.reduceByKey(_ + _)// 必须设置检查点保存路径val wordCount1 = wordAsOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,(x: Int, y: Int) => x - y,Seconds(6), Seconds(6))wordCount.print()// wordCount1.print()ssc.start()ssc.awaitTermination()}
}

三、DStream 的输出

SparkStreaming 也有惰性机制,执行输出操作才会触发所有 DStream 计算的执行

/**基本语法:1.print():将 DStream 输出到控制台,只有这个输出会带时间戳2.saveAsTextFiles(prefix, [suffix]):将 DStream 保存为 text 格式文件,每一批次的存储文件名基于参数中的 prefix 和 suffix (prefix-Time_IN_MS[.suffix])3.saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 DStream 中的数据保存为SequenceFiles,每一批次的存储文件名为 "prefix-TIME_IN_MS[.suffix]"4.saveAsHadoopFiles(prefix, [suffix]):将 DStream 中的数据保存为 Hadoop files,每一批次的存储文件名为 "prefix-TIME_IN_MS[.suffix]"5.foreachRDD(func):最通用的输出操作,将函数 func 用于 DStream 的每一个 RDD,可以将 RDD 存入文件或者通过网络将其写入数据库说明:使用foreachRDD(func)把数据写到 MySQL 的外部数据库的注意事项:1.创建连接对象不能写在 driver 层面(因为所有的连接对象都不能序列化)2.如果写在 foreachRDD 中则每个 RDD 中的每一条数据都会创建连接,影响性能和资源;3.推荐使用 RDD 的 foreachPartition() 算子,在每个分区迭代中创建连接
*/
object DStreamOutput {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))val wordCount = wordAsOne.reduceByKey(_ + _)// wordCount.print() // SparkStreaming 没有输出操作会报错wordCount.foreachRDD(rdd => {rdd.foreach(println)})ssc.start()ssc.awaitTermination()}
}

四、SparkStreaming 优雅的关闭

SparkStreaming 任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,而分布式程序没办法做到一个个进程去停止,所以需要使用第三方系统 (MySQL/Redis/Zookeepr/HDFS) 来控制内部程序关闭

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}class MonitorStop(ssc: StreamingContext) extends Runnable {override def run(): Unit = {val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "hello")while(true) {try{Thread.sleep(5000)} catch {case e: InterruptedException => e.printStackTrace()}val state: StreamingContextState = ssc.getStateval bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))if(bool) {if(state == StreamingContextState.ACTIVE) {// 优雅地关闭,停止接收新数据,并将已有的数据处理完后再关闭ssc.stop(stopSparkContext = true, stopGracefully = true)System.exit(0)}}}}
}import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkTest {def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: 
Option[Int]) => {//当前批次内容的计算val sum: Int = values.sum//取出状态信息中上一次状态 val lastStatu: Int = status.getOrElse(0)Some(sum + lastStatu)}val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")//设置优雅的关闭sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint("./ck")val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)val word: DStream[String] = line.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = word.map((_, 1))val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)wordAndCount.print()ssc}def main(args: Array[String]): Unit = {// 从检查点恢复数据val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())new Thread(new MonitorStop(ssc)).start()ssc.start()ssc.awaitTermination()}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25303.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java——简单图书管理系统

前言&#xff1a; 一、图书管理系统是什么样的&#xff1f;二、准备工作分析有哪些对象&#xff1f;画UML图 三、实现三大模块用户模块书架模块管理操作模块管理员操作有这些普通用户操作有这些 四、Test测试类五、拓展 哈喽&#xff0c;大家好&#xff0c;我是无敌小恐龙。 写…

Spark作业运行异常慢的问题定位和分析思路

一直很慢 &#x1f422; 运行中状态、卡住了&#xff0c;可以从以下两种方式入手&#xff1a; 如果 Spark UI 上&#xff0c;有正在运行的 Job/Stage/Task&#xff0c;看 Executor 相关信息就好。 第一步&#xff0c;如果发现卡住了&#xff0c;直接找到对应的 Executor 页面&a…

「前端+鸿蒙」鸿蒙应用开发-TS函数

在 TypeScript 中&#xff0c;函数是一等公民&#xff0c;这意味着函数可以作为参数传递、作为其他函数的返回值&#xff0c;甚至可以赋值给变量。TypeScript 为 JavaScript 的函数增加了类型系统&#xff0c;使得函数的参数和返回值都具有明确的类型。 TS快速入门-函数 基本函…

论文写作神器:15大参考文献来源网站推荐

撰写论文需要大量的参考文献支撑论点,这对在校学生和学者而言都是一大挑战。今天,我们可以充分利用网络资源,高效获取优质文献,摆脱遍查图书馆和杂志社的艰辛。本文就为大家推荐15大优质的参考文献来源网站,助力论文写作! AIPaperDone 拥有完美文献综述的AI论文网站 - AIPaperD…

模糊控制器实现对某个对象追踪输入

MATLAB是一个十分便捷的软件&#xff0c;里面提供了许多集成的组件&#xff0c;本文利用simulink实现模糊控制器实现对某个对象追踪输入。 这里的对象根据自己的需求可以修改&#xff0c;那么搭建一个闭环控制系统并不是难事儿&#xff0c;主要是对于模糊控制器参数的设置&…

win设置ftp服务器~java通过ftp下载文件

1.先设置ftp 2.打开服务 3.设置站点 4.起名字 这样就可以了 5.剩下的就是设置权限和账号了&#xff0c;找到对应的按钮就可以了 6.下载文件的代码 public byte[] downloadFile(File file) throws IOException{ByteArrayOutputStream out new ByteArrayOutputStream();toDi…

seerfar选品功能,OZON运营插件工具seerfar

在当今这个数字化、信息化的时代&#xff0c;电子商务的飞速发展使得越来越多的商家开始关注如何更高效地运营自己的在线店铺。其中&#xff0c;选品作为电商运营的重要一环&#xff0c;直接影响着店铺的流量、转化率和利润。在OZON这样的电商平台上&#xff0c;如何快速、准确…

第四篇红队笔记-百靶精讲之Prime-wfuzz-wpscan-openssl enc

靶机Prime渗透 主机发现 nmap扫描与分析 目录爆破与模糊测试 dirb 目录扫描 dev secret.txt wfuzz发现 file参数 根据secret.txt-location.txt 和 file参数结合 secrettier360 根据filelocation.txt得到的on some other php page&#xff08;改用之前扫到image.p…

chrony内网同步服务器时间

当前需要在10.26.24.62和10.26.24.61两个服务器上设置chrony同步时间&#xff0c;其中10.26.24.62为NTP时间服务器&#xff0c;10.26.24.61去10.26.24.62同步时间 检查Chrony配置文件&#xff1a; 确认10.26.24.62&#xff08;NTP服务器&#xff09;的配置文件 /etc/chrony/c…

Spring 自动配置 condition

目录 前言 1. 自定义condition加载bean 1.1. 自定义一个condition注解 1.2. 实现自定义注解对应的实现类 1.3. 使用如上注解 1.4. 使用Spring上下文获取一下改bean 2. 我们来看看Spring是如何加载redisTemplate的。 2.1. 找到Spring的autoconfigure的jar包&#xff0c;我们…

Web前端高亮:深度解析高亮技术的四个方面、五个方面、六个方面与七个方面

Web前端高亮&#xff1a;深度解析高亮技术的四个方面、五个方面、六个方面与七个方面 在Web前端开发中&#xff0c;高亮技术以其独特的功能和广泛的应用场景&#xff0c;为网页增添了丰富的交互体验。然而&#xff0c;高亮技术的实现并非一蹴而就&#xff0c;它涉及到多个方面…

第二十章 SOAP 错误处理 - 产生故障的方法

文章目录 第二十章 SOAP 错误处理 - 产生故障的方法产生故障的方法MakeFault()MakeFault12()MakeSecurityFault()MakeStatusFault() 第二十章 SOAP 错误处理 - 产生故障的方法 产生故障的方法 MakeFault() classmethod MakeFault(pFaultCode As %String, pFaultString As %S…

Steam下载游戏很慢?一个设置解决!

博主今天重装系统后&#xff0c;用steam下载发现巨慢 500MB&#xff0c;都要下载半小时。 平时下载软件&#xff0c;一般1分钟就搞定了&#xff0c;于是大致就知道&#xff0c;设置应该出问题了 于是修改了&#xff0c;如下设置之后&#xff0c;速度翻了10倍。 另外&#x…

【案例分享】印前制版工单系统:“鹿山科技”助力“铭匠数据”重塑业务流程

内容概要 本文介绍了鹿山信息科技通过明道云HAP平台的数字化解决方案提升了铭匠数据在印前制版行业的效率。周口铭匠数据科技有限公司位于河南省周口市沈丘县&#xff0c;是一家专注于印前制版设计服务的公司&#xff0c;成立于2023年。企业在销售业务、版材制作生产和美工设计…

计算机组成原理复习笔记

前言 就是按照考试的题型写的总结 非常应试版 题型 一、进制转换 只考 十进制 二进制 十六进制 之间的相互转换 一个个看 &#xff08;1&#xff09;十进制 转其他 转二进制&#xff1a;除以2 从小到大取余数&#xff08;0或1&#xff09; 转十六进制 &#xff1a; 除以1…

爬虫可以不必自己写,使用ChatGPT编写抓取电影评论数据脚本

经常去新华书店看看有没有什么新书上架&#xff0c;还是更新挺及时的&#xff0c;可以反映新的技术趋势。这不&#xff0c;最近就看到了这本《巧用 ChatGPT 快速搞定数据分析》&#xff0c;作者是个大牛&#xff0c;第一次看到prompt可以这么写&#xff0c;得写这么长&#xff…

htb-linux-6-beep

nmap web渗透 目录扫描 漏洞关键词 shell py脚本执行 flag root 目前的权限 nmap root

【深度学习】深度学习之巅:在 CentOS 7 上打造完美Python 3.10 与 PyTorch 2.3.0 环境

【深度学习】深度学习之巅&#xff1a;在 CentOS 7 上打造完美Python 3.10 与 PyTorch 2.3.0 环境 大家好 我是寸铁&#x1f44a; 总结了一篇【深度学习】深度学习之巅&#xff1a;在 CentOS 7 上打造完美Python 3.10 与 PyTorch 2.3.0 环境✨ 喜欢的小伙伴可以点点关注 &#…

本地部署AI大模型 —— Ollama文档中文翻译

写在前面 来自Ollama GitHub项目的README.md 文档。文档中涉及的其它文档未翻译&#xff0c;但是对于本地部署大模型而言足够了。 Ollama 开始使用大模型。 macOS Download Windows 预览版 Download Linux curl -fsSL https://ollama.com/install.sh | sh手动安装说明 …