Spark常用RDD算子:transformation转换算子以及action触发算子

文章目录

  • 1. 算子(方法)介绍
  • 2. 常用transformation算子
    • 2.1 map
  • 2.2 flatMap
    • 2.3 filter
    • 2.4 distinct
    • 2.6 groupBy
    • 2.7 sortBy()
    • 2.8 k-v数据[(k,v),(k1,v1)]
  • 3. 常用action算子

1. 算子(方法)介绍

rdd中封装了各种算子方便进行计算,主要分为两类:

  • transformation 转换算子
    • 对RDD数据进行转化得到新的RDD,定义了一个线程任务。
    • 常见:map、filter、flatMap、reduceByKey、groupByKey、sortByKey
  • action 触发算子
    • 触发计算任务,让计算任务进行执行,得到结果。
    • 触发线程执行的。
    • 常见:foreach、first、count、reduce、saveAsTextFile、collect、take

RDD的转换算子大部分都是从RDD中读取元素数据(RDD中每条数据),具体计算需要开发人员编写函数传递到RDD算子中。
RDD的执行算子则大部分是用来获取数据,collect方法就是触发算子。

注意

  • 转换算子是lazy模式,一般不会触发job和task的运行,返回值一定是RDD。
  • 执行算子,会触发job和task的运行,返回值一定不是RDD。

2. 常用transformation算子

2.1 map

  • RDD.map(lambda 参数:参数计算)
  • 参数接受每个元素数据
# Map算子使用
# map算子主要使用长场景,一个转化rdd中每个元素的数据类型,拼接rdd中的元素数据,对rdd中的元素进行需求处理
# 需求,处理hdfs中的学生数据,单独获取每个学生的信息
from pyspark import SparkContextsc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
# 3-从rdd2中获取姓名数据
rdd3 = rdd2.map(lambda x:x[1])# lambda 函数能进行简单的数据计算,如果遇到复杂数据计算时,就需要使用自定义函数
# 获取年龄数据,并且转化年龄数据为int类型,将年龄和性别合并一起保存成元组  (男,20) (女,21)
def func(x):# 1-先切割数据data_split = x.split(',')# 2-转化数据类型age = int(data_split[2])# 3-拼接性别和年龄data_tuple = (data_split[3],age)return data_tuple
# 将函数的名字传递到map中,不要加括号
rdd4 = rdd.map(func)# 触发执行算子,查看读取的数据
res = rdd.collect()
print(res)res2 = rdd2.collect()
print(res2)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.2 flatMap

  • 处理的是二维嵌套列表数据[[1,‘张三’],[2,‘李四’],[3,‘王五’]] --> [1, ‘张三’, 2, ‘李四’, 3, ‘王五’]
  • rdd.flatMap(lambda 参数:[参数计算])
#flatmap算子使用
# 主要使用场景是对二维嵌套的数据降维操作 [[1,'张三'],[2,'李四'],[3,'王五']] --> [1, '张三', 2, '李四', 3, '王五']
from pyspark import SparkContext
sc = SparkContext()#生成rdd
rdd = sc.parallelize([[1,'张三'],[2,'李四'],[3,'王五']])
#使用flatmap算子进行转化
rdd2 = rdd.flatMap(lambda x: x)#查看数据
res = rdd2.collect()
print(res)

运行结果:
在这里插入图片描述

2.3 filter

  • rdd.filter(lambda 参数:参数条件过滤)
  • 条件过滤的书写和Python中if判断的一样
# RDD数据过滤
# 需求:过滤年龄大于20岁的信息
from pyspark import SparkContextsc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
#使用fliter方法进行数据过滤
rdd3 = rdd2.filter(lambda x: int(x[2]) > 20)
rdd4 = rdd2.filter(lambda x:x[3]=='男')# 查看数据
res = rdd2.collect()
print(res)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.4 distinct

  • 不需要lambda rdd.distinct
# distinct  去重算子
# rdd中有重复数据时,可以进行去重
from pyspark import SparkContextsc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))
# 3-从rdd2中获取性别数据
rdd3 = rdd2.map(lambda x: x[3])#对rdd3中的数据去重
rdd4 = rdd3.distinct()#查看数据
res = rdd3.collect()
print(res)res1 = rdd4.collect()
print(res1)

运行结果:
在这里插入图片描述

2.6 groupBy

  • rdd.groupBy(lambda 参数:根据参数编写分组条件)
  • mapValues(list)
# groupBy分组
# 按照不同性别进行分组
# 原理: 就是对需要分组的数据进行hash取余数 ,余数相同会放入同一组
from pyspark import SparkContextsc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))
# 3- 对性别进行分组
rdd3 = rdd2.groupBy(lambda x: hash(x[3]) % 2)
#查看分组的数据内容 mapValues 取出分组后的数据值,对数据值转为列表即可
rdd4 = rdd3.mapValues(lambda x:list(x))# 查看数据内容
res = rdd2.collect()
print(res)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.7 sortBy()

  • rdd.sortBy(lambda x:x,ascending=False)
#RDD的数据排序
from pyspark import SparkContext
sc = SparkContext()# 生成rdd数据
# 非k,v数据
rdd = sc.parallelize([4,7,3,2,8])
#在spark中使用元组表示k,v数据
rdd2 = sc.parallelize([('张三',90),('李四',70),('王五',99)])# 数据排序
rdd3 = rdd.sortBy(lambda x: x)
rdd4 = rdd.sortBy(lambda x: x,ascending=False)#k,V数据排序
rdd5 = rdd2.sortBy(lambda x: x[1],ascending=False)
rdd6 = rdd2.sortBy(lambda x: x[1])#查看结果
res = rdd3.collect()
print(res)res2 = rdd4.collect()
print(res2)res3 = rdd5.collect()
print(res3)res4 = rdd6.collect()
print(res4)

运行结果:
在这里插入图片描述

2.8 k-v数据[(k,v),(k1,v1)]

  • groupByKey()
    • rdd.groupByKey()
  • reduceByKey()
    • rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)
  • sortByKey()
    • rdd.sortByKey()
#k,v结构数据处理
from pyspark import SparkContext
sc = SparkContext()
#k,v分组
# 1. 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2. 使用转化算子进行数据处理
rdd2 = rdd.map(lambda x: x.split(','))
#将数据转为k,v结构,然后进行分组,把分组的字段作为key值
rdd3 = rdd2.map(lambda x: (x[3], x))
# 使用groupBykey方法,按key进行分组
rdd4 = rdd3.groupByKey().mapValues(lambda x: list(x))#k,v数据计算
#统计不同性别的年龄总和 (求和  平均数  最大值  最小值  数量)
#将需要计算的数据转为k,v结构  分组的字段是key值  聚合数据是value值
rdd5 = rdd2.map(lambda x: (x[3],int(x[2])))
# 使用reduceBykey方法进行聚合计算  会将相同key值的数据先合并,然后在聚合计算
# 聚合计算的算子,lambda x,y 需要结合两个参数
rdd6 = rdd5.reduceByKey(lambda x, y: x+y)rdd7 = rdd5.groupByKey().mapValues(lambda x: sum(list(x))/len(list(x)))
rdd8 = rdd5.groupByKey().mapValues(lambda x: max(list(x)))res = rdd2.collect()
print(res)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)res5 = rdd5.collect()
print(res5)res6 = rdd6.collect()
print(res6)res7 = rdd7.collect()
print(res7)res8 = rdd8.collect()
print(res8)

运行结果:

3. 常用action算子

  • collecct()取出RDD中所有值
    • rdd.collect()
  • reduce() 非k-v类型数据累加[1,2,3,4,6]
    • rdd.reduce(lambda 参数1,参数2:两个参数计算)
  • count() 统计RDD元素个数
    • rdd.count()
  • take() 取出指定数量值
    • rdd.take(数量)
# action算子使用
# 触发转化算子执行
from pyspark import SparkContextsc = SparkContext()# 生成rdd
rdd = sc.parallelize([1,2,3,4])rdd_kv = sc.parallelize([('a',2),('b',3)])# 进行转化处理# 使用action
# 获取rdd中所有元素数据,转为列表展示
res = rdd.collect()
print(res)
# 指定取出的数据数量
res2 = rdd.take(3)
print(res2)
# 对非kv数据计算
# 求和
res3 = rdd.reduce(lambda x,y:x+y)
print(res3)
# 求数量
res4 = rdd.count()
print(res4)
# 求最大值
res5 = rdd.max()
print(res5)res6 = rdd.mean()
print(res6)# 将kv数据转为字典输出
res7 = rdd_kv.collectAsMap()
print(res7)# 将rdd结果保存到hdfs 指定目录路径,指定的目录不能存在
rdd_kv.saveAsTextFile('hdfs://node1:8020/data/result')

运行结果:
在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/881674.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux网络编程】网络基础 | Socket 编程基础

🌈个人主页: 南桥几晴秋 🌈C专栏: 南桥谈C 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据…

【动手学深度学习】6.3 填充与步幅(个人向笔记)

卷积的输出形状取决于输入形状和卷积核的形状在应用连续的卷积后,我们最终得到的输出大小远小于输入大小,这是由于卷积核的宽度和高度通常大于1导致的比如,一个 240 240 240240 240240像素的图像,经过10层 5 5 55 55的卷积后&am…

自然语言处理问答系统:技术进展、应用与挑战

自然语言处理(NLP)问答系统是人工智能领域的一个重要分支,它通过理解和分析用户的提问,从大量的文本数据中提取相关信息,并以自然语言的形式回答用户的问题。随着深度学习技术的发展,尤其是预训练语言模型&…

MATLAB智能优化算法-学习笔记(4)——灰狼优化算法求解旅行商问题【过程+代码】

灰狼优化算法(Grey Wolf Optimizer, GWO)是一种基于灰狼社会行为的元启发式算法,主要模拟灰狼群体的捕猎行为(包括围攻、追捕、搜寻猎物等过程)。多旅行商问题(Multi-Traveling Salesman Problem, mTSP)是旅行商问题(TSP)的扩展,它涉及多个旅行商(车辆)从一个起点城…

深度学习:循环神经网络—RNN的原理

传统神经网络存在的问题? 无法训练出具有顺序的数据。模型搭建时没有考虑数据上下之间的关系。 RNN神经网络 RNN(Recurrent Neural Network,循环神经网络)是一种专门用于处理序列数据的神经网络。在处理序列输入时具有记忆性…

动态规划的优化与高级应用

姊妹篇: 动态规划基础与经典问题-CSDN博客 贪心算法:原理、应用与优化_最优解-CSDN博客​​​​​​贪心算法:原理、应用与优化_最优解-CSDN博客 一、动态规划的优化策 动态规划在提高时间效率的同时,往往会占用较多的空间。因…

【汇编语言】寄存器(CPU工作原理)(七)—— 查看CPU和内存,用机器指令和汇编指令编程

文章目录 前言1. 预备知识:Debug的使用1.1 什么是Debug?1.2 我们用到的Debug功能1.3 进入Debug1.3.1 对于16位或者32位机器的进入方式1.3.2 对于64位机器的进入方式 1.4 R命令1.5 D命令1.6 E命令1.7 U命令1.8 T命令1.9 A命令 2. 总结3. 实操练习结语 前言…

grpc的python使用

RPC 什么是 RPC ? RPC(Remote Procedure Call)远程过程调用,是一种计算机通信协议,允许一个程序(客户端)通过网络向另一个程序(服务器)请求服务,而无需了解…

笔试算法总结

文章目录 题目1题目2题目3题目4 题目1 使用 StringBuilder 模拟栈的行为&#xff0c;通过判断相邻2个字符是否相同&#xff0c;如果相同就进行删除 public class Main {public static String fun(String s) {if (s null || s.length() < 1) return s;StringBuilder builde…

前端开发基础NodeJS+NPM基本使用(零基础入门)

文章目录 1、Nodejs基础1.1、NodeJs简介1.2、下载安装文件1.3、安装NodeJS1.4、验证安装2、Node.js 创建第一个应用2.1、说明2.2、创建服务脚本2.3、执行运行代码2.4、测试访问3、npm 基本使用3.1、测试安装3.2、配置淘宝npm镜像3.3.1、本地安装3.3.2、全局安装3.4、查看安装信…

【网络】详解TCP协议的流量控制和拥塞控制

【网络】详解TCP协议的流量控制和拥塞控制 一. 流量控制模型窗口探测 二. 拥塞控制模型 总结 一. 流量控制 流量控制主要考虑的是接收方的处理速度。 接收端处理数据的速度是有限的.。如果发送端发的太快, 导致接收端的缓冲区被打满, 这个时候如果发送端继续发送, 就会造成丢包…

IP地址如何支持远程办公?

由于当今社会经济的飞速发展&#xff0c;各个方向的业务都不免接触到跨省、跨市以及跨国办公的需要&#xff0c;随之而来的远程操作的不方便&#xff0c;加载缓慢&#xff0c;传输文件时间过长等困难&#xff0c;如何在万里之外实现远程办公呢&#xff1f;我们以以下几点进行阐…

【GaussDB】产品简介

产品定位 GaussDB 200是一款具备分析及混合负载能力的分布式数据库&#xff0c;支持x86和Kunpeng硬件架构&#xff0c;支持行存储与列存储&#xff0c;提供PB(Petabyte)级数据分析能力、多模分析能力和实时处理能力&#xff0c;用于数据仓库、数据集市、实时分析、实时决策和混…

3DCAT实时云渲染赋能2024广东旅博会智慧文旅元宇宙体验馆上线!

广东国际旅游产业博览会&#xff08;以下简称“旅博会”&#xff09;是广东省倾力打造的省级展会品牌&#xff0c;自2009年独立成展至今已成功举办十五届。2024广东旅博会于9月13—15日在广州中国进出口商品交易会展馆A区举办&#xff0c;线上旅博会“智慧文旅元宇宙体验馆”于…

力扣21~30题

21题&#xff08;简单&#xff09;&#xff1a; 分析&#xff1a; 按要求照做就好了&#xff0c;这种链表基本操作适合用c写&#xff0c;python用起来真的很奇怪 python代码&#xff1a; # Definition for singly-linked list. # class ListNode: # def __init__(self, v…

Parallels Desktop意外退出,Parallels Desktop安装软件很卡闪退怎么办?

Parallels Desktop是目前很优秀的虚拟机软件&#xff0c;操作简单&#xff0c;兼容性强而且安装也非常方便&#xff0c;备受苹果用户的喜爱和满意。然而&#xff0c;部分用户在使用Parallels Desktop的时候&#xff0c;会遇到意外退出或终端关机的情况&#xff0c;这不仅会影响…

利用 Llama 3.1模型 + Dify开源LLM应用开发平台,在你的Windows环境中搭建一套AI工作流

文章目录 1. 什么是Ollama&#xff1f;2. 什么是Dify&#xff1f;3. 下载Ollama4. 安装Ollama5. Ollama Model library模型库6. 本地部署Llama 3.1模型7. 安装Docker Desktop8. 使用Docker-Compose部署Dify9. 注册Dify账号10. 集成本地部署的 Llama 3.1模型11. 集成智谱AI大模型…

图像分类-demo(Lenet),tensorflow和Alexnet

目录 demo(Lenet) 代码实现基本步骤&#xff1a; TensorFlow 一、核心概念 二、主要特点 三、简单实现 参数: 模型编译 模型训练 模型评估 Alexnet model.py train.py predict.py demo(Lenet) PyTorch提供了一个名为“torchvision”的附加库&#xff0c;其中包含…

修复PDF打印速度慢

详细问题&#xff1a; 当您尝试将 PDF 文件打印到本地或网络打印机时&#xff0c;打印需要很长时间&#xff0c;因为发送打印作业后&#xff0c;打印机开始打印的速度非常慢&#xff0c;在打印任务中可以看到打印传输的数据在缓慢增长。 从其他程序打印时也不会出现打印速度慢…