Spark-RDD算子大全

Spark RDD(弹性分布式数据集)是Spark中的核心抽象,它代表一个不可变、分区的分布式数据集合。下面是一些常用的RDD算子:

转换算子:

  1. map(func):对RDD中的每个元素应用给定的函数,返回一个新的RDD。

  2. filter(func):对RDD中的每个元素应用给定的函数,返回满足条件的元素组成的新的RDD。

  3. flatMap(func):对RDD中的每个元素应用给定的函数并返回一个迭代器,将所有迭代器的元素组合成一个新的RDD。

  4. distinct():去除RDD中的重复元素,返回一个包含唯一元素的新的RDD。

  5. groupByKey():对具有相同键的元素进行分组,返回一个键值对的RDD。

  6. sortByKey():按照键对RDD中的元素进行排序,返回一个键值对的RDD。

  7. join(otherRDD):将两个RDD按照键进行连接操作,返回一个键值对的RDD。

  8. union(otherRDD):将两个RDD进行合并,返回一个包含两个RDD所有元素的新的RDD。

  9. aggregateByKey(zeroValue)(seqOp, combOp):对每个键的元素进行聚合操作,返回一个键值对的RDD。

行动算子:

  1. collect():将RDD中的所有元素以数组的形式返回到驱动程序。

  2. count():返回RDD中的元素数量。

  3. first():返回RDD中的第一个元素。

  4. take(n):返回RDD中的前n个元素。

  5. reduce(func):使用给定的函数对RDD中的元素进行归约操作,返回一个元素。

  6. foreach(func):对RDD中的每个元素应用给定的函数。

  7. saveAsTextFile(path):将RDD中的元素保存为文本文件。

  8. saveAsObjectFile(path):将RDD中的元素保存为序列化的对象文件。

进行shuffle操作的一些常见算子:

  1. groupByKey(): 将具有相同键的元素分组到一起,并创建一个键值对的RDD。这个操作会导致数据的重新洗牌,将具有相同键的数据移动到同一个分区。

  2. reduceByKey(): 通过对具有相同键的值进行reduce操作来合并数据,并创建一个键值对的RDD。这个操作也会导致数据的重新洗牌,将具有相同键的数据移动到同一个分区。

  3. sortByKey(): 根据键对RDD进行排序。这个操作需要将数据重新洗牌,将具有相同键的数据移动到同一个分区。

  4. join(): 在两个键值对的RDD之间执行内连接操作。这个操作会对两个RDD进行重新洗牌,并将具有相同键的数据移动到同一个分区。

  5. cogroup(): 将具有相同键的两个RDD的数据进行分组,并返回键值对的RDD。这个操作会对两个RDD进行重新洗牌,将具有相同键的数据移动到同一个分区。

  6. distinct(): 去除RDD中的重复元素,并返回一个新的RDD。这个操作需要将数据进行重新洗牌,以确保在整个数据集上去重。

groupByKey()和reduceByKey()区别:

如果只需要将具有相同键的值分组起来,而不进行聚合计算,可以使用groupByKey()

而如果需要对具有相同键的值进行聚合计算,并返回一个键值对的RDD,可以使用reduceByKey()

在性能方面,尽量使用reduceByKey()来减少数据的传输和处理开销。


1.map(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 对RDD中的每个元素进行平方操作
squaredRDD = rdd.map(lambda x: x**2)# 输出结果
print(squaredRDD.collect())  # [1, 4, 9, 16, 25]

2.filter(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 过滤RDD中的偶数元素
filteredRDD = rdd.filter(lambda x: x % 2 == 0)# 输出结果
print(filteredRDD.collect())  # [2, 4]

3.flatMap(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 对RDD中的每个元素进行重复操作
flatMapRDD = rdd.flatMap(lambda x: [x, x, x])# 输出结果
print(flatMapRDD.collect())  # [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5]

4.distinct():

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 2, 3, 4, 4, 5])# 去除RDD中的重复元素
distinctRDD = rdd.distinct()# 输出结果
print(distinctRDD.collect())  # [1, 2, 3, 4, 5]

5.reduce(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 对RDD中的元素求和
sum = rdd.reduce(lambda x, y: x + y)# 输出结果
print(sum)  # 15

6.groupByKey():

# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 'a'), (2, 'b'), (1, 'c'), (2, 'd')])# 按键进行分组
groupedRDD = rdd.groupByKey()# 输出结果
for key, values in groupedRDD.collect():print(key, list(values))
# 1 ['a', 'c']
# 2 ['b', 'd']

7.sortByKey():

# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 'c'), (2, 'b'), (3, 'a')])# 按键进行排序
sortedRDD = rdd.sortByKey()# 输出结果
print(sortedRDD.collect())  # [(1, 'c'), (2, 'b'), (3, 'a')]

8.join(otherRDD):

# 创建两个键值对的RDD
rdd1 = sparkContext.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sparkContext.parallelize([(1, 'c'), (2, 'd')])# 按键进行连接
joinedRDD = rdd1.join(rdd2)# 输出结果
print(joinedRDD.collect())  # [(1, ('a', 'c')), (2, ('b', 'd'))]

9.union(otherRDD):

# 创建两个RDD
rdd1 = sparkContext.parallelize([1, 2, 3])
rdd2 = sparkContext.parallelize([4, 5, 6])# 合并两个RDD
unionRDD = rdd1.union(rdd2)# 输出结果
print(unionRDD.collect())  # [1, 2, 3, 4, 5, 6]

10.aggregateByKey(zeroValue)(seqOp, combOp):

# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 2), (1, 4), (2, 3), (2, 5)])# 对每个键的元素进行求和操作
sumRDD = rdd.aggregateByKey(0, lambda x, y: x + y, lambda a, b: a + b)# 输出结果
print(sumRDD.collect())  # [(1, 6), (2, 8)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/634387.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue基于Spring Boot的中医在线学习课程购买服务管理系统

SpinrgBoot的主要优点有: 1、为所有spring开发提供了一个更快、更广泛的入门体验; 2、零配置; 3、集成了大量常用的第三方库的配置; 4、提供准备好的特性。当今,nodejs领域的开发者机会都在使用SpinrgBoot,在开发领域逐…

【探索C++容器:vector的使用和模拟实现】

【本节目标】 1.vector的介绍及使用 2.vector深度剖析及模拟实现 1.vector的介绍及使用 1.1 vector的介绍 vertor文档介绍 1. vector是表示可变大小数组的序列容器。2. 就像数组一样,vector也采用连续存储空间来存储元素。也就是意味着可以采用下标对vector的元…

Deepin/Ubuntu_查看磁盘空间大小

以下是Linux系统(Deepin、Ubuntu)查看磁盘空间大小的代码示例: 使用df命令: df -h使用du命令查看指定目录的磁盘使用情况: du -sh /path/to/directory使用lsblk命令: lsblk使用fdisk命令查看磁盘分区表…

轻松一刻 浅休息下哈

yum -y install epel-release yum install -y linux_logo cal 此命令以日历表的方式显示日期 curl http://wttr.in 此网站进行在屏幕上面显示天气情况

Linux下安装docker

1、查看系统版本 Docker支持64位版本的CentOS 7和CentOS 8及更高版本,它要求Linux内核版本不低于3.10。查看Linux版本的命令这里推荐两种:lsb_release -a或cat /etc/redhat-release。 显然,当前Linux系统为CentOS7。再查一下内核版本是否不低…

关于python环境变量相关的配置汇总(venv虚拟环境/conda环境/pip相关)

关于python环境变量相关的配置汇总(venv虚拟环境/conda环境/pip相关) 本文作者: slience_me 文章目录 关于python环境变量相关的配置汇总(venv虚拟环境/conda环境/pip相关)1. python环境配置相关1.1 系统环境1.2 Anaconda环境相关1.2.1 安装1.2.2 查看python环境 1.…

面试150-76(Leetcode114二叉树展开为链表)

代码: 0124 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode righ…

【软件测试常见Bug清单】

软件测试中,bug的类型有很多种,比如:代码错误、界面优化、设计缺陷、需求补充和用户体验等; 一般情况下,需求补充和设计缺陷比较好区分,但是代码错误、界面优化和用户体验区分不是很明显; 下面…

MySQL存储函数和存储过程练习题

一、创建表的要求 字段名 数据类型 主键 外键 非空 唯一 自增 id INT 是 否 是 是 否 name VARCHAR(50) 否 否 是 否 否 glass VARCHAR(50) 否 否 是 否 否 sch表内容 id name glass 1 xiaommg …

【GitHub项目推荐--名校课程资源】【转载】

先引用一段话,今天推荐的所有 GitHub 项目创立动机几乎都是这个。本文会盘点清华、北大、斯坦福、中国科学技术大学、上海交大等学校的课程资源。 01. 浙江大学课程共享计划 上图截屏中的话就是出自该项目,浙江大学搞了一个课程共享计划,其…

安装RabbitMQ sentos并挂载

1. usr/local/software/mq/data 创建data目录, mkdir data 2. 拉取镜像 docker pull rabbitmq 3.配置网络 docker network create --driver bridge --subnet172.18.12.0/16 --gateway172.18.1.1 wn_docker_net 4. 设置参数并创建挂载 docker run -it \ --name rabbitmq \…

一文说明白 MySQL 的 ACID 和 几种日志的关系

1、简介 我们对于MySQL 很熟悉,关于其特性都有一定的了解,但是关于一些具体的实现原理,有的小伙伴可能不太熟悉,而且这部分知识在我们互联网大厂面试中是经常涉及的,因此,本文将带你深入底层,顺…

力扣日记1.19-【二叉树篇】538. 把二叉搜索树转换为累加树

力扣日记:【二叉树篇】538. 把二叉搜索树转换为累加树 日期:2023.1.19 参考:代码随想录、力扣 ps:因为准备组会汇报又搁置了好久(其实就是懒逃避T^T),但这是最后一道二叉树啦啊啊啊!&#xff01…

递归、搜索与回溯算法(专题六:记忆化搜索)

目录 1. 什么是记忆化搜索(例子:斐波那契数) 1.1 解法一:递归 1.2 解法二:记忆化搜索 1.2.1 记忆化搜索比递归多了什么? 1.2.2 提出一个问题:什么时候要使用记忆化搜索呢? 1.3 …

第十三章 MySQL

第十三章 MySQL 下面是创建数据库操作 删除数据库 右上角选择要操作的数据库 如果关闭了这个控制台,下次如何找到它呢 也可以对其改名

Linux环境下,针对QT软件工程搭建C++Test单元测试环境的操作指南

文章目录 前言一、安装QT二、安装CTest三、使用QT生成.bdf文件四、创建CTest工程注意事项 前言 CTest是Parasoft公司出品的一款可以针对C/C源代码进行静态分析、单元测试、集成测试的测试工具。本文主要讲解如何在Linux环境下,搭建QT插件版的CTest测试环境。 一、…

java测简单案例定时器和netty心跳检

一,定时器的实现方式 在Java中,定时器可以通过多种方式实现,其中最常用的是使用java.util.Timer和java.util.TimerTask类。下面是一个简单的示例,演示如何使用这些类来创建一个定时器。 首先,我们需要创建一个继承自…

Android源码编译和刷机

目录 1. Android源码编译备注2. Android源码刷机1. Android源码编译 1. 下载对应设备版本的驱动。 https://source.android.com/setup/start/build-numbers 找到需要的android版本号和对应的设备驱动号(例:android-9.0.0_r46 PQ3A.190801.002)https://developers.google.com…

【b站咸虾米】chapter4_vue组件_新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握

课程地址:【新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握】 https://www.bilibili.com/video/BV1mT411K7nW/?p12&share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 四、vue组件 uni-app官网 …

如何解决态势感知中的“时隐时现”问题

解决态势感知中的“时隐时现”问题有以下几个方法: 1、确保所有关键的监控设备和传感器正常运行,能够及时和准确地检测到各种异常情况。 2、引入先进的技术手段。例如使用人工智能和机器学习算法来识别和分析大量的数据,快速发现异常和威胁&a…