Spark核心组件解析:Executor、RDD与缓存优化

Spark核心组件解析:Executor、RDD与缓存优化

Spark Executor

Executor 是 Spark 中用于执行任务(task)的执行单元,运行在 worker 上,但并不等同于 worker。实际上,Executor 是一组计算资源(如 CPU 核心和内存)的集合,多个 executor 共享 worker 上的 CPU 和内存资源。

Executor 的功能

  • 任务执行:Executor 负责执行分配给它的任务,并返回结果到 driver 程序。

  • 缓存机制:如果应用程序调用了 cache() 或 persist() 函数,Executor 会通过 Block Manager 为RDD 提供缓存机制,优化重复计算。

  • 生命周期:Executor 存在于整个 Spark 应用的生命周期内。

Executor 的创建

Spark 在以下几种情况下创建 Executor:

  • 当资源管理器为 Standalone 或 YARN,且 CoarseGrainedExecutorBackend 进程接收到
    RegisteredExecutor 消息时;
  • 当使用 Mesos 资源管理器时,MesosExecutorBackend 进程注册时;
  • 在本地模式下,当 LocalEndpoint 被创建时。

创建成功后,日志会显示如下信息:

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

心跳发送线程

Executor 会定期向 driver 发送心跳信号以确保连接活跃。心跳线程通常是一个调度线程池,利用 ScheduledThreadPoolExecutor 来维持任务的实时性。

执行任务

Executor 通过 launchTask 方法来执行任务。这个方法会创建一个 TaskRunner 线程,并在 Executor Task Launch Worker 线程池中执行任务。

private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")

Spark RDD (Resilient Distributed Dataset)

RDD 是 Spark 的基础数据结构,表示一个不可变的分布式数据集。RDD 在集群中的各个节点上并行计算,并且具有弹性(容错性)和分布式的特性。

RDD 的特性

  • 弹性:RDD 是容错的,丢失的数据可以通过其父 RDD 重新计算。
  • 分布式:RDD 的数据分布在集群的不同节点上,支持分布式计算。
  • 不可修改:RDD 一旦创建,其数据不可修改,这也保证了数据的一致性。
  • 分区:RDD 会被划分为多个分区,以便并行处理。

RDD 的创建方式

(1)并行化:可以通过 SparkContext.parallelize() 方法从一个数据集合创建 RDD。
(2)从外部存储:可以通过 SparkContext.textFile() 等方法从外部存储系统(如 HDFS)加载数据创建 RDD。
(3)从其他 RDD:通过 Spark 的 Transformation 操作从已有的 RDD 创建新的 RDD。

RDD 操作

RDD 支持两种类型的操作:

  • Transformation 操作(转换):如 map()、filter(),返回新的 RDD。
  • Action 操作(行动):如 count()、collect(),触发实际计算并返回结果。

RDD 的容错性

RDD 提供容错能力。当某个节点失败时,可以根据其父 RDD 的计算逻辑恢复丢失的数据。这是通过 DAG(有向无环图)和父 RDD 关系来实现的。

RDD 的持久化

RDD 可以使用 cache() 或 persist() 进行持久化存储,缓存的 RDD 会存储在内存中,若内存不足则溢写到磁盘,避免重复计算。

RDD 的局限性

缺少内置优化引擎:RDD 无法像 DataFrame 和 Dataset 一样利用 Spark 的 Catalyst 优化器进行自动优化。

性能问题:随着数据量增大,RDD 计算的性能可能下降,尤其是与 JVM 垃圾回收和序列化相关的开销。

存储问题:当内存不足时,RDD 会将数据溢写到磁盘,这会导致计算性能大幅下降。

创建 RDD 的例子

  1. 并行化创建 RDD:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = data.map(_ * 2)
result.collect()  // 返回 [2, 4, 6, 8, 10]
  1. 从外部存储创建 RDD:
val rdd = sc.textFile("hdfs://path/to/file")
  1. 从其他 RDD 创建 RDD:
val newRdd = oldRdd.filter(_ > 10)

Spark RDD 缓存机制

Spark RDD 缓存是一种优化技术,用于将中间计算结果存储在内存中,以便在后续操作中复用,从而减少重复计算,提高性能。RDD 缓存可以显著加速一些需要迭代计算的应用,特别是在机器学习和图计算等场景中。

持久化 RDD

持久化操作会将 RDD 的计算结果存储到内存中。这样,每次对 RDD 进行操作时,Spark 会直接使用内存中的数据,而不必重新计算。通过持久化,可以避免重复计算从而提高效率。

  • cache():cache() 是 persist() 的简化方法,默认将 RDD 数据存储在内存中,使用 MEMORY_ONLY
    存储级别。
  • persist():可以通过 persist() 方法选择不同的存储级别,例如 MEMORY_ONLY、DISK_ONLY 等。
  • unpersist():用于移除已缓存的数据,释放内存。

RDD 持久化存储级别

Spark 提供了多种存储级别,每种级别的存储方式不同,根据具体的需求选择合适的存储级别。

存储级别使用空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY默认级别,数据未序列化,全部存储在内存中
MEMORY_ONLY_2数据存储 2 份
MEMORY_ONLY_SER数据序列化存储,占用更少内存
MEMORY_AND_DISK中等部分部分内存不够时,数据溢写到磁盘
MEMORY_AND_DISK_SER部分部分数据序列化,内存不够时溢写到磁盘
DISK_ONLY数据仅存储在磁盘中
OFF_HEAP----存储在堆外内存,目前为试验性选项

副本机制

带有 _2 后缀的存储级别表示在每个节点上缓存数据的副本。副本机制是为了提高容错性。如果某个节点的数据丢失,Spark 可以从其他节点的副本中恢复数据,而不必重新计算。

缓存策略的选择

  • MEMORY_ONLY:适用于内存足够大的场景,避免序列化和磁盘 I/O开销。性能较高,但如果内存不足可能会导致计算失败。
  • MEMORY_ONLY_SER:适用于内存较为紧张的场景,将数据进行序列化后保存在内存中,减少内存占用,但会增加序列化的开销。
  • MEMORY_AND_DISK:适用于内存不足的场景,数据无法完全存储在内存时会溢写到磁盘,确保数据不会丢失。
  • DISK_ONLY:适用于数据量极大的情况,全部数据存储在磁盘中,性能较低,但可以处理大规模数据。

如何使用 Spark RDD 缓存

缓存 RDD:

val rdd = sc.textFile("data.txt")
rdd.cache()  // 使用默认的 MEMORY_ONLY 存储级别

选择存储级别:

rdd.persist(StorageLevel.MEMORY_AND_DISK)  // 选择 MEMORY_AND_DISK 存储级别

清除缓存

rdd.unpersist()  // 移除缓存

Spark 键值对 RDD

Spark 通过 PairRDD 处理键值对类型的数据,提供了多种用于处理键值对数据的转换操作。

  1. 如何创建键值对 RDD
    通过 map 操作将普通 RDD 转换为键值对 RDD。
    Scala 示例:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))

Python 示例:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
  1. 常见的键值对操作
    reduceByKey(func):对具有相同键的值进行规约操作。
val pairs = sc.parallelize(List((1,2),(3,4),(3,6)))
val result = pairs.reduceByKey((a, b) => a + b)
println(result.collect().mkString(","))

groupByKey():对具有相同键的值进行分组。

val result = pairs.groupByKey()
println(result.collect().mkString(","))

mapValues():对值进行转换操作,但不改变键。

val result = pairs.mapValues(x => x + 1)
println(result.collect().mkString(","))

sortByKey():按键排序。

val sorted = pairs.sortByKey()
println(sorted.collect().mkString(","))
  1. 对两个 RDD 的操作
    join():连接两个 RDD,返回键相同的数据。
val other = sc.parallelize(List((3, 9)))
val joined = pairs.join(other)
println(joined.collect().mkString(","))

leftOuterJoin() 和 rightOuterJoin():左外连接和右外连接,分别确保第一个和第二个 RDD 中的键存在。

val leftJoined = pairs.leftOuterJoin(other)
val rightJoined = pairs.rightOuterJoin(other)
println(leftJoined.collect().mkString(","))
println(rightJoined.collect().mkString(","))

通过合理使用 Spark 的缓存和键值对 RDD 操作,可以显著提升大数据计算的效率,尤其是在迭代计算和需要频繁访问中间数据的场景下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/61756.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VH6501国产替代同星TH7011干扰仪?

文章目录 同星TH7011干扰仪VH6501有使用过TH7011的么,可以在评论区讨论一下~ 同星TH7011干扰仪 干货分享 | 一文详解同星CAN总线干扰仪的使用方法 VH6501

蓝桥杯每日真题 - 第23天

题目:(直线) 题目描述(12届 C&C B组C题) 解题思路: 题目理解: 在平面直角坐标系中,从给定的点集中确定唯一的直线。 两点确定一条直线,判断两条直线是否相同,可通过…

目录遍历漏洞-CVE-2021-41773

目录 简介 原理 例子 Apache路径穿越漏洞 环境搭建 漏洞原理 漏洞利用 简介 目录遍历漏洞(也称为路径遍历漏洞)是一种由于Web服务器或Web应用程序对用户输入的文件名称的安全性验证不足而导致的安全漏洞。 原理 目录遍历漏洞允许攻击者在未授权…

.NET9 - 新功能体验(三)

书接上回,我们继续来聊聊.NET9和C#13带来的新变化。 01、Linq新方法 CountBy 和 AggregateBy 引入了新的方法 CountBy 和 AggregateBy后,可以在不经过GroupBy 分配中间分组的情况下快速完成复杂的聚合操作,同时方法命名也非常直观&#xff0…

Android蓝牙架构,源文件目录/编译方式学习

Android 版本 发布时间 代号(Codename) Android 1.0 2008年9月23日 无 Android 1.1 2009年2月9日 Petit Four Android 1.5 2009年4月27日 Cupcake Android 1.6 2009年9月15日 Donut Android 2.0 2009年10月26日 Eclair Android 2.1 2…

YOLO-World解读:零基础学习开放世界模型

文章目录 一、摘要二、引言相关工作方法预训练公式模型架构可重新参数化的视觉-语言路径聚合网络(RepVL-PAN) 3.4 预训练方案 实验YOLO-World: 利用多样化数据集进行开放词汇对象检测的预训练方法YOLO-World: LVIS数据集上的零样本性能评估YOLO-World: 预…

信创改造 - TongRDS 安装方式之控制台安装【Window】

安装前准备 安装 jdk1.8 即可,并配上 环境变量 安装 1)解压缩 2)启动 进入安装路径的console\bin目录,在cmd命令行窗口运行console.bat 输入序号 1 如果想查看运行状态,可以重新执行 console.bat,然后输…

志愿者小程序源码社区网格志愿者服务小程序php

志愿者服务小程序源码开发方案:开发语言后端php,tp框架,前端是uniapp。 一 志愿者端-小程序: 申请成为志愿者,志愿者组织端进行审核。成为志愿者后,可以报名参加志愿者活动。 志愿者地图:可以…

Node.js的下载与安装(支持各种新旧版本)

目录 1、node官网 2、node软件下载 3、软件安装(完整版) 1、node官网 Node.js — Download Node.jshttps://nodejs.org/en/download/package-manager 2、node软件下载 按照下图进行选择node版本(真心推荐16/18,而是尽量是LTS…

对于相对速度的重新理解 - 2

回到先前说的,先令真空光速为标准光速, 光子的绝对速度 范围, 物质粒子的 范围, 这样的话,我们就可以根据 和 ,把速度分成3个段, 这样就可以出现速度和它的负值,也就是速度的矢量具…

大模型系列11-ray

大模型系列11-ray PlasmaPlasmaStore启动监听处理请求 ProcessMessagePlasmaCreateRequest请求PlasmaCreateRetryRequest请求PlasmaGetRequest请求PlasmaReleaseRequestPlasmaDeleteRequestPlasmaSealRequest ObjectLifecycleManagerGetObjectSealObject ObjectStoreRunnerPlas…

Java---反射机制

JAVA反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象, 都能够调用它的任意方法和属性;这种动态获取信息以及动态调用对象方法的功能称为java语言的反射机制。 在编译后产生…

Java 线程状态详解

1 引言 在 Java 多线程编程中,线程的状态是一个非常重要的概念。了解线程的状态及其转换过程,有助于我们更好地理解和控制线程的行为。本文将详细介绍 Java 线程的 6 种状态,并通过示例代码和图解来帮助读者更好地理解这些状态及其转换过程。…

AirScreen 安卓平板作为MacOS副屏

前言: 对笔记本续航有刚需,不得不选MacBook。 手机用的是mate40Pro,平板用的是matepad pro 12.6 干货: 参考网友的分享: https://www.bilibili.com/video/BV12A4y1d7zX/?spm_id_from333.337.search-card.all.click 【…

深度强化学习(RL)介绍

深度强化学习(RL)介绍 写到了一半,图待后补 一、强化学习概述 (一)与监督学习对比及定义 强化学习不同于监督学习,在一些任务中数据标注困难,但机器可通过环境反馈知道结果好坏。强化学习是机…

使用 Elasticsearch 构建食谱搜索(二)

这篇文章是之前的文章 “使用 Elasticsearch 构建食谱搜索(一)” 的续篇。在这篇文章中,我将详述如何使用本地 Elasticsearch 部署来完成对示例代码的运行。该项目演示了如何使用 Elastic 的 ELSER 实现语义搜索并将其结果与传统的词汇搜索进…

数据结构 【带环单链表】

在单链表中可能会存在一种情况,某一结点在经过几次转移之后回到了自己本身,这种情况就称之为带环链表。对于带环链表,我们不能轻易对其进行遍历,遍历可能会导致产生死循环。 带环链表的逻辑图如下所示:(这…

Vue 项目中如何使用FullCalendar 时间段选择插件(类似会议室预定、课程表)

本文中是基于VUEelementui项目中实现的前后端分离的前端功能部分: 插件的官方文档:FullCalendar 1.安装对应依赖(统一安装版本为6.15) npm install --save fullcalendar/core6.15 npm install --save fullcalendar/daygrid6.…

学习路之压力测试--jmeter安装教程

Jmeter安装 0、先安装jdk:这里是安装jdk-8u211-windows-x64 1、百度网盘上下载 jdk和jmeter 链接: https://pan.baidu.com/s/1qqqaQdNj1ABT1PnH4hfeCw?pwdkwrr 提取码: kwrr 复制这段内容后打开百度网盘手机App,操作更方便哦 官网:Apache JMeter - D…