Pyspark下操作dataframe方法(1)

文章目录

  • Pyspark dataframe
    • 创建DataFrame
      • 使用Row对象
      • 使用元组与scheam
      • 使用字典与scheam
      • 注意
    • agg 聚合操作
    • alias 设置别名
      • 字段设置别名
      • 设置dataframe别名
    • cache 缓存
    • checkpoint RDD持久化到外部存储
    • coalesce 设置dataframe分区数量
    • collect 拉取数据
    • columns 获取dataframe列

Pyspark dataframe

创建DataFrame

from pyspark.sql import  SparkSession,Row
from pyspark.sql.types import *def init_spark():spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master', "local[2]") \.enableHiveSupport().getOrCreate()return spark
spark = init_spark()# 设置字段类型
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("gender", StringType(), True),
])

使用Row对象

cs = Row('name','age','id','gender')
row_list = [ cs('ldsx','12','1','男'),cs('test1','20','1','女'),cs('test2','26','1','男'),cs('test3','19','1','女'),cs('test4','51','1','女'),cs('test5','13','1','男')]
data = spark.createDataFrame(row_list)
data.show()+-----+---+---+---+
| name|age| id|gender|
+-----+---+---+---+
| ldsx| 12|  1| 男|
|test1| 20|  1| 女|
|test2| 26|  1| 男|
|test3| 19|  1| 女|
|test4| 51|  1| 女|
|test5| 13|  1| 男|
+-----+---+---+---+
data.printSchema()
root|-- name: string (nullable = true)|-- age: string (nullable = true)|-- id: string (nullable = true)|-- gender: string (nullable = true)

使用元组与scheam

park.createDataFrame([('ldsx1','12','1','男'),('ldsx2','12','1','男')],schema).show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|ldsx1| 12|  1|    男|
|ldsx2| 12|  1|    男|
+-----+---+---+------+

使用字典与scheam

spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}]).show()
+---+------+---+----+
|age|gender| id|name|
+---+------+---+----+
| 12|    女|  1|ldsx|
+---+------+---+----+

注意

scheam设置优先级高于row设置,dict设置的key

schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("测试", StringType(), True),
])
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}],schema).show()
+----+---+---+----+
|name|age| id|测试|
+----+---+---+----+
|ldsx| 12|  1|null|
+----+---+---+----+

agg 聚合操作

在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。可以结合groupby使用。

from pyspark.sql import functions as sf
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.agg({'age':'max'}).show()
+--------+
|max(age)|
+--------+
|      51|
+--------+
data.agg({'age':'max','gender':"max"}).show()
+-----------+--------+
|max(gender)|max(age)|
+-----------+--------+
|         男|      51|
+-----------+--------+data.agg(sf.min(data.age)).show()
+--------+
|min(age)|
+--------+
|      12|
+--------+
data.agg(sf.min(data.age),sf.min(data.name)).show()
+--------+---------+
|min(age)|min(name)|
+--------+---------+
|      12|     ldsx|
+--------+---------+

结合groupby使用

data.groupBy('gender').agg(sf.min('age')).show()+------+--------+
|gender|min(age)|
+------+--------+
|    女|      19|
|    男|      12|
+------+--------+
data.groupBy('gender').agg(sf.min('age'),sf.max('name')).show()
+------+--------+---------+
|gender|min(age)|max(name)|
+------+--------+---------+
|    女|      19|    test4|
|    男|      12|    test5|
+------+--------+---------+

alias 设置别名

字段设置别名

#字段设置别名
data.select(data['name'].alias('rename_name')).show()
+-----------+
|rename_name|
+-----------+
|       ldsx|
|      test1|
|      test2|
|      test3|
|      test4|
|      test5|
+-----------+

设置dataframe别名

d1 = data.alias('ldsx1')
d2 = data2.alias('ldsx2')
d1.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
d2.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|测试1| 12|  1|    男|
|测试2| 20|  1|    男|
+-----+---+---+------+d3 = d1.join(d2,col('ldsx1.gender')==col('ldsx2.gender'),'inner')
d3.show()
+-----+---+---+------+-----+---+---+------+
| name|age| id|gender| name|age| id|gender|
+-----+---+---+------+-----+---+---+------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|
|test2| 26|  1|    男|测试1| 12|  1|    男|
|test2| 26|  1|    男|测试2| 20|  1|    男|
|test5| 13|  1|    男|测试1| 12|  1|    男|
|test5| 13|  1|    男|测试2| 20|  1|    男|
+-----+---+---+------+-----+---+---+------+d3[['name']].show()
#报错提示
pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`ldsx1`.`name`, `ldsx2`.`name`].
# 使用别名前缀获取
d3[['ldsx1.name']].show()
+-----+
| name|
+-----+
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
+-----+
>>> d3[['ldsx2.name']].show()
+-----+
| name|
+-----+
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
+-----+
d3.select('ldsx1.name','ldsx2.name').show()
+-----+-----+
| name| name|
+-----+-----+
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
+-----+-----+

cache 缓存

dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER

df.cache()
# 查看逻辑计划和物理计划
df.explain()

checkpoint RDD持久化到外部存储

Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。
CheckPoint支持写入HDFS。CheckPoint被认为是安全的

sc = spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir('hdfs:///ldsx_checkpoint')
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()

在这里插入图片描述

coalesce 设置dataframe分区数量

# 设置dataframe分区数量
d3 = d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()

collect 拉取数据

当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点,数据量很大慎用防止dirver炸掉。

d3.collect()
[Row(name='ldsx', age='12', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='ldsx', age='12', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试2', age='20', id='1', gender='男')]

columns 获取dataframe列

>>> d3.columns
['name', 'age', 'id', 'gender', 'name', 'age', 'id', 'gender']d3.withColumn('ldsx1.name_1',col('ldsx1.name')).show()
+-----+---+---+------+-----+---+---+------+------------+
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
+-----+---+---+------+-----+---+---+------+------------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|        ldsx|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|        ldsx|
|test2| 26|  1|    男|测试1| 12|  1|    男|       test2|
|test2| 26|  1|    男|测试2| 20|  1|    男|       test2|
|test5| 13|  1|    男|测试1| 12|  1|    男|       test5|
|test5| 13|  1|    男|测试2| 20|  1|    男|       test5|
+-----+---+---+------+-----+---+---+------+------------+# 重命名列名
d3.withColumnRenamed('ldsx1.name_1',col('ldsx1.name')).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/52616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[数据集][目标检测]智慧农业草莓叶子病虫害检测数据集VOC+YOLO格式4040张9类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):4040 标注数量(xml文件个数):4040 标注数量(txt文件个数):4040 标注…

TCP/IP - Transport Layer

目录 1. UDP1.1. 数据格式1.2. 端口号2. TCP2.1. 数据格式2.2. TCP建立:三次握手2.3. TCP断开:四次挥手2.x. TCP与UDP的区别3. ICMP3.1. ICMP帧格式回见TCP/IP 对TCP/IP协议簇传输层作介绍 1. UDP UDP(User Datagram Protocol,用户数据报协议)是一种面向无连接的传输层协…

【小沐学OpenGL】Ubuntu环境下glew的安装和使用

文章目录 1、简介1.1 OpenGL简介1.2 glew简介 2、安装glew2.1 命令安装glew2.2 直接代码安装glew2.3 cmake代码安装glew 3、测试glew3.1 测试glewfreeglut3.2 测试glewglfw 结语 1、简介 1.1 OpenGL简介 Linux 系统中的 OpenGL 是一个跨语言、跨平台的应用程序编程接口&#…

多态的概念

多态 所谓的多态其实就是多种形态,它又被分为编译时多态(静态多态) 和 运行时多态(动态多态)。 静态的多态其实就是之前的模版和函数重载,今天我们主要讲动态的多态。所谓的动态多态其实就是相同的函数,完成不同的功能。 这就实现了明明都是…

《数字图像处理(面向新工科的电工电子信息基础课程系列教材)》P84

更正卷积与相关微课中互相关运算动画中的索引。 1-D correlation rectwave 禹晶、肖创柏、廖庆敏《数字图像处理(面向新工科的电工电子信息基础课程系列教材)》 禹晶、肖创柏、廖庆敏《数字图像处理》资源二维码

【Linux实践】实验一:Linux系统安装与启动

【Linux实践】实验一:Linux系统安装与启动 实验目的实验内容实验步骤及结果1. 下载VMware2. 下载 Linux 操作系统3. 在VMware中安装Ubuntu系统4. 配置Ubuntu系统5. 关机 实验目的 1.掌握Linux系统的安装过程和简单配置方法。 2.掌握与Linux相关的多操作系统的安装方…

【Leetcode算法面试题】-1. 两数之和

文章目录 算法练习题目思路参考答案算法1算法2算法3 算法练习 面试经常会遇到算法题目,今天开启算法专栏,常用算法解析 题目 ** 给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为目标值 target 的那 两个 整数&…

算法练习题25——合并多项式

题目描述 给定两个多项式,要求对它们进行合并。每个多项式的形式为若干项的集合,每项包含一个系数和一个指数。你需要将两个多项式按照指数相同的项合并,合并后的多项式要求按指数从小到大的顺序输出,输出格式为:各项…

为拖延症量身定制的AI工具,让Kimi做我的《每日信息整理助手》

AI不止对传统行业带来巨大的改变,对日常生活也便利了不少,现在这个时代获取信息的方式太简单了。 我们每天都会接受大量的信息,难免一天下来会忘记很多事情,有时候突然想起了一个点子,有时候突然有一件急事、一件待办事…

深入理解FastAPI的response_model:自动化数据验证与文档生成

使用 FastAPI 的 response_model 参数 在构建 RESTful API 时,确保数据的一致性和正确性是非常重要的。FastAPI 提供了强大的工具来帮助开发者实现这一目标。其中一个关键特性是 response_model 参数,它允许开发者定义期望的响应格式,并自动…

背诵——系统设计

系统设计 概要设计 概要设计又称为系统总体结构设计,它是系统开发过程中很关键的一步,其主要任务是将系统的功能需求分配给软件模块,确定每个模块的功能和调用关系,形成软件的模块结构图,即系统结构图。在概要设计中…

windows 显示进程地址空间

windows 显示进程地址空间 windows 显示进程地址空间 文章目录 windows 显示进程地址空间显示进程地址空间 显示进程地址空间 /* 3-ProcessInfo.cpp 显示进程地址空间 */#include "..\\CommonFiles\\CmnHdr.h" #include "..\\CommonFiles\\Toolhelp.h"#i…

17 个被动和主动遥感之间的区别

摘要: 遥感是指通过使用连接到卫星的传感器记录有关地球表面信息的行为。遥感在收集大面积信息、表征地球上的自然特征、观察和监测地球和物体随时间的变化以及 利用这些信息进行处理和分析方面发挥着至关重要的作用。在遥感中,太阳是终极能源,对照明非常有用。卫星具有成像传…

LCS—最长公共子序列

最长公共子序列问题就是求出两个字符串的LCS长度,是一道非常经典的面试题目,因为它的解法是典型的二维动态规划。 比如输入 str1 "babcde", str2 "acbe",算法应该输出3,因为 str1 和 str2 的最长公共子序列…

金属铬厂商分析:前十强厂商占有大约64.0%的市场份额

金属铬是一种灰色、有光泽、硬而脆的过渡金属。铬是不锈钢的主要添加剂,可增加耐腐蚀性。 据QYResearch调研团队最新报告“全球金属铬市场报告2024-2030”显示,预计2030年全球金属铬市场规模将达到11.8亿美元,未来几年年复合增长率CAGR为6.5%…

Spring Cloud之三 网关 Gateway

1:Intellij 新建项目 spring-cloud-gateway 2:pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLoca…

React 项目中,如何实现有效的内存管理和防止内存泄漏?

在 React 项目中&#xff0c;实现有效的内存管理和防止内存泄漏是确保应用性能和用户体验的关键。以下是一些具体的实践技巧和示例&#xff1a; 使用 useEffect 清理副作用&#xff1a; 在 useEffect 中返回一个清理函数&#xff0c;确保在组件卸载时清除事件监听器、定时器等资…

CnCrypt(磁盘加密工具绿色版是一款功能强大磁盘加密工具,供大家学习研究参考

CnCrypt(磁盘加密工具)特点 加密单个分区或整个硬盘,所有加密都是以分区为基础的 提供两级方案,以应对被强迫说出密码的情况(如抢劫。隐藏分区(覆盖式密码术,steganography)无法探测到CnCrypt 加密分区(加密数据会被认为是随机数据)。 CnCrypt(磁盘加密工具)特色 1、加密U…

【编程基础知识】什么是数据库事务

事务&#xff08;Transaction&#xff09;是数据库管理系统中的一个基本概念&#xff0c;用于确保数据库操作的原子性&#xff08;Atomicity&#xff09;、一致性&#xff08;Consistency&#xff09;、隔离性&#xff08;Isolation&#xff09;和持久性&#xff08;Durability…

【C++】C++ STL 探索:List使用与背后底层逻辑

C语法相关知识点可以通过点击以下链接进行学习一起加油&#xff01;命名空间缺省参数与函数重载C相关特性类和对象-上篇类和对象-中篇类和对象-下篇日期类C/C内存管理模板初阶String使用String模拟实现Vector使用及其模拟实现 本文将通过模拟实现List&#xff0c;从多个角度深入…