RDD算子介绍(二)

1. coalesce

用于缩减分区,减少分区个数,减少任务调度成本。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

分区数可以减少,但是减少后的分区里的数据分布并不一定是均匀分布的,比如以下场景:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

结果现实1和2在一个分区,3 4 5 6四个数在第二个分区。因为coalesce算子默认不会打乱分区的数据进行重新组合的。原来1和2,3和4,5和6分别在三个分区里,如果缩减分区之后1 2 3在一个分区,4 5 6在一个分区,意味着将原来的3和4所在的分区里的数据打乱重新组合了。所以缩减分区后,应该将5和6所在的分区里的数据移到其他分区中去,即3 4 5 6最终在一个分区了。

coalesce算子可能会导致数据倾斜。如果想要数据均衡,需要进行shuffle处理,coalesce算子第二个参数就表示是否shuffle处理,默认是false,改为true即可,但是数据不一定有规律,这就是shuffle的效果。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2, true)
newRDD.saveAsTextFile("output")

结果显示1 4 5在一个分区,2 3 6在一个分区。

2. repartition

coalesce算子也可以增加分区,但是第二个参数须为true,但用得更多的是repartition算子。 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val newRDD = rdd.repartition(3)
newRDD.saveAsTextFile("output")

repartition算子源码中还是调用了coalesce算子(第二个参数为true)。

3. sortBy

见名知义,就是排序。

val rdd : RDD[Int] = sc.makeRDD(List(1, 4, 2, 3, 6, 5), 2)
val newRDD = rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")

数据重新排序,但是分区数不变,存在shuffle过程。 默认是升序排序,第二个参数传false,表示降序排序。

4. intersection、union、subtract、zip

val rdd1 : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 : RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))val rdd3 = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))val rdd6 : RDD[(Int, Int)]= rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))

 

 注意事项:

1)交集不去重

2)如果两个rdd的数据类型不同,不能做交集、并集、差集操作,但拉链可以

3)拉链操作的两个rdd的分区数需要一致,且分区中的数据数量也要一致

5. partitionBy

只有数据类型为key-value类型的rdd,才有partitionBy操作。partitionBy本身不是RDD的方法,是通过隐式转化得到的PairRDDFunctions的方法。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD = rdd.map((_, 1))val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")

这里默认使用HashPartitioner,即按照key的哈希值对分区数取模得到分区号,后续会介绍自定义分区器。

 6. reduceByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Int)] = rdd.reduceByKey((x:Int, y:Int) => {x + y})
newRDD.collect().foreach(println)

reduceByKey将相同key的值进行聚合,具体来说是两两聚合。 但是上例中"b"只有一个,是不会做两两聚合计算的。

7. groupByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[Int])] = rdd.groupByKey()
newRDD.collect().foreach(println)

groupByKey根据相同key的值进行分组,形成一个可迭代的集合。这与groupBy类似,但是区别是groupBy的可迭代集合不是原有value的集合,而是原来每个元素(即tuple)的集合:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[(String, Int)])] = rdd.groupBy(_._1)

reduceByKey和groupByKey的区别:

1) reduceByKey相比于groupByKey不仅做了分组,还做了聚合计算

2)groupByKey会将数据打乱重新组合,即存在shuffle操作。既然存在shuffle操作,如果后续还有map等转换操作,原来一个分区的数据处理完之后还需要等待其他分区的数据处理完,因为shuffle后的分区的数据可能不止来源于原来的一个分区。这种等待可能很耗时,并且占用大量内存,因此需要进行落盘操作。简而言之,shuffle操作必须有落盘处理,不能在内存中进行数据等待,否则可能会导致内存溢出,因此性能也不高

3)reduceByKey也会有shuffle,也会有落盘操作,但是在落盘之前,会对原来每个分区内的数据事先进行分组并聚合计算(预聚合,combine)。这样落盘的数据量少了,磁盘IO也少了,性能也提高了

8. aggregateByKey

reduceByKey会进行预聚合,这是分区内的聚合,然后shuffle操作打乱数据,进行分区间的聚合,此时分区内和分区间的聚合规则是一样的。如果分区内和分区间的聚合计算规则不一样,那就要使用aggregateByKey算子。例如,分区内求最大值,分区间求和:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

aggregateByKey有两个参数列表,第一个参数列表表示初始值(aggregateByKey的最终计算结果与这个初始值类型是相同的),用于和第一个key的value进行分区内计算,第二个参数列表的两个参数分别表示分区内和分区间的计算规则。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(5)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

 

计算相同key的value的平均值:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)val newRDD : RDD[(String,(Int, Int))] = rdd.aggregateByKey((0, 0))((t, v) => (t._1 + v, t._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))val result = newRDD.mapValue {case (val, num) => {val / num}
}result.collect().foreach(println)

9. foldByKey

如果分区内和分区间的计算规则一样,可以使用foldByKey算子。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.foldByKey(0)(_+_)
newRDD.collect().foreach(println)

 10. combineByKey

aggreagteByKey的初始值在一些场景其实很难确定,但如果初始值是相同key的第一个value或者其适当转换,就更为合理。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)val newRDD : RDD[(String,(Int, Int))] = rdd.combineByKey(v => (v, 1))((t : (Int, Int), v) => (t._1 + v, t._2 + 1), (t1 : (Int, Int), t2 : (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2))val result = newRDD.mapValue {case (val, num) => {val / num}
}result.collect().foreach(println)

 

wordCount的多种实现方式(假设已经得到所有的(单词, 1)的tuple):

rdd.reduceByKey(_+_)
rdd.aggregateByKey(0)(_+_, _+_)
rdd.foldByKey(0)(_+_)
rdd.comnbineByKey(v=>v)((x:Int, y:Int)=>x+y, (x:Int, y:Int)=>x+y)

观察源码,发现他们底层调用的都是combineByKey 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736304.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营Day41 ||leetCode 0-1背包问题 || 416. 分割等和子集

0-1背包问题 dp[i][j]的含义:从下标为[0-i]的物品里任意取,放进容量为j的背包,价值总和最大是多少。 那么可以有两个方向推出来dp[i][j], 不放物品i:由dp[i - 1][j]推出,即背包容量为j,里面不放…

02-app端文章查看,静态化freemarker,分布式文件系统minIO-黑马头条

app端文章查看,静态化freemarker,分布式文件系统minIO 1)文章列表加载 1.1)需求分析 文章布局展示 1.2)表结构分析 ap_article 文章基本信息表 ap_article_config 文章配置表 ap_article_content 文章内容表 三张表关系分析 1.3)导入文章数据库 1.3.1)导入数据…

详解@Configuration

简介 Configuration注解用于标识一个类作为Spring的配置类,从而允许使用纯Java代码的方式来定义Bean和Bean之间的依赖关系。 Configuration注解是Spring框架中非常核心的一个功能,它提供了一种替代XML配置文件的方法。通过该注解,开发人员可…

ROS2从入门到精通0-2:ROS2简介、对比ROS1与详细安装流程

目录 0 专栏介绍1 什么是机器人操作系统?2 ROS的发展历程3 ROS2与ROS1的区别4 ROS2安装4.1 基本安装4.2 测试ROS24.2.1 测试一:发布者与订阅者4.2.2 测试二:海龟仿真器 5 常见问题 0 专栏介绍 本专栏旨在通过对ROS2的系统学习,掌…

信息系统项目管理师--成本管理

项⽬成本管理重点关注完成项⽬活动所需资源的成本,但同时也考虑项⽬决策对项⽬产品、服务或成果的使⽤成本、维护成本和⽀持成本的影响。不同的⼲系⼈会在不同的时间,⽤不同的⽅法 测算项⽬成本。 就某些项⽬,特别是⼩项⽬⽽⾔,成…

FocusVisualStyle通常是键盘焦点样式

设置了Button的FocusVisualSytle但是死活没有效果&#xff0c;查了一下这个是键盘焦点样式&#xff0c;摁下Tab键了才能让Button有焦点 <ButtonWidth"100"Height"30"FocusVisualStyle"{DynamicResource MyFocusVisual}">按钮1 </Butto…

VSCode报错:/bin/sh: python: command not found

背景 以前都是直接用txt写python&#xff0c;然后直接命令行运行。 这次涉及的代码较多&#xff0c;决定用编译器。 写好的一段python点击运行报错&#xff01; 问题描述 因为我本地安装的是python3&#xff0c;但是vscode用的是另一个路径的python&#xff0c;所以找不到 解决…

_note_04_02

1&#xff1a;输出1-20之间的奇数。 package _09142023.homework_04;import java.util.Scanner;/**** 输出1-20之间的奇数。**/ public class _07_1 {public static void main(String[] args) {System.out.println("1到20之间的奇数&#xff1a;");for (int i 1; i…

视觉语言处理:用Transformer桥接视觉与语言

引言 人工智能研究的前沿领域见证了显著的交叉融合。将计算机视觉和自然语言处理的领域融合&#xff0c;问题随之而来&#xff1a;AI能否直接从其视觉表现&#xff0c;即从原始像素中辨识和理解语言&#xff1f;在这篇博客中&#xff0c;我试图探究AI从图像中直接理解自然语言的…

在 Android 上恢复已删除文件的 5 种简单方法

您可能会因为意外删除、未完成的 Android 更新、手机意外关机等原因而丢失 Android 上的重要数据。新技术的发展使许多手机功能或程序能够从内部恢复丢失的数据。 在 Android 上恢复已删除文件的 5 种简单方法 然而恢复成功率的不确定性也成为人们克服数据丢失困境的重要考虑因…

Android14音频进阶:AudioTrack与AudioFlinger创建数据通道(五十八)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

子类和父类不在同一包中的继承性

不同包中的继承性 继承父类的protected 、public 成员变量 继承父类的 protected 、public 方法&#xff1b; 继承的成员或方法的 。 public class People { int age,leg 2,hand 2; public void showPeopleMess() { System.out.printf("%d岁&#xff0c;%d只脚,%d只手…

[Spring] IoC 控制反转和DI依赖注入和Spring中的实现以及常见面试题

目录 1. 什么是Spring 2.什么是IoC容器 3.通过实例来深入了解IoC容器的作用 3.1造一量可以定义车辆轮胎尺寸的车出现的问题 3.2解决方法 3.3IoC优势 4.DI介绍 5.Spring中的IoC和DI的实现 5.1.存对象 5.1.2 类注解 5.1.3 方法注解 5.2取对像 (依赖注入) 5.2.1.属性…

FPGA高端项目:FPGA基于GS2971的SDI视频接收+HLS多路视频融合叠加,提供1套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收转HDMI输出应用本方案的SDI接收图像缩放应用本方案的SDI接收纯verilog图像缩放纯verilog多路视频拼接应用本方案的SDI接收HLS图像缩放Video Mixer多路视频拼接应用本方案的SDI接收OSD动态字符叠加…

护眼台灯怎么选比较好?明基、爱德华、书客护眼台灯硬核PK测评

现在不管是学生党学习阅读&#xff0c;还是办公族加班工作&#xff0c;都离不开一盏光源舒适的台灯&#xff0c;然而如今的台灯市场水实在太深的&#xff0c;各种网红、劣质产品混杂在其中&#xff0c;这类台灯往往采用劣质电源&#xff0c;其电源品质较差&#xff0c;导致输出…

开发指南002-前后端信息交互规范-返回值定义

public enum IOResultEnum {SUCCESS(88888888, "IOResult_88888888"),//操作成功ERROR(99999999, "IOResult_99999999"), //操作失败EXCEPTION(11111111, "IOResult_11111111"),//操作异常AUTHORIZATE_FAIL(9000,"IOResult_9000"),//没…

VUE3 显示Echarts百度地图

本次实现最终效果 技术基础以及环境要求 vue3 echarts 百度地图API 要求1&#xff1a; VUE3 环境搭建&#xff1a;https://blog.csdn.net/LQ_001/article/details/136293795 要求2&#xff1a; VUE3 echatrs 环境搭建:https://blog.csdn.net/LQ_001/article/details/1363…

Ps:画笔工具

画笔工具 Brush Tool是 Photoshop 中最常用的工具&#xff0c;可广泛地用于绘画与修饰工作之中。 快捷键&#xff1a;B ◆ ◆ ◆ 常用操作方法与技巧 1、熟练掌握画笔工具的操作对于使用其他工具也非常有益&#xff0c;因为 Photoshop 中许多与笔刷相关的工具有类似的选项和操…

docker一键安装debian/ubuntu桌面环境LXDE+VNC+Firefox

好处不用说&#xff0c;可以在linux服务器版本使用chrome和firefox浏览器&#xff0c;步骤如下&#xff1a; 1.拉取镜像 镜像大概有1.3G左右 docker pull dorowu/ubuntu-desktop-lxde-vnc 使用docker images 查看镜像id 2.运行容器 docker run -dit --name ubuntu -p 6080…

多层菜单的实现方案(含HierarchicalDataTemplate使用)

1、递归 下面是Winform的递归添加菜单栏数据&#xff0c;数据设置好父子id方便递归使用 在TreeView的控件窗口加载时&#xff0c;调用递归加载菜单 private void LoadTvMenu(){this.nodeList objService.GetAllMenu(); // 通过Service得到全部数据// 创建一个根节点this.t…