Spark Streaming编程基础

文章目录

  • 1. 流式词频统计
    • 1.1 Spark Streaming编程步骤
    • 1.2 流式词频统计项目
      • 1.2.1 创建项目
      • 1.2.2 添加项目依赖
      • 1.2.3 修改源目录
      • 1.2.4 添加scala-sdk库
      • 1.2.5 创建日志属性文件
    • 1.3 创建词频统计对象
    • 1.4 利用nc发送数据
    • 1.5 启动应用,查看结果
  • 2. 编程模型的基本概念
  • 3. 离散化数据流
  • 4. 基本数据源
  • 5. 基本DStream转换操作
  • 6. DStream输出操作

1. 流式词频统计

  • 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用 nc 工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。

1.1 Spark Streaming编程步骤

  1. 添加SparkStreaming相关依赖
  2. 获取程序入口接收数据
  3. 对数据进行业务处理
  4. 获取最终结果
  5. 启动程序等待程序执行结束

1.2 流式词频统计项目

1.2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述

1.2.2 添加项目依赖

  • pom.xml文件里添加依赖
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
  • 刷新项目依赖
    在这里插入图片描述

1.2.3 修改源目录

  • java修改为scala
    在这里插入图片描述

  • pom.xml里设置源目录
    在这里插入图片描述

1.2.4 添加scala-sdk库

  • 在项目结构对话里添加
    在这里插入图片描述
  • 单击【Add to Modules】菜单项
    在这里插入图片描述
  • 单击【OK】按钮以后,就可以在scala里创建Scala Class
    在这里插入图片描述

1.2.5 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

1.3 创建词频统计对象

  • 创建net.huawei.streaming
    在这里插入图片描述
  • net.huawei.streaming包里创建SparkStreamingWordCount对象
    在这里插入图片描述
package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
  • 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。

1.4 利用nc发送数据

  • bigdata1节点利用nc发送数据,执行命令:nc -lp 9999
    在这里插入图片描述

1.5 启动应用,查看结果

  • 启动SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果
    在这里插入图片描述
  • 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用 updateStateByKeymapWithState 实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。

2. 编程模型的基本概念

3. 离散化数据流

4. 基本数据源

5. 基本DStream转换操作

6. DStream输出操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/68059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

git和idea重新安装后提交异常

场景&#xff1a;我重装了系统&#xff0c;idea装了2024.3版本的&#xff0c;git也重新装了&#xff0c;但是项目中还是有.git文件夹的&#xff0c;下载了idea的码云插件后&#xff0c;提交报错如下&#xff1a; 异常&#xff1a;Error updating changes: detected dubious ow…

Pix2Pix:图像到图像转换的条件生成对抗网络深度解析

在深度学习的广阔领域中&#xff0c;图像到图像的转换任务一直是研究者和开发者们关注的热点。Pix2Pix&#xff0c;作为一种基于条件生成对抗网络&#xff08;Conditional Generative Adversarial Networks, CGANs&#xff09;的模型&#xff0c;自2017年由Phillip Isola等人提…

OpenCV imread函数读取图像__实例详解

OpenCV imread函数读取图像__实例详解 本文目录&#xff1a; 零、时光宝盒 一、imread函数定义 二、imread函数支持的文件格式 三、imread函数flags参数详解 &#xff08;3.1&#xff09;、Flags-1时&#xff0c;样返回加载的图像&#xff08;使用alpha通道&#xff0c;否…

Spring 框架基础:IOC 与 AOP 原理剖析及面试要点

在上一篇中&#xff0c;我们深入探讨了 Java 反射机制&#xff0c;了解了它在运行时动态操作类和对象的强大能力。而今天&#xff0c;我们将进入 Spring 框架的世界。Spring 框架作为 Java 企业级开发中最流行的框架之一&#xff0c;极大地简化了企业级应用的开发过程。对于春招…

ssm基于HTML5的红酒信息分享系统

SSM基于HTML5的红酒信息分享系统是一个专注于红酒领域的综合性信息平台&#xff0c;旨在为红酒爱好者、从业者以及普通消费者提供一个便捷的交流与获取红酒相关信息的空间。 一、系统背景与意义 随着人们生活水平的提高和消费观念的转变&#xff0c;红酒作为一种高雅的饮品&a…

【python】subprocess.Popen执行adb shell指令进入linux系统后连续使用指令,出现cmd窗口阻塞问题

问题描述 subprocess.Popen执行adb shell指令进入linux系统后出现cmd窗口阻塞问题&#xff0c;需要手动关闭cmd才会继续执行其他指令。 解决方案 1、cmd指令后面加入exit\n关闭exe进程 2、subprocess.Popen()添加内置参数creationflagssubprocess.CREATE_NO_WINDOW隐藏窗口弹…

细说机器学习算法之过拟合与欠拟合

系列文章目录 第一章&#xff1a;Pyhton机器学习算法之KNN 第二章&#xff1a;Pyhton机器学习算法之K—Means 第三章&#xff1a;Pyhton机器学习算法之随机森林 第四章&#xff1a;Pyhton机器学习算法之线性回归 第五章&#xff1a;Pyhton机器学习算法之有监督学习与无监督…

深度学习中的通道(Channel)概念详解

1. 通道的基本概念 通道(Channel)是深度学习中的一个重要概念&#xff0c;它在不同场景下有不同的具体含义。理解通道概念对于理解深度学习模型的结构和工作原理至关重要。 2. 大语言模型中的通道 2.1 全连接层的通道概念 2.1.1 基本结构 输入&#xff1a;[batch_size, in…

Windows Docker Desktop安装及使用 Docker 运行 MySQL

Docker Desktop是Docker的官方桌面版&#xff0c;专为Mac和Windows用户设计&#xff0c;提供了一个简单易用的界面来管理和运行Docker容器。它集成了Docker引擎&#xff0c;为开发人员提供了一个快速、可靠、可扩展的方式来构建、运行和管理应用。DockerDesktop的优势在于&…

TTL 在 Redis 缓存中的作用

Redis TTL&#xff08;Time To Live&#xff09;与缓存的关系 TTL&#xff08;Time To Live&#xff0c;生存时间&#xff09;是 Redis 提供的一种自动过期机制&#xff0c;用于控制键值对的存活时间。当 TTL 到期后&#xff0c;Redis 会自动删除该键&#xff0c;避免长期占用…

人形机器人,自动驾驶“老炮”创业第二站

造一台人形机器人&#xff0c;或许正在成为2025年最炙手可热的事情。 从去年第四季度开始&#xff0c;伴随着大模型应用的深入&#xff0c;具身智能概念被点燃&#xff0c;其中最鲜明的一个特点是&#xff0c;大量自动驾驶大佬的转行加入。 随便说几个比较有分量的&#xff0…

MFC常用操作

1&#xff0c;获取STATIC控件的值 CString str; m_STATIC2.GetWindowText(str);//获取STATIC控件的值 MessageBox(str); 2.设置EDIT控件的值 m_EDIT2.SetWindowText(str); GetDlgItem(IDC_EDIT1)->SetWindowText("Leave"); 3.移动控件 m_EDIT2.SetWindowPos(…

Flutter_学习记录_基本组件的使用记录

1.TextWidge的常用属性 1.1TextAlign: 文本对齐属性 常用的样式有&#xff1a; TextAlign.center 居中TextAlign.left 左对齐TextAlign.right 有对齐 使用案例&#xff1a; body: Center(child: Text(开启 TextWidget 的旅程吧&#xff0c;珠珠, 开启 TextWidget 的旅程吧&a…

什么是COLLATE排序规则?

在当今数字化世界中&#xff0c;数据的整理、比较和排序是至关重要的。在数据库管理和编程语言中&#xff0c;我们经常需要对字符串进行排序&#xff0c;以展示或处理信息。为了实现这一点&#xff0c;各种系统和工具提供了排序规则&#xff0c;其中COLLATE排序规则就是其中的一…

打印输入单词字符数量统计直方图-C语言第二版

1. 编程要求 多年前写过一篇用 C 语言实现打印单词字符数量统计的直方图的文章, 现在看上去有些混乱, 对一些任务划分不清晰, 全部混在一起. 于是重写了这个编程题, 希望可以给初学者一些参考, 并且我分别用 C, C, Java, Python 四种语言完成了这道编程题, 有兴趣的可以看我另…

【QT】-explicit关键字

explicit explicit 是一个 C 关键字&#xff0c;用于修饰构造函数。它的作用是防止构造函数进行隐式转换。 为什么需要 explicit&#xff1f; 在没有 explicit 的情况下&#xff0c;构造函数可以用于隐式类型转换。这意味着&#xff0c;如果你有一个接受某种类型的参数的构造…

【C++模板】:如何判断自定义类型是否实现某个函数

一、引子 偶尔我们会面对这样的尴尬的场景&#xff0c;我们需要显示的去判断在某个自定义类型中&#xff0c;是否已经提供了我们期待的API接口&#xff0c;以避免产生“莫须有”的错误。阁下该如何破解此问题&#xff01; 这里&#xff0c;直接给出一种通用的方法&#xff0c;…

认识Django项目模版文件——Django学习日志(二)

1.默认文件介绍 └── djangoproject1/├── djangoproject1/│ ├── urls.py [URL和函数的对应关系]【常用文件】│ ├── settings.py [项目配置文件]【常用文件】│ ├── _init_.py│ ├── wsgi.py [接受网络请求] 【不要动】│ └──…

【JS逆向】前端加密对抗基础

目录 逆向基础断掉调试基本常见的加解密方式 encrypt-labs靶场搭建过程靶场基本教程AES加密key前端体现(固定key)AES服务端获取keyRSA非对称加密DES加密存在规律key明文加签sign加签key在服务端signAESRAS组合 逆向基础 断掉调试 通过浏览器站点控制台(F12)&#xff0c;进行断…

C# 多线程同步(Mutex | Semaphore)

Mutex: 用于保护临界区&#xff0c;确保同一时间只有一个线程能够访问共享资源&#xff1b; Semaphore: 允许同时有多个线程访问共享资源&#xff0c;但会限制并发访问的数量。 Mutex运行输出 Semaphore运行输出 namespace SyncThreadDemo {internal class Program{static stri…