dataframe 如何选中某列的一行_PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)

b04ac683ce25e6aa732b49d39bea472e.png

作者:Pinar Ersoy

翻译:孙韬淳

校对:陈振东

本文约2500字,建议阅读10分钟

本文通过介绍Apache Spark在Python中的应用来讲解如何利用PySpark包执行常用函数来进行数据处理工作。

bab82f5eab15c9b3630a19a78af63e73.png

Apache Spark是一个对开发者提供完备的库和API的集群计算系统,并且支持多种语言,包括Java,Python,R和Scala。SparkSQL相当于Apache Spark的一个模块,在DataFrame API的帮助下可用来处理非结构化数据。

通过名为PySpark的Spark Python API,Python实现了处理结构化数据的Spark编程模型。

这篇文章的目标是展示如何通过PySpark运行Spark并执行常用函数。

Python编程语言要求一个安装好的IDE。最简单的方式是通过Anaconda使用Python,因其安装了足够的IDE包,并附带了其他重要的包。

1、下载Anaconda并安装PySpark

通过这个链接,你可以下载Anaconda。你可以在Windows,macOS和Linux操作系统以及64位/32位图形安装程序类型间选择。我们推荐安装Python的最新版本。

51adf68862c5437ee29b189530db5e1b.png

Anaconda的安装页面(https://www.anaconda.com/distribution/)

下载好合适的Anaconda版本后,点击它来进行安装,安装步骤在Anaconda Documentation中有详细的说明。

安装完成时,Anaconda导航主页(Navigator Homepage)会打开。因为只是使用Python,仅需点击“Notebook”模块中的“Launch”按钮。

e96035a7877371656947562d7ccc9a10.png

Anaconda导航主页

为了能在Anaconda中使用Spark,请遵循以下软件包安装步骤。

第一步:从你的电脑打开“Anaconda Prompt”终端。

第二步:在Anaconda Prompt终端中输入“conda install pyspark”并回车来安装PySpark包。

第三步:在Anaconda Prompt终端中输入“conda install pyarrow”并回车来安装PyArrow包。

当PySpark和PyArrow包安装完成后,仅需关闭终端,回到Jupyter Notebook,并在你代码的最顶部导入要求的包。

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions
import *from pyspark.sql.types
import *from datetime import date, timedelta, datetime
import time

2、初始化SparkSession

首先需要初始化一个Spark会话(SparkSession)。通过SparkSession帮助可以创建DataFrame,并以表格的形式注册。其次,可以执行SQL表格,缓存表格,可以阅读parquet/json/csv/avro数据格式的文档。

sc = SparkSession.builder.appName("PysparkExample")    
.config ("spark.sql.shuffle.partitions", "50")    
.config("spark.driver.maxResultSize","5g")    
.config ("spark.sql.execution.arrow.enabled", "true")    
.getOrCreate()

想了解SparkSession每个参数的详细解释,请访问pyspark.sql.SparkSession。

3、创建数据框架

一个DataFrame可被认为是一个每列有标题的分布式列表集合,与关系数据库的一个表格类似。在这篇文章中,处理数据集时我们将会使用在PySpark API中的DataFrame操作。

你可以从https://www.kaggle.com/cmenca/new-york-times-hardcover-fiction-best-sellers中下载Kaggle数据集。

3.1、从Spark数据源开始

DataFrame可以通过读txt,csv,json和parquet文件格式来创建。在本文的例子中,我们将使用.json格式的文件,你也可以使用如下列举的相关读取函数来寻找并读取text,csv,parquet文件格式。

#Creates a spark data frame called as raw_data.
#JSON
dataframe = sc.read.json('dataset/nyt2.json')
#TXT FILES#
dataframe_txt = sc.read.text('text_data.txt')
#CSV FILES#
dataframe_csv = sc.read.csv('csv_data.csv')
#PARQUET FILES#
dataframe_parquet = sc.read.load('parquet_data.parquet')

4、重复值

表格中的重复值可以使用dropDuplicates()函数来消除。

dataframe = sc.read.json('dataset/nyt2.json')
dataframe.show(10)
1322b650c7f145b6d6b8aab1e8ee4c28.png

使用dropDuplicates()函数后,我们可观察到重复值已从数据集中被移除。

dataframe_dropdup = dataframe.dropDuplicates() dataframe_dropdup.show(10)
1636ce01311312611ed07b854d0c127c.png

5、查询

查询操作可被用于多种目的,比如用“select”选择列中子集,用“when”添加条件,用“like”筛选列内容。接下来将举例一些最常用的操作。完整的查询操作列表请看Apache Spark文档。

5.1、“Select”操作

可以通过属性(“author”)或索引(dataframe[‘author’])来获取列。

#Show all entries in title column
dataframe.select("author").show(10)
#Show all entries in title, author, rank, price columns
dataframe.select("author", "title", "rank", "price").show(10)
da9543b451cf9897ec097b50862a2f22.png

第一个结果表格展示了“author”列的查询结果,第二个结果表格展示多列查询。

5.2、“When”操作

在第一个例子中,“title”列被选中并添加了一个“when”条件。

# Show title and assign 0 or 1 depending on title
dataframe.select("title",when(dataframe.title != 'ODD HOURS',
1).otherwise(0)).show(10)
26fee319f9a0fd1e16e2c630bd3fd157.png

展示特定条件下的10行数据

在第二个例子中,应用“isin”操作而不是“when”,它也可用于定义一些针对行的条件。

# Show rows with specified authors if in the given options
dataframe [dataframe.author.isin("John Sandford",
"Emily Giffin")].show(5)
780ef72ff155644fe56af2be07d8601d.png

5行特定条件下的结果集

5.3、“Like”操作

在“Like”函数括号中,%操作符用来筛选出所有含有单词“THE”的标题。如果我们寻求的这个条件是精确匹配的,则不应使用%算符。

# Show author and title is TRUE if title has " THE " word in titles
dataframe.select("author", "title",
dataframe.title.like("% THE %")).show(15)
950586fad59b8f83edaec428aa1abf7e.png

title列中含有单词“THE”的判断结果集

5.4、“startswith”-“endswith”

StartsWith指定从括号中特定的单词/内容的位置开始扫描。类似的,EndsWith指定了到某处单词/内容结束。两个函数都是区分大小写的。

dataframe.select("author", "title",
dataframe.title.startswith("THE")).show(5)
dataframe.select("author", "title",
dataframe.title.endswith("NT")).show(5)
cbb296e175b59f23cca62743caead2c8.png

对5行数据进行startsWith操作和endsWith操作的结果。

5.5、“substring”操作

Substring的功能是将具体索引中间的文本提取出来。在接下来的例子中,文本从索引号(1,3),(3,6)和(1,6)间被提取出来。

dataframe.select(dataframe.author.substr(1
, 3).alias("title")).show(5)
dataframe.select(dataframe.author.substr(3
, 6).alias("title")).show(5)
dataframe.select(dataframe.author.substr(1
, 6).alias("title")).show(5)
4d3fb5905538a002e11bc71ab1baa509.png

分别显示子字符串为(1,3),(3,6),(1,6)的结果

6、增加,修改和删除列

在DataFrame API中同样有数据处理函数。接下来,你可以找到增加/修改/删除列操作的例子。

6.1、增加列

# Lit() is required while we are creating columns with exact
values.
dataframe = dataframe.withColumn('new_column',
F.lit('This is a new column'))
display(dataframe)
4d27044d05825109e00b33afd1d2c229.png

在数据集结尾已添加新列

6.2、修改列

对于新版DataFrame API,withColumnRenamed()函数通过两个参数使用。

# Update column 'amazon_product_url' with 'URL'
dataframe = dataframe.withColumnRenamed('amazon_product_url', 'URL')
dataframe.show(5)
6de88065057640adabc79dc9e2123a76.png

“Amazon_Product_URL”列名修改为“URL”

6.3、删除列

列的删除可通过两种方式实现:在drop()函数中添加一个组列名,或在drop函数中指出具体的列。两个例子展示如下。

dataframe_remove = dataframe.drop("publisher",
"published_date").show(5)
dataframe_remove2=dataframe 
.drop(dataframe.publisher).drop(dataframe.published_date).show(5)
ad1de0c1341e7967ad7a713de81690cf.png

“publisher”和“published_date”列用两种不同的方法移除。

7、数据审阅

存在几种类型的函数来进行数据审阅。接下来,你可以找到一些常用函数。想了解更多则需访问Apache Spark doc。

# Returns dataframe column names and data types
dataframe.dtypes
# Displays the content of dataframe
dataframe.show()
# Return first n rows
dataframe.head()
# Returns first row
dataframe.first()
# Return first n rows
dataframe.take(5)
# Computes summary statistics
dataframe.describe().show()
# Returns columns of dataframe
dataframe.columns
# Counts the number of rows in dataframe
dataframe.count()
# Counts the number of distinct rows in dataframe
dataframe.distinct().count()
# Prints plans including physical and logical
dataframe.explain(4)

8、“GroupBy”操作

通过GroupBy()函数,将数据列根据指定函数进行聚合。

# Group by author, count the books of the authors in the groups
dataframe.groupBy("author").count().show(10)
29421d196da4901cb679826204c97a8c.png

作者被以出版书籍的数量分组

9、“Filter”操作

通过使用filter()函数,在函数内添加条件参数应用筛选。这个函数区分大小写。

# Filtering entries of title
# Only keeps records having value 'THE HOST'
dataframe.filter(dataframe["title"] == 'THE HOST').show(5)
ee3e5b11461e463646122bf9a71adc1d.png

标题列经筛选后仅存在有“THE HOST”的内容,并显示5个结果。

10、缺失和替换值

对每个数据集,经常需要在数据预处理阶段将已存在的值替换,丢弃不必要的列,并填充缺失值。pyspark.sql.DataFrameNaFunction库帮助我们在这一方面处理数据。举例如下。

# Replacing null values
dataframe.na.fill()
dataFrame.fillna()
dataFrameNaFunctions.fill()
# Returning new dataframe restricting rows with null valuesdataframe.na.drop()
dataFrame.dropna()
dataFrameNaFunctions.drop()
# Return new dataframe replacing one value with another
dataframe.na.replace(5, 15)
dataFrame.replace()
dataFrameNaFunctions.replace()

11、重分区

在RDD(弹性分布数据集)中增加或减少现有分区的级别是可行的。使用repartition(self,numPartitions)可以实现分区增加,这使得新的RDD获得相同/更高的分区数。分区缩减可以用coalesce(self, numPartitions, shuffle=False)函数进行处理,这使得新的RDD有一个减少了的分区数(它是一个确定的值)。请访问Apache Spark doc获得更多信息。

# Dataframe with 10 partitions
dataframe.repartition(10).rdd.getNumPartitions()
# Dataframe with 1 partition
dataframe.coalesce(1).rdd.getNumPartitions()

12、嵌入式运行SQL查询

原始SQL查询也可通过在我们SparkSession中的“sql”操作来使用,这种SQL查询的运行是嵌入式的,返回一个DataFrame格式的结果集。请访问Apache Spark doc获得更详细的信息。

# Registering a table
dataframe.registerTempTable("df")
sc.sql("select * from df").show(3)
sc.sql("select                
CASE WHEN description LIKE '%love%' THEN 'Love_Theme'                
WHEN description LIKE '%hate%' THEN 'Hate_Theme'                
WHEN description LIKE '%happy%' THEN 'Happiness_Theme'                
WHEN description LIKE '%anger%' THEN 'Anger_Theme'                
WHEN description LIKE '%horror%' THEN 'Horror_Theme'                
WHEN description LIKE '%death%' THEN 'Criminal_Theme'                
WHEN description LIKE '%detective%' THEN 'Mystery_Theme'                
ELSE 'Other_Themes'                END Themes        
from df").groupBy('Themes').count().show()

13、输出

13.1、数据结构

DataFrame API以RDD作为基础,把SQL查询语句转换为低层的RDD函数。通过使用.rdd操作,一个数据框架可被转换为RDD,也可以把Spark Dataframe转换为RDD和Pandas格式的字符串同样可行。

# Converting dataframe into an RDD
rdd_convert = dataframe.rdd
# Converting dataframe into a RDD of string
dataframe.toJSON().first()
# Obtaining contents of df as Pandas
dataFramedataframe.toPandas()
2a4452f2af8637daa934fe9244fcd351.png

不同数据结构的结果

13.2、写并保存在文件中

任何像数据框架一样可以加载进入我们代码的数据源类型都可以被轻易转换和保存在其他类型文件中,包括.parquet和.json。请访问Apache Spark doc寻求更多保存、加载、写函数的细节。

# Write & Save File in .parquet format
dataframe.select("author", "title", "rank", "description") 
.write 
.save("Rankings_Descriptions.parquet")
f2fa64b96dca5e49b8b097afebc58012.png

当.write.save()函数被处理时,可看到Parquet文件已创建。

# Write & Save File in .json format
dataframe.select("author", "title") 
.write 
.save("Authors_Titles.json",format="json")
9cd00c3405699bf916a90078de8dff57.png

当.write.save()函数被处理时,可看到JSON文件已创建。

13.3、停止SparkSession

Spark会话可以通过运行stop()函数被停止,如下。

# End Spark Session
sc.stop()

代码和Jupyter Notebook可以在我的GitHub上找到。

欢迎提问和评论!

参考文献:

1. http://spark.apache.org/docs/latest/

2. https://docs.anaconda.com/anaconda/

原文标题:

PySpark and SparkSQL Basics

How to implement Spark with Python Programming

原文链接:

https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

编辑:于腾凯

校对:洪舒越

译者简介

65b5d86233fd76135ba86ee4e2331af7.png

孙韬淳,首都师范大学大四在读,主修遥感科学与技术。目前专注于基本知识的掌握和提升,期望在未来有机会探索数据科学在地学应用的众多可能性。爱好之一为翻译创作,在业余时间加入到THU数据派平台的翻译志愿者小组,希望能和大家一起交流分享,共同进步。

—完—

关注清华-青岛数据科学研究院官方微信公众平台“ THU数据派 ”及姊妹号“ 数据派THU ”获取更多讲座福利及优质内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重磅官宣:Nacos2.0 发布,性能提升 10 倍

简介: 继 Nacos 1.0 发布以来,Nacos 迅速被成千上万家企业采用,并构建起强大的生态。但是随着用户深入使用,逐渐暴露一些性能问题,因此我们启动了 Nacos 2.0 的隔代产品设计,时隔半年我们终于将其全部实现&…

deepin linux 2014 硬盘安装教程,Linux Deepin的硬盘安装

于是装了,问题来了,即使执行了sudo umount -l /cdrom,也不能直接把原来的个人文件分区挂成/home,因为liveCD的内容还放在这个上面(想不通,我只是挂载,又不格式化,怎么就行了?新氧好像…

一站式云原生智能告警运维平台——SLS新版告警发布!

简介: 本文介绍什么是云原生可观测性需求以及告警限制,介绍一站式云原生智能告警运维平台——SLS新版告警。 前言 本篇是SLS新版告警系列宣传与培训的第一篇,后续我们会推出20系列直播与实战培训视频,敬请关注。 系列目录&#…

ansible 修改文件变量_基于ansible的批量配置生成

背景网络运维,我们有很多时间是在准备配置的路上,咱们之前也讲过,从脑海中或者是从自己的宝藏笔记中找出模板,一顿操作猛如虎,Ctrl C Ctrl V。这个过程是十分危险的,因为人不是机器,肯定会出错&…

Fluid — 云原生环境下的高效“数据物流系统”

简介: 为了解决大数据、AI 等数据密集型应用在云原生计算存储分离场景下,存在的数据访问延时高、联合分析难、多维管理杂等痛点问题,南京大学 PASALab、阿里巴巴、Alluxio 在 2020 年 9 月份联合发起了开源项目 Fluid。Fluid 本质上是一个云原…

普通大学生的 Java 开发能力到什么水平才能进大厂?

对于刚开始接触编程的同学来说,选择一门合适的编程语言非常重要。Java一直以来都是被广泛使用的语言,其服务端开发占比90%,83次在TIOBE排行第一,是很多程序员的首选语言。在发展前景方面,Java的就业范围很广&#xff0…

delphi7 如何判定dbgrid两行重复_教你如何在服装上加入好看的毛线刺绣花边

有时候一件旧衣服我们给它添加一些新的元素,会产生意外惊喜的效果。今天,我们就来学学如何用毛线在衣服上刺绣花边,让衣服变得更加漂亮的方法吧!在本教程中,我们使用了一些零头羊毛线,11号木针和用于刺绣的…

阿里的 RocketMQ 如何让双十一峰值之下 0 故障?

简介: 2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。 作者 | 愈安 来源 | 阿里巴巴云原生公众号 2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 Roc…

英特尔携手谷歌云加速最新虚拟机;谷歌云平台下调抽成比例;Hitachi Vantara推出全新云成本优化服务...

NEWS新闻回顾GitLab 将上市随着收入持续增长和亏损扩大,DevOps 宠儿 GitLab 终于申请了首次公开募股 (IPO)。GitLab 一直致力于成为开发人员的一站式商店,与 GitHub(2018 年被微软以 75 亿美元高价收购)和其他 DevOps 公司竞争。在…

eq linux_音乐家和音乐爱好者的开放硬件 | Linux 中国

从 3D 打印乐器到无线播放声音的设备,有很多通过开放硬件项目来奏乐的方法。-- Michael Weinberg这个世界到处都是很棒的开源音乐播放器,但为什么只是将开源用在播放音乐上呢?你还可以使用开源硬件奏乐。本文中描述的所有工具都是经过了开源硬…

“匿名句柄” 是一切皆文件背后功臣……

作者 | 奇伢 来源 | 奇伢云存储匿名 fd 的样子?我们经常在 /proc/${pid}/fd/ 下面能看到 anon_inode : 前缀的句柄,如下:rootubuntu:~/temp# ll /proc/5398/fdlr-x- 1 x x 64 Aug 4 9:9 8 -> anon_inode:inotify lrwx- 1 x x 64 Aug …

Flink集成Iceberg在同程艺龙的实践

简介: 本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践。 本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iiceberg 的生产实践。内容包括: 背景及痛点Flink Ice…

阿里巴巴开源容器镜像加速技术

简介: 近日阿里巴巴开源了其云原生容器镜像加速技术,其推出的overlaybd镜像格式,相比于传统的分层tar包文件格式,实现了基于网络的按需读取,从而使得容器可以快速启动。 近日阿里巴巴开源了其云原生容器镜像加速技术&…

Unity重写Inspector简化分组配置文件

Unity重写Inspector简化分组配置文件 重写Inspector创建分组管理配置文件创建修改参数参数对应类工程在我的资源中名为CreateConfig,免费下载 重写Inspector创建分组管理配置文件 创建 修改参数 参数对应类 using UnityEngine;public class GameConfig : Scriptab…

985大学的高材生只会写代码片段,丢人吗?

很多同学在学习编程的时候都会遇到各种各样的难题,比如:没有合适的资料、学习过于碎片化、资料的质量层次不齐、看了很多视频自己动手时却还是不会、接触不到完整项目、无法检测自己的编程水平是不是企业所认可的……最近,小郭和小解同学也遇…

快手基于RocketMQ的在线消息系统建设实践

简介: 快手需要建设一个主要面向在线业务的消息系统作为 Kafka 的补充,低延迟、高并发、高可用、高可靠的分布式消息中间件 RocketMQ 正是我们所需的。 作者:黄理 黄理,10多年软件开发和架构经验,热衷于代码和性能优…

基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台

简介: 本文将对 RocketMQ-Exporter 的设计实现做一个简单的介绍,读者可通过本文了解到 RocketMQ-Exporter 的实现过程,以及通过 RocketMQ-Exporter 来搭建自己的 RocketMQ 监控系统。RocketMQ 在线可交互教程现已登录知行动手实验室&#xff…

c语言结构体函数平面向量加法公式,插值 拟合 符号变量与符号表达式 微积分 解方程 向量运算...

7.1.1 分段线性插值所谓分段线性插值就是通过插值点用折线段连接起来逼近原曲线,这也是计算机绘制图形的基本原理。实现分段线性插值不需编制函数程序,MATLAB自身提供了内部函数interp1其主要用法如下:interp1(x,y,xi) 一维插值◆ yiinterp1(…

Redis 很屌,不懂使用规范就糟蹋了

作者 | 码哥 来源 | 码哥字节❝这可能是最中肯的 Redis 使用规范了一网友昨天和我说,公司凌晨 12 点之后,网站用户量暴增,出现了一个技术故障,用户无法下单,当时老大火冒三丈!经过查找发现 Redis 报 C…

python统计字符在文件中出现的次数_一文搞定统计字符串中某字符出现的频次

下面是统计字符串中某字符出现的次数的方法 方法1: 这个方法相当简单,零基础自学编程,代码写成这样能满足需求,但它逐个逐个计数,比较笨拙。rlt {} for i in content: if i in rlt.keys(): rlt[i] 1 else: rlt[i] 1…