大数据——SparkStreaming学习笔记

Spark

一、SparkStreaming

​ Spark Streaming 用于流式数据的处理(准实时,微序列)。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语,如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
在这里插入图片描述

  • DStream 离散化流,discretized stream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以 简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。
  • 背压机制(即 Spark Streaming Backpressure),根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。即协调处理数据与接收数据的速率。
  • 处理流程

​ Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
在这里插入图片描述

二、DStream创建

——RDD队列创建

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。

//1.初始化 Spark 配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)
}
ssc.awaitTermination()

——自定义数据源

​ 需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

/**
自定义数据源
*/
class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var flage = true//最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Sparkoverride def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {while (flage) {val message = "采集的数据为:" + new Random().nextInt(10).toString()store(message)Thread.sleep(300)}}}).start()}override def onStop(): Unit =  {flage = false}
}val message = sc.receiverStream(new MyReceiver())
message.print()

——Kafka数据源

ReceiverAPI(早期版本):需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在 的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor 速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。

DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

/** Kafka 0.10版本下的数据消费配置 */
//1.定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](ArrowAssoc(BOOTSTRAP_SERVERS_CONFIG) ->"hadoop102:9092,hadoop103:9092,hadoop104:9092",GROUP_ID_CONFIG -> "5yw","key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer"
)
//2.读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](sc,                                             		    LocationStrategies.PreferConsistent,                                    ConsumerStrategies.Subscribe[String, String](Set("5yw"), kafkaPara))
//3.将每条消息的 KV 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.print()

三、DStream数据转换

​ DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及 各种 Window 相关的原语。注意:针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用

​ SparkStreaming是将一个流数据划分为一个个小时间段,每个时间段封装为一个RDD。无状态转化即不保存各采集周期的数据,各RDD间无关。有状态转化操作即保存采集周期的数据,RDD间相关。

——无状态转化操作

​ Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

​ 使用transform,一来是因为DStream本身也对map等函数进行了封装,但是有些功能尚不完善,所以我们使用transform将其转化为RDD进行操作;二来是针对某些伴随RDD的输入所进行的周期性操作。

//创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
//转换为 RDD 操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => 
{     //********************* 这里的代码会周期执行,即每进来一个RDD都会执行一次val words: RDD[String] = rdd.flatMap(_.split(" "))         val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value
})
//打印
wordAndCountDStream.print

​ 两数据流间实现join操作。

//1.从端口获取数据创建流
val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)
//2.将两个流转换为 KV 类型
val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))
//3.流的 JOIN
val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)

——有状态转化操作

  1. UpdateStateByKey()

​ UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。

​ 给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对 应的(键,状态)对组成的。

​ updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功 能,需要做下面两步: 1. 定义状态,状态可以是一个任意的数据类型。 2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

    /*** 根据key对数据的状态进行更新* 传递两个参数* 第一个:表示相同的key的value数据* 第二个:表示缓冲区相同key的value数据,因为初始缓冲区可能没有数据,所以定义为Option[]** 在wordCount中,加入现在缓冲区已经有(word, 3) (hello, 4)* 第一个参数表示新接收到的RDD的value(1),第二个参数表示为(3) (4) (根据不同key进行对应计算)* */val value2 = word2.updateStateByKey((seq: Seq[Int], buf:Option[Int]) => {val newCount = buf.getOrElse(0) + seq.sumOption(newCount)})
  1. WindowOperations()

​ 当我们希望对多个采集周期的数据进行分析,可以设置窗口。Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Streaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

➢ 窗口时长:计算内容的时间范围

➢ 滑动步长:隔多久触发一次计算。

​ 允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation)。slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。

    val line2 = sc.socketTextStream("localhost", 8083) //监听8082端口val word2 = line2.flatMap(_.split(" ")).map((_, 1))//窗口大小必须是采集周期的整数倍val window = word2.window(Seconds(6), Seconds(3))	//每隔三秒对过去六秒的数据进行获取window.reduceByKey(_+_).print()

四、DStream输出

​ 输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。

方法作用
print()直接打印在控制台
saveAsTextFiles(prefix, [suffix])以text文件存储,每一批次文件名:prefix-Time_IN_MS[.suffix]
saveAsObjectFiles(prefix, [suffix])以 Java 对象序列化的方式将 Stream 中的数据保存为 SequenceFiles .
saveAsHadoopFiles(prefix, [suffix])将 Stream 中的数据保存为 Hadoop files
foreachRDD(rdd => {})参数func 应该实现将每一个 RDD 数据推送到外部系统,如将 RDD 存入文件或者通过网络将其写入数据库。注意:外部连接对象不应该为每一个RDD创建,参考SparkCore的foreachPartition方法

五、DStream关闭及数据恢复

//TODO 数据恢复
val sc = StreamingContext.getActiveOrCreate("cp", () => {val sparkconf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamning")val sc = new StreamingContext(sparkconf, Seconds(3))sc
})
sc.checkpoint("cp")
//TODO 关闭
sc.start()
new Thread(new Runnable {override def run(): Unit = {val state: StreamingContextState = sc.getState		//获取当前streamingif (true) {     //这里应该是任务是否完成的判断,比如数据库是否读取完成if (state == StreamingContextState.ACTIVE) {    //只有检测当前sparkstreaming是激活的,才需要执行关闭操作sc.stop(stopSparkContext = true, stopGracefully = true)   //当执行关闭操作时,接收操作先关闭,等待已接收数据都处理完成后,整个关闭System.exit(0)    //线程关闭}}}
}).start()
sc.awaitTermination()

具体代码可以参考:
https://github.com/Ostrich5yw/java4BigData

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/263431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[推荐]ORACLE SQL:经典查询练手第三篇(不懂装懂,永世饭桶!)

[推荐]ORACLE SQL&#xff1a;经典查询练手第三篇(不懂装懂&#xff0c;永世饭桶&#xff01;) [推荐]ORACLE SQL&#xff1a; 经典查询练手第三篇(不懂装懂&#xff0c;永世饭桶&#xff01;) ——通过知识共享树立个人品牌。 本文与大家共同讨论与分享ORACLE SQL的一些常用经…

DotNET多线程使用初探

最近几周一直在做DotNET WinForm开发&#xff0c;陆陆续续有些收获&#xff0c;希望能够有空好好整理整理。记下来以免以后又忘了。:-) 一、最简单的线程使用方法 新建一个C# Windows应用程序项目&#xff0c;在最前面的引用代码那增加一行using System.Threading;在界面上扔个…

大数据——Spark学习笔记(配置)

Spark运行环境 spark ui web http://hadoop102:8080 历史服务器 http://hadoop102:18080 一. 本地模式(Local)——单机运行 本地模式&#xff1a;运行 bin/spark-shell提交应用&#xff1a;运行 bin/spark-submit --class org.apache.spark.examples.SparkPi …

大数据——Hive学习笔记

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

UML各种图

转载自&#xff1a; https://www.cnblogs.com/jiangds/p/6596595.html UML&#xff08;Unified Modeling Language&#xff09;是一种统一建模语言&#xff0c;为面向对象开发系统的产品进行说明、可视化、和编制文档的一种标准语言。下面将对UML的九种图包图的基本概念进行介绍…

大数据——Hive学习笔记(配置)

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

HashMap(摘)

1.HashMap简介 HashMap基于哈希表的Map接口实现&#xff0c;是以key-value存储形式存在。&#xff08;除了不同步和允许使用 null 之外&#xff0c;HashMap 类与 Hashtable 大致相同。)HashMap 的实现不是同步的&#xff0c;这意味着它不是线程安全的。它的key、value都可以为n…

JavaScript复制数组

转载于:https://blog.51cto.com/5880861/1651346

面向对象基础(一)

我想在这个园子里面的新人&#xff0c;还是"掌门人"&#xff0c;都对"面向对象"这几字非常的耳熟了或者有一定的了解。 但当一个初学都在学习面向对象的时候&#xff0c;会遇到一些瓶颈和麻烦&#xff0c;认为面向对象非常的难以理解。 那么本章节主要是让…

有哪些简单粗暴的logo设计方法?

Logo设计在设计的过程中要考虑很多问题&#xff0c;但是如果时间周期比较短&#xff0c;又要求快速出方案的时候&#xff0c;可以走一些捷径。 在设计logo之前要去了解公司的主营业务、公司规模、公司的名字、公司的主要产品针对的用户群体、甲方的个人偏好、公司原有VIS...这些…

大数据——Zookeeper学习笔记

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

大数据——Zookeeper学习笔记(配置)

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

微软技术节(TechFest 2010)最前沿技术汇总

Twitter 替代 微博时代行将过渡&#xff0c;微媒时代即将到来! 不错&#xff0c;所谓的高官|明星|名人|红人就在这里哈www.gg3m.com! 马上关注鸽姆微媒吧&#xff0c;再不来你就要OUT勒~[导读]微软亚洲研究院的技术依然是这次技术节的重要组成部分&#xff0c;约有36个项目被选…

大数据——Hadoop学习笔记

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

Inside C++ object Model--对象模型概述

在C中, "数据"和"处理数据的操作"是分开声明的, 语言本身并没有支持"数据和函数"之间的关联性. 这种称为"procedural", 由一组算法函数所驱动, 他们处理的是共同的外部数据. 而C, 则在程序风格, 更在程序的思考上有明显的差异, 它以A…

Microsoft Visual C++ Runtime Library Runtime Error的解决的方法

打开浏览器时&#xff0c;出现Microsoft Visual C Runtime Library Runtime Error错误&#xff0c;初步预计是软件冲突&#xff0c;可能有多种出错的方式&#xff0c;我的是浏览器自己主动关闭。 一、 有些时候&#xff0c;在你安装、执行某个软件&#xff0c;可能会得到这样一…

java之hibernate之基于外键的一对一单向关联映射

这篇讲解基于外键的一对一单向关联映射 1.考察如下信息&#xff0c;人和身份证之间是一个一对一的关系。表的设计 注意&#xff1a;基于外键的一对一关联的表结构和多对一的表结构是一致的&#xff0c;但是&#xff0c;外键是唯一的。 2.类的结构 Person.java public class Per…

intellij idea 热部署 spring jvm 版

2019独角兽企业重金招聘Python工程师标准>>> 配置 intellij idea tomcat 加入下面这个参数&#xff0c; 对应jar 到 mvnrepo 下载 。 -javaagent:D:\work\springloaded-1.2.3.RELEASE.jar -noverify 转载于:https://my.oschina.net/u/556878/blog/416563

大数据——Hadoop学习笔记(配置)

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData