Spark SQL大数据分析快速上手-DataFrame应用体验

【图书介绍】《Spark SQL大数据分析快速上手》-CSDN博客

《Spark SQL大数据分析快速上手》【摘要 书评 试读】- 京东图书

大数据与数据分析_夏天又到了的博客-CSDN博客

本节主要介绍如何使用DataFrame进行编程。

4.1.1  SparkSession

在旧版本中,Spark SQL提供两种SQL查询起始点:一个叫作SQLContext,用于Spark自己提供的SQL查询;一个叫作HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合。因此,在SQLContext和HiveContext上可用的API,在SparkSession上同样可以使用。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

当我们使用Spark Shell的时候,Spark会自动创建一个叫作spark的SparkSession,就像以前可以自动获取一个sc来表示SparkContext一样,如图4-1所示。

图4-1  自动创建SparkSession

4.1.2  DataFrame应用

Spark SQL的DataFrame API允许我们使用DataFrame而不必去注册临时表或者生成SQL表达式。DataFrame API既有转换操作,也有行动操作;DataSet API则提供了更加函数式的API。

1. 创建DataFrame

有了SparkSession之后,可以通过以下3种方式来创建DataFrame:

  • 通过Spark的数据源来创建。
  • 通过已知的RDD来创建。
  • 通过查询一个Hive表来创建。

Spark支持的数据源如图4-2所示。

图4-2  Spark支持的数据源

通过Spark数据源创建DataFrame的代码如下:

// 读取 JSON 文件
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]// 展示结果
scala> df.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

其中,employees.json文件内容如下:

{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
2. DataFrame语法风格

1)SQL语法风格

SQL语法风格是指我们查询数据的时候可以使用SQL语句。这种SQL语句风格的查询必须有临时视图或者全局视图来辅助。

创建视图的数据来源于people.json,其内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

创建临时视图的代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/ src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.createOrReplaceTempView("people")scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建全局视图的代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.createGlobalTempView("people")scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> res31.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2)DSL语法风格

DataFrame提供一个特定领域语言(domain-specific language,DSL)去管理结构化的数据。可以在Scala、Java、Python和R中使用DSL。使用DSL语法风格就不必创建临时视图了。

(1)查看schema信息,示例代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

(2)使用DSL查询,示例代码如下:

只查询name列数据:

scala> df.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|

查询name和age列数据:

scala> df.select("name", "age").show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

查询name和age + 1的数据:

scala> df.select($"name", $"age" + 1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

查询age大于20的数据:

scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

按照age分组,查看数据条数:

scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
3. RDD和DataFrame的交互

1)从RDD到DataFrame

涉及RDD、DataFrame、DataSet之间的操作时,需要进行导入,即import spark.implicits._。这里的spark不是包名,而是表示SparkSession的那个对象,所以必须先创建SparkSession对象再导入;implicits是一个内部对象。

首先创建一个RDD:

scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24

然后进行转换,转换有3种方法:手动转换、通过样例类反射转换和通过API的方式转换。

(1)手动转换。

示例代码如下:

scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26// 转换为DataFrame的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

(2)通过样例类反射转换。

首先创建样例类:

scala> case class People(name :String, age: Int)
defined class People

然后使用样例把 RDD 转换成DataFrame:

scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28scala> rdd2.toDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

(3)通过API的方式转换。

通过API方式转换不能在spark命令行下进行,需要编写完整的Scala程序代码,示例代码  如下:

代码4-1  DataFrameDemo.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object DataFrameDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Word Count").getOrCreate()val sc: SparkContext = spark.sparkContextval rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))// 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))// 创建 StructType 类型val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))val df: DataFrame = spark.createDataFrame(rowRdd, types)df.show}
}

2)从DataFrame到RDD

直接调用DataFrame的rdd方法就能完成转换。示例代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58534.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSM中maven

一&#xff1a;maven的分模块开发 maven分模块就是在多人操作一个项目时将maven模块导入依赖&#xff0c;注意仓库里面没有资源坐标&#xff0c;需要使用install操作下载。 二&#xff1a;maven的依赖管理 pom文件中直接写的依赖叫做直接依赖&#xff0c;直接依赖中用到的依…

25中海油笔试测评春招秋招校招暑期实习社招笔试入职测评行测题型微测网题型分享

中海油笔试一般采用线上机考的形式。考试时间为 120 分钟&#xff0c;满分 100 分。笔试内容主要包括思想素质测评和通用能力测评两个科目。以下是具体介绍&#xff1a; 1. 思想素质测评&#xff1a; ✅价值观&#xff1a;考察考生对工作、职业、企业等方面的价值观念和态度&…

【笔记】变压器-热损耗-频响曲线推导 - 04 额定功率处损耗特性

0.最大的问题 - 散热 对变压器这类功率器件&#xff0c;最大的问题是散热的效率。因为传统的电路基板热导率并不高&#xff0c;几乎和良性导热材料有近乎两个数量级的导热差异&#xff0c;所以&#xff0c;会采用特殊的导热技术&#xff0c;把热量尽可能快地传导到散热片。 传…

定高虚拟列表:让大数据渲染变得轻松

定高虚拟列表 基本认识 在数据如潮水般涌来的今天&#xff0c;如何高效地展示和管理这些数据成为了开发者们面临的一大挑战&#xff0c;传统的列表渲染方式在处理大量数据时&#xff0c;往往会导致页面卡顿、滚动不流畅等问题&#xff0c;严重影响用户体验&#xff08;在页面…

我的博客网站为什么又回归Blazor了

引言 在博客网站的开发征程中&#xff0c;站长可谓是一路披荆斩棘。从最初的构思到实践&#xff0c;先后涉足了多种开发技术&#xff0c;包括 MVC、Razor Pages、Vue、Go、Blazor 等。在这漫长的过程中&#xff0c;网站版本更迭近 10 次&#xff0c;每一个版本都凝聚着站长的心…

Uniapp安装Pinia并持久化(Vue3)

安装pinia 在uni-app的Vue3版本中&#xff0c;Pinia已被内置&#xff0c;无需额外安装即可直接使用&#xff08;Vue2版本则内置了Vuex&#xff09;。 HBuilder X项目&#xff1a;直接使用&#xff0c;无需安装。CLI项目&#xff1a;需手动安装&#xff0c;执行yarn add pinia…

<网络> 协议

目录 文章目录 一、认识协议 1. 协议概念 2. 结构化数据传输 3. 序列化和反序列化 二、网络计算器 1. 封装socket类 2. 协议定制 request类的序列化和反序列化 response类的序列化和反序列化 报头的添加与去除 Json序列化工具 Jsoncpp 的主要特点: Jsoncpp 的使用方法: 3. Ser…

群控系统服务端开发模式-应用开发-文件上传功能开发

一、文件上传路由 在根目录下route文件夹中app.php文件中&#xff0c;添加文件上传功能路由&#xff0c;代码如下&#xff1a; Route::post(upload/file,common.Upload/file);// 上传文件接口 二、功能代码开发 在根目录下app文件夹下common文件夹中创建上传控制器并命名为Up…

pycharm小游戏贪吃蛇及pygame模块学习()

由于代码量大&#xff0c;会逐渐发布 一.pycharm学习 在PyCharm中使用Pygame插入音乐和图片时&#xff0c;有以下这些注意事项&#xff1a; 插入音乐&#xff1a; - 文件格式支持&#xff1a;Pygame常用的音乐格式如MP3、OGG等&#xff0c;但MP3可能需额外安装库&#xf…

检索增强和知识冲突学习笔记

检索增强生成任务&#xff08;Retrieval-Augmented Generation, RAG&#xff09;是一种自然语言处理技术&#xff0c;它结合了信息检索和生成模型&#xff0c;用于生成高质量的文本输出。具体来说&#xff0c;RAG 模型在生成文本时&#xff0c;会先通过检索模块从外部知识库或文…

从0开始深度学习(25)——多输入多输出通道

之前我们都只研究了一个通道的情况&#xff08;二值图、灰度图&#xff09;&#xff0c;但实际情况中很多是彩色图像&#xff0c;即有标准的RGB三通道图片&#xff0c;本节将更深入地研究具有多输入和多输出通道的卷积核。 1 多输入通道 当输入包含多个通道时&#xff0c;需要…

网管平台(进阶篇):如何正确的管理网络设备?

网络设备作为构建计算机网络的重要基石&#xff0c;扮演着数据传输、连接和管理的关键角色。从交换机、路由器到防火墙、网关&#xff0c;各类网络设备共同协作&#xff0c;形成了高效、稳定的网络系统。本文将详细介绍网络设备的种类&#xff0c;并探讨如何正确管理这些设备&a…

论文 | Teaching Algorithmic Reasoning via In-context Learning

这篇论文《通过上下文学习教授算法推理》探讨了如何通过上下文学习&#xff08;In-context Learning, ICL&#xff09;有效训练大型语言模型&#xff08;LLMs&#xff09;以进行算法推理。以下是从多个角度对这项工作的详细解读&#xff1a; 1. 问题陈述与研究动机 算法推理的…

RK3568平台(基础篇)性能分析工具

一.Linux 性能优化工具简介 Linux 系统性能指标无非就是这几个方面,CPU、内存、磁盘 I/O、文件系统、网络等相关指标。不同的性能指标都有对应的具体命令工具进行查看与监控,接下来我们将介绍一些常见的 Linux 系统性能指标及其对应的命令工具(通过命令工具找出 Linux 系统性…

2024阿里云CTF Web writeup

《Java代码审计》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484219&idx1&sn73564e316a4c9794019f15dd6b3ba9f6&chksmc0e47a67f793f371e9f6a4fbc06e7929cb1480b7320fae34c32563307df3a28aca49d1a4addd&scene21#wechat_redirect 前言 又是周末…

Bartender 5 for Mac 菜单栏管理软件 安装教程【保姆级教程,操作简单小白轻松上手使用】

Mac分享吧 文章目录 Bartender 5 for Mac 菜单栏管理软件 安装完成&#xff0c;软件打开效果一、Bartender 5 菜单栏管理软件 Mac电脑版——v5.2.3⚠️注意事项&#xff1a;1️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件3️⃣&#xff1a;打开软件&#xff0c;根据自己…

职场逆袭!学会管理上司,你也能成为职场赢家

书友们&#xff0c;不要错过了&#xff01;我挖到了一本真正让我彻夜难眠的小说&#xff0c;情节跌宕起伏&#xff0c;角色鲜活得就像从书里跳出来陪你聊天。每一页都是新的惊喜&#xff0c;绝对让你欲罢不能。要是你也在寻找那种让人上瘾的阅读体验&#xff0c;这本书就是你的…

Actor-Critic方法【A2C,A3C,Policy Gradient】

强化学习笔记系列目录 第一章 强化学习基本概念 第二章 贝尔曼方程 第三章 贝尔曼最优方程 第四章 值迭代和策略迭代 第五章 强化学习实例分析:GridWorld 第六章 蒙特卡洛方法 第七章 Robbins-Monro算法 第八章 多臂老虎机 第九章 强化学习实例分析:CartPole 第十章 时序差分法…

若依管理系统使用已有 Nacos 部署流程整理

背景 玩了一下开源项目 RuoYi 管理系统Cloud 版&#xff0c;卡住的地方是&#xff1a;它用到了 nacos 配置管理&#xff0c;如果用的 nacos 环境是单机且是内置数据库的话&#xff0c;该怎么配置呢&#xff1f; 本文整理本机启动 RuoYi Cloud 应用本地部署的过程&#xff0c;…

数字信号处理-FPGA插入不同误码率的模拟源

module data_error_injector (input clk, // 时钟信号&#xff0c;50MHzinput reset, // 复位信号&#xff0c;高有效input DIN_EN, // 数据输入使能&#xff0c;高有效input [7:0] ERROR_LEVEL, // 错误等级…