pyspark学习_dataframe常用操作_02

#回顾01常用操作
from pyspark import SparkSession,DataFramespark = SparkSession.builder.getOrCreate()peopleDF = spark.read.json("people.json")peopleDF.printSchema()#显示DataFrame的模式信息
peopleDF.show()#显示DataFrame的数据信息
peopleDF.foreach(print)#foreach方法参数为function,如print,每条数据输入function进行结果输出
def test_foreach(df:DataFrame):if df['age']>20:print(df)else:pass
peopleDF.foreach(test_foreach)
#以下操作返回新的dataframe
peopleDF.select(peopleDF['name'])#查询DataFrame部分列    
peopleDF.filter(peopleDF['age']>30)#按条件筛选数据         
peopleDF.groupBy(peopleDF['age']).count()#按列对数据进行统计   
peopleDF.sort(peopleDF['age'].asc(),peopleDF['name'].desc())#按age升序,如果一致,按name降序
peopleDF.select(peopleDF['name'].alias('username'))#修改列名
peopleDF.withColumn('test_col',peopleDF['age'])#新增列
peopleDF.replace('Bob','tom',subset=['name'])#替换信息,如果name列字段为Bob,替换为tom
2.11 withCoulumnRenamed
#withColumnRenamed:修改现有列名名称
"""
para1:existing:str   现有列名str类型
para2:new:str        修改后列名str类型
return:dataframe     返回dataframe
"""
peopleDF.withColumnRenamed("age","age_new").show()输出如下:
+-------+----+
|age_new|name|
+-------+----+
|     12| tom|
|     22|jack|
|     33| Bob|
+-------+----+
2.12  join
#join:关联,类似于sql里面的join函数
"""
para1:other:DataFrame   另一个DataFrame
para2:on:               类似于sql中的on,两个dataframe的关联字段
how:                    left/right/inner/full
"""
peopleDF1 = peopleDF
peopleDF.join(peopleDF1,on = (peopleDF1['age']==peopleDF['age']) & (peopleDF1['name'] == peopleDF['name']),how='inner').show()输出如下:
+---+----+---+----+
|age|name|age|name|
+---+----+---+----+
| 12| tom| 12| tom|
| 22|jack| 22|jack|
| 33| Bob| 33| Bob|
+---+----+---+----+
2.13 count
#count:计算数量--->返回int
peopleDF.count()  #返回dataframe的数量输出如下:
3
 2.14 drop
#drop:删除dataframe的列,一次删除一列
"""
para:列名 str
return:dataframe
"""
peopleDF.drop('name').show()输出如下:
+---+
|age|
+---+
| 12|
| 22|
| 33|
+---+
2.15 take
#take:获取dataframe的前N行数据
"""
para1:num   int类型
return  list(Row)
""" 
print(peopleDF.take(2)) #获取peopleDF前2行数据输出如下:
[Row(age=12, name='tom'), Row(age=22, name='jack')]解析Row数据
people_list = peopleDF.take(2)
for people in people_list:print(people['age'])
2.16 distinct
#distinct:输出dataframe不同的行数据  返回新的dataframepeopleDF.distinct().show()输出如下:
+---+----+
|age|name|
+---+----+
| 33| Bob|
| 12| tom|
| 22|jack|
+---+----+
2.17 union
#union:上下拼接两个不同的dataframe,要求两个dataframe有相同的列数
"""
para1: other:DataFrame 其他的DataFrame
return: DataFrame
"""
peopleDF.union(peopleDF).show()
+---+----+
|age|name|
+---+----+
| 12| tom|
| 22|jack|
| 33| Bob|
| 12| tom|
| 22|jack|
| 33| Bob|
+---+----+
2.18 first
#first:获取dataframe中第一个数据
print(peopleDF.first()) #返回Row输出如下:
Row(age=12, name='tom')
2.19 createOrReplaceTempView
#createOrReplaceTempView:注册临时表
"""
para:name str类型,注册后表名
return:None
"""
peopleDF.createOrReplaceTempView("people")
result = spark.sql("select * from people")
result.show()输出如下:
+---+----+
|age|name|
+---+----+
| 12| tom|
| 22|jack|
| 33| Bob|
+---+----+
2.20 repartition
#repartition:返回由给定分区表达式分区的新的DataFrame
"""
para1:numPartitions int:指定分区的数量,如果未指定,则默认分区数
para2:cols str或者Column  指定分区列
return:dataframe
"""
print('修改分区前:{}'.format(peopleDF.rdd.getNumPartitions()))  # 修改分区前分区数
peopleDF_new = peopleDF.repartition(3, 'name')
print('修改分区后:{}'.format(peopleDF_new.rdd.getNumPartitions()))  # 修改分区后分区数输出如下:
修改分区前:1
修改分区后:3
 2.21 rdd
#rdd:将dataframe类型数据转为RDD类型数据
peopleRDD = peopleDF.rdd
print(type(peopleRDD))输出如下:
<class 'pyspark.rdd.RDD'>
2.22 toDF
#toDF:将RDD类型数据转换为DataFrame类型数据
"""
para1:schema:列名,由各个列名组成的list
para2:sampleRatio:采样率,用于推测各个列的数据类型,默认前100个数据
"""
peopleRdd = peopleDF.rdd
peopleRdd.toDF(schema=['name','age'],sampleRatio=0.5).printSchema()输出如下:
root|-- age: long (nullable = true)|-- name: string (nullable = true)
2.23 collect
#collect:将DataFrame类型的数据转为list,同时会从远程集群拉去数据到driver端
print(peopleDF.collect())#输出如下:
[Row(age=12, name='tom'), Row(age=22, name='jack'), Row(age=33, name='Bob')]
2.24 persist/unpersist
#persist:dataframe数据持久化   unpersist:数据释放持久化  cache:数据持久化,调用了persist
#is_cached:属性,是否缓存
"""
persist参数:
storageLevel:持久化策略 (useDisk,useMemory,useOffHeap,deserialized,replication=1)
默认持久化级别是:Memory_and_disk (True,True,False,False,1)
"""
persist_peopleDF = peopleDF.persist()
print(persist_peopleDF.is_cached)persist_peopleDF_1 = persist_peopleDF.unpersist()
print(persist_peopleDF_1.is_cached)输出如下:
True
False
2.25 fillna
#fillna:如果列为空,填充数据
"""
para1:value:要填充的值
para2:subset:列名,可以为多列
return:dataframe  返回新的dataframe
"""
修改原有数据如下:
{"name":"tom","age":12,"year":2012}
{"name":"jack"}
{"name":"Bob","age":33}peopleDF.fillna(value=0,subset=['age','year']).show()输出如下:
+---+----+----+
|age|name|year|
+---+----+----+
| 12| tom|2012|
|  0|jack|   0|
| 33| Bob|   0|
+---+----+----+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/648888.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

unity 网络地址加载图片

/// <summary> /// 网络地址加载图片 /// </summary> /// <param name"url">地址</param> /// <param name"raw">图片显示位置</param> public void loadImage(string url,RawImage raw) { …

【Godot4自学手册】第六节实现人物的挥剑操作

同学们好&#xff01;本节学习一下人物挥剑操作。 一、将鼠标左键单击设为输入映射 单击项目选择项目设置&#xff0c;在添加新动作填写sword&#xff0c;然后点击添加。在动作列表中,单击sword后面的加号&#xff0c;在弹出对话框中单击鼠标左键&#xff0c;最后单击确定&am…

Flink Checkpoint 超时问题和解决办法

第一种、计算量大&#xff0c;CPU密集性&#xff0c;导致TM内线程一直在processElement&#xff0c;而没有时间做CP【过滤掉部分数据&#xff1b;增大并行度】 代表性作业为算法指标-用户偏好的计算&#xff0c;需要对用户在商城的曝光、点击、订单、出价、上下滑等所有事件进…

Linux--基础开发工具篇(1)(yum)

1.Linux 软件包管理器 yum 1.1yum是什么&#xff1f;什么是软件包&#xff1f; yum是什么&#xff1f; yum是一个软件下载安装管理的一个客户端&#xff0c;就如小米应用商店&#xff0c;华为应用商城。 Linux中软件包可能有依赖关系--yum会帮助我们解决依赖关系的问题。 什么是…

算法训练营Day51(动态规划12)

309.最佳买卖股票时机含冷冻期 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 提醒 本题加了一个冷冻期&#xff0c;状态就多了&#xff0c;有点难度&#xff0c;要把各个状态分清&#xff0c;思路才能清晰 注意初始化dp[0][0]、dp[0][1]、dp[0][2]…

使用OpenCV实现一个简单的实时人脸跟踪

简介&#xff1a; 这个项目将通过使用OpenCV库来进行实时人脸跟踪。实时人脸跟踪是一项在实际应用中非常有用的技术&#xff0c;如视频通话、智能监控等。我们将使用OpenCV中的VideoCapture()函数来读取视频流&#xff0c;并使用之前加载的Haar特征级联分类器来进行人脸跟踪。 …

GPTs 英语老师 现在不能发布为Averyone了 翻译 时态 结构 例句 一清二楚

https://chat.openai.com/g/g-l3S5WDgP7-english-teacher The crowd began to shout, but the drunk was unaware of the danger. 翻译为中文: 人群开始大声喊叫&#xff0c;但那个醉酒的人没有意识到危险。 分析时态和句子语法: 时态&#xff1a;这个句子使用的是过去时。语法…

【python】一文带你了解什么是dataclass?

为什么需要dataclass数据类 在Python 3.7&#xff08;PEP 557&#xff09;后引入一个新功能是装饰器dataclass&#xff0c;它通过自动生成特殊方法&#xff08;如__init__() 和 __repr__() ...等魔术方法 &#xff09;来简化数据类的创建。 数据类和普通类一样&#xff0c;但设…

muduo库的模拟实现——muduo库的介绍

文章目录 一、muduo库介绍二、背景知识1.epoll2.Reactor模式 三、功能模块划分1.工具部分2.Reactor部分3.TCPServer部分 一、muduo库介绍 muduo库是在Linux环境下使用C实现的一个多Reactor多线程的高性能网络服务器&#xff0c;作者陈硕&#xff0c;他还出了一本书《Linux多线…

基于差分进化算法(Differential Evolution Algorithm,DE)的移动边缘计算的任务卸载与资源调度研究(提供MATLAB代码)

一、优化模型介绍 移动边缘计算的任务卸载与资源调度是指在移动设备和边缘服务器之间&#xff0c;将部分计算任务从移动设备卸载到边缘服务器&#xff0c;并合理分配资源以提高系统性能和降低能耗。 在本文所研究的区块链网络中&#xff0c;优化的变量为&#xff1a;挖矿决策&…

热门应用滥用苹果 iPhone 推送通知,暗中窃取用户数据

移动研究人员 Tommy Mysk 近日揭露&#xff0c;部分热门应用利用 iPhone 推送通知功能秘密发送用户数据&#xff0c;这引发了用户隐私安全担忧。 许多 iOS 应用程序正在使用由推送通知触发的后台进程来收集设备的用户数据&#xff0c;从而有可能创建用于跟踪的指纹档案。 Mys…

Azure AI - 沉浸式阅读器,阅读障碍用户福音

目录 一、什么是沉浸式阅读器将内容划分开来提高可读性显示常用字词的图片突出显示语音的各个部分朗读内容实时翻译内容将单词拆分为音节 二、沉浸式阅读器如何工作&#xff1f;环境准备创建 Web 应用项目设置身份验证配置身份验证值安装标识客户端 NuGet 包更新控制器以获取令…

《ORANGE’S:一个操作系统的实现》读书笔记(三十八)尾声(三)

这篇文章是尾声的第三部分&#xff0c;也是《ORANGE’S&#xff1a;一个操作系统的实现》读书笔记的最后一篇文章&#xff0c;本篇文章记录如何将我们开发的OS安装到真实的计算机&#xff08;建议在虚拟机中进行&#xff09;。 将OS安装到真实的计算机 其实安装到真实的硬盘和…

JS slice() 方法总结

在JavaScript中&#xff0c;有一种数组方法叫做slice()&#xff0c;它基于给定的起始和结束位置&#xff0c;创建一个新的数组副本。该方法能够将数组的一部分切成另一个数组。 语法 array.slice(start, end) start: 可选参数&#xff0c;表示切片起始位置的索引。如果没有指…

《Linux C编程实战》笔记:信号的屏蔽

在《Linux C编程实战》笔记&#xff1a;信号的捕捉和处理-CSDN博客的sigaction的sa_mask成员&#xff0c;它的类型就是一个信号集&#xff0c;下面我们来介绍它 信号集 信号的总数目达64个&#xff0c;所以不能用一个整数表示它们的集合&#xff0c;int类型通常是4字节32位&a…

CKA考试练习题

一&#xff1a;为部署管道创建一个新的 ClusterRole 并将其绑定到范围为特定 namespace 的特定 ServiceAccount 要求&#xff1a;创建一个名字为 deployment-clusterrole 且仅允许创建以下&#xff08;Deployment&#xff0c;StatefulSet &#xff0c;DaemonSet&#xff09;资源…

Linux | makefile简单教程 | Makefile的工作原理

前言 在学习完了Linux的基本操作之后&#xff0c;我们知道在linux中编写代码&#xff0c;编译代码都是要手动gcc命令&#xff0c;来执行这串代码的。 但是我们难道在以后运行代码的时候&#xff0c;难道都要自己敲gcc命令嘛&#xff1f;这是不是有点太烦了&#xff1f; 在vs中…

力扣646. 最长数对链

动态规划 思路&#xff1a; 思路与 力扣354. 俄罗斯套娃信封问题 类似将序列进行排序&#xff0c;然后假设 dp[i] 为第 i 个元素的最长数对链个数&#xff1b;则其状态转移方程&#xff1a; 第 i 个元素之前的某一个元素&#xff08;假设是下标是 j&#xff09;&#xff0c;如…

SPEC CPU 2017 Qemu RISCV

SPEC CPU 2017 Qemu RISCV 以下是 SPEC CPU 2017 的官方描述, 据说在 1.1.9 版本之后支持 RISCV SPEC CPU 2017 may be updated from time to time. To update your copy, use runcpu --update. History: v1.1.9, Nov-2022: Add RISC-V Linux toolset; update sysinfo.v1.1.8, …

KVM部署Alibaba Cloud Linux操作系统

下载镜像文件 下载链接&#xff1a;https://mirrors.aliyun.com/alinux/image/?spma2c4g.11186623.0.0.79ed5af6pehv54 下载文件&#xff1a;aliyun_3_x64_20G_nocloud_alibase_20230727.qcow2 部署KVM虚拟化环境 yum -y install qemu libvirt rr-testsuite systemctl star…