Spark-Streaming有状态计算

一、上下文

《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。

二、官方例子

所属包:org.apache.spark.examples.streaming

object StatefulNetworkWordCount {def main(args: Array[String]): Unit = {if (args.length < 2) {System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")//创建微批为 1 秒的上下文val ssc = new StreamingContext(sparkConf, Seconds(1))//指定 checkpoint 目录ssc.checkpoint(".")// 用一个 List 初始化一个 RDDval initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))// 在目标ip:port上创建一个ReceiverInputDStream,并对分隔测试的输入流中的单词进行计数(例如由'nc'生成)val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))// 使用mapWithState更新累积计数这将给出一个由状态组成的DStream(即单词的累积计数)val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {val sum = one.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, sum)state.update(sum)output}val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))stateDstream.print()ssc.start()ssc.awaitTermination()}
}

三、分析

1、构建SparkConf

它是Spark应用程序的配置,用于设置Spark的各种参数。支持链式设置

new SparkConf().setMaster("local").setAppName("My app")

 一旦SparkConf对象传递给Spark,用户就不能再对其进行修改。Spark不支持在运行时修改配置

2、构建StreamingContext

它是Spark Streaming功能的主要入口点,且提供了从各种输入源创建[[org.apache.spark.streaming.dstream.DStream]] 的方法。

创建和转换DStreams后,可以分别使用start()、stop()启动和停止流计算,awaitTermination()允许当前线程通过stop()或异常等待上下文的终止。

3、设置checkpoint

StreamingContext最终还是通过SparkContext来设置checkpoint,但其实都是为各自的checkpointDir设置checkpoint路径,在有状态计算中checkpoint是必须的。

所谓有状态计算就必须要把历史状态给存储下来,spark中使用使用checkpoint来实现这个存储,每个微批的数据的计算都要更新到历史状态中。

class SparkContext(config: SparkConf) extends Logging {private[spark] var checkpointDir: Option[String] = None}
class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {private[streaming] var checkpointDir: String = {if (isCheckpointPresent) {sc.setCheckpointDir(_cp.checkpointDir)_cp.checkpointDir} else {null}}}

4、初始化一个RDD

为什么要初始化一个RDD呢?我们看看下面是如何用到的。

5、创建一个ReceiverInputDStream

这里是从TCP源hostname:port创建输入流。使用TCP套接字接收数据,并使用给定的转换器将接收字节解释为对象

6、处理单词

从源码中可以看出会把这样的文本

hadoop spark flink kafka hadoop spark-streaming

处理成这样的格式

hadoop 1

spark 1

flink 1

kafka 1

hadoop 1

spark-streaming 1

6、使用mapWithState更新累积计数

该算子可以维护并更新每个key的状态。

这里用到一个新对象:StateSpec,且用到了它的两个方法,initialState和function

initialState:设置包含“mapWithState”将使用的初始状态的RDD`

function:设置实际的状态更新操作

//第1个参数:状态 key 的类别
//第2个参数:状态 value 的类别
//第3个参数:状态 数据 的类别
//第4个参数:状态 处理完要返回 的类别
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {// 使用state.exists()、state.get()、state.update()和state.remove()来管理状态,并返回必要的字符串
}

四、运行

运行Netcat

nc -lk 9999

新建一个窗口运行官方例子

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/64932.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 3 输入与输出指南

文章目录 1. 输入与 input()示例&#xff1a;提示&#xff1a; 2. 输出与 print()基本用法&#xff1a;格式化输出&#xff1a;使用 f-string&#xff08;推荐&#xff09;&#xff1a;使用 str.format()&#xff1a;使用占位符&#xff1a; print() 的关键参数&#xff1a; 3.…

【SQLi_Labs】Basic Challenges

什么是人生&#xff1f;人生就是永不休止的奋斗&#xff01; Less-1 尝试添加’注入&#xff0c;发现报错 这里我们就可以直接发现报错的地方&#xff0c;直接将后面注释&#xff0c;然后使用 1’ order by 3%23 //得到列数为3 //这里用-1是为了查询一个不存在的id,好让第一…

时间序列预测算法---LSTM

目录 一、前言1.1、深度学习时间序列一般是几维数据&#xff1f;每个维度的名字是什么&#xff1f;通常代表什么含义&#xff1f;1.2、为什么机器学习/深度学习算法无法处理时间序列数据?1.3、RNN(循环神经网络)处理时间序列数据的思路&#xff1f;1.4、RNN存在哪些问题? 二、…

leetcode题目(3)

目录 1.加一 2.二进制求和 3.x的平方根 4.爬楼梯 5.颜色分类 6.二叉树的中序遍历 1.加一 https://leetcode.cn/problems/plus-one/ class Solution { public:vector<int> plusOne(vector<int>& digits) {int n digits.size();for(int i n -1;i>0;-…

快速上手LangChain(三)构建检索增强生成(RAG)应用

文章目录 快速上手LangChain(三)构建检索增强生成(RAG)应用概述索引阿里嵌入模型 Embedding检索和生成RAG应用(demo:根据我的博客主页,分析一下我的技术栈)快速上手LangChain(三)构建检索增强生成(RAG)应用 langchain官方文档:https://python.langchain.ac.cn/do…

[cg] android studio 无法调试cpp问题

折腾了好久&#xff0c;native cpp库无法调试问题&#xff0c;原因 下面的Deploy 需要选Apk from app bundle!! 另外就是指定Debug type为Dual&#xff0c;并在Symbol Directories 指定native cpp的so路径 UE项目调试&#xff1a; 使用Android Studio调试虚幻引擎Android项目…

【Windows】powershell 设置执行策略(Execution Policy)禁止了脚本的运行

报错信息&#xff1a; 无法加载文件 C:\Users\11726\Documents\WindowsPowerShell\profile.ps1&#xff0c;因为在此系统上禁止运行脚本。有关详细信息&#xff0c;请参 阅 https:/go.microsoft.com/fwlink/?LinkID135170 中的 about_Execution_Policies。 所在位置 行:1 字符…

可编辑37页PPT |“数据湖”构建汽车集团数据中台

荐言分享&#xff1a;随着汽车行业智能化、网联化的快速发展&#xff0c;数据已成为车企经营决策、优化生产、整合供应链的核心资源。为了在激烈的市场竞争中占据先机&#xff0c;汽车集团亟需构建一个高效、可扩展的数据管理平台&#xff0c;以实现对海量数据的收集、存储、处…

【快速实践】类激活图(CAM,class activation map)可视化

类激活图可视化&#xff1a;有助于了解一张图像的哪一部分让卷积神经网络做出了最终的分类决策 对输入图像生成类激活热力图类激活热力图是与特定输出类别相关的二维分数网格&#xff1a;对任何输入图像的每个位置都要进行计算&#xff0c;它表示每个位置对该类别的重要程度 我…

ros2 py文件间函数调用

文章目录 写在前面的话生成python工程包命令运行python函数命令python工程包的目录结构目录结构&#xff08;细节&#xff09; 报错 1&#xff08; no module name ***&#xff09;错误示意 截图终端输出解决方法 报错 2&#xff08; AttributeError: *** object has no attrib…

Milvus×合邦电力:向量数据库如何提升15%电价预测精度

01. 全球能源市场化改革下的合邦电力 在全球能源转型和市场化改革的大背景下&#xff0c;电力交易市场正逐渐成为优化资源配置、提升系统效率的关键平台。电力交易通过市场化手段&#xff0c;促进了电力资源的有效分配&#xff0c;为电力行业的可持续发展提供了动力。 合邦电力…

OLED的显示

一、I2C I2C时序&#xff1a;时钟线SCL高电平下&#xff1a;SDA由高变低代表启动信号&#xff0c;开始发送数据&#xff1b;SCL高电平时&#xff0c;数据稳定&#xff0c;数据可以被读走&#xff0c;开始进行读操作&#xff0c;SCL低电平时&#xff0c;数据发生改变&#xff1…

VMware运维效率提升50%,RVTools管理更简单

RVTools 是一款专为 VMware 虚拟化环境量身打造的高效管理工具&#xff0c;基于 .NET 4.7.2 框架开发&#xff0c;并与 VMware vSphere Management SDK 8.0 和 CIS REST API 深度集成&#xff0c;能够全面呈现虚拟化平台的各项关键数据。该工具不仅能够详细列出虚拟机、CPU、内…

python +t kinter绘制彩虹和云朵

python t kinter绘制彩虹和云朵 彩虹&#xff0c;简称虹&#xff0c;是气象中的一种光学现象&#xff0c;当太阳光照射到半空中的水滴&#xff0c;光线被折射及反射&#xff0c;在天空上形成拱形的七彩光谱&#xff0c;由外圈至内圈呈红、橙、黄、绿、蓝、靛、紫七种颜色。事实…

Zabbix5.0版本(监控Nginx+PHP服务状态信息)

目录 1.监控Nginx服务状态信息 &#xff08;1&#xff09;通过Nginx监控模块&#xff0c;监控Nginx的7种状态 &#xff08;2&#xff09;开启Nginx状态模块 &#xff08;3&#xff09;配置监控项 &#xff08;4&#xff09;创建模板 &#xff08;5&#xff09;用默认键值…

Python入门教程 —— 字符串

字符串介绍 字符串可以理解为一段普通的文本内容,在python里,使用引号来表示一个字符串,不同的引号表示的效果会有区别。 字符串表示方式 a = "Im Tom" # 一对双引号 b = Tom said:"I am Tom" # 一对单引号c = Tom said:"I\m Tom" # 转义…

AcWing练习题:差

读取四个整数 A,B,C,D&#xff0c;并计算 (AB−CD)的值。 输入格式 输入共四行&#xff0c;第一行包含整数 A&#xff0c;第二行包含整数 B&#xff0c;第三行包含整数 C&#xff0c;第四行包含整数 D。 输出格式 输出格式为 DIFERENCA X&#xff0c;其中 X 为 (AB−CD) 的…

小程序添加购物车业务逻辑

数据库设计 DTO设计 实现步骤 1 判断当前加入购物车中的的商品是否已经存在了 2 如果已经存在 只需要将数量加一 3 如果不存在 插入一条购物车数据 4 判断加到本次购物车的是菜品还是套餐 Impl代码实现 Service public class ShoppingCartServiceImpl implements Shoppin…

如何在谷歌浏览器中使用自定义搜索快捷方式

在数字时代&#xff0c;浏览器已经成为我们日常生活中不可或缺的一部分。作为最常用的浏览器之一&#xff0c;谷歌浏览器凭借其简洁的界面和强大的功能深受用户喜爱。本文将详细介绍如何自定义谷歌浏览器的快捷工具栏&#xff0c;帮助你更高效地使用这一工具。 一、如何找到谷歌…

Python 3 与 Python 2 的主要区别

文章目录 1. 语法与关键字print 函数整数除法 2. 字符串处理默认字符串类型字符串格式化 3. 输入函数4. 迭代器和生成器range 函数map, filter, zip 5. 标准库变化urllib 模块configparser 模块 6. 异常处理7. 移除的功能8. 其他重要改进数据库操作多线程与并发类型注解 9. 总结…