Spark:DataFrame介绍及使用

1. DataFrame详解

DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。

  • Row 类型 表示一行数据
    • DataFrame就算是多行构成
# 导入行类Row
from pyspark.sql import Row# 创建行数据
r1 = Row(1, '张三', 20)# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
  • schema表信息
    • 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型
from pyspark.sql.types import *# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值  默认是True,允许为空
schema_type = StructType().\add('id',IntegerType()).\add('name',StringType()).\add('age',IntegerType(),False)

2. DataFrame创建

创建datafram数据需要使用一个sparksession的类创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。

2.1 基本创建

#DataFrame 的基本创建
#Row就是行数据定义的类
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *#行数据创建
r1 = Row(1,"刘向阳",23,'男')
print(r1)#行数据下标取值
print(r1[0])
print(r1[1])#创建行数据时可以指定字段名
r2 = Row(id=2,name='李四',age=20,gender='女')
print(r2)
#使用字段名取值
print(r2['name'])# 定义元数据
schema = (StructType().add('id', IntegerType()).add('username', StringType()).add('age', IntegerType()).add('gender', StringType()))
print(schema)# 将元数据和行数据放在一起合成DataFrame
ss = SparkSession.builder.getOrCreate()# 调用创建df的方法
df = ss.createDataFrame([r1,r2],schema=schema)# 查看df中数据
df.show()#查看元数据信息
df.printSchema()

运行结果:
在这里插入图片描述

2.2 RDD和DF之间的转化

  • rdd的二维数据转化为DataFrame
    • rdd.toDF()
      在这里插入图片描述
# rdd 和 dataframe的转化
from pyspark.sql import SparkSession#创建SparkSession对象
ss = SparkSession.builder.getOrCreate()#基于ss对象获取sparkContext
sc = ss.sparkContext#创建rdd , 要使用二维列表指定每行数据
rdd = sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])#将rdd转为df
df = rdd.toDF(schema='id int,name string,age int,gender string')#df数据查看
df.show()
df.printSchema()#df可以转rdd
res = df.rdd.collect()
print(res)rdd2 = df.rdd.map(lambda x:x['name'])res2 = rdd2.collect()
print(res2)

运行结果:
在这里插入图片描述

2.3 pandas和spark之间转化

  • spark的df转为pandas的df
    • toPandas
#pandas 和 spark的dataframe转化
from pyspark.sql import SparkSession
import pandas as pdss = SparkSession.builder.getOrCreate()#创建pandas的df
df_pd = pd.DataFrame({'id':[1,2,3,4],'name':['张三','李四','王五','赵六'],'age':[1,2,3,4],'gender':['男','女','女','女']}
)
#查看数据
print(df_pd)#取值
name = df_pd['name'][0]
print(name)
# 将pandas中的df转为spark的df
df_spark = ss.createDataFrame(df_pd)#查看
df_spark.show()#取值
row = df_spark.limit(1).first()
print(row['name'])#将spark的df重新转为pandas的df
df_pandas = df_spark.toPandas()
print(df_pandas)

运行结果:
在这里插入图片描述

2.4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
#读取文件转为df
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取不同文件数据转为df
# txt文件
df = ss.read.text('hdfs://node1:8020/data/students.txt')
df.show()# json 文件
df_json = ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')
df_json.show()#orc文件
df_orc = ss.read.orc('hdfs://node1:8020/data/users.orc')
df_orc.show()#去取csv文件
#header或csv文件中的第一行作为表头字段数据
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv')
df_csv.show()

3. DataFrame基本使用

3.1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

#使用sql操作dataframe结构化数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',')#使用sql操作df数据
#将df指定一个临时表名
df_csv.createTempView('stu')#编写sql字符串语句,支持hivesql语法
sql_str ="""
select * from stu 
"""#执行sql语句,执行结果返回一个新的df
df_res = ss.sql(sql_str)
df_csv.show()
df_res.show()

3.2 DSL方法

DSL方法是df提供的数据操作函数
使用方式:

  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df

#使用DSL方法操作dataframe
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1/data/students.csv', header=True,sep=',')#使用DSL方法对df数据进行操作
df2 = df_csv.select('id','name')#查看结果
df2.show()#第二种指定字段的方式
df3 = df_csv.select(df_csv.age,df_csv.gender)#给字段起别名
df4 = df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)
df4.show()#修改字段类型
df_csv.printSchema()
df5 = df_csv.select(df_csv.age.cast('int'),df_csv.gender)
df5.printSchema()#where 的数据过滤
age = 20
df6 = df_csv.where(f'age > {age}')
df6.show()#过滤年龄大于20并且性别为女性的学生信息
df7 = df_csv.where(f'age > 20 and gender = "女" ')
df7.show()#使用第二种字段判断方式
df8 = df_csv.where(df_csv.age == age)
df8.show()#分组聚合计算
df9 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')
df9.show()#分组后过滤where 聚合计算时只能一次计算一个聚合数据
df10 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')
df10.show()#排序
df11 = df_csv.orderBy('age')  #默认排序
df11.show()df12 = df_csv.orderBy('age',ascending=False)  #降序
df12.show()#分页
df13 = df_csv.limit(5)
df13.show()#转为rdd
res = df_csv.rdd.collect()[5:10]
print(res)
df_new = ss.createDataFrame(res)
df_new.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/56253.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++笔记之原子操作

C++笔记之原子操作 code review! 文章目录 C++笔记之原子操作1.初始化2.赋值3.取值4.赋给另一个原子类型5.`exchange`6.`compare_exchange_weak` 和 `compare_exchange_strong`使用场景7.注意事项在 C++ 中,原子类型提供了对共享变量的无锁操作,确保多线程环境下的安全。以下…

AI的风终于吹到到了短剧,也把财富的风吹到了家门口!

近年来,AI技术在短剧领域的创新应用,给整个行业带来了全新的变革。以快手平台为例,一部以**《山海经》为背景的短剧“李行舟”在今年7月13日上线后引发热议。** 这部短剧讲述了少年李行舟在大海中与古代神灵和各种异兽搏斗的故事。与传统影视…

A0001.主机访问虚拟机中的共享文件完事教程

1. 先在虚拟机中创建一个共享文件夹 2. 在虚拟机的windows系统中查看ip地址 3. 检查网络是否连通 4. 访问虚拟机 5. 登录帐号密码

【JavaEE】——Udp翻译器的实现(回显服务器)

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 一:引入 1:基本概念 二:UDP socket API使用 1:socke…

正点原子讲解SPI学习,驱动编程NOR FLASH实战

配置SPI传输速度时,需要先失能SPI,__HAL_SPI_DISABLE,然后操作SPI_CR1中的波特率设置位,再使能SPI, NM25Q128驱动步骤 myspi.c #include "./BSP/MYSPI/myspi.h"SPI_HandleTypeDef g_spi1_handler; /* SPI句柄 */void spi1_init(void) {g_spi…

使用Hugging Face中的BERT进行标题分类

使用Hugging Face中的BERT进行标题分类 前言相关介绍出处基本原理优点缺点 前提条件实验环境BERT进行标题分类准备数据集读取数据集划分数据集设置相关参数创建自己DataSet对象计算准确率定义预训练模型定义优化器训练模型保存模型测试模型 参考文献 前言 由于本人水平有限&…

动态规划-简单多状态dp问题——面试题17.16.按摩师

多状态问题的核心就是每个位置不止有一个状态,因此需要多个dp表表示不同状态对应位置的值,然后根据题目考虑特定情况写出状态转移方程即可 1.题目解析 题目来源:面试题17.16.按摩师——力扣 测试用例 2.算法原理 1.状态表示 这里与路径问…

【CSS in Depth 2 精译_047】7.2 CSS 响应式设计中的媒体查询原则(上):深入理解媒体查询的类型

当前内容所在位置(可进入专栏查看其他译好的章节内容) 【第七章 响应式设计】(概述) 7.1 移动端优先设计原则(上篇) 7.1.1 创建移动端菜单(下篇)7.1.2 给视口添加 meta 标签&#xf…

MATLAB - 机器人机械臂设计轨迹规划器

系列文章目录 前言 本示例介绍了一种设计抓取和轨迹规划器的方法,该规划器可用于垃圾箱拣选系统。 在机器人技术中,垃圾箱拣选包括使用机械手从垃圾箱中取出物品。智能垃圾箱拣选是这一过程的高级版本,具有更强的自主性。使用摄像系统感知部件,规划器生成与场景相适应的无碰…

NASA:ARCTAS 区域的二级 FIRSTLOOK 气溶胶产品子集。 它包含气溶胶光学深度和粒子类型,以及相关的大气数据

目录 简介 信息 代码 引用 网址推荐 知识星球 机器学习 MISR L2 FIRSTLOOK Aerosol Product subset for the ARCTAS region V001 简介 这是 ARCTAS 区域的二级 FIRSTLOOK 气溶胶产品子集。 它包含气溶胶光学深度和粒子类型,以及相关的大气数据,…

关于摩托车一键启动无钥匙进入、智能科技创新

摩托车一键启动无钥匙进入功能 一、工作原理 摩托车的一键启动无钥匙进入功能采用了世界最先进的RFID无线射频技术和最先进的车辆身份编码识别系统,率先应用小型化、小功率射频天线的开发方案,并成功融合了遥控系统和无钥匙系统,沿用了传统…

在 MTT GPU 上使用 llama.cpp 推理

大语言模型因其出色的自然语言理解和生成能力而迅速被广泛使用,llama.cpp 大幅降低了进行大语言模型推理的门槛,MTT GPU 同样也是 llama.cpp 支持的运行平台,能够充分利用硬件的性能来助力用户的大语言模型应用。 本文主要介绍了如何在摩尔线…

出处不详 取数游戏

目录 取数游戏题目描述背景输入输出数据范围 题解解法优化 打赏 取数游戏 题目描述 背景 两人将 n n n个正整数围成一个圆环,规则如下: 第一名玩家随意选取数字;第二名玩家从与第一名玩家相邻的两个数字中选择一个;而后依次在…

用Arduino单片机制作一个简单的音乐播放器

Arduino单片机上有多个数字IO针脚,可以输出数字信号,用于驱动发声器件,从而让它发出想要的声音。蜂鸣器是一种常见的发声器件,通电后可以发出声音。因此,单片机可以通过数字输出控制蜂鸣器发出指定的声音。另外&#x…

【尚硅谷】FreeRTOS学笔记(更新中更新时间2024.10.12)

在网上看到的一段很形象的描述,放在这里给大家娱乐一下。 裸机开发:n个人拉屎,先进去一个拉完,下一个再来。看门狗:如果有人拉完屎还占着,茅坑刷视频,把他拖出去中断系统:n个人拉屎&…

Python | Leetcode Python题解之第477题汉明距离总和

题目: 题解: class Solution:def totalHammingDistance(self, nums: List[int]) -> int:n len(nums)ans 0for i in range(30):c sum(((val >> i) & 1) for val in nums)ans c * (n - c)return ans

数通--3

一、动态路由 内部 路由器之间要互联互通,必须遵循相同的协议 企业内部用 IGP,企业之间用BGP RIP(已淘汰,不考) 距离就是长短,矢量就是方向,即路由的出接口 一台路由器 A 配好RIP,…

C++面试速通宝典——25

473. HTTP如何减少重定向请求 重定向请求: ‌‌‌‌  服务器上的一个资源可能由于迁移、维护等原因从url1移至url2后,而客户端不知情,他还是继续请求url1,这时服务器不能粗暴地返回错误,而是通过302响应码和Locati…

鸿蒙--商品列表

这里主要利用的是 List 组件 相关概念 Scroll:可滚动的容器组件,当子组件的布局尺寸超过父组件的视口时,内容可以滚动。List:列表包

Appium Device Farm安装教程

环境要求:Appium version ≥ 2.4.X 安装appium npm install -g appium2.11.3 如果安装提示如下问题 npm error code EEXIST npm error syscall rename npm error path /Users/wan/.npm/_cacache/tmp/d5787519 npm error dest /Users/wan/.npm/_cacache/content-…