面试系列之《Spark》(持续更新...)

参考文档及示例代码均基于pyspark==3.1.2

  • 1.什么是RDD?
  • 2.job、stage、task如何划分?
  • 3.什么是宽窄依赖?
  • 4.spark有哪几种部署模式?
  • 5.spark中的算子分为哪些类型,举例说明。
  • 6.cache、persist、checkpoint的区别,及各自的使用场景?
  • 7.广播变量与累加器。
  • 8.reduceByKey与groupByKey的区别?
  • 9.spark数据倾斜及通用调优。
  • 10.map与flatMap区别?
  • 11.spark中的shuffle有哪几种方式?

1.什么是RDD?

RDD,弹性分布式数据集(Resilient Distributed Datasets),即一个分布于多个节点机器上的数据集合。为开发人员提供编程抽象,具有只读的特点。这里只读的意思是,当对RDD中的数据修改时,并不修改原RDD,而是返回一个新的RDD。注意RDD本身并不保存数据,只是定义了一组计算规则。
RDD中的弹性体现在:
1)容错性:包括基于血缘关系的容错和自动失败重试的容错。

  • 血缘关系的容错:RDD中一个分区的数据丢失,可以通过RDD间的血缘关系重新计算得到该分区的数据。单个节点的故障不影响其他节点的任务处理。
  • 自动失败重试的容错:包括task失败重试和stage失败重试,由spark自动支持。且stage失败重试时只重试任务失败的分区,而不是全部计算。

2)计算存储方面:内存和磁盘空间的自动切换和管理。包括计算过程中RDD的存储,及持久化时持久化级别的动态管理。

  • 计算过程中RDD的存储:当内存使用完毕时自动溢写磁盘,使得内存较小时也可以处理大数据量。
  • 持久化方面:开发者可以自定义选择持久化级别,包括持久化内存,持久化磁盘,持久化内存磁盘相结合的方式。

3)计算过程中可动态调整分区(repartition、coalesce)。

2.job、stage、task如何划分?

job:应用程序中每遇到一个action算子就会划分为一个job。
stage:一个job任务中从后往前划分,分区间每产生了shuffle也就是宽依赖则划分为一个stage,stage的划分体现了spark的pipeline思想,即数据在内存中尽可能的往后多计算,减少磁盘或者网络IO。
task:RDD中一个分区对应一个task。

3.什么是宽窄依赖?

根据分区之间是否产生shuffle来确定。
宽依赖:上游一个分区的数据被打散到下游的多个分区,1:N
窄依赖:上游一个分区的数据全部进入到下游的一个分区,可以是1:1,也可以是N:1

4.spark有哪几种部署模式?

1.Local:本地模式,运行在单个机器,一般用作测试环境。
2.Standalone:一个基于Master+Slaves的资源调度集群。spark任务提交给Master调度管理,是spark自带的一个调度系统。
3.Yarn:spark客户端直接连接yarn,不需要额外构建spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:driver程序的运行节点。yarn-client时driver运行在本地提交任务的客户端,yarn-cluster是driver运行在集群中随机的任一节点。
4.Mesos:比较少用,不了解。
5.K8s:spark后续高版本新增支持。

5.spark中的算子分为哪些类型,举例说明。

spark中算子类型分为两类:
1)转换算子(Transformation):惰性求值,需要action算子进行触发才会执行。返回一个新的RDD。不负责数据存储,只是定义了一个计算规则。

  • map:对RDD中的每个元素应用规则。
    filter:对RDD中的每个元素按规则过滤。
    groupByKey:将相同key的数据合并。
    glom:将RDD中的每个分区合并为一个列表。
    union:合并两个RDD。
    simple:抽样。
    注:关于持久化类算子,也有人叫控制算子(cache、persist、checkpoint),严格意义上也属于转换算子,需要动作算子才能触发。

2)动作算子(Action):触发spark任务执行,立即构建DAG有向无环图,不返回RDD,返回RDD的结果或者没有返回值。

  • collect:以数组形式获取RDD中所有元素。
    count:获取RDD中元素个数。
    first:获取RDD中的第一个元素,等价于take(1)。
    take:通过指定参数n获取RDD中前n个元素。
    top:通过指定参数n获取RDD中排序后的前n个元素。

更多RDD相关API参考官方文档:https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.html#rdd-apis

6.cache、persist、checkpoint的区别,及各自的使用场景?

共同点:1)都用来做持久化,避免多个action算子对同一个RDD的重复计算。2)都遵循spark的惰性执行策略,需要通过action算子触发执行。
区别:

  • cache:仅持久化到内存,MEMORY_ONLY级别。等价于persist的默认持久化级别。
  • persist:默认持久化到内存(MEMORY_ONLY),但同时支持开发者自定义存储级别,例如仅磁盘(DISK_ONLY),磁盘内存结合(MEMORY_AND_DISK)。
    更多的存储级别设置及使用场景参考:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#rdd-persistence
  • checkpoint:将数据持久化到节点指定路径中(sc.setCheckpointDir方法设置),如果执行模式是cluster则检查点路径必须为HDFS路径。该方法与上述两种方法最大的不同点在于会截断RDD的血缘关系,而上述两种方法不会截断血缘关系,只是起到了缓存数据避免重复计算的作用。checkpoint实际使用中有两点需要注意:1)checkpoint之前不要触发RDD的动作算子,否则会截断血缘关系,导致checkpoint重新计算时找不到血缘链条从而保存不到数据。2)checkpoint前最好将需要保存的RDD通过cache或者persist缓存一下,避免RDD的重复计算。

7.广播变量与累加器。

广播变量和累加器是spark中提供的两种共享变量,分别用来解决广播通信和任务结果汇总的两种业务场景问题。详细参考官方文档:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#shared-variables

1)广播变量

简而言之,就是在每个集群节点中缓存一份driver端定义的公共变量,且该被广播的变量在executor中只读。
当不使用广播变量的时候,spark任务中需要用到的公共变量会copy到每个task中,这种方式弊端一是重复存储占用内存资源,二是增加了IO操作。而使用广播变量,driver端定义的公共变量只会往每个集群中的worker节点中copy一份,由executor中的所有task共享。且该方法的底层实现涉及到了序列化与反序列化以及高效的广播算法,所以效率比较高。

demo:

from pyspark.sql import SparkSession"""
需求:从rdd中过滤掉singer中歌手的歌曲
"""
spark = SparkSession.builder \.master("local[*]") \.appName("broadcast_demo") \.config("spark.executor.instances", "4") \.config("spark.executor.cores", "2") \.config("spark.executor.memory", "1g") \.getOrCreate()
sc = spark.sparkContextrdd = sc.parallelize([("梁静茹", "向左转向右转"), ("梁静茹", "亲亲"), ("王诗安", "Home"), ("李宗盛", "山丘"), ("邵夷贝", "未来俱乐部")], 2)
print(f"过滤前:{rdd.collect()}")singer = ["梁静茹", "王诗安"]
# 设置广播变量并将singer广播到executor
bc = sc.broadcast(singer)# 根据广播变量过滤并输出过滤结果
rdd_filter = rdd.filter(lambda x: x[0] not in bc.value)
print(f"过滤后:{rdd_filter.collect()}")sc.stop()
spark.stop()

在这里插入图片描述

2)累加器

累加器,简要的概括,是一种分布式共享只写变量。在driver端定义,并被序列化到每个executor中,在使用时被反序列化。所有executor中的task持有一个累加器的副本进行累加操作。并将结果回传给driver进行汇总。spark原生支持数值型累加器,也支持开发人员自定义累计器类型。

demo:

from pyspark.sql import SparkSession"""
需求:统计rdd中属于singer中歌手的歌曲数量
"""
spark = SparkSession.builder \.master("local[*]") \.appName("accumulator_demo") \.config("spark.executor.instances", "4") \.config("spark.executor.cores", "2") \.config("spark.executor.memory", "1g") \.getOrCreate()
sc = spark.sparkContextrdd = sc.parallelize([("梁静茹", "向左转向右转"), ("梁静茹", "亲亲"), ("王诗安", "Home"), ("李宗盛", "山丘"), ("邵夷贝", "未来俱乐部")], 2)
singer = ["梁静茹", "王诗安"]# 初始化一个初值为0的累加器
acc = sc.accumulator(0)# 定义map函数,统计属于singer的歌曲数量
def map_fun(x, s):if x[0] in s:acc.add(1)# 使用collect算子触发执行map函数并输出结果
rdd.map(lambda x: map_fun(x, singer)).collect()
print(f"属于singer的歌曲数量:{acc.value}")sc.stop()
spark.stop()

在这里插入图片描述

8.reduceByKey与groupByKey的区别?

https://blog.csdn.net/atwdy/article/details/133155108

9.spark数据倾斜及通用调优。

10.map与flatMap区别?

map:对RDD中的每个元素应用规则,并返回一个新的元素。也就是结果RDD的元素数量与原始RDD元素数量相等。
flatMap:对RDD中每个元素应用规则,并返回一个集合,集合中的元素可以为0个或多个。在此基础之上,再对所有的集合进行flat平铺操作,可以理解为将各个集合元素合并到一起。

demo:

from pyspark.sql import SparkSessionspark = SparkSession.builder \.master("local[*]") \.appName("demo") \.config("spark.executor.instances", "4") \.config("spark.executor.cores", "2") \.config("spark.executor.memory", "1g") \.getOrCreate()
sc = spark.sparkContextrdd = sc.parallelize([2, 3, 4], 2)
rdd1 = rdd.map(lambda x: range(1, x))
rdd2 = rdd.flatMap(lambda x: range(1, x))print(f"map: {rdd1.collect()}")
print(f"flatMap: {rdd2.collect()}")sc.stop()
spark.stop()

在这里插入图片描述

11.spark中的shuffle有哪几种方式?

两种。早期的HashShuffle,和后期的SortShuffle。
HashShuffle(后续高版本已被SortShuffle取代):

  • 未优化:基于对下游分区个数hash取模实现,下游有多少个分区,上游每个task都会产生多少个小文件,带来的问题是小文件过多,增大磁盘和网络IO,拖慢执行效率。同时上游每个task维护了多个小文件缓冲区,增加内存压力。理论上的小文件个数 = map task数量 x 下游分区数量。
  • 优化后:HashShuffle的优化其实就是针对上游task产生的小文件的合并优化。未优化前,每个task维护各自的缓冲区并生成和下游分区数量相等的小文件,优化后,每个executor中属于同一个的core的task,会产生和下游分区数量相等的小文件并复用同一组小文件。所以理论上的小文件个数 = 上游core个数 x 下游分区数量。

SortShuffle:

  • 普通SortShuffle:上游的每个map task会不断地往磁盘溢写小文件(溢写前会进行排序),每次溢写产生一个小文件,最终将所有属于同一个task溢写的小文件merge为一个大文件,并且产生一个索引文件,下游的reduce task根据索引文件去读取属于自己分区的数据。即产生的小文件个数 = map task数量 x 2。
  • bypass机制:这种机制,可以理解为,在未优化的HashShuffle机制基础上,对同一个task产生的小文件进行了一个合并的功能,产生一个大文件,同时生成一个索引文件。这种机制相比普通SortShuffle省略了排序的过程。产生的文件个数 = map task数量 x 2。触发该机制的两个阈值条件:1)reduce task数量 < spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。2)不是聚合类的shuffle算子。准确来说,不是map端预聚合的算子(eg:reduceByKey,因为为了聚合的高效,通常要求数据有序,而bypass机制并不对数据排序)。

12.spark为什么比MR快?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/697741.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++模板为什么不能声明和定义分离

首先我们要直到C程序运行需要进行的四个阶段。 预处理->编译->汇编->链接 编译&#xff1a;对语法语义分析&#xff0c;分析无误生成汇编&#xff0c;头文件不参加编译&#xff0c;多个源文件是分开单独编译的。 链接&#xff1a;将多个obj文件链接合成一个&#x…

Sora----打破虚实之间的最后一根枷锁----这扇门的背后是人类文明的晟阳还是最后的余晖

目录 一.Sora出道即巅峰 二.为何说Sora是该领域的巨头 三.Sora无敌的背后究竟有怎样先进的处理技术 1.Spacetime Latent Patches 潜变量时空碎片&#xff0c;建构视觉语言系统 2.扩散模型与Diffusion Transformer&#xff0c;组合成强大的信息提取器 3.DiT应用于潜变量时…

关于在分布式环境中RVN和使用场景的介绍4

简介 在前面的文档中&#xff0c;我们介绍了RVN的概念&#xff0c;通过RVN可以解决的某类问题和使用技巧&#xff0c;以及处理RVN的逻辑的具体实现。在本文中&#xff0c;我们将要介绍关于如何使用RVN解决另一种在分布式系统中常出现的问题。 问题 假设我们创建了一个servic…

pikachu靶场-CSRF

CSRF: 介绍&#xff1a; Cross-site request forgery简称为"CSRF”。 在CSF的攻击场景中攻击者会伪造一个请求&#xff08;这个请求一般是一个链接&#xff09; 然后欺骗目标用户进行点击&#xff0c;用户一旦点击了这个请求&#xff0c;整个攻击也就完成了&#xff0…

VSCode-更改系统默认路径

修改vscode中的默认扩展路径&#xff1a;"%USERPROFILE%\.vscode" 打开目录C:\用户\电脑用户名&#xff0c;将.vscode文件剪切至D:\VSCode文件夹下 用管理员身份打开cmd.exe命令界面输入mklink /D "%USERPROFILE%\.vscode" "D:\VSCode\.vscode\"…

同一个包下 golang run时报undefined

问题描述 今天在运行一个项目&#xff0c;一个包下有两个文件&#xff0c;分别是main.go和route&#xff0c;main函数在main.go文件中&#xff0c;main引用了route.go中的两个函数&#xff0c;SetupRoutes和SetupAdminRoutes go build 编译后&#xff0c;直接运行&#xff0c…

【C++私房菜】面向对象中的简单继承

文章目录 一、 继承基本概念二、派生类对象及派生类向基类的类型转换三、继承中的公有、私有和受保护的访问控制规则四、派生类的作用域五、继承中的静态成员 一、 继承基本概念 通过继承&#xff08;inheritance&#xff09;联系在一起的类构成一种层次关系。通常在层次关系的…

Leetcoder Day17| 二叉树 part06

语言&#xff1a;Java/C 654.最大二叉树 给定一个不含重复元素的整数数组。一个以此数组构建的最大二叉树定义如下&#xff1a; 二叉树的根是数组中的最大元素。左子树是通过数组中最大值左边部分构造出的最大二叉树。右子树是通过数组中最大值右边部分构造出的最大二叉树。 …

免费搭建个人网盘

免费搭建一个属于个人的网盘。 服务端 详情请参考原网站的服务端下载和安装虚拟磁盘Fuse4Ui可以支持把网盘内容挂载成系统的分区&#xff1b; 挂载工具效果图&#xff1a;应用端应用端的下载 效果图

短剧小程序系统,重塑视频观看体验的科技革命

随着科技的飞速发展&#xff0c;人们对于数字化内容的消费需求也在不断增长。在这个大背景下&#xff0c;短剧小程序作为一种新型的视频观看方式&#xff0c;正逐渐受到大众的青睐。本文将探讨短剧小程序的发展背景、特点以及市场前景&#xff0c;分析其在重塑视频观看体验方面…

如何使用Inno Setup制作Unity构建程序的Windows安装程序

1. 准备 &#xff08;1&#xff09;准备好Unity构建的程序集合 必须包括&#xff1a; Data文件夹&#xff08;xxx_Data&#xff09; Mono文件夹&#xff08;MonoBleedingEdge&#xff09; 打包的应用程序文件&#xff08;xxx.exe&#xff09; Unity播放器dll文件&#xff…

基于springboot+vue的大创管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

【selenium】执行 Javascript 脚本 滚动、元素的特殊操作等

某些特殊情况下&#xff0c;使用selenium的api无法操作页面元素&#xff0c;点击、滚动实现的某些功能&#xff0c;可以考虑通过执行js来完成。 为什么不用js写自动化&#xff1f;——selenium第一版是js写的&#xff0c;但js兼容性存在问题&#xff0c;所以引入webdriver 现在…

ad15 PCB3D模型导出到SOLIDWORKS

注意&#xff0c;工程文件目录不能用中文&#xff0c;否则导出的文件会不存在 将这个文件直接拖到 SOLIDWORKS 中 下一步很关键 显示出来了 另存为一个转配体就可以了

海思SD3403,SS928/926,hi3519dv500,hi3516dv500移植yolov7,yolov8(14)

自己挖了一个坑,准备做SS928/SD3403的Yolov8的移植,主要是后台私信太多人在问相关的问题。先别着急去写代码,因为在hi3516dv500下的移植还是比较顺利。之前在hi3519av100和hi3559av100系列时遇到过一些问题,所以没有继续去移植新的算法。 SS928架构乍一看和hi3559av100特别…

Ubuntu系统本地部署Inis博客结合内网穿透实现远程访问本地站点

文章目录 前言1. Inis博客网站搭建1.1. Inis博客网站下载和安装1.2 Inis博客网站测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 3. 公网访问测试总…

git 使用总结

文章目录 git merge 和 git rebasegit mergegit rebase总结 git merge 和 git rebase git merge git merge 最终效果说明&#xff1a; 假设有一个仓库情况如下&#xff0c;现需要进行 merge&#xff1a; merge 操作流程&#xff1a; merge 的回退操作&#xff1a; git reba…

在线进制转换工具

在线进制转换 - BTool在线工具软件&#xff0c;为开发者提供方便。 在线进制转换器提供了二进制&#xff0c;八进制&#xff0c;十进制&#xff0c;十六进制等相互转换功能。

K8S部署Java项目 pod报错 logs日志内容:no main manifest attribute, in app.jar

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

hash,以及数据结构——map容器

1.hash是什么&#xff1f; 定义&#xff1a;hash,一般翻译做散列、杂凑&#xff0c;或音译为哈希&#xff0c;是把任意长度的输入&#xff08;又叫做预映射pre-image&#xff09;通过散列算法变换成固定长度的输出&#xff0c; 该输出就是散列值。这种转换是一种压缩映射&…