RDD算子介绍

1. RDD算子

RDD算子也叫RDD方法,主要分为两大类:转换和行动。转换,即一个RDD转换为另一个RDD,是功能的转换与补充,比如map,flatMap。行动,则是触发任务的执行,比如collect。所谓算子(Operator),就是通过操作改变问题的状态(来源于认知心理学)。RDD算子有Value类型,双Value类型和Key-Value类型。

2. map

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD : RDD[Int] = rdd.map(num=>num*2)
mapRDD.collect().foreach(println)
val rdd : RDD[String] = sc.textFile("data")
val mapRDD : RDD[String] = rdd.map(line => {val datas = line.split(" ")datas(3)
})
mapRDD.collect().foreach(println)

为观察map阶段的分区并行计算过程,添加如下打印

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD1 : RDD[Int] = rdd.map(num => {println(">>>>>>>>")num
})
val mapRDD2 : RDD[Int] = rdd.map(num => {println("######")num
})
mapRDD2.collect().foreach(println)

结果如下:

 

看不出什么规律,改为1个分区:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)
val mapRDD1 : RDD[Int] = rdd.map(num => {println(">>>>>>>>")num
})
val mapRDD2 : RDD[Int] = rdd.map(num => {println("######")num
})
mapRDD2.collect().foreach(println)

结果如下:

 

所以,RDD的计算对于分区内的数据是一个个执行的,即分区内数据的执行是有序的,但是分区间的数据执行是无序的。

3. mapPartitions

上述的map算子对于分区内的数据是一个个依次进行操作,可能存在性能问题,而mapPartitions算子是对于整个分区的数据整体进行操作,但是可能会占用大量空间(以空间换时间)。mapPartitions的参数是iter=>iter。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitions(iter => {println(">>>>>>>>")iter.map(_*2)
})
mapRDD.collect().foreach(println)

结果如下:

 

因为只有两个分区,所以打印两次">>>>>>>>"。使用mapPartitions获取每个分区的最大值:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitions(iter => {List(iter.max).iterator
})
mapRDD.collect().foreach(println)

这个功能是map算子所实现不了的,因为map算子并不能感知数据来源于分区,而mapPartitions可以以分区为单位进行数据处理(批处理操作)。

4. mapPatitionsWithIndex

mapPartitions虽然以分区为单位进行数据批处理,但是其实也感知不到分区是哪个分区,在一些需要知道分区号的场景下,需要用到mapPatitionsWithIndex。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {if (index == 1) {iter} else {Nil.iterator}
})
mapRDD.collect().foreach(println)

上述代码实现了保留第二个(索引为1)分区,结果如下:

 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD : RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {iter.map(num => (index, num))
})
mapRDD.collect().foreach(println)

上述代码实现了查看每个数据在哪个分区,结果如下:

5. flatMap

flatMap做扁平化映射

val rdd : RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
val mapRDD : RDD[Int] = rdd.flatMap(list => list)
mapRDD.collect().foreach(println)
val rdd : RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
val mapRDD : RDD[String] = rdd.flatMap(s => s.split(" "))
mapRDD.collect().foreach(println)

结果如下:

 

将List(List(1,2), 3, List(4,5))进行扁平化操作(使用模式匹配):

val rdd : RDD[List[Int]] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
val mapRDD : RDD[Int] = rdd.flatMap(data => {data match {case list:List[] => listcase num => List(num)}
})
mapRDD.collect().foreach(println)

6. glom

glom操作有点类似于flatMap的逆操作,将分区内的数据转换为相同类型的内存数组,分区不变。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD : RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>data.mkString(","))

结果如下:

 

求各分区最大值之和:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD : RDD[Array[Int]] = rdd.glom()
val maxRDD : RDD[Int] = glomRDD.map(array => array.max)
println(maxRDD.collect().sum))

7. groupBy

按照指定的key进行分组

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val groupRDD : RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)
groupRDD.collect().foreach(println)

结果如下:

 

按照首字母分组

val rdd : RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
val groupRDD = rdd.groupBy(s => s.charAt(0))
groupRDD.collect().foreach(println)

结果如下:

 

分组的过程可能会打乱数据,即数据可能会重新组合,原分区的数据被分到另一个分区了,即shuffle过程。极限情况下,数据可能被分到一个分区中。一个组的数据在一个分区中,但是一个分区不一定只有一个组。

8. filter

过滤偶数

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val filterRDD : RDD[Int] = rdd.filter(num % 2 == 0)
filterRDD.collect().foreach(println)

按照指定规则进行数据过滤,分区不变,过滤后,不同分区内的数据可能不均衡,即数据倾斜。 

过滤指定日期的数据:

val rdd : RDD[String] = sc.textFile("data")
val filterRDD : RDD[String] = rdd.filter(line => {val datas = line.split(" ")datas(3).startWith("17/05/2015")
})
filterRDD.collect().foreach(println)

9. sample

采样/抽取数据,用的一般不多,其中一个用途可能是解决数据倾斜问题。sample算子主要有三个参数,第一个是抽取的数放不放回去,第二个参数是概率,如果抽取不放回,则表示每个数被抽取的概率,如果抽取放回,则表示某个数可能的抽取次数(可能的次数而已),第三个参数是随机数算法种子(一般可不填,如果填了,可能会导致抽取结果固定)。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(false, 0.4, 1)
println(sampleRDD.collect().mkstring(","))

 结果如下:

多运行几次,发现结果不变,因为随机数算法种子固定了,如果不传,则默认使用系统时间(变化的)。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(false, 0.4)
println(sampleRDD.collect().mkstring(","))

此时结果就不固定,结果都不一定为4个数。

根据源码,如果抽取不放回,抽取算法为伯努利分布,如果抽取放回,则为泊松分布。

如果抽取放回,

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(true, 2)
println(sampleRDD.collect().mkstring(","))

 结果如下:

10. distinct

distinct算子用于去重

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
val distinctRDD : RDD[Int] = rdd.distinct()
distinctRDD.collect().foreach(println)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/714558.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1551.是数组中所有元素相等的最小操作数

存在一个长度为 n 的数组 arr &#xff0c;其中 arr[i] (2 * i) 1 &#xff08; 0 < i < n &#xff09;。 一次操作中&#xff0c;你可以选出两个下标&#xff0c;记作 x 和 y &#xff08; 0 < x, y < n &#xff09;并使 arr[x] 减去 1 、arr[y] 加上 1 &…

Mac专用投屏工具AirServer 7.27 for Mac中文版2024最新图文教程

Mac专用投屏工具AirServer 7.27 for Mac中文版是一款适用于Mac的投屏工具&#xff0c;可以将Mac屏幕快速投影到其他设备上&#xff0c;如电视、投影仪、平板等。 Mac专用投屏工具AirServer 7.27 for Mac中文版具有优秀的兼容性&#xff0c;可以与各种设备配合使用。无论是iPhon…

基于springboot+vue的在线考试系统(源码+论文)

文章目录 目录 文章目录 前言 一、功能设计 二、功能页面 三、论文 前言 现在我国关于在线考试系统的发展以及专注于对无纸化考试的完善程度普遍不高&#xff0c;关于对考试的模式还大部分还停留在纸介质使用的基础上&#xff0c;这种教学模式已不能解决现在的时代所产生的考试…

【MySQL】数据库的操作

【MySQL】数据库的操作 目录 【MySQL】数据库的操作创建数据库数据库的编码集和校验集查看系统默认字符集以及校验规则查看数据库支持的字符集查看数据库支持的字符集校验规则校验规则对数据库的影响数据库的删除 数据库的备份和恢复备份还原不备份整个数据库&#xff0c;而是备…

YOLOv9改进|增加SPD-Conv无卷积步长或池化:用于低分辨率图像和小物体的新 CNN 模块

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;主力高效涨点&#xff01;&#xff01;&#xff01; 一、文章摘要 卷积神经网络(CNNs)在计算即使觉任务中如图像分类和目标检测等取得了显著的成功。然而&#xff0c;当图像分辨率较低或物体较小时&…

【LeetCode刷题】146. LRU 缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中&#xff0c;则返回关键字的值&#xff0c;否则返回 -…

全量知识系统问题及SmartChat给出的答复 之9 三套工具之4语法解析器 之2

Q23. 一个语言的语法简约规则 这些规则显示show 在一个给定单词&#xff08;a given word&#xff09;的右边或左边可能出现的单词的类别。句型的多样性variety不是复杂文法&#xff08;a complex grammar&#xff09;的结果&#xff0c;而是简单语法&#xff08;a simple gra…

【InternLM 实战营笔记】浦语·灵笔的图文理解及创作部署、 Lagent 工具调用 Demo

浦语灵笔的图文理解及创作部署 浦语灵笔是基于书生浦语大语言模型研发的视觉-语言大模型&#xff0c;提供出色的图文理解和创作能力&#xff0c;结合了视觉和语言的先进技术&#xff0c;能够实现图像到文本、文本到图像的双向转换。使用浦语灵笔大模型可以轻松的创作一篇图文推…

进程间的通信 -- 共享内存

一 共享内存的概念 1. 1 共享内存的原理 之前我们学过管道通信&#xff0c;分为匿名管道和命名管道&#xff0c;匿名管道通过父子进程的属性继承原理来完成父子进程看到同一份资源的目的&#xff0c;而命名管道则是通过路径与文件名来唯一标识管道文件&#xff0c;来让不同的进…

学习Android的第二十一天

目录 Android ProgressDialog (进度条对话框) 例子 Android DatePickerDialog 日期选择对话框 例子 Android TimePickerDialog 时间选择对话框 Android PopupWindow 悬浮框 构造函数 方法 例子 官方文档 Android OptionMenu 选项菜单 例子 官方文档 Android Progr…

Java实战:Spring Boot中各类参数校验机制

引言 在开发Web应用程序时&#xff0c;对客户端传入的参数进行有效校验是保证系统安全性和稳定性的重要环节。Spring Boot作为一个现代化的Java开发框架&#xff0c;提供了多种参数校验的方法和工具&#xff0c;以满足不同场景下的需求。本文将深入探讨Spring Boot中实现各种参…

typescript 的常用方式

文章目录 前言一、绑定props 默认值的方式&#xff1a;withDefaults1.vue2 的props设置默认值2.vue3 的props设置默认值(1) 不设置默认值的写法(2) 设置默认值的写法&#xff08;分离模式&#xff09;(3) 设置默认值的写法&#xff08;组合模式&#xff09; 二、定义一个二维数…

Matlab在同一张图中如何加入多个图例

根据代码最终画出的图片如下&#xff1a; 其实原理很简单&#xff0c;就是在一张figure中画多个坐标轴&#xff0c;每个坐标轴都有对应的图例&#xff0c;之后再将多余坐标轴隐藏&#xff0c;只保留一个即可。 代码如下&#xff1a; clear all; close all;dd_linewidth 1;a …

maven archetype 项目原型

拓展阅读 maven 包管理平台-01-maven 入门介绍 Maven、Gradle、Ant、Ivy、Bazel 和 SBT 的详细对比表格 maven 包管理平台-02-windows 安装配置 mac 安装配置 maven 包管理平台-03-maven project maven 项目的创建入门 maven 包管理平台-04-maven archetype 项目原型 ma…

Spring学习笔记(六)利用Spring的jdbc实现学生管理系统的用户登录功能

一、案例分析 本案例要求学生在控制台输入用户名密码&#xff0c;如果用户账号密码正确则显示用户所属班级&#xff0c;如果登录失败则显示登录失败。 &#xff08;1&#xff09;为了存储学生信息&#xff0c;需要创建一个数据库。 &#xff08;2&#xff09;为了程序连接数…

洛谷P1927防护伞

题目描述 据说 20122012 的灾难和太阳黑子的爆发有关。于是地球防卫小队决定制造一个特殊防护伞&#xff0c;挡住太阳黑子爆发的区域&#xff0c;减少其对地球的影响。由于太阳相对于地球来说实在是太大了&#xff0c;我们可以把太阳表面看作一个平面&#xff0c;中心定为(0,0…

C 基本语法

我们已经看过 C 程序的基本结构&#xff0c;这将有助于我们理解 C 语言的其他基本的构建块。 C 的令牌&#xff08;Token&#xff09; C 程序由各种令牌组成&#xff0c;令牌可以是关键字、标识符、常量、字符串值&#xff0c;或者是一个符号。例如&#xff0c;下面的 C 语句…

30天自制操作系统(第23天)

23.1 编写malloc 参考第22天的内容&#xff0c;在绘制窗口前先分配了150*50个字节大小的内存&#xff0c;所以导致该文件经编译后有7.6k左右&#xff0c;能否在其中使用指针呢&#xff1f;当需要开辟空间时&#xff0c;移动指针即可。在之前的章节中也有函数memman_alloc函数可…

php源码 单色bmp图片取模工具 按任意方式取模 生成字节数组 自由编辑点阵

http://2.wjsou.com/BMP/index.html 想试试chatGPT4生成&#xff0c;还是要手工改 php 写一个网页界面上可以选择一张bmp图片&#xff0c;界面上就显示这张bmp图片&#xff0c; 点生成取模按钮&#xff0c;在图片下方会显示这张bmp图片的取模数据。 取模规则是按界面设置的&a…

Linux 的交换空间(swap)是什么?有什么用?

目录 swap是什么&#xff1f;swap有什么用&#xff1f;swap使用典型场景如何查看你的系统是否用到交换空间呢&#xff1f;查看系统中swap in/out的情况 swap是什么&#xff1f; swap就是磁盘上的一块区域。它和Windows系统中的交换文件作用类似&#xff0c;但是它是一段连续的…