Spark RDD sortBy算子什么情况会触发shuffle

在 Spark 的 RDD 中,sortBy 是一个排序算子,虽然它在某些场景下可能看起来是分区内排序,但实际上在需要全局排序时会触发 Shuffle。这里我们分析其底层逻辑,结合源码和原理来解释为什么会有 Shuffle 的发生。


1. 为什么 sortBy 会触发 Shuffle?

关键点 1:全局有序性要求

sortBy 并非单纯的分区内排序。它的目标是按照用户指定的键对整个 RDD 的数据进行排序,这种操作需要保证全局顺序。为实现这一点,必须:

  • 对数据进行 重新分区(Repartition),确保每个分区中的数据按照全局范围内的排序键正确分布;
  • 每个分区内部再完成排序。

这些步骤不可避免地引入了 Shuffle,因为数据需要从一个分区转移到另一个分区以保证全局有序性。


关键点 2:底层调用 repartitionAndSortWithinPartitions

sortBy 的底层实现会调用 repartitionAndSortWithinPartitions 方法:

this.keyBy(f).repartitionAndSortWithinPartitions(new RangePartitioner(numPartitions, this, ascending))(ordInverse).values
  1. keyBy(f)

    • 将数据转化为 (key, value) 格式,key 是排序的关键字,value 是原始数据。
  2. RangePartitioner

    • 使用 RangePartitioner 将数据根据排序键重新分区(这一步需要 Shuffle)。
  3. repartitionAndSortWithinPartitions

    • 先 Shuffle 数据以保证每个分区内的 key 是按范围划分的;
    • 然后对每个分区内的数据进行排序。
Shuffle 的触发
  • 当目标分区数量与当前分区数量不一致时(用户指定分区数或默认分区数),会触发 Shuffle;
  • 即使目标分区数一致,只要需要保证全局有序,也需要重新分布数据来确保各分区内数据按键范围划分。

2. Shuffle 的作用

  • 全局排序:分区间重新分布数据,确保所有分区的排序键范围是连续的。
  • 负载均衡:通过 RangePartitioner 分布数据,避免某些分区过大或过小的问题。
  • 分区内排序:确保每个分区内部数据按键排序。

3. 源码分析

repartitionAndSortWithinPartitions 的核心逻辑如下:

def repartitionAndSortWithinPartitions(partitioner: Partitioner)(implicit ord: Ordering[K]): RDD[(K, V)] = withScope {val shuffled = new ShuffledRDD[K, V, V](this, partitioner)shuffled.setKeyOrdering(ord)new MapPartitionsRDD(shuffled, (context, pid, iter) => {val sorter = new ExternalSorter[K, V, V](context, Some(partitioner), Some(ord))sorter.insertAll(iter)context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)sorter.iterator})
}
  1. ShuffledRDD

    • 触发 Shuffle,将数据根据分区器重新分布。
  2. ExternalSorter

    • 对每个分区内的数据进行排序(如果数据超出内存,会使用磁盘作为临时存储)。

4. 举例说明 Shuffle 的发生

sortBy 的行为取决于传递的参数。为了实现分区内排序,你需要明确控制 sortBy 的参数设置。如果不显式指定目标分区数(numPartitions 参数),sortBy 默认不会触发 Shuffle,因此只会在分区内排序。

例子 1:带 Shuffle 的全局排序
val rdd = sc.parallelize(Seq(5, 2, 4, 3, 1), numSlices = 2)
val sortedRdd = rdd.sortBy(x => x, ascending = true, numPartitions = 3)// 指定目标分区数
println(sortedRdd.collect().mkString(", "))
  • 初始数据分区
    分区 1:[5, 2],分区 2:[4, 3, 1]
  • 重新分区和排序后
    分区 1:[1, 2],分区 2:[3, 4],分区 3:[5]
  • Shuffle 触发原因
    数据必须重新分布,确保分区键范围([1-2], [3-4], [5])。
  • 特点
    触发 Shuffle 操作,数据按照 RangePartitioner 进行分区。
    每个分区内局部排序后,实现全局排序。
例子 2:分区内排序(无 Shuffle)
val rdd = sc.parallelize(Seq(5, 2, 4, 3, 1), numSlices = 2) // 两个分区
// 如果只需要分区内排序,mapPartitions 提供了无 Shuffle 的选择。
val sorted = rdd.mapPartitions(partition => partition.toList.sorted.iterator)
sorted.collect().foreach(println)
  • 初始数据分区
    分区 1:[5, 2],分区 2:[4, 3, 1]
  • 排序后
    分区 1:[2, 5],分区 2:[1, 3, 4]
  • 无 Shuffle 原因
    数据仅在分区内排序,分区间顺序无全局保证。

5. 总结

  • sortBy 在需要全局排序时触发 Shuffle,这是为了重新分区以确保分区范围和分区内排序。
  • 如果只需要分区内排序,mapPartitions 提供了无 Shuffle 的选择。

注意事项

  • 全局排序带来的 Shuffle 会显著增加网络传输和计算成本。
  • 如无必要,尽量避免全局排序,优先考虑局部排序或 Top-N 算法以优化性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/61140.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ssm139选课排课系统的设计与开发+vue(论文+源码)_kaic

摘 要 互联网的普及,改变了人们正常的生活学习及消费习惯,而且也大大的节省了人们的时间,由于各种管理系统都再不断的增加,更方便了用户,也改良了很多的用户习惯。对于选课排课系统查询方面缺乏系统的管理方式&#x…

网络基础 - NAT 篇

一、全局 IP 地址(公网 IP 地址)和私有 IP 地址 RFC 1918 规定了用于组建局域网的私有 IP 地址: 10.0.0.0 ~ 10.255.255.255172.16.0.0 ~ 172.31.255.255192.168.0.0 ~ 192.168.255.255 包含在以上范围内的 IP 地址都属于私有 IP 地址,而在此之外的 I…

Springboot3.3.5 启动流程之 tomcat启动流程介绍

在文章 Springboot3.3.5 启动流程(源码分析) 中讲到 应用上下文(applicationContext)刷新(refresh)时使用模板方法 onRefresh 创建了 Web Server. 本文将详细介绍 ServletWebServer — Embedded tomcat 的启动流程。 首先&…

NPOI 实现Excel模板导出

记录一下使用NPOI实现定制的Excel导出模板&#xff0c;已下实现需求及主要逻辑 所需Json数据 对应参数 List<PurQuoteExportDataCrInput> listData [{"ItemName": "电缆VV3*162*10","Spec": "电缆VV3*162*10","Uom":…

DAY113代码审计-PHPTP框架微P系统漏审项目等

一、环境安装 导入数据 Debug 版本信息收集 一、不安全写法的sql注入&#xff08;拼接写法绕过预编译机制&#xff09; 1、Good.php的不安全写法 2、查找可以参数 3、找路由关系 application/index/controller/Goods.php http://172.19.1.236:8833/index.php/index/goods/aj…

Flink1.19编译并Standalone模式本地运行

1.首先下载源码 2.本地运行 新建local_conf和local_lib文件夹&#xff0c;并且将编译后的文件放入对应的目录 2.1 启动前参数配置 2.1.2 StandaloneSessionClusterEntrypoint启动参数修改 2.1.3 TaskManagerRunner启动参数修改 和StandaloneSessionClusterEntrypoint一样修改…

Ascend C算子性能优化实用技巧05——API使用优化

Ascend C是CANN针对算子开发场景推出的编程语言&#xff0c;原生支持C和C标准规范&#xff0c;兼具开发效率和运行性能。使用Ascend C&#xff0c;开发者可以基于昇腾AI硬件&#xff0c;高效的实现自定义的创新算法。 目前已经有越来越多的开发者使用Ascend C&#xff0c;我们…

计算机编程中的测试驱动开发(TDD)及其在提高代码质量中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 计算机编程中的测试驱动开发&#xff08;TDD&#xff09;及其在提高代码质量中的应用 计算机编程中的测试驱动开发&#xff08;T…

前后端交互之动态列

一. 情景 在做项目时&#xff0c;有时候后会遇到后端使用了聚合函数&#xff0c;导致生成的对象的属性数量或数量不固定&#xff0c;因此无法建立一个与之对应的对象来向前端传递数据&#xff0c;这时可以采用NameDataListVO向前端传递数据。 Data Builder AllArgsConstructo…

[笔记]L6599的极限工作条件考量

0.名词 OTP over tempature protect.OCP over current protectOVP over voltage protectBrownout Protection Undervoltage Protection可能需要考虑hysteresis response.因为要考虑一些高频干扰 1.基本的过流保护逻辑 参考&#xff1a;ST L6599 器件手册 LLC开关电源&#…

【Pikachu】XML外部实体注入实战

若天下不定&#xff0c;吾往&#xff1b;若世道不平&#xff0c;不回&#xff01; 1.XXE漏洞实战 首先写入一个合法的xml文档 <?xml version "1.0"?> <!DOCTYPE gfzq [<!ENTITY gfzq "gfzq"> ]> <name>&gfzq;</name&…

多模块集成swagger(knife4j-spring-boot-starter)

前言 单体项目、多模块单体项目、微服务项目&#xff0c;集成的方案大同小异&#xff0c;微服务会在网关做个聚合&#xff0c;后面再补充。 依赖版本 目前demo的版本如下&#xff1a; spring boot 2.7.3spring cloud 2021.0.4spring cloud alibaba 2021.0.4.0knife4j-sprin…

DataStream编程模型之数据源、数据转换、数据输出

Flink之DataStream数据源、数据转换、数据输出&#xff08;scala&#xff09; 0.前言–数据源 在进行数据转换之前&#xff0c;需要进行数据读取。 数据读取分为4大部分&#xff1a; &#xff08;1&#xff09;内置数据源&#xff1b; 又分为文件数据源&#xff1b; socket…

CSS盒子的定位>(上篇)#定位属性#相对定位-附练习

一、定位属性 1.定位方式 position属性可以选择4种不同类型的定位方式。 语法格式&#xff1a;position&#xff1a;relation | absolute | fixed参数&#xff1a;①relative生成相对定位的元素&#xff0c;相对于其正常位置进行定位。 ②absolute生成绝对定位的…

Redis/Codis性能瓶颈揭秘:网卡软中断的影响与优化

目录 现象回顾 问题剖析 现场分析 解决方案 总结与反思 1.调整中断亲和性&#xff08;IRQ Affinity&#xff09;&#xff1a; 2.RPS&#xff08;Receive Packet Steering&#xff09;和 RFS&#xff08;Receive Flow Steering&#xff09;&#xff1a; 近期&#xff0c;…

WordPress设置自动更新CSS版本号

WordPress 通常会在引用 CSS 文件时添加版本号参数&#xff08;?verx.x.x&#xff09;。如果版本号未更新&#xff0c;浏览器可能继续加载旧的文件。 解决方法&#xff1a;确保你在 functions.php 文件中正确加载了 CSS 文件&#xff0c;并动态更新版本号。例如在functions.p…

若依权限控制

springbootvue2项目中的权限控制(若依项目) 步骤: 1.登录管理员账号,为普通用户增加权限按钮 绿色部分为权限控制字符 2.在后端对应的方法上增加权限控制(这里以删除操作为例):PreAuthorize(“ss.hasPermi(‘area:store:remove’)”) 3.在前端对应的按钮上增加权限控制:v-ha…

【机器学习】如何配置anaconda环境(无脑版)

马上就要上机器学习的实验&#xff0c;这里想写一下我配置机器学习的anaconda环境的二三事 一、首先&#xff0c;下载安装包&#xff1a; Download Now | Anaconda 二、打开安装包&#xff0c;一直点NEXT进行安装 这里要记住你要下载安装的路径在哪&#xff0c;后续配置环境…

OceanBase 升级过程研究(4.2.1.6-4.2.1.8)

模拟业务 使用benchmark加载10仓数据模拟业务场景 升级方法 使用滚动升级方式来进行OB升级。该方法前提是OB集群必须满足官方规定的高可用架构(如果 Zone 个数小于 3&#xff0c;滚动升级时则无法构成多数派), 滚动升级的原理就是轮流完成每个ZONE的升级工作&#xff0c;由于…

微知-DOCA ARGP参数模块的相关接口和用法(config单元、params单元,argp pipe line,回调)

文章目录 1. 背景2. 设置参数的主要流程2.1 初始化2.2 注册某个params的处理方式以及回调函数2.4 定义好前面的params以及init指定config地点后start处理argv 3. 其他4. DOCA ARGP包相关4.1 主要接口4.2 DOCA ARGP的2个rpm包4.2.1 doca-sdk-argp-2.9.0072-1.el8.x86_64.rpm4.2.…