7.spark sql编程

概述

spark 版本为 3.2.4,注意 RDDDataFrame 的代码出现的问题及解决方案

本文目标如下:

  • RDD ,Datasets,DataFrames 之间的区别
  • 入门
    • SparkSession
    • 创建 DataFrames
    • DataFrame 操作
    • 编程方式运行 sql 查询
    • 创建 Datasets
    • DataFramesRDDs 互相转换
      • 使用反射推断模式
      • 编程指定 Schema

参考 Spark 官网

相关文章链接如下

文章链接
spark standalone环境安装地址
Spark的工作与架构原理地址
使用spark开发第一个程序WordCount程序及多方式运行代码地址
RDD编程指南地址
RDD持久化地址

RDD ,Datasets,DataFrames 之间的区别

Datasets , DataFrames和 RDD

Dataset 是一个分布式的数据集合,DatasetSpark 1.6 中添加的一个新接口,它增益了 RDD (强类型,可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建,然后使用函数转换(map、flatMap、filter等)进行操作。数据集API有Scala和Java版本。Python不支持数据集API。

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的DataFrame APIScalaJavaPythonR中可用。在Scala API中,DataFrame只是Dataset[Row]的一个类型别名。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

DataFrame=RDD+SchemaRDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

入门

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互,包括SQLDataset API。计算结果时,使用相同的执行引擎,与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作

people.json

people.json文件准备
在这里插入图片描述

SparkSession

Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

创建 DataFrames

使用 SparkSession,通过存在的RDDhive 表,或其它的Spark data sources 程序创建 DataFrames

val df = spark.read.json("/tmp/people.json")
df.show()

执行如下图
在这里插入图片描述

DataFrame 操作

使用数据集进行结构化数据处理的基本示例如下

// 需要引入 spark.implicits._ 才可使用 $
// This import is needed to use the $-notation
import spark.implicits._
// 打印schema 以树格式
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)// 仅显示 name 列
// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 显示所有,age 加1
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+// 过滤 人的 age 大于 21
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// 按 age 分组统计
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

spark-shell 执行如下图
在这里插入图片描述
在这里插入图片描述

编程方式运行 sql 查询

df.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

执行如下:

scala> df.createOrReplaceTempView("people")scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建 Datasets

Datasets类似于RDD,不是使用Java序列化或Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化为对象。

case class Person(name: String, age: Long)// 为 case classes 创建编码器
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()// 为能用类型创建编码器,并提供 spark.implicits._ 引入 
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// 通过定义类,将按照名称映射,DataFrames 能被转成 Dataset 
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "/tmp/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

执行如下:

scala> case class Person(name: String, age: Long)
defined class Personscala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(2, 3, 4)scala> val path = "/tmp/people.json"
path: String = /tmp/people.jsonscala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]scala> peopleDS.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DataFrames 与 RDDs 互相转换

Spark SQL支持两种不同的方法将现有RDD转换为Datasets

  • 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,当知道 schema 结构的时间,会有更好的效果。
  • 第二种方法是通过编程接口,构造 schema,然后将其应用于现有的RDD。虽然此方法更详细,直至运行时,才能知道他们的字段和类型,用于构造 Datasets

使用反射推断模式

代码如下:

object RddToDataFrameByReflect {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("RddToDataFrameByReflect").master("local").getOrCreate()// 用于从RDD到DataFrames的隐式转换// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager => "Name: " + teenager(0)).show()// or by field nameteenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()}case class Person(name: String, age: Long)
}

执行如下图:
在这里插入图片描述

编码问题

关于 Spark 官网 上复杂类型编码问题,直接加下面一句代码

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))

报以下图片错误
在这里插入图片描述
将原有代码改变如下:

 // 没有为 Dataset[Map[K,V]] 预先定义编码器,需要自己定义// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// 也可以如下操作// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))// Array(Map("name" -> "Justin", "age" -> 19))

在这里插入图片描述
通过这一波操作,就可以理解什么情况下,需要编码器,以及编码器的作用

编程指定 Schema

代码如下:

object RddToDataFrameByProgram {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").getOrCreate()import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._// 加上此解决报错问题import spark.implicits._// Create an RDDval peopleRDD = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Generate the schema based on the string of schemaval fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesval results = spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes => "Name: " + attributes(0)).show()}
}

执行如下图
在这里插入图片描述

官方文档的代码不全问题

Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
results.map(attributes => "Name: " + attributes(0)).show()

在这里插入图片描述
加下以下代码

// 加上此解决报错问题
import spark.implicits._

如下图解决
在这里插入图片描述

结束

spark sql 至此结束,如有问题,欢迎评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

创建基于多任务的并发服务器

有几个请求服务的客户端&#xff0c;我们就创建几个子进程。 这个过程有以下三个阶段&#xff1a; 这里父进程传递的套接字文件描述符&#xff0c;实际上不需要传递&#xff0c;因为子进程会复制父进程拥有的所有资源。 #include <stdio.h> #include <stdlib.h>…

如何再kali中下载iwebsec靶场

这个靶场有三种搭建方法&#xff1a; 第一种是在线靶场&#xff1a;http://www.iwebsec.com:81/ 第二种是虚拟机版本的&#xff0c;直接下载到本地搭建 官网地址下载&#xff1a;http://www.iwebsec.com/ 而第三种就是利用docker搭建这个靶场&#xff0c;我这里是用kali进行…

window10 定时任务

window10 定时任务 1、背景2、目标3、思路4、实操4.1、设置定时任务4.2、配置策略4.3、验证 1、背景 项目上由于业务调试需要&#xff0c;开具了一台window10系统&#xff0c;此台window10为项目组公共使用&#xff0c;为防止误操作分配了不通的账号&#xff0c;日常使用各自账…

学习视频剪辑:巧妙运用中画、底画,制作画中画,提升视频效果

随着数字媒体的普及&#xff0c;视频剪辑已经成为一项重要的技能。在视频剪辑过程中&#xff0c;制作画中画可以显著提升视频效果、信息传达和吸引力。本文讲解云炫AI智剪如何巧妙运用中画、底画批量制作画中画来提升视频剪辑水平&#xff0c;提高剪辑效率。 操作1、先执行云…

​Z时代时尚SUV新宠:起亚赛图斯值不值得年轻人买?

在当今汽车行业中&#xff0c;随着消费者偏好的多样化和年轻化&#xff0c;汽车制造商们正面临着前所未有的挑战与机遇。在2023年上海车展上&#xff0c;起亚汽车公司正式发布了全新紧凑级SUV——赛图斯。这款车型不仅标志着起亚对年轻消费市场的深入洞察&#xff0c;也展现了公…

Springboot+vue的导师双选管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的导师双选管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的导师双选管理系统&#xff0c;采用M&#xff08;model&a…

软件测试需不需要懂代码?

无论是刚入测试行业的萌新&#xff0c;还是已经在测试行业闯荡了两三年的小司机们&#xff0c;都会琢磨一个问题&#xff1a;如果要持续发展下去&#xff0c;我要不要懂代码&#xff1f; 在软件测试初级阶段&#xff0c;不需要编程能力。但是任何一个职业&#xff0c;都会追求…

微服务之Nacos注册管理

文章目录 一、Nacos安装步骤1.安装地址2.安装版本3.目录说明4.端口配置5.启动 二、Nacos服务注册1.Nacos依赖2.客户端修改配置文件3.启动效果图4.总结 三、Nacos服务集群属性1.服务跨集群调用问题2.服务集群属性3.总结 四、Nacos根据集群负载均衡1.修改配置文件2.设置集群服务类…

一题都看不懂,大厂的面试是真的变态......

最近我的一个读者朋友去了字节面试&#xff0c;来给我发信息吐槽&#xff0c;说字节的面试太困难了&#xff0c;像他这种三年经验的测试员&#xff0c;在技术面&#xff0c;居然一题都答不上来&#xff0c;这要多高的水平才能有资格去面试字节的测试岗位。 确实&#xff0c;字…

浅析应急疏散照明设计在高层建筑中的应用

【摘要】作为工程设计人员&#xff0c;对高层建筑的应急照明设计应有足够的认识和重视&#xff0c;以保证在出现失火事件时&#xff0c;可以有效地引导建筑内的人员安全逃离、正确疏散&#xff0c;这是建筑设计的*大价值所在。在设计应急照明时&#xff0c;应根据当地的情况选择…

【操作系统】2009年408真题第 46 题

文章目录 题目描述1&#xff09;依次访问上述三个虚地址&#xff0c;各需多少时间&#xff1f;给出计算过程2&#xff09;基于上述访问序列&#xff0c;虚地址1565H的物理地址是多少&#xff1f;请说明理由 原题 & 官方题解 题目描述 46&#xff08;8分&#xff09;请求分…

LiveMeida视频接入网关

一、产品简介 视频接入网关主要部署在视频存储节点或视频汇聚节点&#xff0c;面向不同用户&#xff0c;主要用于对接不同厂家、不同型号的摄像机设备&#xff0c;获取摄像机视频后&#xff0c;以统一标准的视频格式和传输协议&#xff0c;将视频推送至上层联网/应用平台。可…

CS5523设计资料|替代LT9711方案|MIPI转LVDS方案|CS5523规格书

ASL CS5523是MIPI DSI输入、DP/e DP输出转换芯片。MIPI DSI最多支持4个通道&#xff0c;每个通道的最大运行速度为1.5Gps。对于DP 1.2输出&#xff0c;它由4个数据通道组成&#xff0c;支持1.62Gbps和2.7Gbps的链路速率。支持1.62Gbps和2.7Gbps的链路速率。它支持2560的最高分辨…

前端 | (十四)canvas基本用法 | 尚硅谷前端HTML5教程(html5入门经典)

文章目录 &#x1f4da;canvas基本用法&#x1f407;什么是canvas(画布)&#x1f407;替换内容&#x1f407;canvas标签的两个属性&#x1f407;渲染上下文 &#x1f4da;绘制矩形&#x1f407;绘制矩形&#x1f407;strokeRect时&#xff0c;边框像素渲染问题&#x1f407;添加…

公开IP属地信息如何保护用户的隐私?

公开IP属地信息通常涉及与用户或组织的隐私有关&#xff0c;因此在公开此类信息时需要非常小心&#xff0c;以避免侵犯他人的隐私权。以下是触碰底线的几种情况以及如何保护网络安全和用户隐私&#xff1a; 个人隐私保护&#xff1a; 公开IP属地信息可能泄露用户的物理位置&…

最新版一媒体7.3、星媒体、皮皮剪辑,视频MD ,安卓手机剪辑去重神器+搬运脚本+去视频重软件工具

最新版一媒体app安卓版介绍&#xff1a; 这是一款功能强大的视频搬运工具&#xff0c;内置海量视频编辑工具&#xff0c;支持一键智能化处理、混剪、搬运、还能快速解析和去水印等等&#xff0c;超多实用功能等着您来体验&#xff01; 老牌手机剪辑去重神器&#xff0c;用过的…

uniapp 编译到模拟器(mumu)

一开始我是用逍遥模拟器&#xff0c;但这个玩意突然不好使了&#xff0c;一直加载卡在这页面 1、下载 官网下载&#xff1a;mumu模拟器12 2、打开mumu多开器&#xff0c;在右上角adb查看端口号 3、打开mumu模拟器 4、打开HBuiderX 工具—设置—运行配置 5、配置电脑的系统…

了解web3,什么是web3

Web3是指下一代互联网&#xff0c;它基于区块链技术&#xff0c;将各种在线活动更加安全、透明和去中心化。Web3是一个广义的概念&#xff0c;它包括了很多方面&#xff0c;如数字货币、去中心化应用、智能合约等等。听不懂且大多数人听到这个东西&#xff0c;直觉感觉就像骗子…

Python新手必读:容器类型使用的实用小贴士

更多资料获取 &#x1f4da; 个人网站&#xff1a;涛哥聊Python Python提供了多种容器类型&#xff0c;如列表&#xff08;List&#xff09;、元组&#xff08;Tuple&#xff09;、集合&#xff08;Set&#xff09;、字典&#xff08;Dictionary&#xff09;等&#xff0c;用于…

linux查看文件夹使用情况以及查看文件大小

查看文件夹的使用情况&#xff0c;包含已用和可用空间大小 df -h 文件夹路径然后再查看里面某一个文件夹占用大小 du -sh /data1、ls ls 命令是 Linux 中最常用的文件和目录列表命令之一。它可以显示文件的各种属性&#xff0c;包括文件大小。 ls -l <文件名>上述命令…