Spark(22):SparkStreaming之DStream创建

目录

0. 相关文章链接

1. RDD队列

1.1. 用法及说明

1.2. 案例实操

2. 自定义数据源

2.1. 用法和说明

2.2. 案例实操

3. Kafka数据源

3.1. 版本选型

3.2. Kafka 0-8 Receiver 模式(当前3.x版本不适用)

3.3. Kafka 0-8 Direct 模式(当前3.x版本不适用)

3.4. Kafka 0-10 Direct 模式(3.x版本中使用此模式)


0. 相关文章链接

 Spark文章汇总 

1. RDD队列

1.1. 用法及说明

        测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。 

1.2. 案例实操

  • 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算WordCount
  • 编写代码:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject StreamTest {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamTest")//2.初始化SparkStreamingContextval ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))//3.创建RDD队列val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStreamval inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)//5.处理队列中的RDD数据val mappedStream: DStream[(Int, Int)] = inputStream.map(((_: Int), 1))val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey((_: Int) + _)//6.打印结果reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}}
  • 数据展示:
-------------------------------------------
Time: 1689147795000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...-------------------------------------------
Time: 1689147798000 ms
-------------------------------------------
(84,2)
(96,2)
(120,2)
(180,2)
(276,2)
(156,2)
(216,2)
(300,2)
(48,2)
(240,2)
...-------------------------------------------
Time: 1689147801000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...-------------------------------------------
Time: 1689147804000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...-------------------------------------------
Time: 1689147807000 ms
--------------------------------------------------------------------------------------
Time: 1689147810000 ms
-------------------------------------------

2. 自定义数据源

2.1. 用法和说明

需要继承 Receiver,并实现 onStart、 onStop 方法来自定义数据源采集。

2.2. 案例实操

  • 需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
  • 自定义数据源
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候,调用该方法,作用为:读数据并将数据发送给Sparkoverride def onStart(): Unit = {new Thread("Socket Receiver") {override def run() {receive()}}.start()}//读数据并将数据发送给Sparkdef receive(): Unit = {//创建一个Socketval socket: Socket = new Socket(host, port)//定义一个变量,用来接收端口传过来的数据var input: String = null//创建一个BufferedReader用于读取端口传来的数据val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))//读取数据input = reader.readLine()//当receiver没有关闭并且输入数据不为空,则循环发送数据给Sparkwhile (!isStopped() && input != null) {store(input)input = reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart("restart")}override def onStop(): Unit = {}}
  • 使用自定义的数据源采集数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamTest")//2.初始化SparkStreamingContextval ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))//3.创建自定义receiver的Streamingval lineStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomerReceiver("localhost", 9999))//4.将每一行数据做切分,形成一个个单词val wordStream: DStream[String] = lineStream.flatMap(_.split(" "))//5.将单词映射成元组(word,1)val wordAndOneStream: DStream[(String, Int)] = wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream: DStream[(String, Int)] = wordAndOneStream.reduceByKey(_ + _)//7.打印wordAndCountStream.print()//8.启动SparkStreamingContextssc.start()ssc.awaitTermination()}}
  • 展示结果
-------------------------------------------
Time: 1689148212000 ms
--------------------------------------------------------------------------------------
Time: 1689148215000 ms
-------------------------------------------
(abc,2)
(hello,1)-------------------------------------------
Time: 1689148218000 ms
-------------------------------------------

3. Kafka数据源

3.1. 版本选型

  • ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor 速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。
  • DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。 

3.2. Kafka 0-8 Receiver 模式(当前3.x版本不适用)

  • 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台
  • 导入依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.5</version>
</dependency>
  • 编写代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamTest")//2.初始化SparkStreamingContextval ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))//3.读取Kafka数据创建DStream(基于Receive方式)val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,"bigdata01:2181,bigdata02:2181,bigdata03:2181","StreamTest",Map[String, Int]("test" -> 1))//4.计算WordCountkafkaDStream.map {case (_, value) => (value, 1)}.reduceByKey(_ + _).print()//5.开启任务ssc.start()ssc.awaitTermination()}}

3.3. Kafka 0-8 Direct 模式(当前3.x版本不适用)

  • 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台
  • 导入依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.5</version>
</dependency>
  • 编写代码(自动维护 offset):
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamTest")//2.初始化SparkStreamingContext,并设置CKval ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint("./checkpoint")//3.定义Kafka参数val kafkaPara: Map[String, String] = Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "StreamTest")//4.读取Kafka数据val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaPara,Set("test"))//5.计算WordCountkafkaDStream.map((_: (String, String))._2).flatMap((_: String).split(" ")).map(((_: String), 1)).reduceByKey((_: Int) + (_: Int)).print()//6. 开启任务ssc.start()ssc.awaitTermination()}}
  • 编写代码(手动维护 offset):
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamTest")//2.初始化SparkStreamingContext,并设置CKval ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint("./checkpoint")//3.定义Kafka参数val kafkaPara: Map[String, String] = Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "StreamTest")//4.获取上一次启动最后保留的Offset=>getOffset(MySQL)val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](TopicAndPartition("test", 0) -> 20)//5.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaPara,fromOffsets,(m: MessageAndMetadata[String, String]) => m.message())//6.获取当前消费数据的offset信息,并保存到创建的数组里// 注意://      使用的方法为,通过transform算子将这个批次的数据转换成RDD,然后使用asInstanceOf方法将RDD转换成HasOffsetRanges,即可以获取offsetRanges//      transform算子的用法是,将这个批次的DStream转换成RDD,但是transform是转换算子,所以如果没有使用行动算子,那其内部的内容不会进行运算var offsetRanges: Array[OffsetRange] = Array.emptykafkaDStream.transform {rdd: RDD[String] => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (offset <- offsetRanges) {println(s"${offset.topic}:${offset.partition}:${offset.fromOffset}:${offset.untilOffset}")}rdd.flatMap((_: String).split(" ")).map(((_: String), 1)).reduceByKey((_: Int) + (_: Int))}}//7.打印Offset信息kafkaDStream.foreachRDD {rdd: RDD[String] => {for (offset <- rdd.asInstanceOf[HasOffsetRanges].offsetRanges) {println(s"${offset.topic}:${offset.partition}:${offset.fromOffset}:${offset.untilOffset}")}}}//8.开启任务ssc.start()ssc.awaitTermination()}}

3.4. Kafka 0-10 Direct 模式(3.x版本中使用此模式)

  • 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台
  • 导入依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version><scope>provided</scope>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version><scope>provided</scope>
</dependency>
  • 编写代码:
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamTest")//2.初始化SparkStreamingContext,并设置CKval ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint("./checkpoint")//3.定义Kafka参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "bigdata01:9092,bigdata01:9092,bigdata01:9092",ConsumerConfig.GROUP_ID_CONFIG -> "StreamTest",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer")//4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara))//5.将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map((record: ConsumerRecord[String, String]) => record.value())//6.计算WordCountvalueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//7.开启任务ssc.start()ssc.awaitTermination()}}
  • 可以通过命令行查看Kafka对应Topic的消费进度:
bin/kafka-consumer-groups.sh --describe --bootstrap-server bigdata01:9092 --group testTopic

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/3884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

离线安装docker

目录 1、下载docker 安装包 2、上传docker 到服务器目录/opt/ 3、解压docker-19.03.9.tgz 4、解压的docker文件夹全部移动至/usr/bin目录 5、将docker注册为系统服务 6、重启生效 6.1、重新加载配置文件 6.2、启动Docker服务 6.3、查看启动状态 6.4、 设置docker为开…

CS 144 Lab Two -- TCPReceiver

CS 144 Lab Two -- TCPReceiver TCPReceiver 简述索引转换TCPReceiver 实现 测试 对应课程视频: 【计算机网络】 斯坦福大学CS144课程 Lab Two 对应的PDF: Lab Checkpoint 2: the TCP receiver TCPReceiver 简述 在 Lab2&#xff0c;我们将实现一个 TCPReceiver&#xff0c;用…

如何在Ubuntu上安装OpenneBula

OpenNebula是一个开源云计算平台&#xff0c;允许我们在完全虚拟化云中组合和管理VMware和KVM虚拟机 第1步&#xff1a;安装MariaDB数据库服务器 OpenNebula还需要一个数据库服务器来存储其内容。 安装MariaDB&#xff1a; 1 2 sudo apt update sudo apt install mariadb-s…

mac idea 常用快捷键

mac idea 常用快捷键 代码生成&#xff1a; Command N&#xff1a;在代码编辑界面生成setter、getter等代码。当光标在左侧的工程结构时&#xff0c;使用Command N可以创建新的类、包等。 删除和复制&#xff1a; Command Delete&#xff1a;删除当前行。Command D&#…

SringCloud集成Redis工具类

1、pom文件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>2、redis工具类 Service(value "redisService") Slf4j public class RedisServi…

【深度学习笔记】训练 / 验证 / 测试集

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记&#xff0c;视频由网易云课堂与 deeplearning.ai 联合出品&#xff0c;主讲人是吴恩达 Andrew Ng 教授。感兴趣的网友可以观看网易云课堂的视频进行深入学习&#xff0c;视频的链接如下&#xff1a; 神经网络和…

Bootstrap编写一个兼容主流浏览器的受众巨幕式风格页面

Bootstrap编写一个兼容主流浏览器的受众巨幕式风格页面 虽然说IE6除了部分要求苛刻的需求以外已经被可以不考虑了&#xff0c;但是WIN7自带的浏览器IE8还是需要支持的。 本文这个方法主要的优点&#xff0c;个人觉得就是准备少&#xff0c;不需要上网寻找大量的图片做素材&…

CSS ::file-selector-button伪元素修改input上传文件按钮的样式

默认样式 修改后的样式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdev…

CNN学习记录

目录 CNN基础知识——卷积(Convolution)、填充(Padding)、步长(Stride): 卷积的三种模式:full, same, valid:

【MySQL】查询进阶

查询进阶 数据库约束约束类型NULL , DEFAULT , UNIQUE 约束主键约束外键约束 聚合查询聚合函数group by子句HAVING 联合查询内连接外连接自连接子查询单行子查询多行子查询 数据库约束 约束类型 NOT NULL #表示某行不能储存空值 UNIQUE #保证每一行必须有唯一的值 DEFAULT #规…

CSS科技感四角边框

实现效果:使用before和after就可以实现,代码量不多,长度颜色都可以自己调整 <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Title</title><style>*{margin:0;padding:0;}html,body{…

PostgreSQL MVCC的弊端

数据库有很多种&#xff08;截至 2023 年 4 月有 897 个&#xff09;。面对如此多的数据库&#xff0c;很难知道该选择什么&#xff01;但有一个有趣的现象&#xff0c;互联网集体决定新应用程序的默认选择。在 2000 年代&#xff0c;传统观点选择 MySQL 是因为像 Google 和 Fa…

vue3+vite+pinia+vue-router+ol项目创建及配置

一、vite (一)、定义 vite官网 (二)、操作步骤 注意&#xff1a;两种方式创建目录结构一致 方式一&#xff1a;vite创建脚手架命令&#xff1a; 命令行&#xff1a;npm create vitelatest 然后选择 方式二&#xff1a;命令行直接声明带上vue 二、pinia (一)、定义 定义&#xf…

快速批量改名文件!随机字母命名,让文件名更有创意!

想要让文件名更加有创意和个性化吗&#xff1f;不妨尝试使用随机字母来批量改名文件&#xff01;无论是照片、文档还是其他文件&#xff0c;只需要简单的几个步骤&#xff0c;您就可以为它们赋予一个独特的随机字母命名。这不仅可以帮助您整理文件&#xff0c;还能增加一些乐趣…

非50欧系统阻抗的S参数测试

1. S参数依赖于系统阻抗 S参数的定义需要约定一个系统阻抗。同一个微波电路&#xff0c;在不同系统阻抗下的S参数是不同的。例如&#xff0c;50欧电阻在50欧系统阻抗下的S11为零&#xff0c;是没有反射的匹配状态&#xff1b;但50欧电阻在75欧系统阻抗下的S11不为零&#xff0…

WebRTC系列--WebRTC音频支持RedFEC的修改

文章目录 1. FEC流程综述1.1 offer中sdp的red1.2 setRemoteSdp中创建red编码流程2. 编码端2.1 编码缓存数据2.2 RED协议简介3. 解码端WebRTC在最新的代码中对opus的编码的包,默认使用red的方式进行一次冗余处理;冗余包在解码端的处理在其他版本中都有支持;这篇文章讲分两部分…

Orange:一个基于 Python 的数据挖掘可视化平台

本篇介绍一个适合初学者入门的机器学习工具。 Orange 简介 Orange 是一个开源的数据挖掘和机器学习软件。Orange 基于 Python 和 C/C 开发&#xff0c;提供了一系列的数据探索、可视化、预处理以及建模组件。 Orange 拥有漂亮直观的交互式用户界面&#xff0c;非常适合新手进…

计算机网络 day11 tcpdump - 传输层 - netstat - socket - nc - TCP/UDP头部

目录 故障排查 tcpdump抓包工具 传输层&#xff08;TCP和UDP协议&#xff09; 传输层的作用 应用程序和端口号有什么关系&#xff1f; 传输层端对端连接实现拓扑图 如何查看自己的linux机器开放了哪些端口&#xff1f; 1、netstat(network status 网络的状态) netsta…

【ceph】存储池pg个数如何设置

存储池pg个数如何设置 参考官方文档说明&#xff1a;https://old.ceph.com/pgcalc/参数说明TargePGs per OSD&#xff1a;每个OSD的pg数OSD#存储池包含osd个数%Data存储池写入数据占总OSD容量百分比Size存储池冗余数

安全生产简记

文章目录 面向失败的设计冗余设计避免单点故障宏观多活架构服务能力与依赖调用自我保护为失败准备预案精细化监控体系自动化运维管控故障与攻防演练锤炼容灾应急能力最佳实践面向失败的设计 什么样的失败?硬件问题软件Bug配置变更错误系统恶化外部攻击依赖库问题依赖服务问题…