Spark Streaming

Spark Streaming

  • Spark Streaming概念
  • Spark Streaming操作
    • 1 netcat传入数据
    • 2 DStream 创建
    • 3 自定义数据源
    • 4 接受kafka数据
    • DStream 转换
      • 1无状态的转换
      • 2有状态的转换
        • updateSateByKey
        • WindowOperations

Spark Streaming概念

Spark Streaming 用于流式数据的处理。
Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume 、Twitter 、ZeroMQ 和简单的 TCP 套接字等等。
数据输入后可以用 Spark 的高度抽象原语。如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
在这里插入图片描述
Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream 。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收 到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以 简单来将, DStream 就是对 RDD 在实时数据处理场景的一种封装。

在这里插入图片描述
在这里插入图片描述

Spark Streaming操作

1 netcat传入数据

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamWordCount {def main(args:Array[String])={//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//3.通过监控端口创建 DStream,读进来的数据为一行行val lineStreams = ssc.socketTextStream ("localhost", 9999)//将每一行数据做切分, 形成一个个单词val wordStreams = lineStreams.flatMap(_.split(" "))//将单词映射成元组(word,1)val wordAndOneStreams = wordStreams.map((_, 1))//将相同的单词次数做统计val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)//打印wordAndCountStreams.print()//启动 SparkStreamingContextssc.start ()ssc.awaitTermination ()}}

链接: 配置netcat
下载netcat,解压到英文路径下。
将文件路径添加到环境变量中。
启动netcat。
在这里插入图片描述
运行StreamWordCount 程序。

2 DStream 创建

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutableobject SparkStreaming02_Queue {def main(args: Array[String]) {//1.初始化 Spark 配置信息val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")//2.初始化 SparkStreamingContextval ssc = new StreamingContext(conf, Seconds(4))//3.创建 RDD 队列val rddQueue = new mutable.Queue[RDD[Int]]()//4.创建 QueueInputDStreamval inputStream = ssc.queueStream(rddQueue,oneAtATime = false)//5.处理队列中的 RDD 数据val mappedStream = inputStream.map ((_,1))val reducedStream = mappedStream.reduceByKey(_ + _)//6.打印结果reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向 RDD 队列中放入 RDDfor (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}

3 自定义数据源

import java.util.Randomimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming03_DIY {def main(args: Array[String]) {//1.初始化 Spark 配置信息val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")val ssc = new StreamingContext(conf, Seconds(3))val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())messageDS.print()ssc.start()ssc.awaitTermination()}/* 自定义数据采集器1.继承Receiver,定义泛型,传递参数2.重写方法*/class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){private var flg =trueoverride def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {while(flg){val message = "采集的数据为:" + new Random().nextInt(10).toStringstore(message)Thread.sleep(500)}}}).start()}override def onStop(): Unit = {flg=false;}}
}

4 接受kafka数据

import java.util.Randomimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming04_kafka {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")val ssc = new StreamingContext(conf, Seconds(3))//3.定义 Kafka 参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer")//4.读取 Kafka 数据创建 DStreamval kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe [String, String](Set("atguiguNew"), kafkaPara))kafkaDataDS.map(_.value()).print()ssc.start()ssc.awaitTermination()}}

DStream 转换

1无状态的转换

DStream 上的操作与 RDD 的类似,分为 Transformations (转换) 和 Output Operations (输 出)两种。

状态:DStream状态,每一次实时处理都要登录相关配置信息或是有一定初始状态。设置一个状态,这段时间在这个状态下设有一定的权限或记录着某种数值状态,方便后续处理。

//无状态数据操作,只对当前的采集周期内的数据进行处理
//在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
import java.util.Randomimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming05_State {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")val ssc = new StreamingContext(sparkConf, Seconds(3))//无状态数据操作,只对当前的采集周期内的数据进行处理//在某些场合下,需要保留数据统计结果(状态),实现数据的汇总val datas = ssc.socketTextStream("localhost",9999)val wordToOne = datas.map((_,1))val wordToCount = wordToOne.reduceByKey(_+_)wordToCount.print()ssc.start()ssc.awaitTermination()}
}

转换结构使用了reduceByKey,会直接出结果,不能和缓冲区的数据进行汇总。
val wordToCount = wordToOne.reduceByKey(+)

updateSateByKey:根据key对数据的状态进行更新
传递的参数中含有两个值
第一个值表示相同的key的value数据
第二个值表示缓冲区相同key的value数据

import java.util.Random
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming05_State {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming2")val ssc = new StreamingContext(sparkConf, Seconds(3))//无状态数据操作,只对当前的采集周期内的数据进行处理//在某些场合下,需要保留数据统计结果(状态),实现数据的汇总val datas = ssc.socketTextStream("localhost",9999)val wordToOne = datas.map((_,1))
//    val wordToCount = wordToOne.reduceByKey(_+_)//    updateSateByKey:根据key对数据的状态进行更新
//    传递的参数中含有两个值
//    第一个值表示相同的key的value数据
//    第二个值表示缓冲区相同key的value数据val state = wordToOne updateStateByKey ((seq:Seq[Int], buff:Option[Int] ) => {val newCount = buff.getOrElse(0) + seq.sumOption(newCount)})state.print()ssc.start()ssc.awaitTermination()}
}
23/10/10 15:26:41 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

这个错误是由于未设置 Spark Streaming 的检查点目录导致的。检查点目录用于存储 Spark Streaming 的元数据和中间状态信息,以便在故障恢复时保持一致性。
要解决这个问题,你需要在创建 StreamingContext 对象之前通过 checkpoint 方法设置检查点目录。

设置一个检查点就好了,填写对应的检查点路径。
ssc.checkpoint(“input”)

在这里插入图片描述

import java.util.Randomimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming05_State {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming2")val ssc = new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint("input")//无状态数据操作,只对当前的采集周期内的数据进行处理//在某些场合下,需要保留数据统计结果(状态),实现数据的汇总val datas = ssc.socketTextStream("localhost",9999)val wordToOne = datas.map((_,1))
//    val wordToCount = wordToOne.reduceByKey(_+_)//    updateSateByKey:根据key对数据的状态进行更新
//    传递的参数中含有两个值
//    第一个值表示相同的key的value数据
//    第二个值表示缓冲区相同key的value数据val state = wordToOne updateStateByKey ((seq:Seq[Int], buff:Option[Int] ) => {val newCount = buff.getOrElse(0) + seq.sumOption(newCount)})state.print()ssc.start()ssc.awaitTermination()}
}

Transform
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也
就是对 DStream 中的 RDD 应用转换。

使用Transform 的两个原因:
Transform 可以将底层RDD获取到后进行操作。
1.DStream功能不完善
2.需要RDD/代码周期性的执行

import java.util.Randomimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming06_State_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming2")val ssc = new StreamingContext(sparkConf, Seconds(3))val lines = ssc.socketTextStream("localhost",port=9999)//transform方法可以将底层RDD获取到后 进行操作val newDs: DStream[String] = lines.transform(rdd => {//code:Driver端,(周期性执行)rdd.map(str=>{//Code : Executor端str})})val newDs1: DStream[String] = lines.map(data=>{data})ssc.start()ssc.awaitTermination()}
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming06_State_Join {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming2")val ssc = new StreamingContext(sparkConf, Seconds(5))val data9999 = ssc.socketTextStream("localhost",port=9999)val data8888 = ssc.socketTextStream("localhost",port=8888)val map9999: DStream[(String, Int)] = data9999.map((_, 9))val map8888: DStream[(String, Int)] = data8888.map((_, 8))//join操作就是两个RDD的join操作val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)joinDS.print()ssc.start()ssc.awaitTermination()}
}

2有状态的转换

updateSateByKey

updateSateByKey:根据key对数据的状态进行更新
传递的参数中含有两个值
第一个值表示相同的key的value数据
第二个值表示缓冲区相同key的value数据

import java.util.Random
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming05_State {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming2")val ssc = new StreamingContext(sparkConf, Seconds(3))//无状态数据操作,只对当前的采集周期内的数据进行处理//在某些场合下,需要保留数据统计结果(状态),实现数据的汇总val datas = ssc.socketTextStream("localhost",9999)val wordToOne = datas.map((_,1))
//    val wordToCount = wordToOne.reduceByKey(_+_)//    updateSateByKey:根据key对数据的状态进行更新
//    传递的参数中含有两个值
//    第一个值表示相同的key的value数据
//    第二个值表示缓冲区相同key的value数据val state = wordToOne updateStateByKey ((seq:Seq[Int], buff:Option[Int] ) => {val newCount = buff.getOrElse(0) + seq.sumOption(newCount)})state.print()ssc.start()ssc.awaitTermination()}
}
WindowOperations
// An highlighted block
var foo = 'bar';

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/135563.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

临界资源,临界区,通信的干扰问题(互斥),信号量(本质,上下文切换问题,原子性,自身的安全性,操作)

目录 引入 概念 临界资源 临界区 干扰存在原因 互斥 信号量 引入 举例 概念 介绍 表示可用资源数 表示等待进程数 申请信号量 信号量的本质 全局变量? 共享内存? 不安全问题 -- 上下文切换 原子性 信号量自身的安全性 原子操作的意义 操作 引入 通信…

Collection集合 迭代器遍历Iterator 和集合增强For

迭代器遍历Iterator 标准写法: 增强For for(类型 名称 : 集合 ) 举例: 不仅可以集合也可以数组 底层仍然是iterator

Power Apps-库组件连接数据表

点击添加数据 可以选择Excel或SharePoint导入 选择右侧边栏中的网站&#xff0c;再选择想要连接的数据表 点击插入&#xff0c;选择布局中的某个库&#xff0c; 选中它可以点击上方的布局&#xff0c;选择想要的样式 右侧选择数据源中的表就将组件与数据表连接起来了 如果想修…

Vite创建React项目,另外一种更加简单的方法

在上一篇blog中一个一个安装依赖dependencies&#xff0c;有没有一步到位的方法呢&#xff0c;有! 参考《React 18 Design Patterns and Best Practices Design, build, and deploy production-ready web applications with React》4th 第一章倒数第二节Vite as a solution有个…

flutter生态一统甜夏 @Android @ios @windowse @macos @linux @Web

(愿景)G o o g l e 中 国flutter生态一统天下(IT) Web Android ios Windowse Macos Linux Google中国https://space.bilibili.com/64169458 https://pub-web.flutter-io.cn 构建 Flutter Web 应用 构建 Flutter Web 应用 - Flutter 中文文档 - Flutter 中文开发者网站 …

vue3+setup 解决:this.$refs引用子组件报错 is not a function

一、如果在父组件中以下四步都没问题的话&#xff0c;再看下面步骤 二、如果父组件引用的是index页面 请在 头部加上以下代码 &#xff08;如果是form页面请忽略这一步&#xff09; <template> <a-modalv-model:visible"visible"title"头部名称&…

SpringCloud 微服务全栈体系(十三)

第十一章 分布式搜索引擎 elasticsearch 二、索引库操作 索引库就类似数据库表&#xff0c;mapping 映射就类似表的结构。 我们要向 es 中存储数据&#xff0c;必须先创建“库”和“表”。 1. mapping 映射属性 mapping 是对索引库中文档的约束&#xff0c;常见的 mapping …

SpringDataJpa(二)

三、Spring Data JPA概述 Spring Data JPA 是 Spring 基于 ORM 框架、JPA 规范的基础上封装的一套JPA应用框架&#xff0c;可使开发者用极简的代码即可实现对数据库的访问和操作。它提供了包括增删改查等在内的常用功能&#xff0c;且易于扩展&#xff01;学习并使用 Spring D…

汽车标定技术(五)--基于模型开发如何生成完整的A2L文件(1)

1 数据对象的创建 CtrlH打开Model Explorer&#xff0c;在Base workspace中点击工具栏add&#xff0c;出现如下界面&#xff0c; 可以看到Simulink提供了多种数据类型 Matlab Variable&#xff1a;Simulink.Parameter&#xff1a;使用该数据对象表示工程应用中的标定量Simuli…

js:React中使用classnames实现按照条件将类名连接起来

参考文档 https://www.npmjs.com/package/classnameshttps://github.com/JedWatson/classnames 安装 npm install classnames示例 import classNames from "classnames";// 字符串合并 console.log(classNames("foo", "bar")); // foo bar//…

高性能网络编程 - The C10M problem

文章目录 Pre概述回顾C10K实现C10M的挑战思路总结 Pre 高性能网络编程 - The C10K problem 以及 网络编程技术角度的解决思路 概述 在接下来的10年里&#xff0c;因为IPv6协议下每个服务器的潜在连接数都是数以百万级的&#xff0c;单机服务器处理数百万的并发连接&#xff0…

基于单片机智能加湿器控制系统仿真设计

**单片机设计介绍&#xff0c; 698【毕业课设】基于单片机智能加湿器控制系统仿真设计 文章目录 一 概要系统组成总结 二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 单片机智能加湿器控制系统仿真设计介绍 单片机智能加湿器控制系统是一种利用微…

Jakarta-JVM篇

文章目录 一.前言1. 1 JVM-堆常用调参1.2 JVM-方法区常用参数1.3 JVM-codeCache 二.JVM内存结构三. 对象创建四. JVM垃圾回收算法4.1 可达性分析算法4.1.1 对象引用4.1.2 回收方法区. 4.2 分代回收4.3 标记清除4.4 标记复制4.5 标记整理 五.垃圾回收器5.1 根节点枚举5.2 安全点…

umi4 React项目使用icon集合

umi项目中使用icon集合。 icon集合&#xff1a;https://icones.js.org/ 测试使用这个ion .umirc.ts文件 icons:{autoInstall:{iconify-json/ion: true,//自动安装iconify-json/ion},include: [ion:social-windows-outline]&#xff0c;//要使用的必须把icon类名加到include中…

AndroidStudio 运行报错:Invalid keystore format

AndroidStudio 运行报错&#xff1a;Invalid keystore format 把这玩意儿删了重新打开Android Studio运行一下就好了&#xff01;&#xff01;&#xff01;

esxi 6.7下安装黑裙

esxi上创建一个黑裙系统的虚拟机&#xff0c;用来存资料 一、工具 硬件&#xff1a; 工控机&#xff1a;装有esxi6.7系统&#xff08;192.168.100.2&#xff09;&#xff0c;配置&#xff1a;3865U&#xff0c;16G内存&#xff0c;120Gmsata120sata硬盘&#xff0c;6个网口 主…

利用Ansible实现批量Linux服务器安全配置

1.摘要 在上一篇<<初步利用Ansible实现批量服务器自动化管理>>文章中, 我初步实现了通过编写清单和剧本来实现多台服务器的自动化管理,在本章节中, 我将利用Ansible的剧本来实现更实用、更复杂一点的功能, 主要功能包括三个:1.同时在三台服务器中增加IP访问控制,只…

杂货铺 | citespace的使用

安装教程 【CiteSpace保姆级教程1】文献综述怎么写&#xff1f; &#x1f4da;数据下载 1. 新建文件夹 2. 数据下载 知网高级检索 数据选中导出 &#xff1a;一次500 导出后重命名为download_xxx.txt&#xff0c;放到input文件里 3. 数据转换 把output里的数据复制到data里…

Qt实现自定义多选下拉列表

目录 前言1、 功能描述2、代码实现总结 前言 本文记录了一种通过继承 QComboBox 实现下拉列表多选功能的方法。效果如下图所示&#xff1a; 1、 功能描述 普通的下拉列表只支持选择一个选项&#xff0c;在软件开发过程中&#xff0c;经常会遇到下拉列表支持选择多个选项的需…

Mybatis-Plus同时使用逻辑删除和唯一索引的问题及解决办法

1 问题背景 在开发中&#xff0c;我们经常会有逻辑删除和唯一索引同时使用的情况。但当使用mybatis plus时&#xff0c;如果同时使用逻辑删除和唯一索引&#xff0c;会报数据重复Duplicate entry的问题。 举例来说&#xff0c;有表user&#xff0c;建立唯一索引&#xff08;u…