Spark
一、SparkStreaming
Spark Streaming 用于流式数据的处理(准实时,微序列)。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语,如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
- DStream 离散化流,discretized stream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以 简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。
- 背压机制(即 Spark Streaming Backpressure),根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。即协调处理数据与接收数据的速率。
- 处理流程
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
二、DStream创建
——RDD队列创建
测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。
//1.初始化 Spark 配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)
}
ssc.awaitTermination()
——自定义数据源
需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
/**
自定义数据源
*/
class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var flage = true//最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Sparkoverride def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {while (flage) {val message = "采集的数据为:" + new Random().nextInt(10).toString()store(message)Thread.sleep(300)}}}).start()}override def onStop(): Unit = {flage = false}
}val message = sc.receiverStream(new MyReceiver())
message.print()
——Kafka数据源
ReceiverAPI(早期版本):需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在 的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor 速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。
DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。
/** Kafka 0.10版本下的数据消费配置 */
//1.定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](ArrowAssoc(BOOTSTRAP_SERVERS_CONFIG) ->"hadoop102:9092,hadoop103:9092,hadoop104:9092",GROUP_ID_CONFIG -> "5yw","key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer"
)
//2.读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("5yw"), kafkaPara))
//3.将每条消息的 KV 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.print()
三、DStream数据转换
DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及 各种 Window 相关的原语。注意:针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用
SparkStreaming是将一个流数据划分为一个个小时间段,每个时间段封装为一个RDD。无状态转化即不保存各采集周期的数据,各RDD间无关。有状态转化操作即保存采集周期的数据,RDD间相关。
——无状态转化操作
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
使用transform,一来是因为DStream本身也对map等函数进行了封装,但是有些功能尚不完善,所以我们使用transform将其转化为RDD进行操作;二来是针对某些伴随RDD的输入所进行的周期性操作。
//创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
//转换为 RDD 操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd =>
{ //********************* 这里的代码会周期执行,即每进来一个RDD都会执行一次val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value
})
//打印
wordAndCountDStream.print
两数据流间实现join操作。
//1.从端口获取数据创建流
val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)
//2.将两个流转换为 KV 类型
val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))
//3.流的 JOIN
val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)
——有状态转化操作
- UpdateStateByKey()
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。
给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对 应的(键,状态)对组成的。
updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功 能,需要做下面两步: 1. 定义状态,状态可以是一个任意的数据类型。 2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
/*** 根据key对数据的状态进行更新* 传递两个参数* 第一个:表示相同的key的value数据* 第二个:表示缓冲区相同key的value数据,因为初始缓冲区可能没有数据,所以定义为Option[]** 在wordCount中,加入现在缓冲区已经有(word, 3) (hello, 4)* 第一个参数表示新接收到的RDD的value(1),第二个参数表示为(3) (4) (根据不同key进行对应计算)* */val value2 = word2.updateStateByKey((seq: Seq[Int], buf:Option[Int]) => {val newCount = buf.getOrElse(0) + seq.sumOption(newCount)})
- WindowOperations()
当我们希望对多个采集周期的数据进行分析,可以设置窗口。Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Streaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
➢ 窗口时长:计算内容的时间范围
➢ 滑动步长:隔多久触发一次计算。
允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation)。slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。
val line2 = sc.socketTextStream("localhost", 8083) //监听8082端口val word2 = line2.flatMap(_.split(" ")).map((_, 1))//窗口大小必须是采集周期的整数倍val window = word2.window(Seconds(6), Seconds(3)) //每隔三秒对过去六秒的数据进行获取window.reduceByKey(_+_).print()
四、DStream输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。
方法 | 作用 |
---|---|
print() | 直接打印在控制台 |
saveAsTextFiles(prefix, [suffix]) | 以text文件存储,每一批次文件名:prefix-Time_IN_MS[.suffix] |
saveAsObjectFiles(prefix, [suffix]) | 以 Java 对象序列化的方式将 Stream 中的数据保存为 SequenceFiles . |
saveAsHadoopFiles(prefix, [suffix]) | 将 Stream 中的数据保存为 Hadoop files |
foreachRDD(rdd => {}) | 参数func 应该实现将每一个 RDD 数据推送到外部系统,如将 RDD 存入文件或者通过网络将其写入数据库。注意:外部连接对象不应该为每一个RDD创建,参考SparkCore的foreachPartition方法 |
五、DStream关闭及数据恢复
//TODO 数据恢复
val sc = StreamingContext.getActiveOrCreate("cp", () => {val sparkconf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamning")val sc = new StreamingContext(sparkconf, Seconds(3))sc
})
sc.checkpoint("cp")
//TODO 关闭
sc.start()
new Thread(new Runnable {override def run(): Unit = {val state: StreamingContextState = sc.getState //获取当前streamingif (true) { //这里应该是任务是否完成的判断,比如数据库是否读取完成if (state == StreamingContextState.ACTIVE) { //只有检测当前sparkstreaming是激活的,才需要执行关闭操作sc.stop(stopSparkContext = true, stopGracefully = true) //当执行关闭操作时,接收操作先关闭,等待已接收数据都处理完成后,整个关闭System.exit(0) //线程关闭}}}
}).start()
sc.awaitTermination()
具体代码可以参考:
https://github.com/Ostrich5yw/java4BigData