【Spark】Pyspark RDD

  • 1. RDD算子
    • 1.1 文件 <=> rdd对象
    • 1.2 map、foreach、mapPartitions、foreach Partitions
    • 1.3 flatMap 先map再解除嵌套
    • 1.4 reduceByKey、reduce、fold 分组聚合
    • 1.5 mapValue 二元组value进行map操作
    • 1.6 groupBy、groupByKey
    • 1.7 filter、distinct 过滤筛选
    • 1.8 union 合并
    • 1.9 join、leftOuterJoin、rightOuterJoin 连接
    • 1.10 intersection 交集
    • 1.11 sortBy、sortByKey 排序
    • 1.12 countByKey 统计key出现次数
    • 1.13 first、take、top、count 取元素
    • 1.14 takeOrdered 排序取前n个
    • 1.15 takeSample 随机抽取

from pyspark import SparkConf, SparkContextconf = SparkConf().setAppName('test')\.setMaster('local[*]')
sc = SparkContext(conf=conf)

1. RDD算子

1.1 文件 <=> rdd对象

# 集合对象 -> rdd (集合对象,分区数默认cpu核数)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print(rdd.glom().collect(), rdd.getNumPartitions())
# [[1, 2], [3, 4], [5, 6]] 3# 文件 -> rdd
rdd = sc.textFile("./data.csv")
print(rdd.collect())
# ['1, 2, 3, 4, 5, 6']# rdd -> 文件
rdd = sc.parallelize([1, 2, 3], 3)
rdd.saveAsTextFile('./output')
'''  
生成output文件夹
里面有按分区存储的多个文件
'''

1.2 map、foreach、mapPartitions、foreach Partitions

# map函数
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
rdd2 = rdd.map(lambda x: (x, 1))
print(rdd2.map(lambda x: x[0] + x[1]).collect())
# [2, 3, 4, 5, 6, 7]
# foreach 
# 同map,但无返回值,且不改变原元素
# 另有foreachPartitions
rdd = sc.parallelize([1, 2, 3])
rdd.foreach(lambda x: print(x))
# 1 3 2
rdd.foreach(lambda x: -x)
rdd.collect()
# [1, 2, 3]

# mapPartitions
'''  
map 一次调出一个元素进行计算,io次数多
mapPartitions 一次将一个分区的所有元素调出计算s
'''
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)def func(iter):# 相较于map时间复杂度没优化,空间复杂度优化res = list()for it in iter:res.append(it * 10)return resrdd.mapPartitions(func).collect()
# [10, 20, 30, 40, 50, 60]

1.3 flatMap 先map再解除嵌套

# flatMap 先执行map操作,再解除嵌套(降维 softmax前flatten)
rdd = sc.textFile("./data.csv")
print(rdd.collect())rdd.flatMap(lambda x: x.split(' ')).collect()

1.4 reduceByKey、reduce、fold 分组聚合

# reduceByKey 按照key分组,再对组内value完成聚合逻辑
# key-value型(二元元组)rdd
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
print(rdd.reduceByKey(lambda a, b: a + b).collect())
# [('b', 3), ('a', 3)]
# reduce 只聚合
# 不返回rdd 
rdd = sc.parallelize(range(1, 3))
print(rdd.reduce(lambda a, b: a + b))
# 3
print(sc.parallelize([('a', 1), ('a', 1)]).reduce(lambda a, b: a + b))
# ('a', 1, 'a', 1)
# fold 带初值的reduce
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print(rdd.fold(10, lambda a, b: a + b))
'''   
[[1, 2], [3, 4], [5, 6]]
10 + 1 + 2 = 13
10 + 3 + 4 = 17
10 + 5 + 6 = 21
10 + 13 + 17 + 21 = 61
> 61
'''

1.5 mapValue 二元组value进行map操作

# mapValues 对二元组内的value执行map操作, 没有分组操作
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
rdd.mapValues(lambda x: x * 10).collect()

1.6 groupBy、groupByKey

  • groupBy、groupByKey、reduceByKey区别
# groupBy 多元组皆可进行分组,可选择按哪一个值分组
# reduceByKey 分组后(ByKey)对value进行聚合(reduce),二元组第一个值为key
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])# 分组 按第一个值
rdd2 = rdd.groupBy(lambda x: x[0])
print(rdd2.collect())
'''
返回的是迭代器,需进一步转换
[('a', <pyspark.resultiterable.ResultIterable object at 0x106178370>), 
('b', <pyspark.resultiterable.ResultIterable object at 0x1060abe50>)]
'''
rdd3 = rdd2.map(lambda x: (x[0], list(x[1])))
print(rdd3.collect())
'''  
[('a', [('a', 1), ('a', 2)]), 
('b', [('b', 3), ('b', 4)])]
'''
# groupByKey
# 自动按照key分组,分组后没有聚合操作,只允许二元组
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])rdd2 = rdd.groupByKey()
rdd2.map(lambda x: (x[0], list(x[1]))).collect()
# [('a', [1, 2]), ('b', [3, 4])]

1.7 filter、distinct 过滤筛选

# filter 过滤器
# 过滤条件True则保留
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x > 3).collect()
# [4, 5]
# distinct 去重
rdd = sc.parallelize([1, 1, 1, 1, 2, 3, 'a', 'a'])
rdd.distinct().collect()
# [[1, 'a', 2, 3]

1.8 union 合并

# union 合并两个rdd
# 元素凑在一起,不考虑重复
rdd_a = sc.parallelize([1, 1, 2, 3])
rdd_b = sc.parallelize([2, 3, ('a', 1), ('b', 2)])rdd_a.union(rdd_b).collect()
# [1, 1, 2, 3, 2, 3, ('a', 1), ('b', 2)]

1.9 join、leftOuterJoin、rightOuterJoin 连接

# join JOIN操作
# 只用于二元组,相同key进行关联
rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])print(rdd_a.join(rdd_b).collect())
'''   
内连接 取交集
[('b', (3, 2)), 
('a', (1, 1)), 
('a', (2, 1))]
'''
print(rdd_a.leftOuterJoin(rdd_b).collect())
'''   
左连接 取交集和左边全部
[('b', (3, 2)), 
('a', (1, 1)), 
('a', (2, 1))]
'''
print(rdd_a.rightOuterJoin(rdd_b).collect())
'''   
右连接 取交集和右边全部
[('b', (3, 2)), 
('c', (None, 3)), 
('a', (1, 1)), 
('a', (2, 1))]
'''

1.10 intersection 交集

# intersection 取交集
# 区别于join,没有按key连接的操作
rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])rdd_a.intersection(rdd_b).collect()
# [('a', 1)]

1.11 sortBy、sortByKey 排序

# sortBy
# func 指定排序元素的方法
# ascending True生序,False降序
# numPartitions 用多少分区排序
rdd = sc.parallelize([[1, 2, 3], [7, 8, 9],[4, 5, 6]])
rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect()
'''  
[[1, 2, 3], 
[4, 5, 6], 
[7, 8, 9]]
'''
# sortByKey 针对kv型rdd
'''   
ascending True升序,False降序
numPartitions 全局有序要设为1,否则只能保证分区内有序
keyfunc 对key进行处理,再排序
'''
rdd = sc.parallelize([('a', 1), ('c', 2), ('B', 3)])
print(rdd.sortByKey(ascending=True, numPartitions=1).collect())
'''   
[('B', 3), ('a', 1), ('c', 2)]
'''
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda k: str(k).lower()).collect())
'''  
[('a', 1), ('B', 3), ('c', 2)]
'''

1.12 countByKey 统计key出现次数

# countByKey 统计key出现次数,可多元元组
# 返回dict 不是rdd
rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])
rdd.countByKey()

1.13 first、take、top、count 取元素

# first 取第一个元素
rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])
print(rdd.first() )
# ('a', 1, 2)# take 取前n个元素
print(rdd.take(2))
# [('a', 1, 2), 'a']# count 返回元素个数
print(rdd.count())# top 降序排序取前n个
rdd = sc.parallelize([2, 4, 1, 6])
print(rdd.top(2))
# [6, 4]

1.14 takeOrdered 排序取前n个

# takeOrdered 排序取前n个
'''   
param1: n
param2: func取数前更改元素,不更改元素本身,
不传func,默认升序(取前n最小值)
func = lambda x: -x 变为降序,取前n最大值,和top相同
'''
rdd = sc.parallelize([2, 4, 1, 6])
rdd.takeOrdered(2) # [1, 2]
rdd.takeOrdered(2, lambda x: -x) # [6, 4]

1.15 takeSample 随机抽取

# takeSample 随机抽取元素
'''  
param1: True随机有放回抽样,Fasle不放回抽样 
param2: 抽样个数
param3: 随机数种子
'''
rdd = sc.parallelize([1])
rdd.takeSample(True, 2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/64009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu22安装和部署Kettle8.2

前提 kettle是纯java编写的etl开源工具&#xff0c;目前kettle7和kettle8都需要java8或者以上才能正常运行。所以运行kettle前先检查java环境是否正确配置&#xff0c;java版本是否是8或者以上。 kettle安装 1、创建kettle目录&#xff0c;并将kettle的zip包解压到kettle目…

linux开启端口

目录 1.查看防火墙状态 1.1 开启防火墙 1.2 再次查看防火墙状态 2.开启指定端口 3. 重启防火墙 4.重新加载防火墙 5.查看已经开启的端口 1.查看防火墙状态 firewall-cmd --state 如果返回的是 not running&#xff0c;那么需要先开启防火墙&#xff0c; 1.1 开启防火…

QT中对话框界面的实现以及事件处理机制(核心机制)

对话框 消息对话框、字体对话框、颜色对话框、文件对话框 消息对话框 消息对话框提供了一个模态的对话框&#xff0c;用来提示用户信息&#xff0c;或者询问用户问题并得到回答 基于属性版本的API 使用该类调用构造函数&#xff0c;构造一个类对象调用成员函数exec进入执行…

机器人中的数值优化(五)——信赖域方法

本系列文章主要是我在学习《数值优化》过程中的一些笔记和相关思考&#xff0c;主要的学习资料是深蓝学院的课程《机器人中的数值优化》和高立编著的《数值最优化方法》等&#xff0c;本系列文章篇数较多&#xff0c;不定期更新&#xff0c;上半部分介绍无约束优化&#xff0c;…

写的一款简易的热点词汇记录工具

项目需要对用户提交的附件、文章、搜索框内容等做热词分析。如下图&#xff1a; 公司有大数据团队。本着不麻烦别人就不麻烦别人的原则&#xff0c;写了一款简易的记录工具&#xff0c;原理也简单&#xff0c;手工在业务插入锚点&#xff0c;用分词器分好词&#xff0c;排掉字…

字符函数和字符串函数(2)

目录 memcpy memmove memcmp memcpy void * memcpy ( void * destination, const void * source, size_t num ); 1.函数memcpy从source的位置开始向后复制num个字节的数据到destination的内存位置。 2.这个函数在遇到 \0 的时候并不会停下来。 3.如果source和destination有…

Ansible-playbook循环学习

循环语句简介 我们在编写playbook的时候&#xff0c;不可避免的要执行一些重复性操作&#xff0c;比如指安装软件包&#xff0c;批量创建用户&#xff0c;操作某个目录下的所有文件等。正如我们所说&#xff0c;ansible一门简单的自动化语言&#xff0c;所以流程控制、循环语句…

Ceph入门到精通-LVS基础知识

LB集群: &#xff08;Load Balancing&#xff09;即负载均衡集群,其目的是为了提高访问的并发量及提升服务器的性能&#xff0c;其 实现方式分为硬件方式和软件方式。 硬件实现方式&#xff1a; 常用的有 F5公司的BIG-IP系列、A10公司的AX系列、Citrix公司的 NetScaler系列…

VisualStudio配置pybind11-Python调用C++方法

个人测试下来Debug生成的dll改pyd&#xff0c;py中import会报错gilstate->autoInterpreterState 如果遇到同样问题使用Release吧 目录 1.安装pybind11 1.pip&#xff1a; 2.github&#xff1a; 2.配置VS工程 2.在VC目录中的包含目录添加&#xff1a; 3.在VC目录中的库目录…

【Web系列二十四】使用JPA简化持久层接口开发

目录 环境配置 1、引入依赖 配置文件 代码编写 实体类创建 JPA常用注解 Service与ServiceImpl Service ServiceImpl Controller Dao 三种实现Dao功能方式 1.继承接口&#xff0c;使用默认接口实现 2.根据接口命名规则默认生成实现 3.自定义接口实现(类似MyBatis…

LeetCode-134-加油站-贪心思路

题目描述&#xff1a; 在一条环路上有 n 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发&#xff0c;开始时油箱为空。 给定两个整数…

汽车类 ±0.25°C SPI 温度传感器,TMP126EDBVRQ1、TMP126EDCKRQ1、TMP127EDBVRQ1引脚配置图

一、概述 TMP126-Q1 是一款精度为 0.25C 的数字温度传感器 &#xff0c; 支持的环境温度范围为 -55C 至 175C 。TMP126-Q1 具 有 14 位 &#xff08; 有符号 &#xff09; 温度分辨率(0.03125C/LSB)&#xff0c;并且可在 1.62V 至 5.5V 的电源电压范围内工作。TMP126-Q1 具有转…

同创永益入选首批“金融数字韧性与混沌工程实践试点机构”

8月16日下午&#xff0c;由北京国家金融科技认证中心、北京国家金融标准化研究院联合主办的“传递信任 服务发展”金融科技标准认证生态大会在太原成功举办。中国金融电子化集团有限公司党委书记、董事长周逢民&#xff0c;中国科学院院士冯登国&#xff0c;中国工商银行首席技…

Unity 粒子特效遮罩(ParticleMask)

1.需求&#xff1a; 游戏中粒子特效能实现非常好的效果&#xff0c;但是由于粒子特效是独立的系统&#xff0c;Unity自带的Mask普通的遮罩&#xff0c;遮不住粒子特效。 2.实现原理&#xff1a; 通过shader把超出范围的粒子纹理(Texture)&#xff0c;改成透明颜色&#xff0…

Python安装指南(Windows版)

安装python环境 官网下载地址&#xff1a; Download Python | Python.org 我选择3.10.4版本&#xff0c;当然你也可以选择其他版本 安装 安装完成&#xff0c;需要验证是否安装成功。 打开CMD窗口&#xff0c;输入python命令&#xff0c;如果进入如下python窗口则安装成功&…

redis-lua脚本-无参-比较2个数值

以下是演变的过程&#xff1a; eval " return haha " 0 eval " local res haha; return res; " 0 eval " local value1 redis.call(get,value1); local value2 redis.call(get,value2);return value1; " 0 eval " return 1 < 2;…

docker打包vue vite前端项目

打包vue vite 前端项目 1.打包时将测试删除 2.修改配置 3.打包项目 npm run build 显示成功&#xff08;黄的也不知道是啥&#xff09; 打包好的前端文件放入 4.配置 default.conf upstream wms-app {server 你自己的ip加端口 ;server 192.168.xx.xx:8080 ; } server { …

怎样将几个pdf合并?

在日常工作中&#xff0c;我们经常需要处理大量的PDF文件。有时候&#xff0c;我们需要将多个PDF文件合并成一个文件&#xff0c;以便于快速传输或方便查阅。虽然PDF文件本身不能进行编辑&#xff0c;但是借助专业的PDF编辑软件&#xff0c;我们可以轻松地实现将多个PDF文件合并…

动漫推荐。

声明&#xff1a; 1.观看来源&#xff1a;腾讯&#xff0c;bilibili&#xff0c;爱奇艺&#xff0c;优酷&#xff08;私信博主可获取其他观看途径&#xff09;。 2.以下动漫热度、时间不分先后&#xff0c;并且都是博主观看完的动漫&#xff0c;黄色标注表示热度较高动漫&…

C++Qt QSS要注意的坑

qss源自css&#xff0c;相当于css的一个子集&#xff0c;主要支持的是css2标准&#xff0c;很多网上的css3的标准的写法在qss这里是不生效的&#xff0c;所以不要大惊小怪。 qss也不是完全支持所有的css2&#xff0c;比如text-align官方文档就有说明&#xff0c;只支持 QPushB…