2024.1.8 Day04_SparkCore_homeWork

目录

1. 简述Spark持久化中缓存和checkpoint检查点的区别

2 . 如何使用缓存和检查点?

3 . 代码题

浏览器Nginx案例

先进行数据清洗,做后续需求用

1、需求一:点击最多的前10个网站域名

2、需求二:用户最喜欢点击的页面排序TOP10

3、需求三:统计每分钟用户搜索次数

学生系统案例

4. RDD依赖的分类

5. 简述DAG与Stage 形成过程 

DAG :  

Stage : 


1. 简述Spark持久化中缓存和checkpoint检查点的区别

1- 数据存储位置不同
    缓存: 存储在内存或者磁盘 或者 堆外内存中
    checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
    缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
    checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
    缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
    checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
    
4- 主要作用不同:
    缓存: 提高Spark程序的运行效率
    checkpoint检查点: 提高Spark程序的容错性

2 . 如何使用缓存和检查点?

       将两种方案同时用在一个项目中, 先设置缓存,再设置检查点 ,  最后一同使用Action算子进行触发, 这样程序只会有一次IO操作, 如果先设置检查点的话,就会有2次IO操作;

         当在后续工程中读取数据的时候,优先从缓存中读取,如果缓存中没有数据, 再从检查点读取数据,并且会将数据缓存一份到内存中 ,后续直接从缓存中读取数据

3 . 代码题

浏览器Nginx案例

        

先进行数据清洗,做后续需求用

import os
from pyspark import SparkConf, SparkContext,StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldif __name__ == '__main__':
# 1- 创建SparkSession对象conf = SparkConf().setAppName('需求1').setMaster('local[*]')sc = SparkContext(conf=conf)
# 2- 数据输入init_rdd = sc.textFile('file:///export/data/2024.1.2_Spark/1.6_day04/SogouQ.sample')# 3- 数据处理filter_tmp_rdd = init_rdd.filter(lambda line:line.strip()!='')print('过滤空行的数据',filter_tmp_rdd.take(10))map_rdd = filter_tmp_rdd.map(lambda line:line.split())print('map出来的数据',map_rdd.take(10))len6_rdd = map_rdd.filter(lambda line:len(line)==6)print('字段数为6个的字段',len6_rdd.take(10))etl_rdd = len6_rdd.map(lambda list:(list[0],list[1],list[2][1:-1],list[3],list[4],list[5])  )print('转换成元组后的数据',etl_rdd.take(10))# 设置缓存etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

1、需求一:点击最多的前10个网站域名

print('点击最多的前10个网站域名','-'*50)website_map_rdd = etl_rdd.map(lambda tup:(tup[5].split('/')[0],1))print('把网站域名切出来,变成(hello,1)的格式',website_map_rdd.take(10))website_reducekey_rdd = website_map_rdd.reduceByKey(lambda agg,curr:agg+curr)print('进行聚合',website_reducekey_rdd.take(10))sort_rdd =website_reducekey_rdd.sortBy(lambda tup:tup[1],ascending=False)print('进行降序排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()

2、需求二:用户最喜欢点击的页面排序TOP10

    print('用户最喜欢点击的页面排序TOP10','-'*100)top_10_order = etl_rdd.map(lambda tup:(tup[4],1))print('点击量排行',top_10_order.take(10))top_10_reducebykey = top_10_order.reduceByKey(lambda agg,curr:agg+curr)print('进行聚合',top_10_reducebykey.take(10))sortby_top10 = top_10_reducebykey.sortBy(lambda line:line[1],ascending=False)print('进行排序',sortby_top10.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()

3、需求三:统计每分钟用户搜索次数

    print('统计每分钟用户搜索次数','-'*50)search_map_rdd = etl_rdd.map(lambda tup:(tup[0][0:5],1))print('把网站域名切出来,变成(hello,1)的格式',search_map_rdd.take(10))search_reducekey_rdd = search_map_rdd.reduceByKey(lambda agg,curr:agg+curr)print('进行聚合',search_reducekey_rdd.take(10))sort_rdd =search_reducekey_rdd.sortBy(lambda tup:tup)print('按照时间进行排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()

学生系统案例

 数据准备

import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldif __name__ == '__main__':
# 1- 创建SparkSession对象conf = SparkConf().setAppName('学生案例').setMaster('local[*]')sc = SparkContext(conf=conf)
# 2- 数据输入init_rdd= sc.textFile('hdfs://node1:8020/input/day04_home_work.txt')# 3- 数据处理stu_rdd = init_rdd.map(lambda line:line.split(',')).cache()print('切分后的数据为',stu_rdd.collect())
# 1、需求一:该系总共有多少学生stu_cnt = stu_rdd.map(lambda line:line[0]).distinct().count()print(f'该系总共有{stu_cnt}个学生')
# 2、需求二:该系共开设了多少门课程subject_cnt = stu_rdd.map(lambda line:line[1]).distinct().count()print(f'该系共开设了{subject_cnt}门课程')
# 3、需求三:Tom同学的总成绩平均分是多少tom_score_sum = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:int(line[2])).sum()tom_subject_num = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:line[1]).distinct().count()tom_score_avg = tom_score_sum/tom_subject_numprint(f'Tom同学的总成绩平均分是{round(tom_score_avg,2)}')# 4、需求四:求每名同学的选修的课程门数
#     every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct().map(lambda tup: (tup[0], 1))\
#         .reduceByKey(lambda agg, curr: agg + curr).collect()every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct()print('学生与选修课,把一个学生重修一门选修课的情况去掉',every_student_course_num.collect())every_student_course_num2 = every_student_course_num\.map(lambda tup:(tup[0],1))\.reduceByKey(lambda agg,curr:agg+curr).collect()print('每个同学的选修课数',every_student_course_num2)
# 5、需求五:该系DataBase课程共有多少人选修subject_database = stu_rdd.filter(lambda line:line[1]=='DataBase').map(lambda line:line[0]).distinct().count()print(f'数据库有{subject_database}人选修')
# 6、需求六:各门课程的平均分是多少total_score = stu_rdd.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))print('各科总分为',total_score.collect())total_num = stu_rdd.map(lambda x: (x[1], 1)).groupByKey().map(lambda x: (x[0], sum(x[1])))print('各科的数量为',total_num.collect())#total_join =total_score.join(total_num)print('join后结果',total_join.collect())
# 各科总分为 [('DataBase', 170), ('Algorithm', 110), ('DataStructure', 140)]
# 各科的数量为 [('DataBase', 2), ('Algorithm', 2), ('DataStructure', 2)]
# 合表后为 [('DataBase', (170, 2)), ('DataStructure', (140, 2)), ('Algorithm', (110, 2))]total_avg =total_score.join(total_num).map(lambda x: (x[0], round(x[1][0] / x[1][1], 2))).collect()print('各科目的平均分为',total_avg)
# 4- 数据输出# 5- 释放资源sc.stop()

4. RDD依赖的分类

窄依赖:  父RDD分区与子RDD分区是一对一关系

宽依赖:  父RDD分区与子RDD分区是一对多关系

5. 简述DAG与Stage 形成过程 

DAG :  

1-Spark应用程序,遇到了Action算子以后,就会触发一个Job任务的产生。Job任务首先将它所依赖的全部算子加载到内存中,形成一个完整Stage

2-会根据算子间的依赖关系,从Action算子开始,从后往前进行回溯,如果算子间是窄依赖,就放到同一个Stage中;如果是宽依赖,就形成新的Stage。一直回溯完成。

Stage : 

1-Driver进程启动成功以后,底层基于PY4J创建SparkContext对象,在创建SparkContext对象的过程中,还会同时创建DAGScheduler(DAG调度器)和TaskScheduler(Task调度器)
    DAGScheduler: 对Job任务形成DAG有向无环图和划分Stage阶段
    TaskScheduler: 调度Task线程给到Executor进程进行执行

2-Spark应用程序遇到了一个Action算子以后,就会触发一个Job任务的产生。SparkContext对象将Job任务提交DAG调度器,对Job形成DAG有向无环图和划分Stage阶段。并且确定每个Stage阶段需要有多少个Task线程,将这些Task线程放置在TaskSet集合中。再将TaskSet集合给到Task调度器。

3-Task调度器接收到DAG调度器传递过来的TaskSet集合以后,将Task线程分配给到具体的Executor进行执行,底层是基于调度队列SchedulerBackend。Stage阶段是一个一个按顺序执行的,不能并行执行。

4-Executor进程开始执行具体的Task线程。后续过程就是Driver监控多个Executor的执行状态,直到Job任务执行完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615466.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一卡通水控电控开发踩过的坑

最近在做一个项目,是对接一卡通设备的。我一开始只拿到设备和3个文档开局。不知道从哪下手。一步一步踩坑过来。踩了很多没有必要的坑,写出来给有用的人吧。 读卡器怎么用? 有个读卡器,一开始什么软件也不提供。我都不知道是干嘛…

Jupyter Notebook

2017年左右在大学里都听说过Jupyter Notebook,并且也安装用了一段时间,后来不知道什么原因没有用了。估计是那时候写代码的时候多一些,因为它可以直接写代码并运行结果,现在不怎么写代码了。 介绍 后缀名为.ipynb的json格式文件…

《YOLO算法:基础+进阶+改进》报错解决 专栏答疑

前言:Hello大家好,我是小哥谈。《YOLO算法:基础进阶改进》专栏上线后,部分同学在学习过程中提出了一些问题,笔者相信这些问题其他同学也有可能遇到。为了让大家可以更好地学习本专栏内容,笔者特意推出了该篇…

SpringBoot集成Camunda

一&#xff1a;SpringBoot集成 1.1&#xff1a;pom.xml 因camunda集成SpringBoot对SpringBoot的版本和JDK的版本都有一定的要求&#xff0c;所以这里贴个完整的依赖。可以去官网找每个SpringBoot的版本对应的camunda版本。 <?xml version"1.0" encoding"…

前端入门教程:学完即可单独完成前端项目

目录 目录 1.HTML: 1.1概念 1.2结构 1.3常见的标签使用分类&#xff1a; 2.CSS: 2.1概念 2.2样式实践&#xff1a; 以下的举例都来自于博客&#xff1a; 2.3css选择器&#xff1a; 什么是css选择器&#xff1a; 举例如下&#xff1a; 2.4Demo 3.JavaScript&#…

算法34:贴纸拼词(力扣691题)

题目&#xff1a; 我们有 n 种不同的贴纸。每个贴纸上都有一个小写的英文单词。 您想要拼写出给定的字符串 target &#xff0c;方法是从收集的贴纸中切割单个字母并重新排列它们。如果你愿意&#xff0c;你可以多次使用每个贴纸&#xff0c;每个贴纸的数量是无限的。 返回你…

vivado IP Revision Control

2020.2 只需要git 管理 prj.xpr 和 prj.srcs/ https://china.xilinx.com/video/hardware/ip-revision-control.html https://www.xilinx.com/video/hardware/vivado-design-suite-revision-control.html

每日一题:LeetCode-LCR 007. 三数之和

每日一题系列&#xff08;day 18&#xff09; 前言&#xff1a; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f50e…

linux高级篇基础理论十一(GlusterFS)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️不能因为人生的道路坎坷,就使自己的身躯变得弯曲;不能因为生活的历程漫长,就使求索的 脚步迟缓。 ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xff1a;云计算技…

VS中动态库的创建和调用

VS中动态库的创建和调用 库 ​ 库是写好的现有的&#xff0c;成熟的&#xff0c;可以复用的代码。库的存在形式本质上来说库是一种可执行代码的二进制。 ​ 库有两种&#xff1a;静态库&#xff08;.a、.lib&#xff09;和动态库&#xff08;.so、.dll&#xff09;。所谓静态…

一键操作完整的部署项目流程

目录 一、常见的搭配 二、完整的部署项目流程具体步骤 2.1、安装jdk 安装jdk &#xff1a; 配置环境&#xff1a; 检查是否成功&#xff1a; 2.2、配置tomcat外部访问 下载解压软件 安装tomcat 测试tomcat安装是否成功 2.3、安装MySQL 安装vcc环境 命令输入步骤 安…

应用在热能表领域中的数字温度传感芯片

热能表&#xff0c;是适用于测量在热交换环路中&#xff0c;被称作载热液体的液体所吸收或转换热能的仪器&#xff0c;它由流量传感器、温度传感器和热能积算仪三部分组成。热量表&#xff08;热表&#xff09;又称热能表、热能积算仪&#xff0c;既能测量供热系统的供热量又能…

多模态推荐系统综述:二、特征交互 Fusion

二、Fusion 融合不同的多模态信息&#xff0c;与bridge相比&#xff0c;融合更关注项目之间的多模态内部关系。 它可以灵活地融合不同权重和焦点的多模态信息。 注意机制是应用最为广泛的特征融合。 2.1 粗粒度注意力。 一些模型应用注意力机制在粗粒度级别融合来自多种模式…

K8S集群重新初始化--详细过程

K8S集群重新初始化 0、当前环境1、master节点1.1、在master节点执行下面reset命令&#xff1a;1.2、手动清除配置信息&#xff0c;这一步很关键&#xff1a;1.3、重新引导集群1.4、创建配置目录&#xff0c;并复制权限配置文件到用户目录下&#xff1a;1.5 查看集群状态1.6 安装…

小红书私信组件功能解读,商家如何使用

今年八月&#xff0c;小红书私信组件上新了两大新功能。新功能的出现&#xff0c;无疑为商家与消费者的沟通建联&#xff0c;提供了新的可能。今天我们来针对小红书私信组件功能解读&#xff01; 一、小红书私信组件新功能 这次小红书私信组件上新的两大功能分别是&#xff0c;…

GEE查看SMAP的L3级土壤水分产品并导出为TIFF

SMAP的L3级产品&#xff0c;时间分辨率为每日&#xff0c;空间分辨率为9KM&#xff0c;到2023年12月2日停止提供。 查看逐日的土壤水分变化 // 设置感兴趣区域&#xff08;Region of Interest&#xff09; var roi ee.FeatureCollection(projects/a-flyllf0313/assets/dacha…

【Spring Cloud】微服务架构演变及微服务架构介绍

文章目录 系统架构演变单体应用架构垂直应用架构分布式架构SOA 架构微服务架构 微服务架构介绍微服务架构的常见问题微服务架构的常见概念服务治理服务调用服务网关服务容错链路追踪 微服务架构的常见解决方案ServiceCombSpringCloudSpring Cloud Alibaba 总结 欢迎来到阿Q社区…

探索web技术与低代码开发的融合应用

随着物联网、云计算和人工智能等技术的迅猛发展&#xff0c;现代软件开发正面临着日益增长的需求和复杂性。为了应对这一挑战&#xff0c;一种被称为低代码开发的快速、可视化开发方法逐渐崭露头角。本文将探讨低代码开发与web技术的融合应用&#xff0c;以及这种趋势对软件开发…

答题小程序源码系统:自带流量主广告位+视频激励广告 带完整的代码安装包以及搭建教程

随着互联网的迅速发展&#xff0c;各种应用程序层出不穷&#xff0c;而答题类小程序由于其独特的互动性和吸引力&#xff0c;成为了当前最热门的应用之一。答题小程序源码系统是一款基于微信小程序开发的源代码系统&#xff0c;它具有丰富的功能和灵活的定制性&#xff0c;可以…

git修改最新提交(commit)信息

一、修改最近一次commit信息 1、首先通过git log查看commit信息 2、使用命令git commit --amend进入命令命令模式&#xff0c;按i进入编辑模式&#xff0c;修改好commit信息后按Esc键退出编辑模式&#xff0c;然后输入:wq保存编辑信息&#xff08;注意使用英文输入法&#xf…