大数据——SparkStreaming学习笔记

Spark

一、SparkStreaming

​ Spark Streaming 用于流式数据的处理(准实时,微序列)。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语,如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
在这里插入图片描述

  • DStream 离散化流,discretized stream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以 简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。
  • 背压机制(即 Spark Streaming Backpressure),根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。即协调处理数据与接收数据的速率。
  • 处理流程

​ Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
在这里插入图片描述

二、DStream创建

——RDD队列创建

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。

//1.初始化 Spark 配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)
}
ssc.awaitTermination()

——自定义数据源

​ 需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

/**
自定义数据源
*/
class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var flage = true//最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Sparkoverride def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {while (flage) {val message = "采集的数据为:" + new Random().nextInt(10).toString()store(message)Thread.sleep(300)}}}).start()}override def onStop(): Unit =  {flage = false}
}val message = sc.receiverStream(new MyReceiver())
message.print()

——Kafka数据源

ReceiverAPI(早期版本):需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在 的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor 速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。

DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

/** Kafka 0.10版本下的数据消费配置 */
//1.定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](ArrowAssoc(BOOTSTRAP_SERVERS_CONFIG) ->"hadoop102:9092,hadoop103:9092,hadoop104:9092",GROUP_ID_CONFIG -> "5yw","key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer"
)
//2.读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](sc,                                             		    LocationStrategies.PreferConsistent,                                    ConsumerStrategies.Subscribe[String, String](Set("5yw"), kafkaPara))
//3.将每条消息的 KV 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.print()

三、DStream数据转换

​ DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及 各种 Window 相关的原语。注意:针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用

​ SparkStreaming是将一个流数据划分为一个个小时间段,每个时间段封装为一个RDD。无状态转化即不保存各采集周期的数据,各RDD间无关。有状态转化操作即保存采集周期的数据,RDD间相关。

——无状态转化操作

​ Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

​ 使用transform,一来是因为DStream本身也对map等函数进行了封装,但是有些功能尚不完善,所以我们使用transform将其转化为RDD进行操作;二来是针对某些伴随RDD的输入所进行的周期性操作。

//创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
//转换为 RDD 操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => 
{     //********************* 这里的代码会周期执行,即每进来一个RDD都会执行一次val words: RDD[String] = rdd.flatMap(_.split(" "))         val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value
})
//打印
wordAndCountDStream.print

​ 两数据流间实现join操作。

//1.从端口获取数据创建流
val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)
//2.将两个流转换为 KV 类型
val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))
//3.流的 JOIN
val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)

——有状态转化操作

  1. UpdateStateByKey()

​ UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。

​ 给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对 应的(键,状态)对组成的。

​ updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功 能,需要做下面两步: 1. 定义状态,状态可以是一个任意的数据类型。 2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

    /*** 根据key对数据的状态进行更新* 传递两个参数* 第一个:表示相同的key的value数据* 第二个:表示缓冲区相同key的value数据,因为初始缓冲区可能没有数据,所以定义为Option[]** 在wordCount中,加入现在缓冲区已经有(word, 3) (hello, 4)* 第一个参数表示新接收到的RDD的value(1),第二个参数表示为(3) (4) (根据不同key进行对应计算)* */val value2 = word2.updateStateByKey((seq: Seq[Int], buf:Option[Int]) => {val newCount = buf.getOrElse(0) + seq.sumOption(newCount)})
  1. WindowOperations()

​ 当我们希望对多个采集周期的数据进行分析,可以设置窗口。Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Streaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

➢ 窗口时长:计算内容的时间范围

➢ 滑动步长:隔多久触发一次计算。

​ 允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation)。slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。

    val line2 = sc.socketTextStream("localhost", 8083) //监听8082端口val word2 = line2.flatMap(_.split(" ")).map((_, 1))//窗口大小必须是采集周期的整数倍val window = word2.window(Seconds(6), Seconds(3))	//每隔三秒对过去六秒的数据进行获取window.reduceByKey(_+_).print()

四、DStream输出

​ 输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。

方法作用
print()直接打印在控制台
saveAsTextFiles(prefix, [suffix])以text文件存储,每一批次文件名:prefix-Time_IN_MS[.suffix]
saveAsObjectFiles(prefix, [suffix])以 Java 对象序列化的方式将 Stream 中的数据保存为 SequenceFiles .
saveAsHadoopFiles(prefix, [suffix])将 Stream 中的数据保存为 Hadoop files
foreachRDD(rdd => {})参数func 应该实现将每一个 RDD 数据推送到外部系统,如将 RDD 存入文件或者通过网络将其写入数据库。注意:外部连接对象不应该为每一个RDD创建,参考SparkCore的foreachPartition方法

五、DStream关闭及数据恢复

//TODO 数据恢复
val sc = StreamingContext.getActiveOrCreate("cp", () => {val sparkconf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamning")val sc = new StreamingContext(sparkconf, Seconds(3))sc
})
sc.checkpoint("cp")
//TODO 关闭
sc.start()
new Thread(new Runnable {override def run(): Unit = {val state: StreamingContextState = sc.getState		//获取当前streamingif (true) {     //这里应该是任务是否完成的判断,比如数据库是否读取完成if (state == StreamingContextState.ACTIVE) {    //只有检测当前sparkstreaming是激活的,才需要执行关闭操作sc.stop(stopSparkContext = true, stopGracefully = true)   //当执行关闭操作时,接收操作先关闭,等待已接收数据都处理完成后,整个关闭System.exit(0)    //线程关闭}}}
}).start()
sc.awaitTermination()

具体代码可以参考:
https://github.com/Ostrich5yw/java4BigData

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/263431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[推荐]ORACLE SQL:经典查询练手第三篇(不懂装懂,永世饭桶!)

[推荐]ORACLE SQL&#xff1a;经典查询练手第三篇(不懂装懂&#xff0c;永世饭桶&#xff01;) [推荐]ORACLE SQL&#xff1a; 经典查询练手第三篇(不懂装懂&#xff0c;永世饭桶&#xff01;) ——通过知识共享树立个人品牌。 本文与大家共同讨论与分享ORACLE SQL的一些常用经…

ACM试题 - ASCII码排序 - Java中字符与对应ASCII码的转换

Java中字符转换对应ASCII码有两种方式&#xff1a; 第一种&#xff1a; char c a; byte b (byte)c; // b97 第二种&#xff1a; char c a; int b c; // b97 而一个ASCII码转换成相应字符则仅需强制转换&#xff1a; int a 97; char c (char)a; // ca ACM试…

DotNET多线程使用初探

最近几周一直在做DotNET WinForm开发&#xff0c;陆陆续续有些收获&#xff0c;希望能够有空好好整理整理。记下来以免以后又忘了。:-) 一、最简单的线程使用方法 新建一个C# Windows应用程序项目&#xff0c;在最前面的引用代码那增加一行using System.Threading;在界面上扔个…

Linux mount: Structure needs cleaning 错误解决方法

“mount: Structure needs cleaning”报错可以使用xfs_repair来修复&#xff0c;但是要注意 xfs_repair修复的分区中的文件都会丢失&#xff0c;即使是du能正常显示的文件也会丢失。 [rootyw-0-0 /]# mount -a mount: Structure needs cleaning[rootyw-0-0 /]# xfs_repair /dev…

大数据——Spark学习笔记(配置)

Spark运行环境 spark ui web http://hadoop102:8080 历史服务器 http://hadoop102:18080 一. 本地模式(Local)——单机运行 本地模式&#xff1a;运行 bin/spark-shell提交应用&#xff1a;运行 bin/spark-submit --class org.apache.spark.examples.SparkPi …

Linux软件安装——安装软件的命令

Linux软件安装——安装软件的命令 摘要&#xff1a;本文主要学习了如何在Linux系统中安装、更新、卸载软件。 rpm命令 rpm命令用来在Linux系统上进行软件的安装。 基本语法 安装命令&#xff1a; 1 rpm -ivh 包全名 如果没有安装则安装&#xff0c;如果已经安装则升级&#xff…

JavascriptHelp

阅读全文&#xff1a;http://www.cckan.net/forum.php?modviewthread&tid147usingSystem;usingSystem.Data;usingSystem.Configuration;usingSystem.Web;usingSystem.Web.Security;usingSystem.Web.UI;usingSystem.Web.UI.WebControls;usingSystem.Web.UI.WebControls.Web…

GoLang之方法与接口

GoLang之方法与接口 Go语言没有沿袭传统面向对象编程中的诸多概念&#xff0c;比如继承、虚函数、构造函数和析构函数、隐藏的this指针等。 方法 Go 语言中同时有函数和方法。方法就是一个包含了接受者&#xff08;receiver&#xff09;的函数&#xff0c;receiver可以是内置类…

孙继海化装

于输得太惨&#xff0c;中国球员怕回国被球迷打&#xff0c;孙继海决定化装成“乞丐”。化好之后遇到一个老太太&#xff0c;他想看看自己化装的效果&#xff0c;就给了老太太100块钱&#xff0c;问&#xff1a;“你知道我是谁吗&#xff1f;”老太太看了看说&#xff1a;“你是…

sql 时间函数

1. 当前系统日期、时间 select getdate() 2. dateadd 在向指定日期加上一段时间的基础上&#xff0c;返回新的 datetime 值 例如&#xff1a;向日期加上2天 select dateadd(day,2,’2004-10-15′) –返回&#xff1a;2004-10-17 00:00:00.000 3. datediff 返回跨两个指定日期的…

大数据——Hive学习笔记

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

Eclipse 常用快捷键

Eclipse 常用快捷键 快捷键 描述 编辑 Ctrl1 快速修复&#xff08;最经典的快捷键,就不用多说了&#xff0c;可以解决很多问题&#xff0c;比如import类、try catch包围等&#xff09; CtrlShiftF 格式化当前代码 CtrlShiftM 添加类的import导入 CtrlShiftO 组织类的…

[Story]狗尾草花园

不知道为什么&#xff0c;很多年过去&#xff0c;我却依然记得这个故事。很小的时候&#xff0c;大概四、五岁吧&#xff0c;在河滩上和几个姐姐&#xff0c;一起玩过家家。他们大概十三&#xff0c;四岁吧&#xff0c;反正比我大好多。分成两组&#xff0c;我在我们一组当然是…

UML各种图

转载自&#xff1a; https://www.cnblogs.com/jiangds/p/6596595.html UML&#xff08;Unified Modeling Language&#xff09;是一种统一建模语言&#xff0c;为面向对象开发系统的产品进行说明、可视化、和编制文档的一种标准语言。下面将对UML的九种图包图的基本概念进行介绍…

Interactive Reflection Editing (SIGGRAPH ASIA 09)

讓使用者可以將reflection做editing 以達到使用者想要的結果INPUT: a 3d scene大部分是針對卡通等NPR的場景 對於reflection做editing利用shader也可以控制reflection這篇可以讓使用者做更動沒有使用BRDF做reflection的計算转载于:https://www.cnblogs.com/GameJan/archive/201…

大数据——Hive学习笔记(配置)

具体代码可以参考&#xff1a; https://github.com/Ostrich5yw/java4BigData

CSDN-markdown编辑器语法——字体、字号与颜色

Markdown是一种可以使用普通文本编辑器编写的标记语言&#xff0c;通过类似HTML的标记语法&#xff0c;它可以使普通文本内容具有一定的格式。但是它本身是不支持修改字体、字号与颜色等功能的&#xff01; CSDN-markdown编辑器是其衍生版本&#xff0c;扩展了Markdown的功能&a…

思科携手中兴掌握3G话语权 剑指华为3COM组合

思科携手中兴掌握3G话语权 剑指华为3COM组合 中国电信市场又迎来了一次中外巨头的牵手。昨日,思科系统公司和中兴通讯股份有限公司共同宣布,双方已签署了一份战略合作协议,将进行范围广泛的合作。这也是林正刚上任思科系统中国总裁后,祭出的首个“中国攻略”。 根据协议,两家公…

JS服务器端开发基础篇(Array.slice方法和splice方法)

Array.slice方法和splice方法在众多的JS数组中属于比较复杂的一个方法&#xff0c;而且容易记混。搜索网络上很多资料都没有发现系统的总结。特别归纳如下&#xff0c;不完全处还希望各位批评指正。一、slice方法格式&#xff1a;arrayObj.slice(start, [end])功能&#xff1a;…

HashMap(摘)

1.HashMap简介 HashMap基于哈希表的Map接口实现&#xff0c;是以key-value存储形式存在。&#xff08;除了不同步和允许使用 null 之外&#xff0c;HashMap 类与 Hashtable 大致相同。)HashMap 的实现不是同步的&#xff0c;这意味着它不是线程安全的。它的key、value都可以为n…