大数据分析与应用实验任务十一

大数据分析与应用实验任务十一

实验目的

  • 通过实验掌握spark Streaming相关对象的创建方法;

  • 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法;

  • 熟悉spark Streaming的转换操作,包括无状态和有状态转换。

  • 熟悉spark Streaming输出编程操作。

实验任务

一、DStream 操作概述
  1. 创建 StreamingContext 对象

    登录 Linux 系统后,启动 pyspark。进入 pyspark 以后,就已经获得了一个默认的 SparkConext 对象,也就是 sc。因此,可以采用如下方式来创建 StreamingContext 对象:

    from pyspark.streaming import StreamingContext 
    sscluozhongye = StreamingContext(sc, 1)
    

    image-20231207112253827

    如果是编写一个独立的 Spark Streaming 程序,而不是在 pyspark 中运行,则需要在代码文件中通过类似如下的方式创建 StreamingContext 对象:

    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1)
    print("创建成功,lzy防伪")
    

    image-20231207112652285

二、基本输入源
  1. 文件流
  • 在 pyspark 中创建文件流

    首先,在 Linux 系统中打开第 1 个终端(为了便于区分多个终端,这里记作“数据源终端”),创建一个 logfile 目录,命令如下:

    cd /root/Desktop/luozhongye/
    mkdir streaming 
    cd streaming 
    mkdir logfile
    

    image-20231207112923323

    其次,在 Linux 系统中打开第二个终端(记作“流计算终端”),启动进入 pyspark,然后,依次输入如下语句:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    

image-20231207113305405

  • 采用独立应用程序方式创建文件流

    #!/usr/bin/env python3 
    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    print("2023年12月7日lzy")
    

    保存该文件,并执行以下命令:

    cd /root/Desktop/luozhongye/streaming/logfile/ 
    spark-submit FileStreaming.py
    

image-20231207114014647

  1. 套接字流
  • 使用套接字流作为数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中输入如下内容:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()
    

    使用如下 nc 命令生成一个 Socket 服务器端:

    nc -lk 9999
    

    新建一个终端(记作“流计算终端”),执行如下代码启动流计算:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208002212790

  • 使用 Socket 编程实现自定义数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中输入如下代码:

    #!/usr/bin/env python3 
    import socket# 生成 socket 对象
    server = socket.socket()
    # 绑定 ip 和端口
    server.bind(('localhost', 9999))
    # 监听绑定的端口
    server.listen(1)
    while 1:# 为了方便识别,打印一个“I’m waiting the connect...”print("I'm waiting the connect...")# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例# 所以下面的传输要使用 conn 实例操作conn, addr = server.accept()# 打印连接成功print("Connect success! Connection is from %s " % addr[0])# 打印正在发送数据print('Sending data...')conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())conn.close()print('Connection is broken.')
    print("2023年12月7日lzy")
    

    执行如下命令启动 Socket 服务器端:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit DataSourceSocket.py
    

    新建一个终端(记作“流计算终端”),输入以下命令启动 NetworkWordCount 程序:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208003303167

  1. RDD 队列流

    Linux 系统中打开一个终端,新建一个代码文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,输入以下代码:

    #!/usr/bin/env python3 
    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":print("")sc = SparkContext(appName="PythonStreamingQueueStream")ssc = StreamingContext(sc, 2)# 创建一个队列,通过该队列可以把 RDD 推给一个 RDD 队列流rddQueue = []for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)# 创建一个 RDD 队列流inputStream = ssc.queueStream(rddQueue)mappedStream = inputStream.map(lambda x: (x % 10, 1))reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    下面执行如下命令运行该程序:

    cd /root/Desktop/luozhongye/streaming/rddqueue 
    /usr/local/spark/bin/spark-submit RDDQueueStream.py
    

image-20231208004439462

三、转换操作
  1. 滑动窗口转换操作

    对“套接字流”中的代码 NetworkWordCount.py 进行一个小的修改,得到新的代码文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其内容如下:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")ssc = StreamingContext(sc, 10)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()
    

为了测试程序的运行效果,首先新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

   cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999

然后,再新建一个终端(记作“流计算终端”),运行客户端程序 WindowedNetworkWordCount.py,命令如下:

   cd /root/Desktop/luozhongye/streaming/socket/ /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999

在数据源终端内,连续输入 10 个“hadoop”,每个 hadoop 单独占一行(即每输入一个 hadoop就按回车键),再连续输入 10 个“spark”,每个 spark 单独占一行。这时,可以查看流计算终端内显示的词频动态统计结果,可以看到,随着时间的流逝,词频统计结果会发生动态变化。

image-20231208005821701

  1. updateStateByKey 操作

    在“/root/Desktop/luozhongye/streaming/stateful/”目录下新建一个代码文件 NetworkWordCountStateful.py,输入以下代码:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairsinitialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.pprint()ssc.start()ssc.awaitTermination()
    

    新建一个终端(记作“数据源终端”),执行如下命令启动 nc 程序:

    nc -lk 9999
    

    新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

    cd /root/Desktop/luozhongye/streaming/stateful 
    /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
    

image-20231208010814959

四、把 DStream 输出到文本文件中

下面对之前已经得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代码进行简单的修改,把生成的词频统计结果写入文本文件中。

修改后得到的新代码文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的内容如下:

#!/usr/bin/env python3 
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")running_counts.pprint()ssc.start()ssc.awaitTermination()

新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

cd /root/Desktop/luozhongye/streaming/socket/ 
nc -lk 9999

新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

cd /root/Desktop/luozhongye/streaming/stateful 
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999

image-20231208012123002

实验心得

通过本次实验,我深入理解了Spark Streaming,包括创建StreamingContext、DStream等对象。同时,我了解了Spark Streaming对不同类型数据流的处理方式,如文件流、套接字流和RDD队列流。此外,我还熟悉了Spark Streaming的转换操作和输出编程操作,并掌握了map、flatMap、filter等方法。最后,我能够自定义输出方式和格式。总之,这次实验让我全面了解了Spark Streaming,对未来的学习和工作有很大的帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/214226.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 驱动开发需要掌握哪些编程语言和技术?

Linux 驱动开发需要掌握哪些编程语言和技术&#xff1f; 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「Linux从专业入门到高级教程工具包」&#xff0c;点个关注&#xff0c;全部无偿共享给大家&#xf…

1. mycat入门

1、mycat介绍 Mycat 是一个开源的分布式数据库系统&#xff0c;但是由于真正的数据库需要存储引擎&#xff0c;而 Mycat 并没有存 储引擎&#xff0c;所以并不是完全意义的分布式数据库系统。MyCat是目前最流行的基于Java语言编写的数据库中间件&#xff0c;也可以理解为是数据…

鸿蒙HarmonyOS4.0 入门与实战

一、开发准备: 熟悉鸿蒙官网安装DevEco Studio熟悉鸿蒙官网 HarmonyOS应用开发官网 - 华为HarmonyOS打造全场景新服务 应用设计相关资源: 开发相关资源: 例如开发工具 DevEco Studio 的下载 应用发布: 开发文档:

LeetCode 1631. 最小体力消耗路径:广度优先搜索BFS

【LetMeFly】1631.最小体力消耗路径&#xff1a;广度优先搜索BFS 力扣题目链接&#xff1a;https://leetcode.cn/problems/path-with-minimum-effort/ 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights &#xff0c;其中 heights[row][col] 表示格子 (ro…

视频如何提取文字?这四个方法一键提取视频文案

视频如何提取文字&#xff1f;你用过哪些视频提取工具&#xff1f;视频转文字工具&#xff0c;又称为语音识别软件&#xff0c;是一款能够将视频中的语音或对话转化为文字的实用工具。它运用了尖端的声音识别和语言理解技术&#xff0c;能精准地捕捉视频中的音频&#xff0c;并…

弧形导轨的工作原理

弧形导轨是一种能够将物体沿着弧形轨道运动的装置&#xff0c;它由个弧形轨道和沿着轨道运动的物体组成&#xff0c;弧形导轨的工作原理是利用轨道的形状和物体的运动方式来实现运动&#xff0c;当物体处于轨道上时&#xff0c;它会受到轨道的引导&#xff0c;从而沿着轨道的弧…

Nginx正则表达式

目录 1.nginx常用的正则表达式 2.location location 大致可以分为三类 location 常用的匹配规则 location 优先级 location 示例说明 优先级总结 3.rewrite rewrite功能 rewrite跳转实现 rewrite执行顺序 语法格式 rewrite示例 实例1&#xff1a; 实例2&#xf…

【Python必做100题】之第六题(求圆的周长)

圆的周长公式&#xff1a;C 2 * pi * r 代码如下&#xff1a; pi 3.14 r float(input("请输入圆的半径&#xff1a;")) c 2 * pi *r print(f"圆的周长为{c}") 运行截图&#xff1a; 总结 1、圆周长的公式&#xff1a;C 2 * pi * r 2、输出结果注意…

mybatis-plus查询的字段和mysql关键字重名

先看一下这个 TableField("show") 这个注解表示当前属性对应在数据库的字段为show&#xff0c;但是show在mysql中为关键字&#xff0c;直接查询会导致语法错误 正确写法应该是 但写sql由和mybatis-plus理念相违背&#xff0c; 并且无法轻松创建对应方法&#xff0…

数据结构之----逻辑结构、物理结构

数据结构之----逻辑结构、物理结构 目前我们常见的数据结构分别有&#xff1a; 数组、链表、栈、队列、哈希表、树、堆、图 而它们可以从 逻辑结构和物理结构两个维度进行分类。 什么是逻辑结构&#xff1f; 逻辑结构是指数据元素之间的逻辑关系&#xff0c;而逻辑结构又分为…

HCIA-H12-811题目解析(5)

1、【单选题】 以下关于Hybrid端口说法正确的有&#xff1f; 2、【单选题】使用命令"vlan batch 10 20"和"valn batch 10 to 20"&#xff0c;分别能创建的vlan数量是&#xff1f;&#xff08;&#xff09; 3、【单选题】二层ACL的编号范围是&#xff1f;…

Java毕业设计 SSM SpringBoot 在线学习系统

Java毕业设计 SSM SpringBoot 在线学习系统 SSM SpringBoot 在线学习系统 功能介绍 首页 图片轮播 视频推荐 在线学习 学习介绍 评论 收藏 资料中心 资料详情 下载资料 话题讨论 文档发布 试题中心 系统公告 登录 注册学生 个人中心 试题记录 错题本 我的收藏 算法演示 结果分…

持续集成交付CICD:Jenkins使用GitLab共享库实现前后端项目Sonarqube

目录 一、实验 1.Jenkins使用GitLab共享库实现后端项目Sonarqube 2.优化GitLab共享库 3.Jenkins使用GitLab共享库实现前端项目Sonarqube 4.Jenkins通过插件方式进行优化 二、问题 1.sonar-scanner 未找到命令 2.npm 未找到命令 一、实验 1.Jenkins使用GitLab共享库实现…

Flink之迟到的数据

迟到数据的处理 推迟水位线推进: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))设置窗口延迟关闭&#xff1a;.allowedLateness(Time.seconds(3))使用侧流接收迟到的数据: .sideOutputLateData(lateData) public class Flink12_LateDataC…

力扣编程题算法初阶之双指针算法+代码分析

目录 第一题&#xff1a;复写零 第二题&#xff1a;快乐数&#xff1a; 第三题&#xff1a;盛水最多的容器 第四题&#xff1a;有效三角形的个数 第一题&#xff1a;复写零 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 思路&#xff1a; 上期…

【SpringBoot教程】SpringBoot 统一异常处理(附核心工具类-ErrorInfoBuilder)

作者简介&#xff1a;大家好&#xff0c;我是撸代码的羊驼&#xff0c;前阿里巴巴架构师&#xff0c;现某互联网公司CTO 联系v&#xff1a;sulny_ann&#xff08;17362204968&#xff09;&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗…

曲线分板机主轴有何特点?如何选择合适的曲线分板机主轴?

在现代工业领域&#xff0c;分板机主轴作为重要的机械部件&#xff0c;其性能和质量对于生产效率和产品质量具有至关重要的影响。而在这其中&#xff0c;曲线分板机主轴则因为其独特的优势而被广泛应用于PCB电路板的切割和分板。面对市场上众多的曲线分板机主轴品牌&#xff0c…

前端知识(十三)——JavaScript监听按键,禁止F12,禁止右键,禁止保存网页【Ctrl+s】等操作

禁止右键 document.oncontextmenu new Function("event.returnValuefalse;") //禁用右键禁止按键 // 监听按键 document.onkeydown function () {// f12if (window.event && window.event.keyCode 123) {alert("F12被禁用");event.keyCode 0…

软件测试之缺陷管理

一、软件缺陷的基本概念 1、软件缺陷的基本概念主要分为&#xff1a;缺陷、故障、失效这三种。 &#xff08;1&#xff09;缺陷&#xff08;defect&#xff09;&#xff1a;存在于软件之中的偏差&#xff0c;可被激活&#xff0c;以静态的形式存在于软件内部&#xff0c;相当…

【隐马尔可夫模型】隐马尔可夫模型的观测序列概率计算算法及例题详解

【隐马尔可夫模型】用前向算法计算观测序列概率P&#xff08;O&#xff5c;λ&#xff09;​​​​​​​ 【隐马尔可夫模型】用后向算法计算观测序列概率P&#xff08;O&#xff5c;λ&#xff09; 隐马尔可夫模型是关于时序的概率模型&#xff0c;描述由一个隐藏的马尔可夫链…