RDD算子介绍

1. RDD算子

RDD算子也叫RDD方法,主要分为两大类:转换和行动。转换,即一个RDD转换为另一个RDD,是功能的转换与补充,比如map,flatMap。行动,则是触发任务的执行,比如collect。所谓算子(Operator),就是通过操作改变问题的状态(来源于认知心理学)。RDD算子有Value类型,双Value类型和Key-Value类型。

2. map

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD : RDD[Int] = rdd.map(num=>num*2)
mapRDD.collect().foreach(println)
val rdd : RDD[String] = sc.textFile("data")
val mapRDD : RDD[String] = rdd.map(line => {val datas = line.split(" ")datas(3)
})
mapRDD.collect().foreach(println)

为观察map阶段的分区并行计算过程,添加如下打印

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD1 : RDD[Int] = rdd.map(num => {println(">>>>>>>>")num
})
val mapRDD2 : RDD[Int] = rdd.map(num => {println("######")num
})
mapRDD2.collect().foreach(println)

结果如下:

 

看不出什么规律,改为1个分区:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)
val mapRDD1 : RDD[Int] = rdd.map(num => {println(">>>>>>>>")num
})
val mapRDD2 : RDD[Int] = rdd.map(num => {println("######")num
})
mapRDD2.collect().foreach(println)

结果如下:

 

所以,RDD的计算对于分区内的数据是一个个执行的,即分区内数据的执行是有序的,但是分区间的数据执行是无序的。

3. mapPartitions

上述的map算子对于分区内的数据是一个个依次进行操作,可能存在性能问题,而mapPartitions算子是对于整个分区的数据整体进行操作,但是可能会占用大量空间(以空间换时间)。mapPartitions的参数是iter=>iter。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitions(iter => {println(">>>>>>>>")iter.map(_*2)
})
mapRDD.collect().foreach(println)

结果如下:

 

因为只有两个分区,所以打印两次">>>>>>>>"。使用mapPartitions获取每个分区的最大值:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitions(iter => {List(iter.max).iterator
})
mapRDD.collect().foreach(println)

这个功能是map算子所实现不了的,因为map算子并不能感知数据来源于分区,而mapPartitions可以以分区为单位进行数据处理(批处理操作)。

4. mapPatitionsWithIndex

mapPartitions虽然以分区为单位进行数据批处理,但是其实也感知不到分区是哪个分区,在一些需要知道分区号的场景下,需要用到mapPatitionsWithIndex。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {if (index == 1) {iter} else {Nil.iterator}
})
mapRDD.collect().foreach(println)

上述代码实现了保留第二个(索引为1)分区,结果如下:

 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD : RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {iter.map(num => (index, num))
})
mapRDD.collect().foreach(println)

上述代码实现了查看每个数据在哪个分区,结果如下:

5. flatMap

flatMap做扁平化映射

val rdd : RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
val mapRDD : RDD[Int] = rdd.flatMap(list => list)
mapRDD.collect().foreach(println)
val rdd : RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
val mapRDD : RDD[String] = rdd.flatMap(s => s.split(" "))
mapRDD.collect().foreach(println)

结果如下:

 

将List(List(1,2), 3, List(4,5))进行扁平化操作(使用模式匹配):

val rdd : RDD[List[Int]] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
val mapRDD : RDD[Int] = rdd.flatMap(data => {data match {case list:List[] => listcase num => List(num)}
})
mapRDD.collect().foreach(println)

6. glom

glom操作有点类似于flatMap的逆操作,将分区内的数据转换为相同类型的内存数组,分区不变。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD : RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>data.mkString(","))

结果如下:

 

求各分区最大值之和:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD : RDD[Array[Int]] = rdd.glom()
val maxRDD : RDD[Int] = glomRDD.map(array => array.max)
println(maxRDD.collect().sum))

7. groupBy

按照指定的key进行分组

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val groupRDD : RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)
groupRDD.collect().foreach(println)

结果如下:

 

按照首字母分组

val rdd : RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
val groupRDD = rdd.groupBy(s => s.charAt(0))
groupRDD.collect().foreach(println)

结果如下:

 

分组的过程可能会打乱数据,即数据可能会重新组合,原分区的数据被分到另一个分区了,即shuffle过程。极限情况下,数据可能被分到一个分区中。一个组的数据在一个分区中,但是一个分区不一定只有一个组。

8. filter

过滤偶数

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val filterRDD : RDD[Int] = rdd.filter(num % 2 == 0)
filterRDD.collect().foreach(println)

按照指定规则进行数据过滤,分区不变,过滤后,不同分区内的数据可能不均衡,即数据倾斜。 

过滤指定日期的数据:

val rdd : RDD[String] = sc.textFile("data")
val filterRDD : RDD[String] = rdd.filter(line => {val datas = line.split(" ")datas(3).startWith("17/05/2015")
})
filterRDD.collect().foreach(println)

9. sample

采样/抽取数据,用的一般不多,其中一个用途可能是解决数据倾斜问题。sample算子主要有三个参数,第一个是抽取的数放不放回去,第二个参数是概率,如果抽取不放回,则表示每个数被抽取的概率,如果抽取放回,则表示某个数可能的抽取次数(可能的次数而已),第三个参数是随机数算法种子(一般可不填,如果填了,可能会导致抽取结果固定)。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(false, 0.4, 1)
println(sampleRDD.collect().mkstring(","))

 结果如下:

多运行几次,发现结果不变,因为随机数算法种子固定了,如果不传,则默认使用系统时间(变化的)。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(false, 0.4)
println(sampleRDD.collect().mkstring(","))

此时结果就不固定,结果都不一定为4个数。

根据源码,如果抽取不放回,抽取算法为伯努利分布,如果抽取放回,则为泊松分布。

如果抽取放回,

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(true, 2)
println(sampleRDD.collect().mkstring(","))

 结果如下:

10. distinct

distinct算子用于去重

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
val distinctRDD : RDD[Int] = rdd.distinct()
distinctRDD.collect().foreach(println)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/714558.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac专用投屏工具AirServer 7.27 for Mac中文版2024最新图文教程

Mac专用投屏工具AirServer 7.27 for Mac中文版是一款适用于Mac的投屏工具,可以将Mac屏幕快速投影到其他设备上,如电视、投影仪、平板等。 Mac专用投屏工具AirServer 7.27 for Mac中文版具有优秀的兼容性,可以与各种设备配合使用。无论是iPhon…

基于springboot+vue的在线考试系统(源码+论文)

文章目录 目录 文章目录 前言 一、功能设计 二、功能页面 三、论文 前言 现在我国关于在线考试系统的发展以及专注于对无纸化考试的完善程度普遍不高,关于对考试的模式还大部分还停留在纸介质使用的基础上,这种教学模式已不能解决现在的时代所产生的考试…

【MySQL】数据库的操作

【MySQL】数据库的操作 目录 【MySQL】数据库的操作创建数据库数据库的编码集和校验集查看系统默认字符集以及校验规则查看数据库支持的字符集查看数据库支持的字符集校验规则校验规则对数据库的影响数据库的删除 数据库的备份和恢复备份还原不备份整个数据库,而是备…

YOLOv9改进|增加SPD-Conv无卷积步长或池化:用于低分辨率图像和小物体的新 CNN 模块

专栏介绍:YOLOv9改进系列 | 包含深度学习最新创新,主力高效涨点!!! 一、文章摘要 卷积神经网络(CNNs)在计算即使觉任务中如图像分类和目标检测等取得了显著的成功。然而,当图像分辨率较低或物体较小时&…

【LeetCode刷题】146. LRU 缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类: LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中,则返回关键字的值,否则返回 -…

【InternLM 实战营笔记】浦语·灵笔的图文理解及创作部署、 Lagent 工具调用 Demo

浦语灵笔的图文理解及创作部署 浦语灵笔是基于书生浦语大语言模型研发的视觉-语言大模型,提供出色的图文理解和创作能力,结合了视觉和语言的先进技术,能够实现图像到文本、文本到图像的双向转换。使用浦语灵笔大模型可以轻松的创作一篇图文推…

进程间的通信 -- 共享内存

一 共享内存的概念 1. 1 共享内存的原理 之前我们学过管道通信,分为匿名管道和命名管道,匿名管道通过父子进程的属性继承原理来完成父子进程看到同一份资源的目的,而命名管道则是通过路径与文件名来唯一标识管道文件,来让不同的进…

typescript 的常用方式

文章目录 前言一、绑定props 默认值的方式:withDefaults1.vue2 的props设置默认值2.vue3 的props设置默认值(1) 不设置默认值的写法(2) 设置默认值的写法(分离模式)(3) 设置默认值的写法(组合模式) 二、定义一个二维数…

Matlab在同一张图中如何加入多个图例

根据代码最终画出的图片如下: 其实原理很简单,就是在一张figure中画多个坐标轴,每个坐标轴都有对应的图例,之后再将多余坐标轴隐藏,只保留一个即可。 代码如下: clear all; close all;dd_linewidth 1;a …

maven archetype 项目原型

拓展阅读 maven 包管理平台-01-maven 入门介绍 Maven、Gradle、Ant、Ivy、Bazel 和 SBT 的详细对比表格 maven 包管理平台-02-windows 安装配置 mac 安装配置 maven 包管理平台-03-maven project maven 项目的创建入门 maven 包管理平台-04-maven archetype 项目原型 ma…

Spring学习笔记(六)利用Spring的jdbc实现学生管理系统的用户登录功能

一、案例分析 本案例要求学生在控制台输入用户名密码,如果用户账号密码正确则显示用户所属班级,如果登录失败则显示登录失败。 (1)为了存储学生信息,需要创建一个数据库。 (2)为了程序连接数…

php源码 单色bmp图片取模工具 按任意方式取模 生成字节数组 自由编辑点阵

http://2.wjsou.com/BMP/index.html 想试试chatGPT4生成,还是要手工改 php 写一个网页界面上可以选择一张bmp图片,界面上就显示这张bmp图片, 点生成取模按钮,在图片下方会显示这张bmp图片的取模数据。 取模规则是按界面设置的&a…

Linux 的交换空间(swap)是什么?有什么用?

目录 swap是什么?swap有什么用?swap使用典型场景如何查看你的系统是否用到交换空间呢?查看系统中swap in/out的情况 swap是什么? swap就是磁盘上的一块区域。它和Windows系统中的交换文件作用类似,但是它是一段连续的…

03、MongoDB -- MongoDB 权限的设计

目录 MongoDB 权限的设计演示前准备:启动 mongodb 服务器 和 客户端 :1、启动单机模式的 mongodb 服务器2、启动 mongodb 的客户端 MongoDB 权限的设计1、MongoDB 的每个数据库都可以保存用户,不止admin数据库可以保存用户。2、保存用户的数据…

Linux 学习笔记(8)

八、 启动引导 1 、 Linux 的启动流程 1) BIOS 自检 2) 启动 GRUB/LILO 3) 运行 Linux kernel 并检测硬件 4) 挂载根文件系统 5) 运行 Linux 系统的第一个进程 init( 其 PID 永远为 1 ,是所有其它进程的父进程 ) 6) init 读取系统引导配置文件…

GD25Q32驱动

GD25Q32是一款基于SPI的Flash芯片,容量为32/84M bytes。它的引脚如下: 该芯片支持多种SPI操作方式,包括:Standard SPI(标准SPI)、Dual SPI(双线 SPI)和Quad SPI(四线 SPI) 。有关SPI的介绍可以参考: SPI通信原理-CSDN…

flutter 文字一行显示,超出换行

因为app有多语言,中文和其他语言长度不一致,可能导致英文会很长。 中文样式 英文样式 代码 Row(mainAxisAlignment: MainAxisAlignment.end,crossAxisAlignment: CrossAxisAlignment.end,children: [Visibility(visible: controller.info.fee ! null,ch…

探寻2024年国内热门低代码平台排行!| 功能特点一览

低代码开发是一项革命性的技术,主要目的是尽量避免程序研发的复杂性,让外行开发者也能加入到应用程序的搭建中。低代码平台的核心概念和构成部分通常包括用户界面和拖拽设计、预构件和模块、自动化工作内容与数据库集成和扩展应用,应用低代码…

U盘弹出提示“该设备正在使用中”:原因与解决方案

在日常使用U盘时,我们可能会遇到一个问题:当尝试安全弹出U盘时,系统提示“该设备正在使用中”。这种情况可能会让用户感到困惑,担心数据是否安全或是否会导致U盘损坏。本文旨在探讨这一现象背后的原因,并提供相应的解决…

【前端素材】推荐优质后台管理系统网页Stisla平台模板(附源码)

一、需求分析 1、系统定义 后台管理系统是一种用于管理和控制网站、应用程序或系统的管理界面。它通常被设计用来让网站或应用程序的管理员或运营人员管理内容、用户、数据以及其他相关功能。后台管理系统是一种用于管理网站、应用程序或系统的工具,通常由管理员使…