spark shuffle写操作——SortShuffleWriter


写入的简单流程:
1.生成ExternalSorter对象
2.将消息都是插入ExternalSorter对象中
3.获取到mapOutputWriter,将中间产生的临时文件合并到一个临时文件
4.生成最后的data文件和index文件
可以看到写入的重点类是ExternalSorter对象
image.png

ExternalSorter

基本功能:对(k,v)进行排序,中间可能存在合并操作,最后生成(k,c)。

  1. 使用partitioner对key进行分区
  2. 在每个分区中使用Comparator进行排序
  3. 输出一个单独的文件,每个分区对应这个文件中的一段范围。

如果禁用了合并操作,类型C必须等于V
这个类的工作流程如下:

  • 使用数据,反复填充内存缓冲区。如果是可以合并的数据,则使用PartitionedAppendOnlyMap;如果不合并,则使用PartitionedPairBuffer。在这些缓冲区中,我们首先按分区ID对元素进行排序,然后可能还会按键进行排序。为了避免对每个键多次调用分区器,我们将分区ID与每条记录一同存储。
  • 当每个缓冲区达到内存限制时,会将其spill到文件中。这个文件首先按分区ID排序,如果需要做聚合操作,其次可能按键或键的哈希码排序。对于每个文件,跟踪每个分区在内存中的对象数量,因此不必为每个元素都写出分区ID。
  • 当用户请求迭代器或文件输出时,溢写的文件会与任何剩余的内存数据一起被合并,使用上述定义的相同排序顺序(除非排序和聚合都被禁用)。如果需要按键进行聚合,我们或者从ordering参数中使用全序,或者读取具有相同哈希码的键并相互比较它们的相等性来合并值。
  • 用户在结束时应调用stop()方法来删除所有中间文件。

缓存buffer:PartitionedAppendOnlyMap、PartitionedPairBuffer
关键方法:insertAll、maybeSpillCollection、spill、writePartitionedMapOutput
image.png
image.png

PartitionedPairBuffer

capacity 容量
curSize 当前放入的数据量
data 数组,存储的数据,(k,v)占用数组的两个位置
image.png

insert

如果容量达到瓶颈就进行扩容。
先存key,再存value。再调用afterUpdate
image.png

afterUpdate

numUpdates数据插入/更新次数
nextSampleNum下一次采样的次数
更新numUpdates,如果达到采样次数,执行采样takeSample
image.png

takeSample

samples中只存两个样品数据,用来计算每次更新的差值。
采样的时候要移除多余的数据。更新下一次采样的数据量。
image.png

estimateSize

预估大小。
最后一个样品的lastSize+bytesPerUpdate*新增的更新次数。
image.png

resetSamples

重新进行采样。
image.png

growArray

扩容2倍容量,迁移数据,重启采样
image.png

partitionedDestructiveSortedIterator

生成比较器comparator,调用sort对缓存的数据进行排序。
image.png
sorter是使用TimSort进行排序的。
TimSort介绍: https://zhuanlan.zhihu.com/p/695042849
image.png

iterator

用pos计算剩余量。
data(2 * pos)为key,data(2 * pos+1) 为value
image.png

PartitionedAppendOnlyMap

存储数据用的数组data,里面的元素是key0, value0, key1, value1, key2, value2…
image.png

changeValue

PartitionedAppendOnlyMap插入数据不再是追加,而是有一个相同key合并值的过程。

  1. key是null,返回null,不进行存储
  2. key首次插入,更新data中的对应的kv值
  3. key非首次插入,更新data中合并的的新value
  4. key发生哈希冲突,就向后加1,直到不冲突

image.png

update

跟changeValue类似。
image.png

growTable

比较简单,就是容量扩大两倍,将旧的kv值重新计算hash插入到新的数组中,如果发生hash冲突就不断向后移动一位。
image.png

iterator

核心方法是nextValue,在nextValue中,遍历data数组的对应key值,要求不是null,表明这个位置是有值的。
如果有key为null,要求pos=-1且haveNullValue=true
image.png

partitionedDestructiveSortedIterator

调用destructiveSortedIterator方法
image.png

destructiveSortedIterator

data数组中元素是分散的,首先将数组中的元素都集中到数组的前面。后面就跟PartitionedPairBuffer的partitionedDestructiveSortedIterator方法一样使用TimeSort进行排序。
image.png

采样相关方法

跟上面的PartitionedPairBuffer的采样相关方法一样。

spill相关方法

入口方法是maybeSpillCollection
image.png

maybeSpillCollection

不论使用的数据结构是buffer还是map,都是计算消耗的容量,再调用maybeSpill方法,最后重新初始化化对应数据结构。可以想到maybeSpill中就将缓存的数据放到了本地。
image.png

maybeSpill

每32条数据就进行一次内存使用情况判断。如果当前使用内存超过了限制,就先申请新的内存,按照两倍的内存使用量申请,不一定申请到足量的内存。申请后还是内存使用超过了限制,就进行spill,调用spill方法,同时调用releaseMemory释放内存。
image.png

releaseMemory

image.png

spill

调用destructiveSortedWritablePartitionedIterator方法返回排好序的分区迭代器。
调用spillMemoryIteratorToDisk将数据溢写到磁盘上
最后将生成的文件记录到spills中
image.png

destructiveSortedWritablePartitionedIterator

调用对应数据结构的partitionedDestructiveSortedIterator方法返回排序的迭代器。
就是上面的PartitionedPairBuffer和PartitionedAppendOnlyMap的partitionedDestructiveSortedIterator方法。
image.png

spillMemoryIteratorToDisk

创建临时文件,生成对应的writer
image.png
遍历将数据写入的文件中,每10000条进行一次flush。
如果失败了,调用revertPartialWritesAndClose进行回滚。
image.png

revertPartialWritesAndClose

如果这次写入出现问题,使用这个方法。回滚写入,只保留截止到上一次写入的内容。
image.png

writePartitionedMapOutput

将排好序的缓存和文件合并成一个文件输出。
spills为空,即没有产生排序文件。将缓存中数据生成排好序的迭代器,遍历写入到文件中。
image.png
存在排好序的文件。则需要调用partitionedIterator方法将文件数据和缓存的数据进行合并,再遍历输出。
image.png

partitionedIterator

调用merge方法合并内存和文件数据
image.png

merge

merge的第一个参数是spilled文件,第二个参数是内存缓存的数据。
流程是遍历分区,取出对应分区的spilled文件中和缓存中的数据。
根据情况进行聚合或者排序等操作后输出合并后的排好序的文件。
image.png

mergeSort

使用堆排序,但是heap中存放的是已经排好序的iterator。
最小值就是heap中首个iterator中的第一个元素。
image.png

mergeWithAggregation

有总排序,这样相同的key会在一起。
调用mergeSort将iterators合并成一个排好序的iterator。
next方法就是遍历key出来全部的值,进行合并后输出,因为是全局有序,不需要遍历iterator全部数据。
image.png
没有总排序
跟上面流程类似,先得到合并的iterator,但是它不是全局有序的。存在不同的key在comparator比较下相等,如使用hash进行比较,因此存在 aaabaaa 这种情况的key分布。
在获取相同key对应的值的时候需要遍历iterator的使用comparator和equal进行比较数据,再进行合并。返回值是一个comparator相同有可能key不同的key组成的iterator
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44278.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vant Ui 最新访问地址

Vant 4 - A lightweight, customizable Vue UI library for mobile web apps. 顺带一个顶部导航栏正常写法 先使用吸顶为0&#xff0c;然后再写nav-bar <van-sticky :offset-top"0"> <van-nav-bar class"top-title" title"村集体土地公示&q…

对为什么react需要时间分片,vue3不需要的浅学习

1、时间分片 时间分片指在让应用在cpu进行大量计算时也能与用户交互&#xff0c;但时间分片只能对大量cpu计算进行优化&#xff0c;无法优化复杂DOM操作&#xff0c;因为要确保用户正在操作的界面是最新。 web卡顿的场景&#xff1a; 1、cpu计算量不大&#xff0c;但dom操作…

人工智能算法工程师(中级)课程1-Opencv视觉处理之基本操作与代码详解

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能算法工程师(中级)课程1-Opencv视觉处理之基本操作与代码详解。OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库。它提供了各种视觉处理函数&am…

Redis为什么变慢了?一文讲透如何排查Redis性能问题

Redis 作为优秀的内存数据库&#xff0c;其拥有非常高的性能&#xff0c;单个实例的 OPS 能够达到 10W 左右。但也正因此如此&#xff0c;当我们在使用 Redis 时&#xff0c;如果发现操作延迟变大的情况&#xff0c;就会与我们的预期不符。 你也许或多或少地&#xff0c;也遇到…

以太网中的各种帧结构

帧结构&#xff08;Ethernet Frame Structure&#xff09;介绍 以太网信号帧结构&#xff08;Ethernet Signal Frame Structure&#xff09;&#xff0c;有被称为以太网帧结构&#xff0c;一般可以分为两类 —— 数据帧和管理帧。 按照 IEEE 802.3&#xff0c;ISO/IEC8803-3 …

短视频矩阵管理系统:如何提升内容质量,帮助企业获客?

在数字化营销蓬勃发展的今天&#xff0c;短视频已成为企业推广的重要阵地。然而&#xff0c;如何高效管理短视频内容&#xff0c;提升内容质量&#xff0c;进而帮助企业精准获客&#xff0c;成为企业亟待解决的问题。短视频矩阵管理系统应运而生&#xff0c;以其强大的功能和灵…

TCP/IP协议超时重传,以及应用层超时重传一文详解

很多人会有这样的疑问 TCP/IP协议内置了超时重传的功能&#xff0c;那为什么连接完全断开或超时时&#xff0c;应用层代码段还会进行重传处理呢&#xff1f; TCP协议的重传机制 客户端 服务器| ||---- 数据段1 --…

编程范式之并发编程

目录 前言1. 并发编程的定义2. 并发编程的特点2.1 任务交替执行2.2 状态共享与同步2.3 并行执行 3. 并发编程的适用场景3.1 高性能计算3.2 I/O 密集型应用3.3 实时系统 4. 并发编程的优点4.1 提高资源利用率4.2 缩短响应时间4.3 提高系统吞吐量 5. 并发编程的缺点5.1 编程复杂性…

硬盘模式vmd怎么改ahci_电脑vmd改ahci模式详细步骤

最近有很多网友问&#xff0c;我新买的电脑安装原版win10或win11找不到驱动器呀&#xff0c;进入第三方pe又找不到硬盘&#xff0c;找到硬盘安装后又出现安装蓝屏的情况&#xff0c;新机器怎么回事呀&#xff1f;这位网友内心有点崩溃&#xff0c;不知道啥原因。其实这些都是由…

初识c++(类与对象——上)

一、类的定义 1、类定义格式 • class为定义类的关键字&#xff0c;Stack为类的名字&#xff0c;{}中为类的主体&#xff0c;注意类定义结束时后面分号不能省 略。类体中内容称为类的成员&#xff1a;类中的变量称为类的属性或成员变量; 类中的函数称为类的方法或 者成员函…

损失函数 - Transformer教程

在人工智能和深度学习的领域&#xff0c;Transformer模型已经成为了非常流行的选择。而在Transformer模型的训练过程中&#xff0c;损失函数扮演了至关重要的角色。今天&#xff0c;我们就来深入探讨一下什么是损失函数&#xff0c;以及它在Transformer中的应用。 什么是损失函…

【Node.js安装教程】

Node.js安装教程 第一步&#xff1a;下载 下载链接&#xff1a;https://nodejs.org/zh-cn 第二步&#xff1a;安装 **方法一&#xff1a;**建议安装在默认路径 方法二&#xff1a;如果不是默认安装路径可能会出现一系列问题&#xff1a;这时可以选择卸载重装或者配置环境变量…

kotlin数据类型

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 Kotlin基本数值类型 基本数据类型包括 Byte、Short、Int、Long、Float、Double 整数类型 类型位宽最小值最大…

安卓微信8.0之后如何利用缓存找回的三天之前不可见的朋友圈图片

安卓微信8.0之后如何利用缓存找回的三天之前不可见的朋友圈图片 复习了下安卓程序的知识&#xff0c;我们会了解到&#xff0c;安卓程序清楚数据的时候有两个选项 一个是清除全部数据一个是清除缓存。 清除全部数据表示清除应用数据缓存。 对于安卓微信8.0之后而言&#xff0…

OTP防重放攻击

OTP本意是一次性口令&#xff0c;比如邮箱验证码&#xff0c;短信验证码&#xff0c;或者根据totp或者hotp生成的默认30秒一变的6位数字。 不过开发者要注意&#xff0c;必须要在验证成功后失效那个验证码&#xff0c;不然就会导致重放攻击。 对于邮箱验证码&#xff0c;服务器…

彻底开源,免费商用,上海AI实验室把大模型门槛打下来

终于&#xff0c;业内迎来了首个全链条大模型开源体系。 大模型领域&#xff0c;有人探索前沿技术&#xff0c;有人在加速落地&#xff0c;也有人正在推动整个社区进步。 就在近日&#xff0c;AI 社区迎来首个统一的全链条贯穿的大模型开源体系。 虽然社区有LLaMA等影响力较大…

从 ArcMap 迁移到 ArcGIS Pro

许多 ArcMap 用户正在因 ArcGIS Pro 所具有的现代 GIS 桌面工作流优势而向其迁移。 ArcGIS Pro 与其余 ArcGIS 平台紧密集成&#xff0c;使您可以更有效地共享和使用内容。 它还将 2D 和 3D 组合到一个应用程序中&#xff0c;使您可以在同一工程中使用多个地图和多个布局。 Arc…

【C++杂货铺】C++11新特性

目录 &#x1f308; 前言&#x1f308; &#x1f4c1; C11介绍 &#x1f4c1; 统一初始化列表 &#x1f4c1; 声明 &#x1f4c2; auto &#x1f4c2; decltype &#x1f4c2; 返回类型后置 &#x1f4c2; 范围for &#x1f4c2; 模板别名 &#x1f4c2; nullptr &#x1…

服务器使用PC作为代理访问外网

1、PC上启动代理&#xff0c;比如nginx 下载nginx&#xff1a;http://nginx.org/en/download.html 修改配置文件&#xff0c;在conf下&#xff1a; http {include mime.types;default_type application/octet-stream;sendfile on;keepalive_timeout 65;server…

六、 SpringBoot 配置⽂件 ★ ✔

六、 SpringBoot 配置⽂件 本节⽬标1. 配置⽂件作⽤2. 配置⽂件快速⼊⼿3. 配置⽂件的格式4. properties 配置⽂件说明4.1 properties 基本语法4.2 读取配置⽂件4.3 properties 缺点分析 5. yml 配置⽂件说明5.1 yml 基本语法5.2 yml 使⽤进阶5.2.1 yml 配置不同数据类型及 nul…