Spark算子(RDD)超细致讲解

SPARK算子(RDD)超细致讲解

map,flatmap,sortBykey, reduceBykey,groupBykey,Mapvalues,filter,distinct,sortBy,groupBy共10个转换算子

(一)转换算子

1、map

from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()# 生成rdd
data = [1, 2, 3, 4]
rdd = sc.parallelize(data)# 对rdd进行计算
# 转化算子map使用
# 将处理数据函数当成参数传递给map
# 定义函数只需要一个接受参数
def func(x):return x + 1def func2(x):return str(x)
# 转化算子执行后会返回新的rdd
rdd_map = rdd.map(func)
rdd_map2 = rdd.map(func2)
rdd_map3 = rdd_map2.map(lambda x: [x])# 对rdd数据结果展示
# 使用rdd的触发算子,collect获取是所有的rdd元素数据
res = rdd_map.collect()
print(res)res2 = rdd_map2.collect()
print(res2)res3 = rdd_map3.collect()
print(res3)

2、flatmap

from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()# 生成rdd
data = [[1, 2], [3, 4]]
data2 = ['a,b,c','d,f,g']  # 将数据转为['a','b','c','d','f','g']
rdd = sc.parallelize(data)
rdd2 = sc.parallelize(data2)# rdd计算
# flatMap算子使用  将rdd元素中的列表数依次遍历取出对应的值放入新的rdd [1,2,3,4]
# 传递一个函数,函数接受一个参数
rdd_flatMap = rdd.flatMap(lambda x: x)rdd_map = rdd2.map(lambda x:x.split(','))
rdd_flatMap2 = rdd_map.flatMap(lambda x:x)# 输出展示数据
# 使用执行算子
res  = rdd_flatMap.collect()
print(res)res2  = rdd_map.collect()
print(res2)res3 = rdd_flatMap2.collect()
print(res3)

3、kv数据结构(sortBykey, reduceBykey,groupBykey,Mapvalues)

# 对kv数据进行处理
from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()
# RDD中的kv结构数据   (k,v)
data = [('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)]
# 将python数据转化为RDD数据类型
rdd = sc.parallelize(data)# 排序 根据key排序
# ascending 指定升序还是降序   默认为true是升序  从小到大
rdd_sortByKey = rdd.sortByKey(ascending=False)  # todo 无需传递处理方法# 根据key聚合计算  将相同key的数据放在一起,然后根据指定的计算方法对分组内的数据进行聚合计算
rdd_reduceByKey = rdd.reduceByKey(lambda x, y: x + y)  # todo 需要传递处理方法# 分组  只按照key分组 而且无法直接获取到值内容,需要mapValues
rdd_groupByKey = rdd.groupByKey()  # todo 无需传递处理方法# 获取kv数据中v值
# x接受value值
rdd_mapValues = rdd_groupByKey.mapValues(lambda x: list(x))  # todo 需要传递处理方法# 展示数据
res = rdd_sortByKey.collect()
print(res)res2 = rdd_reduceByKey.collect()
print(res2)res3 = rdd_groupByKey.collect()
print(res3)res4 = rdd_mapValues.collect()
print(res4)

4、复习()

data =[hadoop,flink,spark,hive,hive,spark,python,java,python,itcast,itheima] #自己转一下字符串
# rdd计算
# 对读取到的rdd中的每行数据,先进行切割获取每个单词的数据
# rdd_map = rdd.map(lambda x: x.split(','))
#  todo  这段代码给我的感觉就是先进行数据处理再执行flatmap函数。
rdd_flatMap= rdd.flatMap(lambda x: x.split(','))# 将单词数据转化为k-v结构数据   [(k,v),(k1,v1)]   给每个单词的value一个初始值1
rdd_map_kv = rdd_flatMap.map(lambda x:(x,1))# 对kv数据进行聚合计算  hive:[1,1]  求和  求平均数  求最大值  求最小值
rdd_reduceByKey =  rdd_map_kv.reduceByKey(lambda x,y:x+y)  # 现将相同key值的数据放在一起,然后对相同key值内的进行累加# 展示数据
res = rdd.collect()
print(res)# res2 = rdd_map.collect()
# print(res2)res3 = rdd_flatMap.collect()
print(res3)res4 = rdd_map_kv.collect()
print(res4)res5 = rdd_reduceByKey.collect()
print(res5)

5、过滤算子(filter)

# 数据条件过滤  类似 sql中的where
# 导入sparkcontext
from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()# 生成rdd数据
data = [1, 2, 3, 4, 5, 6]
rdd = sc.parallelize(data)# 对rdd数据进行过滤
# filter 接受函数,要求一个函数参数
# lambda x: x > 3   x接受rdd中的每个元素数据    x>3 是过滤条件   过滤条件的数据内容和Python的 if书写内容一样
# 符合条件的数据会放入一个新的rdd
rdd_filter1 = rdd.filter(lambda x: x > 3)
rdd_filter2 = rdd.filter(lambda x: x in [2, 3, 5])
rdd_filter3 = rdd.filter(lambda x: x % 2 == 0)# 展示数据
res = rdd_filter1.collect()
print(res)res = rdd_filter2.collect()
print(res)res = rdd_filter3.collect()
print(res)

6、去重和排序算子(distinct,sortby注意和sortBYkey的区别)

# rdd数据去重
# 导入sparkcontext
from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()# 生成rdd数据
data = [1, 1, 3, 3, 5, 6]
rdd = sc.parallelize(data)# 去重
rdd_distinct =  rdd.distinct()# 获取数据
res =  rdd_distinct.collect()
print(res)
# todo 其它都是传递计算函数,但是这个传递的不是排序函数,而是指定根据谁来排序
# rdd数据排序
# 导入sparkcontext
from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()# 生成rdd数据
data = [6, 1, 5, 3]
data_kv = [('a',2),('b',3),('c',1)]
rdd = sc.parallelize(data)
rdd_kv = sc.parallelize(data_kv)# 数据排序
# 传递函数  需要定义一个接收参数
# lambda x:x  第一个参数x 接收rdd中的元素,    第二计算x  指定后会按照该x值排序
# ascending 指定排序规则  默认是True 升序
rdd_sortBy =  rdd.sortBy(lambda x:x,ascending=False)
# 对于kv结构数据  可以通过下标指定按照哪个数据排序
rdd_sortBy2 =  rdd_kv.sortBy(lambda x:x[1],ascending=False)# 查看结果
res = rdd_sortBy.collect()
print(res)res2 = rdd_sortBy2.collect()
print(res2)

7、分组算子(groupBy注意和groupBYkey的区别,同时都需要MapValues)

# rdd分组
# 导入sparkcontext
from pyspark import SparkContext# 创建SparkContext对象
sc = SparkContext()# 生成rdd数据
data = [6, 1, 5, 3, 10, 22, 35, 17]
rdd = sc.parallelize(data)# 分组
# 传递函数   函数需要一个接收参数
# lambda x: x % 3   x接收rdd中的每个元素   x%3  对x进行计算,余数相同的数据会放在一起
rdd_groupBy = rdd.groupBy(lambda x: x % 3)
# mapValues 会获取kv中的value数据
# lambda x:list(x)   x接收value值数据
rdd_mapValues = rdd_groupBy.mapValues(lambda x: list(x))def func(x):if len(str(x)) ==1:return '1'elif  len(str(x)) ==2:return 'two'  # 返回分组的key值 指定分组名rdd_groupBy2 = rdd.groupBy(func)
rdd_mapValues2 = rdd_groupBy2.mapValues(lambda x: list(x))# 查看结果
res = rdd_groupBy.collect()
print(res)res2 = rdd_mapValues.collect()
print(res2)res3 = rdd_groupBy2.collect()
print(res3)res4 = rdd_mapValues2.collect()
print(res4)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/621127.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信息系统中的需求分析

软件需求是指用户对新系统在功能、行为、性能、设计约束等方面的期望。根据IEEE的软件工程标准词汇表,软件需求是指用户解决问题或达到目标所需的条件或能力,是系统或系统部件要满足合同、标准、规范或其他正式规定文档所需具有的条件或能力,…

Fastjson库将JSONObject转换为实体类

嗨,大家好,欢迎来到程序猿漠然公众号,我是漠然。 在Java编程中,使用Fastjson库将JSONObject转换为实体类的过程是一种常见的数据处理操作。Fastjson是一个由阿里巴巴开发的高性能JSON处理库,它简化了JSON数据的读写操作…

传感数据分析——傅里叶滤波:理论与公式

传感数据分析——傅里叶滤波:理论与公式 引言 在传感数据分析领域,傅里叶滤波是一种重要的信号处理技术,被广泛应用于各种领域,如通信、图像处理、音频处理以及生物医学等。本文将简单探讨傅里叶滤波的理论基础和相关公式&#…

Azure Machine Learning - 视频AI技术

Azure AI 视频索引器是构建在 Azure 媒体服务和 Azure AI 服务(如人脸检测、翻译器、Azure AI 视觉和语音)基础之上的一个云应用程序,是 Azure AI 服务的一部分。 有了 Azure 视频索引器,就可以使用 Azure AI 视频索引器视频和音频…

向伟人学习反焦虑,在逆境中崛起

第一、乐观的精神。 伟人在长期以来的读书、思考和实践,突破了思想认知限制,并最终在更高的思维层面上,建立起了强大的精神信念感。 在危险环境中表示绝望的人, 在黑暗中看不见光明的人, 只是懦夫与机会主义者。 —— …

蓝桥杯练习题(八)

📑前言 本文主要是【算法】——蓝桥杯练习题(八)的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 …

[晓理紫]每日论文推送(有中文摘要,源码或项目地址)--机器人、视觉相关

专属领域论文订阅 VX关注{晓理紫},每日更新论文,如感兴趣,请转发给有需要的同学,谢谢支持 二维码 分类: 大语言模型LLM视觉模型VLM扩散模型视觉导航具身智能,机器人强化学习开放词汇,检测分割 晓理紫今日论…

二叉树的中序遍历【二叉树】【递归】

Problem: 94. 二叉树的中序遍历 文章目录 思路 & 解题方法复杂度Code 思路 & 解题方法 二叉树简单递归。 复杂度 时间复杂度: O ( n ) O(n) O(n) 空间复杂度: O ( n ) O(n) O(n) Code # Definition for a binary tree node. # class TreeNode: # def __init__(se…

Java重修第五天—面向对象3

通过学习本篇文章可以掌握如下知识 1、多态; 2、抽象类; 3、接口。 之前已经学过了继承,static等基础知识,这篇文章我们就开始深入了解面向对象多态、抽象类和接口的学习。 多态 多态是在继承/实现情况下的一种现象&#xf…

【汇编要笑着学】汇编模块化编程 | call和ret调用指令 | jmp跳转指令 | inc自加指令

Ⅰ.汇编模块化编程 0x00 一个简单的例子 我们了解模块化编程前先给出一个例子,方便大家快速了解。 SECTION MBR vstart0x7c00 ; 起始地址编译在0x7c00mov ax,cs mov ds,ax mov es,axmov ss,axmov fs,axmov sp,0x7c00 ; 上面这些都没什…

【工作日语】二、IT用语

バグ 缺陷ボタン 按钮チックする 检查クリアする 清除クリックする 点击クライアント 客户机コーディング 编码コマンド 命令コメント 注释コンパイルする 编译コンピュータ 计算机コントロール 控制カーソル 光标データ 数据データベース 数据库デバッグ 调试ドキュメント 文…

camtasia studio2024免费版如何下载?怎么录屏?

camtasia studio怎么录屏?Camtasia Studio是一款专门录制屏幕动作的工具,它能在任何颜色模式下轻松地记录屏幕动作,包括影像、音效、鼠标移动轨迹、解说声音等等。一般情况下,用户使用camtasia studio进行录屏时,需要注…

【进程调度】基于优先级的轮转调度C++实现算法

一、简介 1.1 背景 在计算机科学领域,进程调度是操作系统中一个关键的组成部分,它负责协调系统中各个进程的执行顺序,以最大程度地提高系统资源利用率。在这篇博客中,将深入探讨基于优先级的轮转调度算法,该算法结合…

Vue3-customRef的使用

读取数据前,需要先track() 告诉Vue数据msg很重要,你要对msg进行持续关注,一旦msg变化就去更新 修改数据后,需要trigger()收尾 通知Vue一下数据msg变化了 自定义ref如何防抖 hooks中…

UniApp调试支付宝沙箱(安卓)

先看下这里完整的交互的图:小程序文档 - 支付宝文档中心 一、打包 不管怎样,先打个包先。可以直接使用云端证书、云端打包,只需要指定包名即可。 二、在支付宝开放平台创建应用 这个参考官方的过程就可以了,只要有刚才打的包&…

Fastadmin上传图片服务端压缩图片,实测13.45M压缩为29.91K

先前条件:第一步安装compose,已安装忽略。 先上截图看效果 一、在fastadmin的根目录里面输入命令安装think-image composer require topthink/think-image二、找到公共上传类,application/common/library/Upload.php,在最下面…

Redis系列之使用Lua脚本

什么是lua脚本? lua语言是一个轻量级的脚本语言,可以嵌入其他语言中使用,调用宿主语言的功能。lua语法简单,小巧,源码一共才200多K,本身不会有太强的功能,很多的语言也支持lua语言,…

TensorRT(C++)基础代码解析

TensorRT(C)基础代码解析 文章目录 TensorRT(C)基础代码解析前言一、TensorRT工作流程二、C API2.1 构建阶段2.1.1 创建builder2.1.2 创建网络定义2.1.3 定义网络结构2.1.4 定义网络输入输出2.1.5 配置参数2.1.6 生成Engine2.1.7 保存为模型文件2.1.8 释放资源 2.2 运行期2.2.1…

【elastic search】详解elastic search集群

目录 1.与集群有关的一些概念 2.集群搭建 3.集群搭建 4.kibana链接集群 5.选举流程 6.请求流程 7.master的作用 1.与集群有关的一些概念 数据分片: 数据分片(shard),单台服务器的存储容量是有限的,把一份数据…

git提交记录全部删除

目录 问题描述 解决方案 结果 问题描述 新复制的项目具有特比多的提交记录我想给他清除,因为不清楚过多历史也就导致包特别大下载和提交等方面都不是很快 解决方案 查看代码clone网址; 打开远程仓库,选择要去除历史记代码分支&#xff08…