PySpark大数据处理详细教程

在这里插入图片描述
欢迎各位数据爱好者!今天,我很高兴与您分享我的最新博客,专注于探索 PySpark DataFrame 的强大功能。无论您是刚入门的数据分析师,还是寻求深入了解大数据技术的专业人士,这里都有丰富的知识和实用的技巧等着您。让我们一起潜入 PySpark 的世界,解锁数据处理和分析的无限可能!

基础操作

基础操作涵盖了数据的创建、加载、查看、选择、过滤、转换、聚合、排序、合并和导出等基本操作。

1.数据创建和加载

# 读取 CSV 文件
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)# 读取 HIVE 表
hive_sql = f"select * from {DATABASE}.{TABLE_NAME} {CONDITION}"
df = spark.sql(hive_sql)# 读取 Parquet 文件
parquet_file = "path/to/parquet/file"
df = spark.read.parquet(parquet_file)

2.数据查看和检查

df.show(2,truncate=False)
df.printSchema()

3.查看分位数

quantiles = df.approxQuantile("salary", [0.25, 0.5, 0.75], 0)
# col:要计算分位数的列名,为字符串类型。
# probabilities:一个介于 0 和 1 之间的数字列表,表示要计算的分位数。例如,0.5 表示中位数。
# relativeError:相对误差。这是一个非负浮点数,用于控制计算精度。
# 值为 0 表示计算精确的分位数(可能非常耗时)。
# 随着该值的增加,计算速度会提高,但精度会降低。例如,如果 relativeError 为 0.01,则计算结果与真实分位数的差距在真实分位数的 1% 范围内。

4.数据选择和过滤

df.select("column1").show()
df.filter(df["column1"] > 100).show()# 或者
df.filter(F.col("column1") > 100).show()
5.数据转换和操作
df.withColumn("new_column", F.col("column1").cast("int"))).show()df.withColumn("new_column", df["column1"] + F.lit(100)).show()
df.withColumn("new_column", F.col("column1") + F.lit(100)).show()df.drop("column1").show()

6.数据聚合和分组

df.groupBy("column1").count().show()df.groupBy("column1")agg.(F.count(F.col("id"))).show()

7.排序和排名取TopN

df.orderBy(df["column1"].desc()).show()
df.orderBy(F.col("column1").desc()).show()

8.数据合并和连接

df1.join(df2, df1["column"] == df2["column"]).show()# 或者
from functools import reduce
from pyspark.sql import DataFrame
dataframes = [df1,df2,df3]
union_df = reduce(DataFrame.union, dataframes)

9.缺失值和异常值处理

df.na.fill({"column1": 0}).show()

10.数据转换和类型转换

df.withColumn("column_casted", df["column1"].cast("int")).show()

11.数据导出和写入

# 存储 DataFrame 为CSV
df.write.csv("path/to/output.csv")
# 存储 DataFrame 为HIVE
df.write.format("orc").mode("overwrite").saveAsTable(f"test.sample")
# 存储 DataFrame 为 Parquet 文件
output_path = "path/to/output/directory"
df.write.parquet(output_path)

高级操作

高级操作包括更复杂的数据处理技术、特征工程、文本处理和高级 SQL 查询。

1.数据分区和优化

df.repartition(10).write.parquet("path/to/output")

2.数据探索和分析

df.describe().show()
# 或者
df.summary().show())

3.复杂数据类型处理

from pyspark.sql.functions import explode
df.withColumn("exploded_col", explode(df["array_col"])).show()

4.特征工程

from pyspark.ml.feature import StringIndexer# 创建StringIndexer对象,指定输入列名为"category",输出列名为"category_index"
indexer = StringIndexer(inputCol="category", outputCol="category_index")# 使用StringIndexer对象对DataFrame进行转换,将"category"列转换为"category_index"列
df_indexed = indexer.fit(df).transform(df)

5.文本数据处理

from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df_words = tokenizer.transform(df)

6.高级 SQL 查询

df.createOrReplaceTempView("table")
spark.sql("SELECT * FROM table WHERE column1 > 100").show()

进阶操作

进阶操作涵盖了性能调优、与其他数据源的集成和数据流处理,这些通常需要更深入的理解和经验。

1.性能调优和监控

df.explain()

2.与其他数据源集成

df_jdbc = spark.read \.format("jdbc") \.option("url", "jdbc:mysql://your-db-url") \.option("dbtable", "tablename") \.option("user", "username") \.option("password", "password") \.load()

3.数据流处理

df_stream = spark.readStream \.schema(df_schema) \.option("maxFilesPerTrigger", 1) \.json("/path/to/directory/")

4.使用 Structured Streaming

stream_query = df_stream.writeStream \.outputMode("append") \.format("console") \.start()
stream_query.awaitTermination()

这些示例提供了对 PySpark 操作的广泛了解,从基础到进阶,涵盖了数据处理和分析的多个方面。对于更复杂的场景和高级功能,强烈建议查阅 PySpark 的官方文档和相关教程。


将会在后续过程中,逐步完善PySpark处理DataFrame的方法~~~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/224500.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA快捷键注释代码设置不从行开头开始

我们平时在用IDEA开发项目时会发现,快捷键注释的//总是在代码的行开头上面,如下图所示: 这样就显得代码很不美观,那如何才能使注释//贴紧代码呢?需要在IDEA中进行如下配置: 点击Apply之后就可以了&#xff…

Win11 TensorRT环境部署

一、CUDA和CUDNN安装 cuda和cudnn网上有很多安装教程,这里列举了一些,就不详细说了,具体链接如下: csdn.net - CUDA安装教程(超详细) 原创 zhihu.com - 深度学习之CUDACUDNN详细安装教程 tencent.com - C…

numpy.memmap 用法与注意事项

当处理大数组时,内存可能不够用。numpy 提供了一个函数 np.memmap() 让我们可以处理大数组。memmap memory mapped np.memmap() 可以读取大磁盘文件中的一小段到内存,所以它占内存较小 参数说明: import numpy as np from tempfile impo…

Gateway和spring-boot-starter-web的恩怨情仇

为什么取这个题目,其实与我踩到的坑有关,说起来这个坑非常神奇,这里面就涉及到Gateway和spring-boot-starter-web底层所依赖的技术不兼容的问题。 一、背景 SpringCloud 版本 ---- Finchley.SR2 SpringBoot 版本 ---- 2.0.6.RELEASE 如果同…

Bootstrap在弹框Povoper中显示图片

项目开发需要实现这个效果&#xff0c;当鼠标划过这个按钮的时候&#xff0c;会显示出指定的图片出来 HTML代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"…

高德地图+Vue中使用出现的问题

最近在做高德地图的逆向地理编码API出现了问题 按着官方的方式写代码运行时出现了问题&#xff0c;随后问了技术人员。 添加之后成功运行

“分割“安卓用户,对标iOS;鸿蒙将携手程序员的春天

近期关于“华为于明年推出不兼容安卓的鸿蒙版本”的消息传出&#xff0c;引起了业界的热议关注。自从2019年8月&#xff0c;美国制裁下&#xff0c;华为不再能够获得谷歌安卓操作系统相关付费服务&#xff0c;如此情况下&#xff0c;华为“备胎”鸿蒙操作系统一夜转正。华为鸿蒙…

IDEA新建jdk8 spring boot项目

今天新建spring boot项目发现JDK版本最低可选17。 但是目前用的最多的还是JDK8啊。 解决办法 Server URL中设置&#xff1a; https://start.aliyun.com/设置完成后&#xff0c;又可以愉快的用jdk8创建项目了。 参考 https://blog.csdn.net/imbzz/article/details/13469117…

新能源汽车生产污废水需要哪些工艺及设备

新能源汽车的快速发展带来了许多环境问题&#xff0c;其中之一就是生产过程中产生的污废水。由于新能源汽车的生产过程与传统汽车有所不同&#xff0c;因此需要采用特定的工艺和设备来处理和处理这些废水。 首先&#xff0c;新能源汽车生产过程中产生的污废水主要来自洗涤和冷却…

Certbot实现 HTTPS 免费证书(Let‘s Encrypt)自动续期

Certbot实现 HTTPS 自动续期 以前阿里云支持申请一年的免费https证书&#xff0c;那每年我们手动更新证书并没什么大问题&#xff0c;但现在阿里云的免费证书仅支持3个月&#xff0c;这意味着每三个月都要要申请一下证书显得非常麻烦。 下面我们使用Certbot实现ssl证书的自动…

后端打印不了trace等级的日志?-SpringBoot日志打印-Slf4j

在调用log变量的方法来输出日志时&#xff0c;有以上5个级别对应的方法&#xff0c;从不太重要&#xff0c;到非常重要 调用不同的方法&#xff0c;就会输出不同级别的日志。 trace&#xff1a;跟踪信息debug&#xff1a;调试信息info&#xff1a;一般信息warn&#xff1a;警告…

Java基础语法面试题

注释 什么Java注释 定义&#xff1a;用于解释说明程序的文字 分类 单行注释 格式&#xff1a; // 注释文字 多行注释 格式&#xff1a; /* 注释文字 /文档注释 格式&#xff1a;/* 注释文字 */ 作用 在程序中&#xff0c;尤其是复杂的程序中&#xff0c;适当地加入注释可…

结构体概念及应用

1.结构体类型的概念 在C语言中提供了很多基本的数据类型&#xff0c;但在实际开发中&#xff0c;无法满足程序中各种复杂数据的要求。有时需要将不同类型的数据组合成一个有机的整体&#xff0c;一边引用。例如&#xff1a; numnamesexagescore001lemonF18 90 在图中列举了…

VRRP协议详解

目录 一、基础概念 1、概念 2、VRRP的基本结构 状态机 二、VRRP主备备份工作过程 1、备份工作过程 2、VRRP的负载分担工作 三、实验 一、基础概念 1、概念 VRRP能够在不改变组网的情况下&#xff0c;将多台路由器虚拟成一个虚拟路由器&#xff0c;通过配置虚拟路由器的I…

【STM32入门】4.1中断基本知识

1.中断概览 在开展红外传感器遮挡计次的实验之前&#xff0c;有必要系统性的了解“中断”的基本知识. 中断是指&#xff1a;在主程序运行过程中&#xff0c;出现了特定的中断触发条件&#xff08;中断源&#xff09;&#xff0c;使得CPU暂停当前正在运行的程序&#xff0c;转…

Oracle MongoDB

听课的时候第一次碰到&#xff0c;可以了解一下吧&#xff0c;就直接开了墨者学院的靶场 #oracle数据库 Oracle数据库注入全方位利用 - 先知社区 这篇写的真的很好 1.判断注入点 当时找了半天没找到 看样子是找到了&#xff0c;测试一下看看 id1 and 11 时没有报错 2.判断字段…

JMeter下载与安装

文章目录 前言一、安装java环境&#xff08;JDK下载与安装&#xff09;二、JMeter下载三、JMeter安装1.解压缩2.配置环境变量 四、JMeter启动&#xff08;启动成功则代表JMeter安装成功&#xff09;五、JMeter汉化&#xff08;将JMeter修改成中文&#xff09;1.方法一&#xff…

深圳锐科达IP网络广播系统

深圳锐科达IP网络广播系统 网络音频广播系统是一种基于TCP/IP网络的纯数字音频广播系统。该网络音频广播系统在物理结构上与标准IP网络完全集成。它不仅真正实现了基于TCP/IP网络的数字音频的广播、直播和点播&#xff0c;而且利用TCP/IP网络的优势&#xff0c;突破了传统模拟广…

DeepStream--调试Gstreamer

DeepStream是基于Gstreamer开发的。有时候需要在Gstreamer加日志&#xff0c;比如想在rtpjitterbuffer里加日志。 首先&#xff0c;执行gst-inspect-1.0 rtpjitterbuffer命令。 从结果中可以看到&#xff0c;rtpjitterbuffer插件的源码是gst-plugins-good&#xff0c;版本是1…

Tomcat的结构分析和请求处理原理解析

目录 Tomcat服务器&#xff1f;Tomcat结构处理请求流程Tomcat作用其他的web服务器 Tomcat服务器&#xff1f; 我们经常开口闭口“服务器”、“服务器”的&#xff0c;其实“服务器”是个很容易引发歧义的概念 其实&#xff0c;Tomcat服务器 Web服务器 Servlet/JSP容器&#…