Spark Streaming编程基础

文章目录

  • 1. 流式词频统计
    • 1.1 Spark Streaming编程步骤
    • 1.2 流式词频统计项目
      • 1.2.1 创建项目
      • 1.2.2 添加项目依赖
      • 1.2.3 修改源目录
      • 1.2.4 添加scala-sdk库
      • 1.2.5 创建日志属性文件
    • 1.3 创建词频统计对象
    • 1.4 利用nc发送数据
    • 1.5 启动应用,查看结果
  • 2. 编程模型的基本概念
  • 3. 离散化数据流
  • 4. 基本数据源
  • 5. 基本DStream转换操作
  • 6. DStream输出操作

1. 流式词频统计

  • 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用 nc 工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。

1.1 Spark Streaming编程步骤

  1. 添加SparkStreaming相关依赖
  2. 获取程序入口接收数据
  3. 对数据进行业务处理
  4. 获取最终结果
  5. 启动程序等待程序执行结束

1.2 流式词频统计项目

1.2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述

1.2.2 添加项目依赖

  • pom.xml文件里添加依赖
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
  • 刷新项目依赖
    在这里插入图片描述

1.2.3 修改源目录

  • java修改为scala
    在这里插入图片描述

  • pom.xml里设置源目录
    在这里插入图片描述

1.2.4 添加scala-sdk库

  • 在项目结构对话里添加
    在这里插入图片描述
  • 单击【Add to Modules】菜单项
    在这里插入图片描述
  • 单击【OK】按钮以后,就可以在scala里创建Scala Class
    在这里插入图片描述

1.2.5 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

1.3 创建词频统计对象

  • 创建net.huawei.streaming
    在这里插入图片描述
  • net.huawei.streaming包里创建SparkStreamingWordCount对象
    在这里插入图片描述
package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
  • 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。

1.4 利用nc发送数据

  • bigdata1节点利用nc发送数据,执行命令:nc -lp 9999
    在这里插入图片描述

1.5 启动应用,查看结果

  • 启动SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果
    在这里插入图片描述
  • 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用 updateStateByKeymapWithState 实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。

2. 编程模型的基本概念

3. 离散化数据流

4. 基本数据源

5. 基本DStream转换操作

6. DStream输出操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/68059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV imread函数读取图像__实例详解

OpenCV imread函数读取图像__实例详解 本文目录&#xff1a; 零、时光宝盒 一、imread函数定义 二、imread函数支持的文件格式 三、imread函数flags参数详解 &#xff08;3.1&#xff09;、Flags-1时&#xff0c;样返回加载的图像&#xff08;使用alpha通道&#xff0c;否…

ssm基于HTML5的红酒信息分享系统

SSM基于HTML5的红酒信息分享系统是一个专注于红酒领域的综合性信息平台&#xff0c;旨在为红酒爱好者、从业者以及普通消费者提供一个便捷的交流与获取红酒相关信息的空间。 一、系统背景与意义 随着人们生活水平的提高和消费观念的转变&#xff0c;红酒作为一种高雅的饮品&a…

Windows Docker Desktop安装及使用 Docker 运行 MySQL

Docker Desktop是Docker的官方桌面版&#xff0c;专为Mac和Windows用户设计&#xff0c;提供了一个简单易用的界面来管理和运行Docker容器。它集成了Docker引擎&#xff0c;为开发人员提供了一个快速、可靠、可扩展的方式来构建、运行和管理应用。DockerDesktop的优势在于&…

人形机器人,自动驾驶“老炮”创业第二站

造一台人形机器人&#xff0c;或许正在成为2025年最炙手可热的事情。 从去年第四季度开始&#xff0c;伴随着大模型应用的深入&#xff0c;具身智能概念被点燃&#xff0c;其中最鲜明的一个特点是&#xff0c;大量自动驾驶大佬的转行加入。 随便说几个比较有分量的&#xff0…

Flutter_学习记录_基本组件的使用记录

1.TextWidge的常用属性 1.1TextAlign: 文本对齐属性 常用的样式有&#xff1a; TextAlign.center 居中TextAlign.left 左对齐TextAlign.right 有对齐 使用案例&#xff1a; body: Center(child: Text(开启 TextWidget 的旅程吧&#xff0c;珠珠, 开启 TextWidget 的旅程吧&a…

什么是COLLATE排序规则?

在当今数字化世界中&#xff0c;数据的整理、比较和排序是至关重要的。在数据库管理和编程语言中&#xff0c;我们经常需要对字符串进行排序&#xff0c;以展示或处理信息。为了实现这一点&#xff0c;各种系统和工具提供了排序规则&#xff0c;其中COLLATE排序规则就是其中的一…

打印输入单词字符数量统计直方图-C语言第二版

1. 编程要求 多年前写过一篇用 C 语言实现打印单词字符数量统计的直方图的文章, 现在看上去有些混乱, 对一些任务划分不清晰, 全部混在一起. 于是重写了这个编程题, 希望可以给初学者一些参考, 并且我分别用 C, C, Java, Python 四种语言完成了这道编程题, 有兴趣的可以看我另…

认识Django项目模版文件——Django学习日志(二)

1.默认文件介绍 └── djangoproject1/├── djangoproject1/│ ├── urls.py [URL和函数的对应关系]【常用文件】│ ├── settings.py [项目配置文件]【常用文件】│ ├── _init_.py│ ├── wsgi.py [接受网络请求] 【不要动】│ └──…

【JS逆向】前端加密对抗基础

目录 逆向基础断掉调试基本常见的加解密方式 encrypt-labs靶场搭建过程靶场基本教程AES加密key前端体现(固定key)AES服务端获取keyRSA非对称加密DES加密存在规律key明文加签sign加签key在服务端signAESRAS组合 逆向基础 断掉调试 通过浏览器站点控制台(F12)&#xff0c;进行断…

C# 多线程同步(Mutex | Semaphore)

Mutex: 用于保护临界区&#xff0c;确保同一时间只有一个线程能够访问共享资源&#xff1b; Semaphore: 允许同时有多个线程访问共享资源&#xff0c;但会限制并发访问的数量。 Mutex运行输出 Semaphore运行输出 namespace SyncThreadDemo {internal class Program{static stri…

复位信号的同步与释放(同步复位、异步复位、异步复位同步释放)

文章目录 背景前言一、复位信号的同步与释放1.1 同步复位1.1.1 综述1.1.2 优缺点 1.2 recovery time和removal time1.3 异步复位1.3.1 综述1.3.2 优缺点 1.4 同步复位 与 异步复位1.5 异步复位、同步释放1.5.1 总述1.5.2 机理1.5.3 复位网络 二、思考与补充2.1 复…

git远程仓库如何修改

1.需要做的事情&#xff1a;把git的远程仓库修改掉&#xff0c;在git创建一个自己的仓库 如果你是私有化的话&#xff0c;可以生成一个自己token令牌也可以。到时候push的时候会让你登录你就可以输入你的token令牌和用户名。 2.查看当前仓库的远程地址是不是自己的 &#xff…

mysql 学习3 SQL语句--整体概述。SQL通用语法;DDL创建数据库,查看数据库,删除数据库,使用数据库;

SQL通用语法 SQL语句分类 DDL data definition language : 用来创建数据库&#xff0c;创建表&#xff0c;创建表中的字段&#xff0c;创建索引。因此成为 数据定义语言 DML data manipulation language 有了数据库和表以及字段后&#xff0c;那么我们就需要给这个表中 添加数…

【xcode 16.2】升级xcode后mac端flutter版的sentry报错

sentry_flutter 7.11.0 报错 3 errors in SentryCrashMonitor_CPPException with the errors No type named terminate_handler in namespace std (line 60) and No member named set_terminate in namespace std 替换sentry_flutter版本为&#xff1a; 8.3.0 从而保证oc的…

【回忆迷宫——处理方法+DFS】

题目 代码 #include <bits/stdc.h> using namespace std; const int N 250; int g[N][N]; bool vis[N][N]; int dx[4] {0, 0, -1, 1}; int dy[4] {-1, 1, 0, 0}; int nx 999, ny 999, mx, my; int x 101, y 101; //0墙 (1空地 2远方) bool jud(int x, int y) {if…

wireshark工具简介

目录 1 wireshark介绍 2 wireshark抓包流程 2.1 选择网卡 2.2 停止抓包 2.3 保存数据 3 wireshark过滤器设置 3.1 显示过滤器的设置 3.2 抓包过滤器 4 wireshark的封包列表与封包详情 4.1 封包列表 4.2 封包详情 参考文献 1 wireshark介绍 wireshark是非常流行的网络…

⽤vector数组实现树的存储(孩⼦表示法)c++

在我们遇到的算法题中&#xff0c; ⼀般给出的树结构都是有编号的&#xff0c;这样会简化我们之后存储树的操作 &#xff0c;⼀般提供两个信息&#xff1b; 结点的个数 n;n-1条x结点与y结点相连的边 题⽬描述: ⼀共9个结点셈 1号结点为根节点&#xff0c;接下来8⾏&#xff…

C语言-内存管理

1、malloc()函数 用于动态分配一块指定大小的内存&#xff0c;并返回指向这块内存的指针。如果分配失败&#xff0c; 返回 NULL。 int* ptr (int*)malloc(sizeof(int) * 10); // 分配一个包含 10 个整数的内存 if (ptr NULL) {printf("Memory allocation failed!\n&q…

蓝桥杯lesson3---string的使用

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” string的概念 string字符串是一种更加高级的封装&#xff0c;string字符串中包含了大量的方法&#xff0c;这些方法使得字符串的操作变得更加简单&#xff0c;string的使用&…

进制之间转换

「 一、十进制 二进制 」 1.十进制转二进制&#xff1a;一直除以2直到商为0&#xff0c;再反向取余数。 例&#xff1a;13&#xff08;十进制&#xff09;转1101&#xff08;二进制&#xff09; 2.二进制转十进制:最后一位数开始是2^0&#xff0c;然后一直按照指数递增的方式…