Spark-Streaming概述
Spark Streaming 用于流式数据的处理。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。
DStream 是随时间推移而收到的数据的序列。
Spark-Streaming的特点:易用、容错、易整合到spark体系。
Spark-Streaming架构
DStream实操
案例:词频统计
idea中运行
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object wordcount {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 streamingval sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")// 创建 StreamingContext 对象,设置批处理间隔为 3 秒val ssc = new StreamingContext(sparkConf, Seconds(3))// 从指定的主机和端口接收文本流数据val lineStreams = ssc.socketTextStream("node01", 9999)// 将每行文本拆分为单词val wordStreams = lineStreams.flatMap(_.split(" "))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStreams = wordStreams.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)// 打印每个批次中每个单词的计数结果wordAndCountStreams.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()}
}
在虚拟机中输入: nc -lk 9999 并输入数据
结果:
解析:
对数据的操作也是按照 RDD 为单位来进行的
计算过程由 Spark Engine 来完成
DStream 创建
RDD队列
案例:
循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutableobject RDD {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 RDDStreamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")// 创建 StreamingContext 对象,设置批处理间隔为 4 秒val ssc = new StreamingContext(sparkConf, Seconds(4))// 创建一个可变队列,用于存储 RDDval rddQueue = new mutable.Queue[RDD[Int]]()// 从队列中创建输入流,oneAtATime 为 false 表示可以同时处理多个 RDDval inputStream = ssc.queueStream(rddQueue, oneAtATime = false)// 将输入流中的每个元素映射为 (元素, 1) 的键值对val mappedStream = inputStream.map((_, 1))// 按键对键值对进行聚合,统计每个键的出现次数val reducedStream = mappedStream.reduceByKey(_ + _)// 打印每个批次中每个键的计数结果reducedStream.print()// 启动流式计算ssc.start()// 循环 5 次,每次向队列中添加一个 RDD,并休眠 2 秒for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}// 等待计算终止ssc.awaitTermination()}
}
运行结果:
自定义数据源
自定义数据源
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverimport scala.util.control.NonFatalclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {new Thread("Socket Receiver") {override def run(): Unit = {receive()}}.start()}def receive(): Unit = {var socket: Socket = nullvar reader: BufferedReader = nulltry {socket = new Socket(host, port)reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var input: String = reader.readLine()while (!isStopped() && input != null) {store(input)input = reader.readLine()}} catch {case NonFatal(e) =>restart("Error receiving data", e)} finally {if (reader != null) {try {reader.close()} catch {case NonFatal(e) =>println(s"Error closing reader: ${e.getMessage}")}}if (socket != null) {try {socket.close()} catch {case NonFatal(e) =>println(s"Error closing socket: ${e.getMessage}")}}}restart("Restarting receiver")}override def onStop(): Unit = {}
}
使用自定义的数据源采集数据
object sparkConf {def main(args: Array[String]): Unit = {try {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 streamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")// 创建 StreamingContext 对象,设置批处理间隔为 5 秒val ssc = new StreamingContext(sparkConf, Seconds(5))// 使用自定义 Receiver 创建输入流val lineStream = ssc.receiverStream(new CustomerReceiver("node01", 9999))// 将每行文本拆分为单词val wordStream = lineStream.flatMap(_.split(" "))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStream = wordStream.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)// 打印每个批次中每个单词的计数结果wordAndCountStream.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()
}}}