摸鱼大数据——Spark SQL——DataFrame详解一

1.DataFrame基本介绍

 DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息​表结构描述信息(元数据Schema): StructType对象字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空行: Row对象列: Column对象,包含字段名称和字段值​在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

如何构建表结构信息数据:

2.DataFrame的构建方式

 方式1: 使用SparkSession的createDataFrame(data,schema)函数创建data参数1.基于List列表数据进行创建2.基于RDD弹性分布式数据集进行创建3.基于pandas的DataFrame数据进行创建schema参数1: 字符串格式一 :“字段名1 字段类型,字段名2 字段类型”格式二(推荐):“字段名1:字段类型,字段名2:字段类型”2: List格式: ["字段名1","字段名2"]  3: DataType(推荐,用的最多)格式一:schema=StructType().add('字段名1',字段类型).add('字段名2',字段类型)格式二:schema=StructType([StructField('字段名1',类型),StructField('字段名1',类型)])方式2: 使用DataFrame的toDF(colNames)函数创建DataFrame的toDF方法是一个在Apache Spark的DataFrame API中用来创建一个新的DataFrame的方法。这个方法可以将一个RDD转换为DataFrame,或者将一个已存在的DataFrame转换为另一个DataFrame。在Python中,你可以使用toDF方法来指定列的名字。如果你不指定列的名字,那么默认的列的名字会是_1, _2等等。 格式: rdd.toDF([列名])​​方式3: 使用SparkSession的read()函数创建在 Spark 中,SparkSession 的 read 是用于读取数据的入口点之一,它提供了各种方法来读取不同格式的数据并将其加载到 Spark 中进行处理。统一API格式: spark.read.format('text|csv|json|parquet|orc|...')  : 读取外部文件的方式.option('k','v')   : 选项  可以设置相关的参数 (可选).schema(StructType | String)  :  设置表的结构信息.load('加载数据路径')  : 读取外部文件的路径, 支持 HDFS 也支持本地简写API格式:注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写格式: spark.read.文件读取方式()​注意: parquet:是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多, 他俩都是列存储格式​

2.1 createDataFrame()创建

场景:一般用在开发和测试中。因为只能处理少量的数据

2.1.1 基于列表
 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()​# 2.创建DF对象data = [(1, '张三', 18), (2, '李四', 28), (3, '王五', 38)]df1 = spark.createDataFrame(data,schema=['id','name','age'])# 展示数据df1.show()# 查看结构信息df1.printSchema()​print('---------------------------------------------------------')df2 = spark.createDataFrame(data,schema='id int,name string,age int')# 展示数据df2.show()# 查看结构信息df2.printSchema()​print('---------------------------------------------------------')df3 = spark.createDataFrame(data,schema='id:int,name:string,age:int')# 展示数据df3.show()​# 查看结构信息df3.printSchema()​# 3.关闭资源spark.stop()
2.1.2 基于RDD普通方式

场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

Schema选择StructType对象来定义DataFrame的“表结构”转换RDD

 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField​os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()sc = spark.sparkContext​​# 2.读取生成rddtextRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')print(type(textRDD)) # <class 'pyspark.rdd.RDD'>etlRDD = textRDD.map(lambda line:line.split(',')).map(lambda l:(l[0],l[1]))# 3.定义schema结构信息schema1 = StructType().add('name',StringType(),True).add('age',StringType(),True)schema2 = StructType([StructField('name',StringType(),True),StructField('age',StringType(),True)])schema3 = ['name','age']schema4 = 'name string,age string'schema5 = 'name:string,age:string'# 4.创建DF对象dfpeople = spark.createDataFrame(etlRDD,schema5)# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView('peoples')r = spark.sql('select * from peoples')r.show()​​# 7.关闭资源sc.stop()spark.stop()​
2.1.3 基于RDD反射方式

Schema使用反射方法来推断Schema模式Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。

 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Row​os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()sc = spark.sparkContext​​# 2.读取生成rdd# 3.定义schema结构信息textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')etlRDD_schema = textRDD.map(lambda line:line.split(',')).map(lambda l:Row(name=l[0],age=l[1]))​# 4.创建DF对象dfpeople = spark.createDataFrame(etlRDD_schema)​# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView('peoples')r = spark.sql('select * from peoples')r.show()​​​# 7.关闭资源sc.stop()spark.stop()
2.2 toDF()创建

schema模式编码在字符串中,toDF参数用于指定列的名字。如果你不指定列的名字,那么默认的列的名字会是_1, _2等等。

 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Row​os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()sc = spark.sparkContext​​# 2.读取生成rdd# 3.定义schema结构信息textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')etlRDD = textRDD.map(lambda line:line.split(','))​# 4.创建DF对象dfpeople = etlRDD.toDF(['name','age'])​# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView('peoples')r = spark.sql('select * from peoples')r.show()​# 7.关闭资源sc.stop()spark.stop()

2.3 read读取外部文件

复杂API

 统一API格式: spark.read.format('text|csv|json|parquet|orc|avro|jdbc|.....') # 读取外部文件的方式.option('k','v') # 选项  可以设置相关的参数 (可选).schema(StructType | String) #  设置表的结构信息.load('加载数据路径') # 读取外部文件的路径, 支持 HDFS 也支持本地

简写API

 请注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写格式: spark.read.读取方式()例如: df = spark.read.csv(path='file:///export/data/_03_spark_sql/data/stu.txt',header=True,sep=' ',inferSchema=True,encoding='utf-8',)
2.3.1 Text方式读取
 text方式读取文件:1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理2- 默认生成的列名叫value,数据类型string3- 只能够在schema中修改字段value的名称,其他任何内容不能修改
 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Row​os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()​# 2.读取数据# 注意: 读取text文件默认只有1列,且列名交value,可以通过schema修改df = spark.read\.format('text')\.schema('info string')\.load('file:///export/data/spark_project/spark_sql/data/data1.txt')​​# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView('peoples')r = spark.sql('select * from peoples')r.show()​​# 6.关闭资源spark.stop()

2.3.2 CSV方式读取
 csv格式读取外部文件:1- 复杂API和简写API都必须掌握2- 相关参数作用说明:2.1- path:指定读取的文件路径。支持HDFS和本地文件路径2.2- schema:手动指定元数据信息2.3- sep:指定字段间的分隔符2.4- encoding:指定文件的编码方式2.5- header:指定文件中的第一行是否是字段名称2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确
 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Row​os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()​# 2.读取数据# 注意: csv文件可以识别多个列,可以使用schema指定列名,类型# 原始方式# df = spark.read\#     .format('csv')\#     .schema('name string,age int')\#     .option('sep',',')\#     .option('encoding','utf8')\#     .option('header',False)\#     .load('file:///export/data/spark_project/spark_sql/data/data1.txt')# 简化方式df = spark.read.csv(schema='name string,age int',sep=',',encoding='utf8',header=False,path='file:///export/data/spark_project/spark_sql/data/data1.txt')​# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView('peoples')r = spark.sql('select * from peoples')r.show()​​# 7.关闭资源spark.stop()​

2.3.3 JSON方式读取
 json读取数据:1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔

json的数据内容

 {'id': 1,'name': '张三','age': 20}{'id': 2,'name': '李四','age': 23,'address': '北京'}{'id': 3,'name': '王五','age': 25}{'id': 4,'name': '赵六','age': 29}

代码实现

 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Row​os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()​# 2.读取数据# 注意: json的key和schema指定的字段名不一致,会用null补充,如果没有数据也是用null补充# 简化方式df = spark.read.json(schema='id int,name string,age int,address string',encoding='utf8',path='file:///export/data/spark_project/spark_sql/data/data2.txt')​# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView('peoples')r = spark.sql('select * from peoples')r.show()​# 关闭资源spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/42236.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rejetto HFS 服务器存在严重漏洞受到攻击

AhnLab 报告称 &#xff0c;黑客正在针对旧版本的 Rejetto HTTP 文件服务器 (HFS) 注入恶意软件和加密货币挖矿程序。 然而&#xff0c;由于存在错误&#xff0c; Rejetto 警告用户不要使用 2.3 至 2.4 版本。 2.3m 版本在个人、小型团队、教育机构和测试网络文件共享的开发…

力扣 第 134 场双周赛 解题报告 | 珂学家

前言 题解 T1/T3是环形的处理技巧&#xff0c;这边可以double数组(更准确地讲&#xff0c;添加一个合适的小尾巴). T4是典题&#xff0c;前不久周赛刚考过&#xff0c;是一道结论题&#xff0c;也可以借助数据结构处理。 T1. 交替组 I 和T3一起讲 T2. 与敌人战斗后的最大分数…

生产调度:flowshop问题数学建模

接上一篇文章&#xff0c;在了解生产调度问题的背景和基本概念之后&#xff0c;我想先从比较基础的 flowshop和 jobshop 数学模型入手&#xff0c;理解实际调度过程中的问题求解思路。这一篇文章主要面向 flowshop 问题进行数学建模&#xff0c;对于这类比较经典的问题&#xf…

2007-2022年 国内各上市公司绿色化转型数据.(Excel文件、dta文件、参考文献、计算方法与说明)

上市公司绿色化转型数据为研究者提供了评估企业在生态文明建设、循环经济和绿色管理方面表现的重要视角。以下是对中国各上市公司绿色化转型数据的介绍&#xff1a; 数据简介 定义&#xff1a;上市公司绿色化转型是指企业在发展模式上向可持续发展转变&#xff0c;实现资源节…

成人高考报名条件及收费标准详解

成人高考报名条件及收费标准详解 您想通过成人高考改变自己的命运&#xff0c;但不知道报名条件和收费标准&#xff1f;本文将为您详细介绍成人高考报名条件和收费标准&#xff0c;并为您提供专业的成人教育服务。 深圳成人高考www.shenzhixun.com 成人高考报名条件 成人高考…

CH11_JS的多重循环

第11章&#xff1a;Javascript的多重循环 本章目标 掌握二重循环的使用 掌握二重循环的控制语句的使用 课程回顾 循环控制有那几种方式 讲解内容 1. 回顾练习 需求说明 某次程序大赛&#xff0c;AI2101班有4名学员参加&#xff0c;学员的成绩由用户输入&#xff0c;计算…

那你真的了解方法调用吗?

方法调用是不是很熟悉&#xff1f;那你真的了解它吗&#xff1f;今天就让我们来盘一下它。 首先大家要明确一个概念&#xff0c;此处的方法调用并不是方法中的代码被执行&#xff0c;而是要确定被调用方法的版本&#xff0c;即最终会调用哪一个方法。 之前我们了解到&#xff…

Android C++系列:Linux Socket编程(三)CS模型示例

1. TCP通信 下图是基于TCP协议的客户端/服务器程序的一般流程: 服务器调用socket()、bind()、listen()完成初始化后,调用accept()阻塞等待,处于 监听端口的状态,客户端调用socket()初始化后,调用connect()发出SYN段并阻塞等待服 务器应答,服务器应答一个SYN-ACK段,客户…

Blazor SPA 的本质是什么以及服务器端渲染如何与 Blazor 的新 Web 应用程序配合使用

Blazor 通常被称为单页应用程序 (SPA) 框架。当我第一次开始使用 Blazor 时&#xff0c;我对 SPA 的含义、组件如何为 SPA 架构做出贡献以及所有这些如何与交互性联系在一起感到困惑。 今天&#xff0c;我将解答大家可能关心的三个问题&#xff1a; 什么是 SPA&#xff1f;了…

【高中数学/基本不等式】当x是正实数时,求函数f(x)=x/(1+x^2)的最大值?

【问题】 当x是正实数时&#xff0c;求函数f(x)x/(1x^2)的最大值&#xff1f; 【解答】 解&#xff1a; f(x)x/(1x^2)1/(x1/x))<1/2倍根号下(x*1/x)1/2 所以函数在[0,∞)的区域最大值为0.5 【函数图像】 f(x)x/(1x^2)是奇函数&#xff0c;没有断点&#xff0c;是可以…

文心一言最常用的20条指令及指令说明,含增强指令

下面是20条文心一言的指令及其说明&#xff0c;每条指令尽量简洁明了&#xff0c;以便在有限的字数内提供尽可能多的信息。以下是这些指令及其说明&#xff1a; 1. 查询天气 指令&#xff1a;今天北京的天气怎么样&#xff1f;说明&#xff1a;此指令用于查询特定城市&#xf…

HarmonyOS Next系列之Echarts图表组件(折线图、柱状图、饼图等)实现(八)

系列文章目录 HarmonyOS Next 系列之省市区弹窗选择器实现&#xff08;一&#xff09; HarmonyOS Next 系列之验证码输入组件实现&#xff08;二&#xff09; HarmonyOS Next 系列之底部标签栏TabBar实现&#xff08;三&#xff09; HarmonyOS Next 系列之HTTP请求封装和Token…

Redis高级篇之最佳实践

Redis高级篇之最佳实践 今日内容 Redis键值设计批处理优化服务端优化集群最佳实践 1、Redis键值设计 1.1、优雅的key结构 Redis的Key虽然可以自定义&#xff0c;但最好遵循下面的几个最佳实践约定&#xff1a; 遵循基本格式&#xff1a;[业务名称]:[数据名]:[id]长度不超过…

PCIe 规范核心知识线介绍

0&#xff0c;总体Topology x86 处理器系统中 PCIe的拓扑结构&#xff1a; PCIe Switch的总体结构 1&#xff0c;PCIe 枚举 BIOS 负责枚举与分派配置设备的 BusID[7:0] : DeviceID[4:0] : FunctionID[2:0]; cpu先识别 Host-PCI-Bridge&#xff0c;其下是Bus0&#xff1b; 在…

树莓派学习笔记18:IIC驱动_PCA9685(16路舵机驱动模块)误发

今日继续学习树莓派4B 4G:(Raspberry Pi,简称RPi或RasPi) 本人所用树莓派4B 装载的系统与版本如下: 版本可用命令 (lsb_release -a) 查询: ​ Python 版本3.7.3: ​ IIC驱动_PCA9685(16路舵机驱动模块) 文章提供测试代码讲解,整体代码贴出、测试效果图 目录 开启树莓…

什么是 HTTP POST 请求?初学者指南与示范

在现代网络开发领域&#xff0c;理解并应用 HTTP 请求 方法是基本的要求&#xff0c;其中 "POST" 方法扮演着关键角色。 理解 POST 方法 POST 方法属于 HTTP 协议的一部分&#xff0c;主旨在于向服务器发送数据以执行资源的创建或更新。它与 GET 方法区分开来&…

尚硅谷 一 JS简介

一 JS简介 1.1 JS起源 Javascript是一种由Netscape(网景)的LiveScript发展而来的原型化继承的面向对象的动态类型的区分大小写的客户端脚本语言&#xff0c;主要目的是为了解决服务器端语言&#xff0c;遗留的速度问题&#xff0c;为客户提供更流畅的浏览效果。当时服务端需要…

vue3+vue-router+vite 实现动态路由

文章中出现的代码是演示版本&#xff0c;仅供参考&#xff0c;实际的业务需求会更加复杂 什么是动态路由 什么场景会用到动态路由 举一个最常见的例子&#xff0c;比如说我们要开发一个后台管理系统&#xff0c;一般来说后台管理系统都会分角色登录&#xff0c;这个时候也就涉…

第4章 课程发布:模块需求分析,课程预览(模板引擎 静态页面),课程审核,课程发布(分布式事务,页面静态化:熔断降级),课程搜索(es索引)

1 模块需求分析 1.1 模块介绍 课程信息编辑完毕即可发布课程&#xff0c;发布课程相当于一个确认操作&#xff0c;课程发布后学习者在网站可以搜索到课程&#xff0c;然后查看课程的详细信息&#xff0c;进一步选课、支付、在线学习。 下边是课程编辑与发布的整体流程&#…

一.2.(1)双极型晶体三极管的结构、工作原理、特性曲线及主要参数

1.双极型晶体三极管的结构 学会区分P管和N管&#xff0c;会绘制符号 2.工作原理 无论是PNP 还是NPN&#xff0c;本质上放大时&#xff0c;都是发射结正偏&#xff0c;集电极反偏。&#xff08;可以简单理解为pn为二极管&#xff0c;每个三极管都有两个二极管&#xff09; 其中电…