Hadoop+Spark大数据技术 实验8 Spark SQL结构化

9.2 创建DataFrame对象的方式

val dfUsers = spark.read.load("/usr/local/spark/examples/src/main/resources/users.parquet")

dfUsers: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

dfUsers.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

9.2.2 json文件创建DataFrame对象

val dfGrade = spark.read.format("json").load("file:/media/sf_download/grade.json")

dfGrade: org.apache.spark.sql.DataFrame = [Class: string, ID: string ... 3 more fields]

dfGrade.show(3)

+-----+---+----+-----+-----+
|Class| ID|Name|Scala|Spark|
+-----+---+----+-----+-----+
|    1|106|Ding|   92|   91|
|    2|242| Yan|   96|   90|
|    1|107|Feng|   84|   91|
+-----+---+----+-----+-----+
only showing top 3 rows

9.2.3 RDD创建DataFrame对象

val list = List(

("zhangsan" , "19") , ("B" , "29") , ("C" , "9")

)

val df = sc.parallelize(list).toDF("name","age")

list: List[(String, String)] = List((zhangsan,19), (B,29), (C,9))
df: org.apache.spark.sql.DataFrame = [name: string, age: string]

df.printSchema()

root|-- name: string (nullable = true)|-- age: string (nullable = true)

df.show()

+--------+---+
|    name|age|
+--------+---+
|zhangsan| 19|
|       B| 29|
|       C|  9|
+--------+---+

9.2.4 SparkSession创建DataFrame对象

// 1.json创建Datarame对象

val dfGrade = spark.read.format("json").load("file:/media/sf_download/grade.json")

dfGrade.show(3)

+-----+---+----+-----+-----+
|Class| ID|Name|Scala|Spark|
+-----+---+----+-----+-----+
|    1|106|Ding|   92|   91|
|    2|242| Yan|   96|   90|
|    1|107|Feng|   84|   91|
+-----+---+----+-----+-----+
only showing top 3 rows
dfGrade: org.apache.spark.sql.DataFrame = [Class: string, ID: string ... 3 more fields]

Selection deleted

// 2.csv创建Datarame对象

val dfGrade2 = spark.read.option("header",true).csv("file:/media/sf_download/grade.json")

dfGrade2: org.apache.spark.sql.DataFrame = [{"ID":"106": string, "Name":"Ding": string ... 3 more fields]

dfGrade2.show(3)

+-----------+-------------+-----------+----------+-----------+
|{"ID":"106"|"Name":"Ding"|"Class":"1"|"Scala":92|"Spark":91}|
+-----------+-------------+-----------+----------+-----------+
|{"ID":"242"| "Name":"Yan"|"Class":"2"|"Scala":96|"Spark":90}|
|{"ID":"107"|"Name":"Feng"|"Class":"1"|"Scala":84|"Spark":91}|
|{"ID":"230"|"Name":"Wang"|"Class":"2"|"Scala":87|"Spark":91}|
+-----------+-------------+-----------+----------+-----------+
only showing top 3 rows

Selection deleted

// 3.Parquet创建Datarame对象

val dfGrade3 = spark.read.parquet("file:/usr/local/spark/examples/src/main/resources/users.parquet")

dfGrade3: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

dfGrade3.show(3)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

9.2.5 Seq创建DataFrame对象

val dfGrade4 = spark.createDataFrame(

Seq(

("A" , 20 ,98),

("B" , 19 ,93),

("C" , 21 ,92),

)

)toDF("Name" , "Age" , "Score")

dfGrade4: org.apache.spark.sql.DataFrame = [Name: string, Age: int ... 1 more field]

dfGrade4.show()

+----+---+-----+
|Name|Age|Score|
+----+---+-----+
|   A| 20|   98|
|   B| 19|   93|
|   C| 21|   92|
+----+---+-----+

9.3 DataFrame对象保存为不同格式

9.3.1 write.()保存DataFrame对象

DataFrame.write() 提供了一种方便的方式将 DataFrame 保存为各种格式。以下是几种常见格式的保存方法:

1. 保存为JSON格式

df.write.json("path/to/file.json")

2. 保存为Parquet文件

df.write.parquet("path/to/file.parquet")

3. 保存为CSV文件

df.write.csv("path/to/file.csv")

9.3.2 write.format()保存DataFrame对象

DataFrame.write.format() 提供了一种更灵活的方式来保存 DataFrame,可以通过指定格式名称来选择输出格式。以下是几种常见格式的保存方法:

1. 保存为JSON格式

df.write.format("json").save("path/to/file.json")

2. 保存为Parquet文件

df.write.format("parquet").save("path/to/file.parquet")

3. 保存为CSV文件

df.write.format("csv").save("path/to/file.csv")

9.3.3 先将DataFrame对象转化为RDD再保存文件

虽然可以直接使用 DataFrame 的 write 方法保存文件,但有时需要先将 DataFrame 转换为 RDD 再进行保存。这可能是因为需要对数据进行一些 RDD 特定的操作,或者需要使用 RDD 的保存方法。

rdd = df.rdd.map(lambda row: ",".join(str(x) for x in row))
rdd.saveAsTextFile("path/to/file.txt")

注意:

  • 上述代码将 DataFrame 的每一行转换为逗号分隔的字符串,并将结果保存为文本文件。
  • 可以根据需要修改代码以使用不同的分隔符或保存为其他格式。

9.4 DataFrame对象常用操作

9.4.1 展示数据

1. show()

// 显示前20行数据

gradedf.show()

 

// 显示前10行数据

gradedf.show(10)

 

// 不截断列宽显示数据

gradedf.show(truncate=False)

 

2. collect()

// 将 DataFrame 转换成 Dataset 或 RDD,返回 Array 对象

gradedf.collect()

 

3. collectAsList()

// 将 DataFrame 转换成 Dataset 或 RDD,返回 Java List 对象

gradedf.collectAsList()

 

4. printSchema()

// 打印 DataFrame 的模式(schema)

gradedf.printSchema()

 

5. count()

// 统计 DataFrame 中的行数

gradedf.count()

 

6. first()、head()、take()、takeAsList()

// 返回第一行数据

gradedf.first()

 

// 返回前3行数据

gradedf.head(3)

 

// 返回前5行数据

gradedf.take(5)

 

// 返回前5行数据,以 Java List 形式

gradedf.takeAsList(5)

 

7. distinct()

// 返回 DataFrame 中唯一的行数据

gradedf.distinct.show()

 

8. dropDuplicates()

// 删除 DataFrame 中重复的行数据

gradedf.dropDuplicates(Seq("Spark")).show()

9.4.2 筛选

1. where()
   - 根据条件过滤 DataFrame 中的行数据。
   - 示例: gradedf.where("Class = '1' and Spark = '91'").show()

2. filter()
   - 与 where() 功能相同,根据条件过滤 DataFrame 中的行数据。
   - 示例: gradedf.filter("Class = '1'").show()

3. select()
   - 选择 DataFrame 中的指定列。
   - 示例: gradedf.select("Name", "Class","Scala").show(3,false)

修改名称:gradedf.select(gradedf("Name").as("name")).show()

4. selectExpr()
   - 允许使用 SQL 表达式选择列。
   - 示例: gradedf.selectExpr("name", "name as names" ,"upper(Name)","Scala * 10").show(3)

5. col()
   - 获取 DataFrame 中指定列的引用。
   - 示例: gradedf.col("name")

6. apply()
   - 对 DataFrame 中的每一行应用函数。
   - 示例: def get_grade_level(grade): return "A" if grade > 90 else "B" 
   gradedf.select("name", "grade", "grade_level").apply(get_grade_level, "grade_level")

7. drop()
   - 从 DataFrame 中删除指定的列。
   - 示例: gradedf.drop("grade")

8. limit()
   - 限制返回的行数。
   - 示例: gradedf.limit(10)

9.4.3 排序

按ID排序

1. orderBy()、sort()

orderBy() 和 sort() 方法都可以用于对 DataFrame 进行排序,它们的功能相同。

// 按id升序排序

gradedf.orderBy("id").show()

gradedf.sort(gradedf("Class").desc,gradedf("Scala").asc).show(3)

// 按id降序排序

gradedf.orderBy(desc("id")).show()

gradedf.sort(desc("id")).show()

2. sortWithinPartitions()

示例:gradedf.sortWithinPartitions("id").show(5)

引申:sortWithinPartitions() 方法用于对 DataFrame 的每个分区内进行排序。

// 首先对 DataFrame 进行重新分区,使其包含两个分区

val partitionedDF = gradedf.repartition(2)

// 对每个分区内的 id 进行升序排序

partitionedDF.sortWithinPartitions("id").show()

需要注意的是,sortWithinPartitions() 方法不会改变 DataFrame 的分区数量,它只是对每个分区内部进行排序。

9.4.4 汇总与聚合

1. groupBy()

groupBy() 方法用于根据指定的列对 DataFrame 进行分组。

(1) 结合 count()

// 统计每个名字的学生人数
gradedf.groupBy("name").count().show()

(2) 结合 max()

// 找出每个课程学生的最高成绩
gradedf.groupBy("Class").max("Scala","Spark").show()

(3) 结合 min()

// 找出每个名字学生的最低成绩
gradedf.groupBy("name").min("grade").show()

(4) 结合 sum()

// 计算每个名字学生的总成绩
gradedf.groupBy("name").sum("grade").show()gradedf.groupBy("Class").sum("Scala","Spark").show()

(5) 结合 mean()

// 计算课程的平均成绩gradedf.groupBy("Class").sum("Scala","Spark").show()

2. agg()

agg() 方法允许对 DataFrame 应用多个聚合函数。

(1) 结合 countDistinct()

// 统计不重复的名字数量
gradedf.agg(countDistinct("name")).show()

(2) 结合 avg()

gradedf.agg(max("Spark"), avg("Scala")).show()

// 计算所有学生的平均成绩
gradedf.agg(avg("grade")).show()

(3) 结合 count()

// 统计学生总数
gradedf.agg(count("*")).show()

(4) 结合 first()

// 获取第一个学生的姓名
gradedf.agg(first("name")).show()

(5) 结合 last()

// 获取最后一个学生的姓名
gradedf.agg(last("name")).show()

(6) 结合 max()、min()

// 获取最高和最低成绩
gradedf.agg(max("grade"), min("grade")).show()

(7) 结合 mean()

// 计算所有学生的平均成绩
gradedf.agg(mean("grade")).show()

(8) 结合 sum()

// 计算所有学生的总成绩
gradedf.agg(sum("grade")).show()

(9) 结合 var_pop()、variance()

// 计算成绩的总体方差
gradedf.agg(var_pop("grade"), variance("grade")).show()

(10) 结合 covar_pop()

// 计算 id 和 grade 之间的总体协方差
gradedf.agg(covar_pop("id", "grade")).show()

(11) 结合 corr()

gradedf.agg(corr("Spark","Scala")).show()

9.4.5 统计

771468c1170e42e88ed99fd7cc2fc4d2.png

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/843061.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

李飞飞亲自撰文:大模型不存在主观感觉能力,多少亿参数都不行

近日,李飞飞连同斯坦福大学以人为本人工智能研究所 HAI 联合主任 John Etchemendy 教授联合撰写了一篇文章,文章对 AI 到底有没有感觉能力(sentient)进行了深入探讨。 「空间智能是人工智能拼图中的关键一环。」知名「AI 教母」李…

【数据结构】P1 数据结构是什么、算法怎样度量

1.1 基本概念与术语 数据: 数据是信息的载体,是所有能被计算机识别以及处理的符号。数据元素: 数据元素是数据基本单位,由若干 数据项 组成,数据项是构成数据元素最小的单位。 e . g . e.g. e.g. 数据元素如一条学生记…

Leetcode 2028

思路&#xff1a;1-6之间的的n个数组合起来要变成sum_t mean*(rolls.size()n) - sum(rolls) ; 那么可以先假设每个数都是sum_t / n 其中这个数必须要在1 - 6 之间否者无法分配。 然后可以得出n * (sum_t / n ) < sum ; 需要对余数mod进行调整&#xff0c;为了减少调整的次…

接口测试及接口测试常用的工具详解

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 首先&#xff0c;什么是接口呢&#xff1f; 接口一般来说有两种&#xff0c;一种是程序内部的接口&#xff0c;一种是系统对外的接口。 系统对外的接口&#xff1a;比如你要从别的网站或服务器上获取资源或信息…

深入了解 CSS 预处理器 Sass

今天我们来深入探讨一下 CSS 预处理器 Sass。我们将学习什么是 Sass,如何使用它,以及它是如何工作的。 什么是 Sass? Sass 是 syntactically awesome style sheets 的缩写,是一种 CSS 预处理器。它是 CSS 的扩展,为基础 CSS 增加了更多的功能和优雅。普通的 CSS 代码很容…

sklearn监督学习--k近邻算法

sklearn监督学习 一、分类与回归二、泛化、过拟合与欠拟合三、k近邻算法四、分析KNeighborsClassifier五、k近邻算法用于回归优点、缺点和参数 一、分类与回归 监督学习是最常用也是最成功的机器学习类型之一。监督机器学习问题主要有两种&#xff0c;分别叫做分类与回归。分类…

IDEA项目通过 tomcat运行报错: 404 请求的资源不可用

SpringBootVue项目 IDEA运行 Tomcat&#xff0c;自动打开网页报错 HTTP状态 404 - 未找到 类型 状态报告 消息 请求的资源[/WarehouseManagerApi_war/]不可用 描述 源服务器未能找到目标资源的表示或者是不愿公开一个已经存在的资源表示。 Apache Tomcat/9.0.89 但是运行访问 …

【接口测试_04课_Jsonpath断言、接口关联及加密处理】

一、Jasonpath的应用 JsonPath工具网站&#xff1a;JSONPath解析器 - 一个工具箱 - 好用的在线工具都在这里&#xff01; 1、JSONPath的手写与获取 手写JSONPath 1、 $ &#xff08;英文美元符号&#xff09;代表外层的{} . &#xff08;英文句号&#xff09;表示当前…

卷积神经网络-奥特曼识别

数据集 四种奥特曼图片_数据集-飞桨AI Studio星河社区 (baidu.com) 中间的隐藏层 已经使用参数的空间 Conv2D卷积层 ReLU激活层 MaxPool2D最大池化层 AdaptiveAvgPool2D自适应的平均池化 Linear全链接层 Dropout放置过拟合&#xff0c;随机丢弃神经元 -----------------…

echarts快速入门

之前只看过&#xff0c;没写过&#xff0c;来了个新需求了解下echarts功能 官网&#xff1a;https://echarts.apache.org/handbook/zh/get-started 官方参数文档&#xff1a;https://echarts.apache.org/zh/option.html#title 其实大部分问题&#xff0c;去官方参数文档里面都…

【项目教程】FFmpeg+SDL2实现视频播放器

一、前言 学习ffmpeg和sdl&#xff0c;并编写一个视频播放器&#xff0c;是一个很好的音视频开发项目。 虽然关于视频播放器的原理已经有很多人在博客中进行了讲解&#xff0c;但是很多人不提供视频和代码&#xff0c;这也是我写这篇博客的主要原因。 二、在视频播放器中&am…

【组合数学 放球问题 虚拟点 小于等于转小于】1621. 大小为 K 的不重叠线段的数目

本文涉及知识点 放球问题 组合数学汇总 本题难道分&#xff1a;2198 LeetCode1621. 大小为 K 的不重叠线段的数目 给你一维空间的 n 个点&#xff0c;其中第 i 个点&#xff08;编号从 0 到 n-1&#xff09;位于 x i 处&#xff0c;请你找到 恰好 k 个不重叠 线段且每个线段…

零拷贝(Zero Copy)

目录 零拷贝&#xff08;Zero Copy&#xff09; 1.什么是Zero Copy? 2.物理内存和虚拟内存 3.内核空间和用户空间 4.Linux的I/O读写方式 4.1 I/O中断原理 4.2 DMA传输原理 5.传统I/O方式 5.1传统读操作 5.2传统写操作 6.零拷贝 6.1.用户态直接IO 6.2.mmapwrite …

免费使用知网下载文献

第一步&#xff1a;输入网址&#xff1a;https://digi.library.hb.cn:8443/#/&#xff08;或搜索湖北省图书馆&#xff09; 第二步&#xff1a;点击登录按钮。 第三步&#xff1a;使用手机 支付宝 扫描页面左侧二维码。 第四步&#xff1a;手机点击“电子读者证注册”。&…

抖音 v27.8.0 内置增强模块,自动播放、无水印下载(可登录,助手增强版)

介绍 抖音应用作为全球领先的短视频平台&#xff0c;其内置功能允许用户将喜欢的内容保存至本地设备&#xff0c;但默认情况下&#xff0c;这些视频会带有抖音的水印。为了解决这一限制&#xff0c;该版本使用户能够直接保存不带水印的视频到手机中&#xff0c;无需使用任何第…

R25 型双极型晶体管 433功率放大器,集电极电流可达100mA

R25 型硅基微波双极型晶体管是一种常见的晶体管&#xff0c;主要用于高频电子放大线路中。常被用作放大器、开关、变频器等电子电路中的核心元件。在放大电路中&#xff0c;它可以将微弱的信号放大到足以驱动输出负载&#xff1b;在开关电路中&#xff0c;它可以实现电路的打开…

易查分小程序 学生成绩管理小程序

亲爱的老师们&#xff0c;是不是每次成绩公布后&#xff0c;家长们的连环夺命call让你头大&#xff1f;担心孩子们的成绩信息安全&#xff0c;又想快速分享给家长&#xff0c;这可咋整&#xff1f;别急&#xff0c;易查分小程序来帮忙啦&#xff01; 安全有保障 智能验证码&a…

基于tcp实现自定义应用层协议

认识协议 协议&#xff08;Protocol&#xff09; 是一种通信规则或标准&#xff0c;用于定义通信双方或多方之间如何交互和传输数据。在计算机网络和通信系统中&#xff0c;协议规定了通信实体之间信息交换的格式、顺序、定时以及有关同步等事宜的约定。简易来说协议就是通信…

【Linux】Linux下centos更换国内yum源

&#x1f331;博客主页&#xff1a;青竹雾色间 &#x1f331;系列专栏&#xff1a;Linux &#x1f618;博客制作不易欢迎各位&#x1f44d;点赞⭐收藏➕关注 目录 1. 备份旧的 YUM 源文件2. 下载国内的 YUM 源文件阿里云&#xff1a;网易&#xff1a; 3. 清理 YUM 缓存4. 更新…

scp问题:Permission denied, please try again.

我把scp归纳三种情况&#xff1a; 源端root——》目标端root 源端root——》目标端mysql&#xff08;任意&#xff09;用户 源端&#xff08;任意用户&#xff09;——》目标端root用户 在scp传输文件的时候需要指导目标端的用户密码&#xff0c;如root用户密码、mysql用户…