【Spark精讲】RDD特性之数据本地化

首选运行位置

上图红框为RDD的特性五:每个RDD的每个分区都有一组首选运行位置,用于标识RDD的这个分区数据最好能够在哪台主机上运行。通过RDD的首选运行位置可以让RDD的某个分区的计算任务直接在指定的主机上运行,从而实现了移动计算而不是移动数据的目的减少了网络传输的开销,如Spark中HadoopRDD能够实现家装数据的任务在相应的数据节点上执行。

数据的本地化级别

package org.apache.spark.schedulerimport org.apache.spark.annotation.DeveloperApi@DeveloperApi
object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Valuetype TaskLocality = Valuedef isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {condition <= constraint}
}
  1. PROCESS_LOCAL:进程本地化,表示 task 要计算的数据在同一个 Executor 中。
  2. NODE_LOCAL:节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFS 上,那么最好的本地化级别就是 NODE_LOCAL。
  3. NO_PREF:没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。
  4. RACK_LOCAL:机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。
  5. ANY:跨机架,数据在非同一机架的网络上,速度最慢。

谁来负责数据本地化

val rdd1 = sc.textFile("hdfs://...") 
rdd1.cache()
rdd1.map.filter.count()

上面这段简单的代码,背后其实做什么很多事情。Driver 的 TaskScheduler 在发送 task 之前,首先应该拿到 rdd1 数据所在的位置,rdd1 封装了这个文件所对应的 block 的位置,DAGScheduler 通过调用 getPrerredLocations() 拿到 partition 所对应的数据的位置,TaskScheduler 根据这些位置来发送相应的 task。 

具体的解释:

DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations() 得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个task,该 task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 会为每个 TaskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在调度和延迟调度 tasks 时发挥作用。

总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。

数据本地化执行流程

第一步:PROCESS_LOCAL

TaskScheduler 根据数据的位置向数据节点发送 task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:NODE_LOCAL

TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。

第三步:RACK_LOCAL

TaskScheduler重新发送 task 到 worker2 中的 Executor1 中执行。

第四步:

当 task 分配完成之后,task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemote() 方法,通过 ConnectionManager 与原 task 所在节点的 BlockManager 中的 ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。这一步很像 shuffle 的文件寻址流程。

调优

TaskScheduler在发送task的时候,会根据数据所在的节点发送task,这时候的数据本地化的级别是最高的,如果这个task在这个Executor中等待了3秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低一级数据本地化的级别,重新发送task到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别...

如果想让每一个 task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 task 都拿到了最好的数据本地化级别,但是我们 job 执行的时间也会随之延长。

官方参数:Configuration - Spark 3.5.0 Documentation

spark.locality.wait3sHow long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.0.5.0
spark.locality.wait.nodespark.locality.waitCustomize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).0.8.0
spark.locality.wait.processspark.locality.waitCustomize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.0.8.0
spark.locality.wait.rackspark.locality.waitCustomize the locality wait for rack locality.0.8.0

代码中的设置方法 

new SparkConf.set("spark.locality.wait","100") //默认3秒

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/217642.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【matlab进阶学习-6】 读取log数据data.txt文件,并做处理,导出报告/表格/图表

原始文件 原始文件格式txt&#xff0c;每一行对应一个数据&#xff0c;数据之间由逗号分割开 对应意思 时刻&#xff0c;电压&#xff0c;电流&#xff0c;功率&#xff0c;容量&#xff0c;&#xff0c;电流&#xff0c;功率&#xff0c;&#xff0c;RTC时间&#xff0c;状态…

内网服务器部署maven私服简记

前言 很多企业希望创建自己的maven私服&#xff0c;但服务器无法和外网连通&#xff0c;所以这里介绍一套完整的内网部署nexus的解决方案。实现的方式也很简单&#xff0c;将下载好的nexus安装和项目所需的依赖仓库都上传到服务i去上去&#xff0c;通过脚本的方式实现批量导入…

CSS的三大特性(层叠性、继承性、优先级---------很重要)

CSS 有三个非常重要的三个特性&#xff1a;层叠性、继承性、优先级。 层叠性 场景&#xff1a;相同选择器给设置相同的样式&#xff0c;此时一个样式就会覆盖&#xff08;层叠&#xff09;另一个冲突的样式。层叠性主要解决样式冲突 的问题 原则&#xff1a;  样式冲突&am…

autojs-练手-视频号点赞(进阶版)

注释很详细&#xff0c;直接上代码 较初阶版新增内容 1. 简单但好用的ui界面 为方便大家参考&#xff0c;ui界面的模板单独拿出来了 ui界面模板 2. opencv图像识别 3. 需加载情况特殊处理&#xff08;防卡壳&#xff09; 4. 增加自动判断是否已点赞的情况 源码部分 // 启用…

HarmonyOS4.0从零开始的开发教程14Web组件的使用

HarmonyOS&#xff08;十二&#xff09;Web组件的使用 1 概述 相信大家都遇到过这样的场景&#xff0c;有时候我们点击应用的页面&#xff0c;会跳转到一个类似浏览器加载的页面&#xff0c;加载完成后&#xff0c;才显示这个页面的具体内容&#xff0c;这个加载和显示网页的…

智能优化算法应用:基于水循环算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于水循环算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于水循环算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.水循环算法4.实验参数设定5.算法结果6.参考文…

无需公网IP联机Minecraft,我的世界服务器本地搭建教程

目录 前言 1.Mcsmanager安装 2.创建Minecraft服务器 3.本地测试联机 4. 内网穿透 4.1 安装cpolar内网穿透 4.2 创建隧道映射内网端口 5.远程联机测试 6. 配置固定远程联机端口地址 6.1 保留一个固定TCP地址 6.2 配置固定TCP地址 7. 使用固定公网地址远程联机 8.总…

Vue 中 v-model 的修饰符

lazy 修饰符&#xff1a;将 v-model 改为失去焦点后更新数据。 number 修饰符&#xff1a;将 v-model 数据转为数字类型。 trim 修饰符&#xff1a;去除 v-model 数据中的首尾空格。 语法格式&#xff1a; // lazy 修饰符 <input v-model.lazy"数据"> // nu…

靠谱的车- 华为OD统一考试(C卷)

靠谱的车- 华为OD统一考试&#xff08;C卷&#xff09; OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 程序员小明打了一辆出租车去上班。出于职业敏感&#xff0c;他注意到这辆出租车的计费表有点问题&#xf…

【JNA与C++基本使用示例】

JNA中java与C使用注意事项和代码示例 JNA关系映射表使用案列注意代码示例C代码java代码 JNA关系映射表 使用案列 注意 JNA只支持C方式的dll使用C的char* 作为返回值时&#xff0c;需要返回的变量为malloc分配的地址C的strlen函数只获得除/0以外的字符串长度 代码示例 C代码…

基于PaddleNLP的深度学习对文本自动添加标点符号(一)

前言 目前以深度学习对文本自动添加标点符号研究很少&#xff0c;已知的开源项目并不多&#xff0c;详细的介绍就更少了&#xff0c;但对文本自动添加标点符号又在古文识别语音识别上有重大应用。 基于此&#xff0c;本文开始讲解基于PaddleNLP的深度学习对文本自动添加标点符号…

鸿蒙开发之状态管理@Prop和@Link

一、用法 在父子组件需要进行数据同步的时候&#xff0c;可以通过Prop和Link装饰器来做到。在父组件中用State装饰&#xff0c;在自组件中用Prop或Link装饰。 结论&#xff1a;Prop用于子组件只监听父组件的数据改变而改变&#xff0c;自己不对数据改变 Link用于子组件与父组…

Proxmox VE 安装 OpenWrt 配置旁路由教程

话不多说&#xff0c;本篇文章将记录如何在 Proxmox VE 环境通过虚拟机安装 OpenWrt 配置旁路由的过程&#xff0c;仅做参考。 PVE 创建虚拟机 名称随意&#xff0c;GuestOS 选择 Linux&#xff0c;不使用任何 iso 镜像。&#xff08;记住你的 VMID&#xff09; 清空将要创建…

机器学习---Adaboost算法

1. Adaboost算法介绍 Adaboost是一种迭代算法&#xff0c;其核心思想是针对同一个训练集训练不同的分类器&#xff08;弱分类器&#xff09;&#xff0c;然 后把这些弱分类器集合起来&#xff0c;构成一个更强的最终分类器&#xff08;强分类器&#xff09;。Adaboost算法本身…

Qt 线程

&#x1f4a1; 进度条显示拷贝进度&#xff08;verson 1&#xff09; 窗口上放置一个按钮和一个进度条部件&#xff0c;点击按钮&#xff0c;进行拷贝操作 —— 打开对话框选择源文件&#xff0c;然后再打开一个对话框 选择 目标文件存放位置和名称。拷贝过程中进度条显示当前…

十三、YARN资源分配调用

1、为什么要先学习YARN组件&#xff1f; 在Hadoop文件系统中&#xff0c;YARN作为Hadoop系统的第三大组件&#xff0c;其中&#xff0c;第二大组件MapReduce组件是基于YARN运行的&#xff0c;即没有YARN无法运行MapReduce程序&#xff0c;所以需要同时学习YARN。 2、YARN &…

Day58力扣打卡

打卡记录 下一个更大元素 IV&#xff08;单调栈 x2&#xff09; 链接 class Solution:def secondGreaterElement(self, nums: List[int]) -> List[int]:ans [-1] * len(nums)s []t []for i, x in enumerate(nums):while t and nums[t[-1]] < x:ans[t.pop()] x # t…

『npm』一条命令快速配置npm淘宝国内镜像

&#x1f4e3;读完这篇文章里你能收获到 一条命令快速切换至淘宝镜像恢复官方镜像 文章目录 一、设置淘宝镜像源二、恢复官方镜像源三、查看当前使用的镜像 一、设置淘宝镜像源 npm config set registry https://registry.npm.taobao.org服务器建议全局设置 sudo npm config…

科技提升安全,基于YOLOv6开发构建商超扶梯场景下行人安全行为姿态检测识别系统

在商超等人流量较为密集的场景下经常会报道出现一些行人在扶梯上摔倒、受伤等问题&#xff0c;随着AI技术的快速发展与不断普及&#xff0c;越来越多的商超、地铁等场景开始加装专用的安全检测预警系统&#xff0c;核心工作原理即使AI模型与摄像头图像视频流的实时计算&#xf…

没有明确的报错信息,阿里云国际版Windows服务器无法远程连接

在远程连接失败时&#xff0c;如果您没有收到系统返回的报错信息&#xff0c;并且ECS实例是运行中的状态&#xff0c;然后再根据以下步骤进行排查&#xff1a; 步骤一&#xff1a;使用阿里云Workbench工具测试远程登录 步骤二&#xff1a;检查是否有收到黑洞通知 步骤三&…