Pyspark下操作dataframe方法(3)

文章目录

  • Pyspark dataframe操作方式3
    • df.foreach 逐条执行
    • foreachPartition 按分区逐条执行
    • freqltems
    • groupBy 分组
    • head 获取指定数量开头
    • hint 查询优化
    • intersect 获取交集(去重)
    • isEmpty 判断dataframe是否为空
    • join 关联
    • limit 限定数量
    • mapInPandas 迭代处理
    • maplnArrow 迭代处理
    • fill 填充
    • orderBy 排序
    • persist 持久化缓存
    • printSchema 打印架构

Pyspark dataframe操作方式3

df.foreach 逐条执行

df.foreach() == df.rdd.foreach()

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
def func(row):print(row.name)# row对象进入func执行
df.foreach(func)
Alice
Bob

foreachPartition 按分区逐条执行

df.show()
+---+-----+
|age| name|
+---+-----+
| 14|  Tom|
| 23|Alice|
| 16|  Bob|
+---+-----+
def func(itr):for person in itr:print(person.name)df.foreachPartition(func)
Tom
Alice
Bob

freqltems

df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  1| 11|
|  1| 11|
|  3| 10|
|  4|  8|
|  4|  8|
+---+---+
df.freqItems(["c1", "c2"]).show()
+------------+------------+
|c1_freqItems|c2_freqItems|
+------------+------------+
|   [1, 3, 4]| [8, 10, 11]|
+------------+------------+

groupBy 分组

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  2|  Bob|
|  5|  Bob|
+---+-----+df.groupBy("name").agg({"age": "sum"}).show()
+-----+--------+
| name|sum(age)|
+-----+--------+
|  Bob|       9|
|Alice|       2|
+-----+--------+df.groupBy("name").agg({"age": "max"}).withColumnRenamed('max(age)','new_age').sort('new_age').show()
+-----+-------+
| name|new_age|
+-----+-------+
|Alice|      2|
|  Bob|      5|
+-----+-------+

head 获取指定数量开头

df.head(2)
[Row(age=2, name='Alice'), Row(age=2, name='Bob')]

hint 查询优化

处理大表join时,spark默认策略可能不是最优解,通过hint 可以设置join类型

其他hints: merge,shuffle,coalesce

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])                                                                                             
df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
df.join(df2, "name").explain()  
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]+- SortMergeJoin [name#1641], [name#1645], Inner:- Sort [name#1641 ASC NULLS FIRST], false, 0:  +- Exchange hashpartitioning(name#1641, 200), ENSURE_REQUIREMENTS, [plan_id=1916]:     +- Filter isnotnull(name#1641):        +- Scan ExistingRDD[age#1640L,name#1641]+- Sort [name#1645 ASC NULLS FIRST], false, 0+- Exchange hashpartitioning(name#1645, 200), ENSURE_REQUIREMENTS, [plan_id=1917]+- Filter isnotnull(name#1645)+- Scan ExistingRDD[height#1644L,name#1645]df.join(df2.hint("broadcast"), "name").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]+- BroadcastHashJoin [name#1641], [name#1645], Inner, BuildRight, false:- Filter isnotnull(name#1641):  +- Scan ExistingRDD[age#1640L,name#1641]+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=1946]+- Filter isnotnull(name#1645)+- Scan ExistingRDD[height#1644L,name#1645]

intersect 获取交集(去重)

df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
PyDev console: starting.
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
|  c|  4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+
df1.intersect(df2).show()+---+---+
| C1| C2|
+---+---+
|  b|  3|
|  a|  1|
+---+---+

intersectAll 获取交集(保留重复项)

df1.intersectAll(df2).show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+

isEmpty 判断dataframe是否为空

# 空返回True 非空返回False
df1.isEmpty()
False

join 关联

注意聚合方式可能会影响show出来的列

单列聚合

df2.show()
+------+----+
|height|name|
+------+----+
|    80| Tom|
|    85| Bob|
+------+----+
df4.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+
df4.join(df2,df4.name == df2.name,how='left').show()
+----+------+-----+------+----+
| age|height| name|height|name|
+----+------+-----+------+----+
|   5|  null|  Bob|    85| Bob|
|  10|    80|Alice|  null|null|
|null|  null|  Tom|    80| Tom|
|null|  null| null|  null|null|
+----+------+-----+------+----+
df4.join(df2,df4.name == df2.name).show()
+----+------+----+------+----+
| age|height|name|height|name|
+----+------+----+------+----+
|   5|  null| Bob|    85| Bob|
|null|  null| Tom|    80| Tom|
+----+------+----+------+----+# 会合并同列名
df4.join(df2,'name').show()
+-----+----+------+------+
| name| age|height|height|
+-----+----+------+------+
|Alice|  10|    80|    80|
|  Bob|   5|  null|    85|
|  Tom|null|  null|    80|
+-----+----+------+------+

多列聚合

df2.show()
+------+-----+
|height| name|
+------+-----+
|    80|  Tom|
|    85|  Bob|
|    80|Alice|
+------+-----+
df4.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+
df4.join(df2,[df4.name == df2.name,df4.age==df2.age]).show()
+---+------+-----+------+-----+
|age|height| name|height| name|
+---+------+-----+------+-----+
| 10|    80|Alice|    80|Alice|
+---+------+-----+------+-----+# 会合并同列名
df4.join(df2,['name','height']).show()
+-----+------+---+
| name|height|age|
+-----+------+---+
|Alice|    80| 10|
+-----+------+---+df4.join(df2,[df4.name == df2.name,df4.height==df2.height],how='left').show()
+----+------+-----+------+-----+
| age|height| name|height| name|
+----+------+-----+------+-----+
|  10|    80|Alice|    80|Alice|
|   5|  null|  Bob|  null| null|
|null|  null|  Tom|  null| null|
|null|  null| null|  null| null|
+----+------+-----+------+-----+df4.join(df2,'name').show()
+-----+----+------+------+
| name| age|height|height|
+-----+----+------+------+
|Alice|  10|    80|    80|
|  Bob|   5|  null|    85|
|  Tom|null|  null|    80|
+-----+----+------+------+
df4.join(df2,'name').select(df4.height).show()
+------+
|height|
+------+
|    80|
|  null|
|  null|
+------+
df4.join(df2,'name').select(df4.height,df2.height).show()
+------+------+
|height|height|
+------+------+
|    80|    80|
|  null|    85|
|  null|    80|
+------+------+

limit 限定数量

df = spark.createDataFrame( [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
df.limit(1).show()
+---+----+
|age|name|
+---+----+
| 14| Tom|
+---+----+
df.limit(0).show()
+---+----+
|age|name|
+---+----+
+---+----+

mapInPandas 迭代处理

使用pandas dataframe的迭代器

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):for pdf in iterator:print(pdf,type(pdf))yield pdf[pdf.id == 1]df.mapInPandas(filter_func, df.schema).show()  
# 进入filter_func变成了dataframe处理id  age
0   1   21 <class 'pandas.core.frame.DataFrame'>id  age
0   2   30 <class 'pandas.core.frame.DataFrame'>
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+

maplnArrow 迭代处理

该函数应采用pyarrow的迭代器

import pyarrow  
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):for batch in iterator:print(batch,type(batch))pdf = batch.to_pandas()print(pdf,type(pdf))yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1])df.mapInArrow(filter_func, df.schema).show()
pyarrow.RecordBatch
id: int64
age: int64 <class 'pyarrow.lib.RecordBatch'>id  age
0   1   21 <class 'pandas.core.frame.DataFrame'>
pyarrow.RecordBatch
id: int64
age: int64 <class 'pyarrow.lib.RecordBatch'>id  age
0   2   30 <class 'pandas.core.frame.DataFrame'>
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+

fill 填充

d1 = spark.sql("SELECT 1 AS c1, int(NULL) AS c2")
d1.show()
+---+----+
| c1|  c2|
+---+----+
|  1|null|
+---+----+
d1.na.fill(2).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

orderBy 排序

df.orderBy('age').show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  5|  Bob|
+---+-----+

persist 持久化缓存

from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)

printSchema 打印架构

以树格式打印出架构

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  5|  Bob|
+---+-----+df.printSchema()
root|-- age: long (nullable = true)|-- name: string (nullable = true)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/54317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PaddleNLP本文分类及docker部署流程

本文记录使用PaddleNLP进行文本分类的全流程 参考&#xff1a;https://github.com/PaddlePaddle/PaddleNLP/tree/develop/legacy/applications/text_classification/multi_class 文章目录 1. 数据准备2. 模型训练2.1 准备关键库2.2 模型训练&#xff06;验证2.3 模型测试2.4 结…

分布式中间件-redis相关概念介绍

文章目录 什么是redis?示意图Redis的主要特点Redis的主要用途Redis的工作原理Redis的持久化与备份 redis 6.x新增特性多线程数据加载客户端缓存新的 RESP 3 协议支持ACL&#xff08;Access Control List&#xff09;功能新增数据类型性能改进配置文件的改进其他改进 redis数据…

前端vue中如何给reactive赋值

const deviceDatareactive({}) const getDeviceDetail (list)>{ if(list.length > 0){ for(let item of list){ if(item.id param.id){ Object.assign(deviceData,item) } } } }

02 基于STM32的按键控制继电器驱动电机

本专栏所有源资料都免费获取&#xff0c;没有任何隐形消费。 注意事项&#xff1a;STM32仿真会存在各种各样BUG&#xff0c;且尽量按照同样仿真版本使用。本专栏所有的仿真都采用PROTEUS8.15。 本文已经配置好STM32F103C8T6系列&#xff0c;在PROTUES仿真里&#xff0c;32单片…

Doker学习笔记--黑马

介绍&#xff1a;快速构建、运行、管理应用的工具 在不同的服务器上部署多个应用&#xff0c;但是往往不同应用之间会有冲突&#xff0c;因为它们所依赖的环境&#xff0c;函数库&#xff0c;配置都不一样&#xff0c;此时docker在运行时形成了一个隔离环境&#xff08;容器&am…

【C++篇】C++类与对象深度解析(三):类的默认成员函数详解

文章目录 【C篇】C类与对象深度解析&#xff08;三&#xff09;前言4. 运算符重载基本概念4.1 运算符重载的基本概念4.2 重载运算符的规则4.3 成员函数重载运算符4.4 运算符重载的优先级与结合性4.5 运算符重载中的限制与特殊情况4.5.1 不能创建新的操作符4.5.2 无法重载的运算…

李宏毅机器学习2023-HW13-Network Compression

文章目录 TaskLinkBaselineSimple BaselineMedium BaselineStrong BaselineBoss BaselineFitNet Knowledge DistillationRelational Knowledge Distillation (RKD)Distance Metric (DM) Knowledge Distillation Task 通过network compression完成图片分类&#xff0c;数据集跟…

QT 带箭头的控件QPolygon

由于对当前项目需要绘制一个箭头控件&#xff0c;所以使用了QPainter和QPolygon来进行绘制&#xff0c;原理就是计算填充&#xff0c;下面贴出代码和效果图 这里简单介绍下QPolygon QPolygon是继承自 QVector<QPoint>那么可以很简单的理解为&#xff0c;他就是一个点的…

Leetcode面试经典150题-138.随机链表的复制

题目比较简单&#xff0c;重点是理解思想&#xff0c;random不管&#xff0c;copy一定要放在next 而且里面的遍历过程不能省略 解法都在代码里&#xff0c;不懂就留言或者私信 /* // Definition for a Node. class Node {int val;Node next;Node random;public Node(int val…

springboot-创建连接池

操作数据库 代码开发步骤&#xff1a; pom.xml文件配置依赖properties文件配置连接数据库信息&#xff08;连接池用的是HikariDataSource&#xff09;数据库连接池开发 configurationproperties和value注解从properties文件中取值bean方法开发 service层代码操作数据库 步骤&am…

数据分析师的得力助手:vividime Desktop让数据分析变得更简单高效

在数据驱动决策的今天&#xff0c;数据分析已成为企业不可或缺的一部分。面对海量的数据和复杂的业务需求&#xff0c;一款高效、易用的报表工具显得尤为重要。本文将深入解析为何一款优秀的报表工具对于数据分析至关重要&#xff0c;并以市场上备受好评的免费BI工具——vividi…

集成学习详细介绍

以下内容整理于&#xff1a; 斯图尔特.罗素, 人工智能.现代方法 第四版(张博雅等译)机器学习_温州大学_中国大学MOOC(慕课)XGBoost原理介绍------个人理解版_xgboost原理介绍 个人理解-CSDN博客 集成学习(ensemble)&#xff1a;选择一个由一系列假设h1, h2, …, hn构成的集合…

YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点

改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…

【网络】详解HTTP协议的CGI机制和CGI进程

目录 引言 CGI机制模型 伪代码示例 个人主页&#xff1a;东洛的克莱斯韦克-CSDN博客 引言 CGI机制是HTTP协议提供的偏底层的一套机制&#xff0c;也是非常重要的机制——它让大量的业务进程和HTPP协议解耦。而CGI进程是业务层的&#xff0c;用来处理各种数据&#xff0c;比…

OpenCV结构分析与形状描述符(24)检测两个旋转矩形之间是否相交的一个函数rotatedRectangleIntersection()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 测两个旋转矩形之间是否存在交集。 如果存在交集&#xff0c;则还返回交集区域的顶点。 下面是一些交集配置的例子。斜线图案表示交集区域&#…

孙怡带你深度学习(2)--PyTorch框架认识

文章目录 PyTorch框架认识1. Tensor张量定义与特性创建方式 2. 下载数据集下载测试展现下载内容 3. 创建DataLoader&#xff08;数据加载器&#xff09;4. 选择处理器5. 神经网络模型构建模型 6. 训练数据训练集数据测试集数据 7. 提高模型学习率 总结 PyTorch框架认识 PyTorc…

如何在安卓設備上更換IP地址?

IP地址是設備在網路中的唯一標識&#xff0c;通過IP地址&#xff0c;網路能夠識別並與設備進行通信。本文將詳細介紹在安卓設備上更換IP地址的幾種方法。 在安卓設備上更換IP地址的方法 1. 使用Wi-Fi網路更換IP地址 最簡單的方法是通過Wi-Fi網路更換IP地址。步驟如下&#x…

NVIDIA最新AI论文介绍NEST:一种用于语音处理的快速高效自监督模型

语音处理专注于开发能够分析、解释和生成人类语音的系统。这些技术涵盖了多种应用&#xff0c;例如自动语音识别&#xff08;ASR&#xff09;、说话人验证、语音转文本翻译以及说话人分离。随着对虚拟助手、转录服务和多语言交流工具的依赖不断增加&#xff0c;高效准确的语音处…

Android的内核

Android的内核是基于Linux的长期支持版本的“Android通用内核(ACK)”。 Android作为一个广泛使用的操作系统&#xff0c;其根基在于内核的设计和功能。下面将深入探讨Android内核的各个方面&#xff0c;从其基本结构到与Linux内核的关系&#xff0c;再到内核的版本管理及在设备…

Vue2电商平台项目 (三) Search模块、面包屑(页面自己跳自己)、排序、分页器!

文章目录 一、Search模块1、Search模块的api2、Vuex保存数据3、组件获取vuex数据并渲染(1)、分析请求数据的数据结构(2)、getters简化数据、渲染页面 4、Search模块根据不同的参数获取数据(1)、 派发actions的操作封装为函数(2)、设置带给服务器的参数(3)、Object.assign整理参…