RDD算子介绍(二)

1. coalesce

用于缩减分区,减少分区个数,减少任务调度成本。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

分区数可以减少,但是减少后的分区里的数据分布并不一定是均匀分布的,比如以下场景:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

结果现实1和2在一个分区,3 4 5 6四个数在第二个分区。因为coalesce算子默认不会打乱分区的数据进行重新组合的。原来1和2,3和4,5和6分别在三个分区里,如果缩减分区之后1 2 3在一个分区,4 5 6在一个分区,意味着将原来的3和4所在的分区里的数据打乱重新组合了。所以缩减分区后,应该将5和6所在的分区里的数据移到其他分区中去,即3 4 5 6最终在一个分区了。

coalesce算子可能会导致数据倾斜。如果想要数据均衡,需要进行shuffle处理,coalesce算子第二个参数就表示是否shuffle处理,默认是false,改为true即可,但是数据不一定有规律,这就是shuffle的效果。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2, true)
newRDD.saveAsTextFile("output")

结果显示1 4 5在一个分区,2 3 6在一个分区。

2. repartition

coalesce算子也可以增加分区,但是第二个参数须为true,但用得更多的是repartition算子。 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val newRDD = rdd.repartition(3)
newRDD.saveAsTextFile("output")

repartition算子源码中还是调用了coalesce算子(第二个参数为true)。

3. sortBy

见名知义,就是排序。

val rdd : RDD[Int] = sc.makeRDD(List(1, 4, 2, 3, 6, 5), 2)
val newRDD = rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")

数据重新排序,但是分区数不变,存在shuffle过程。 默认是升序排序,第二个参数传false,表示降序排序。

4. intersection、union、subtract、zip

val rdd1 : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 : RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))val rdd3 = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))val rdd6 : RDD[(Int, Int)]= rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))

 

 注意事项:

1)交集不去重

2)如果两个rdd的数据类型不同,不能做交集、并集、差集操作,但拉链可以

3)拉链操作的两个rdd的分区数需要一致,且分区中的数据数量也要一致

5. partitionBy

只有数据类型为key-value类型的rdd,才有partitionBy操作。partitionBy本身不是RDD的方法,是通过隐式转化得到的PairRDDFunctions的方法。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD = rdd.map((_, 1))val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")

这里默认使用HashPartitioner,即按照key的哈希值对分区数取模得到分区号,后续会介绍自定义分区器。

 6. reduceByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Int)] = rdd.reduceByKey((x:Int, y:Int) => {x + y})
newRDD.collect().foreach(println)

reduceByKey将相同key的值进行聚合,具体来说是两两聚合。 但是上例中"b"只有一个,是不会做两两聚合计算的。

7. groupByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[Int])] = rdd.groupByKey()
newRDD.collect().foreach(println)

groupByKey根据相同key的值进行分组,形成一个可迭代的集合。这与groupBy类似,但是区别是groupBy的可迭代集合不是原有value的集合,而是原来每个元素(即tuple)的集合:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[(String, Int)])] = rdd.groupBy(_._1)

reduceByKey和groupByKey的区别:

1) reduceByKey相比于groupByKey不仅做了分组,还做了聚合计算

2)groupByKey会将数据打乱重新组合,即存在shuffle操作。既然存在shuffle操作,如果后续还有map等转换操作,原来一个分区的数据处理完之后还需要等待其他分区的数据处理完,因为shuffle后的分区的数据可能不止来源于原来的一个分区。这种等待可能很耗时,并且占用大量内存,因此需要进行落盘操作。简而言之,shuffle操作必须有落盘处理,不能在内存中进行数据等待,否则可能会导致内存溢出,因此性能也不高

3)reduceByKey也会有shuffle,也会有落盘操作,但是在落盘之前,会对原来每个分区内的数据事先进行分组并聚合计算(预聚合,combine)。这样落盘的数据量少了,磁盘IO也少了,性能也提高了

8. aggregateByKey

reduceByKey会进行预聚合,这是分区内的聚合,然后shuffle操作打乱数据,进行分区间的聚合,此时分区内和分区间的聚合规则是一样的。如果分区内和分区间的聚合计算规则不一样,那就要使用aggregateByKey算子。例如,分区内求最大值,分区间求和:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

aggregateByKey有两个参数列表,第一个参数列表表示初始值(aggregateByKey的最终计算结果与这个初始值类型是相同的),用于和第一个key的value进行分区内计算,第二个参数列表的两个参数分别表示分区内和分区间的计算规则。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(5)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

 

计算相同key的value的平均值:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)val newRDD : RDD[(String,(Int, Int))] = rdd.aggregateByKey((0, 0))((t, v) => (t._1 + v, t._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))val result = newRDD.mapValue {case (val, num) => {val / num}
}result.collect().foreach(println)

9. foldByKey

如果分区内和分区间的计算规则一样,可以使用foldByKey算子。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.foldByKey(0)(_+_)
newRDD.collect().foreach(println)

 10. combineByKey

aggreagteByKey的初始值在一些场景其实很难确定,但如果初始值是相同key的第一个value或者其适当转换,就更为合理。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)val newRDD : RDD[(String,(Int, Int))] = rdd.combineByKey(v => (v, 1))((t : (Int, Int), v) => (t._1 + v, t._2 + 1), (t1 : (Int, Int), t2 : (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2))val result = newRDD.mapValue {case (val, num) => {val / num}
}result.collect().foreach(println)

 

wordCount的多种实现方式(假设已经得到所有的(单词, 1)的tuple):

rdd.reduceByKey(_+_)
rdd.aggregateByKey(0)(_+_, _+_)
rdd.foldByKey(0)(_+_)
rdd.comnbineByKey(v=>v)((x:Int, y:Int)=>x+y, (x:Int, y:Int)=>x+y)

观察源码,发现他们底层调用的都是combineByKey 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736304.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

02-app端文章查看,静态化freemarker,分布式文件系统minIO-黑马头条

app端文章查看,静态化freemarker,分布式文件系统minIO 1)文章列表加载 1.1)需求分析 文章布局展示 1.2)表结构分析 ap_article 文章基本信息表 ap_article_config 文章配置表 ap_article_content 文章内容表 三张表关系分析 1.3)导入文章数据库 1.3.1)导入数据…

ROS2从入门到精通0-2:ROS2简介、对比ROS1与详细安装流程

目录 0 专栏介绍1 什么是机器人操作系统?2 ROS的发展历程3 ROS2与ROS1的区别4 ROS2安装4.1 基本安装4.2 测试ROS24.2.1 测试一:发布者与订阅者4.2.2 测试二:海龟仿真器 5 常见问题 0 专栏介绍 本专栏旨在通过对ROS2的系统学习,掌…

信息系统项目管理师--成本管理

项⽬成本管理重点关注完成项⽬活动所需资源的成本,但同时也考虑项⽬决策对项⽬产品、服务或成果的使⽤成本、维护成本和⽀持成本的影响。不同的⼲系⼈会在不同的时间,⽤不同的⽅法 测算项⽬成本。 就某些项⽬,特别是⼩项⽬⽽⾔,成…

VSCode报错:/bin/sh: python: command not found

背景 以前都是直接用txt写python,然后直接命令行运行。 这次涉及的代码较多,决定用编译器。 写好的一段python点击运行报错! 问题描述 因为我本地安装的是python3,但是vscode用的是另一个路径的python,所以找不到 解决…

视觉语言处理:用Transformer桥接视觉与语言

引言 人工智能研究的前沿领域见证了显著的交叉融合。将计算机视觉和自然语言处理的领域融合,问题随之而来:AI能否直接从其视觉表现,即从原始像素中辨识和理解语言?在这篇博客中,我试图探究AI从图像中直接理解自然语言的…

在 Android 上恢复已删除文件的 5 种简单方法

您可能会因为意外删除、未完成的 Android 更新、手机意外关机等原因而丢失 Android 上的重要数据。新技术的发展使许多手机功能或程序能够从内部恢复丢失的数据。 在 Android 上恢复已删除文件的 5 种简单方法 然而恢复成功率的不确定性也成为人们克服数据丢失困境的重要考虑因…

Android14音频进阶:AudioTrack与AudioFlinger创建数据通道(五十八)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

[Spring] IoC 控制反转和DI依赖注入和Spring中的实现以及常见面试题

目录 1. 什么是Spring 2.什么是IoC容器 3.通过实例来深入了解IoC容器的作用 3.1造一量可以定义车辆轮胎尺寸的车出现的问题 3.2解决方法 3.3IoC优势 4.DI介绍 5.Spring中的IoC和DI的实现 5.1.存对象 5.1.2 类注解 5.1.3 方法注解 5.2取对像 (依赖注入) 5.2.1.属性…

FPGA高端项目:FPGA基于GS2971的SDI视频接收+HLS多路视频融合叠加,提供1套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收转HDMI输出应用本方案的SDI接收图像缩放应用本方案的SDI接收纯verilog图像缩放纯verilog多路视频拼接应用本方案的SDI接收HLS图像缩放Video Mixer多路视频拼接应用本方案的SDI接收OSD动态字符叠加…

护眼台灯怎么选比较好?明基、爱德华、书客护眼台灯硬核PK测评

现在不管是学生党学习阅读,还是办公族加班工作,都离不开一盏光源舒适的台灯,然而如今的台灯市场水实在太深的,各种网红、劣质产品混杂在其中,这类台灯往往采用劣质电源,其电源品质较差,导致输出…

VUE3 显示Echarts百度地图

本次实现最终效果 技术基础以及环境要求 vue3 echarts 百度地图API 要求1: VUE3 环境搭建:https://blog.csdn.net/LQ_001/article/details/136293795 要求2: VUE3 echatrs 环境搭建:https://blog.csdn.net/LQ_001/article/details/1363…

Ps:画笔工具

画笔工具 Brush Tool是 Photoshop 中最常用的工具,可广泛地用于绘画与修饰工作之中。 快捷键:B ◆ ◆ ◆ 常用操作方法与技巧 1、熟练掌握画笔工具的操作对于使用其他工具也非常有益,因为 Photoshop 中许多与笔刷相关的工具有类似的选项和操…

多层菜单的实现方案(含HierarchicalDataTemplate使用)

1、递归 下面是Winform的递归添加菜单栏数据,数据设置好父子id方便递归使用 在TreeView的控件窗口加载时,调用递归加载菜单 private void LoadTvMenu(){this.nodeList objService.GetAllMenu(); // 通过Service得到全部数据// 创建一个根节点this.t…

SQL中如何添加数据

SQL中如何添加数据 一、SQL中如何添加数据(方法汇总)二、SQL中如何添加数据(方法详细解说)1. 使用SQL脚本(推荐)1.1 在表中插入1.1.1 **第一种形式**1.1.2 **第二种形式**SQL INSERT INTO 语法示例SQL INSE…

【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现

🌞前言 这里我们会实现一个项目:在linux操作系统下基于OpenCV和Socket的人脸识别系统。 目录 🌞前言 🌞一、项目介绍 🌞二、项目分工 🌞三、项目难题 🌞四、实现细节 🌼4.1 关…

读算法的陷阱:超级平台、算法垄断与场景欺骗笔记06_共谋(下)

1. 博弈论 1.1. 当市场竞争对手之间普遍存在着误解和不信任情绪时,从长远来看,他们一半时间是在合作,另一半时间则是在背叛承诺 1.2. 当一方越了解对手,或者说可以更好地掌握对方的战略性行为时,他才可能找到展开合作…

软件设计不是CRUD(14):低耦合模块设计理论——行为抽象与设计模式(上)

是不是看到“设计模式”四个字,各位读者就觉得后续内容要开始讲一些假大空的内容了?各位读者是不是有这样的感受,就是单纯讲设计模式的内容,网络上能找到很多资料,但是看过这些资料后读者很难将设计模式运用到实际的工作中。甚至出现了一种声音:设计模式是没有用的,应用…

C++:vector底层剖析

文章目录 前言成员变量成员函数vector ()size_t size()size_t capacity()iterator begin()和const_iterator begin()constiterator end()和const_iterator end()const~vector()void push_back(const&T val)vector<T>(const vector<T>& v)vector<T>&a…

前端解决跨域问题( 6种方法 )

本专栏是汇集了一些HTML常常被遗忘的知识&#xff0c;这里算是温故而知新&#xff0c;往往这些零碎的知识点&#xff0c;在你开发中能起到炸惊效果。我们每个人都没有过目不忘&#xff0c;过久不忘的本事&#xff0c;就让这一点点知识慢慢渗透你的脑海。 本专栏的风格是力求简洁…

美团2025春招第一次笔试题

第四题 题目描述 塔子哥拿到了一个大小为的数组&#xff0c;她希望删除一个区间后&#xff0c;使得剩余所有元素的乘积未尾至少有k个0。塔子哥想知道&#xff0c;一共有多少种不同的删除方案? 输入描述 第一行输入两个正整数 n,k 第二行输入n个正整数 a_i&#xff0c;代表…