SparkSQL的编程模型(DataFrame和DataSet)

1.2 SparkSQL的编程模型(DataFrame和DataSet)

1.2.1  编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

  • SQL SQL不用多说,就和Hive操作一样,但是需要清楚一点的时候,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。 同时支持,通用SQL和HQL。

  • DataFrame和Dataset DataFrameDataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

1.2.2 RDD\DataSet\DataFrame

RDD

弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法

从字面上就能看出的几个特点:

  • 弹性:

    数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换

    RDD出错后可自动重新计算(通过血缘自动容错)

    可checkpoint(设置检查点,用于容错),可persist或cache(缓存)

    里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整

  • 分布式:

    RDD中的数据可存放在多个节点上

  • 数据集:

    数据的集合,没啥好说的

相对于与DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)

DataFrame

DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)

假设RDD中的两行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

从上面两个表格可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法等,有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。,不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

Dataset

相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束

假设RDD中的两行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

那么在DataSet中数据就变成这样;

Person(id:Int,Name:String,Age:Int)
Person(1,张三,20)
Person(2,李四,21)
Person(3,王五,22)
目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是想骂人,这也是引入Dataset的一个重要原因。
​
使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如下图代码所示.
val df1 = spark.read.json( "/tmp/people.json")
// json文件中没有score字段,但是能编译通过
val df2 = df1.filter("score > 60")df2.show()

而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前

val ds1 = spark.read.json( "/tmp/people.json" ).as[ People]
// 使用dataset这样写,在IDE中就能发现错误
val ds2 = ds1.filter(_.score < 60)
val ds3 = ds1.filter(_.age < 18)
// 打印
ds3.show( )

总体来说DS这种方式更加合理,并且更加人性化,比较适合程序员的开发及使用,而且Spark在2.X版本以后也在推行开发者开发中使用DS进行开发。

1.2.3 SparkSQL的编程入口
在SparkSQL中的编程模型,不在是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。
1.2.4 SparkSQL基本编程

创建SparkSQL的模块

创建工程省略,直接在原有工程引入Pom即可

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version>
</dependency>
1.2.5 SparkSQL编程初体验
  • SparkSession的构建

val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")
//.enableHiveSupport()//支持hive的相关操作.getOrCreate()
  • 基本编程

object _01SparkSQLOps {def main(args: Array[String]): Unit = {
​val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")
//                .enableHiveSupport()//支持hive的相关操作.getOrCreate()//加载数据val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")//二维表结构pdf.printSchema()//数据内容 select * from tblpdf.show()//具体的查询 select name, age from tblpdf.select("name", "age").show()import spark.implicits._//导入sparksession中的隐式转换操作,增强sql的功能pdf.select($"name",$"age").show()//列的运算,给每个人的年龄+10 select name, age+10,height-1 from tblpdf.select($"name",$"height" - 1, new Column("age").+(10)).show()//起别名  select name, age+10 as age,height-1  as height from tblpdf.select($"name",($"height" - 1).as("height"), new Column("age").+(10).as("age")).show()//做聚合统计 统计不同年龄的人数 select age, count(1) counts from tbl group by agepdf.select($"age").groupBy($"age").count().show()//条件查询 获取年龄超过18的用户  select * from tbl where age > 18// pdf.select("name", "age", "height").where($"age".>(18)).show()pdf.select("name", "age", "height").where("age > 18").show()//sql//pdf.registerTempTable()//在spark2.0之后处于维护状态,使用createOrReplaceTempView/*从使用范围上说,分为global和非globalglobal是当前SparkApplication中可用,非global只在当前SparkSession中可用从创建的角度上说,分为createOrReplace和不ReplacecreateOrReplace会覆盖之前的数据create不Replace,如果视图存在,会报错*/pdf.createOrReplaceTempView("people")// 使用SQL语法进行处理spark.sql("""|select| age,| count(1) as countz|from people|group by age""".stripMargin).show// 打印输出spark.stop()}
}
1.2.6 SparkSQL编程模型的操作

DataFrame的构建方式

构建方式有两,一种通过Javabean+反射的方式来进行构建;还有一种的话通过动态编码的方式来构建。

  • JavaBean+反射

import org.apache.spark.sql.{DataFrame, SparkSession}
​
/*** 创建DataFrame方式* 反射方式*/
object _02SparkSQLCreateDFOps {def main(args: Array[String]): Unit = {// 创建执行入口val spark = SparkSession.builder().appName("createDF").master("local").getOrCreate()// 创建集合数据,并将数据封装到样例类中val list = List(student(1,"王凯",0,23),student(2,"赵凯",0,32),student(3,"姜华劲",1,24))// 导入隐式转换import spark.implicits._// 创建DF,创建DF的同时可以进行字段名重命名val df: DataFrame = list.toDF("ids","names","genders","ages")// 打印输出df.printSchema()df.show()// 关闭spark.stop()}
}
// 构建反射方式的样例类
case class student(id:Int,name:String,gender:Int,age:Int)
  • 动态编程

object _02SparkSQLDataFrameOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkSQLDataFrame").getOrCreate()
/*使用动态编程的方式构建DataFrameRow-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象*/
val row:RDD[Row] = spark.sparkContext.parallelize(List(Row(1, "李伟", 1, 180.0),Row(2, "汪松伟", 2, 179.0),Row(3, "常洪浩", 1, 183.0),Row(4, "麻宁娜", 0, 168.0)
))
//表对应的元数据信息
val schema = StructType(List(StructField("id", DataTypes.IntegerType, false),StructField("name", DataTypes.StringType, false),StructField("gender", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)
))
​
val df = spark.createDataFrame(row, schema)
df.printSchema()
df.show()}
}

说明:这里学习三个新的类:

Row:代表的是二维表中的一行记录,或者就是一个Java对象

StructType:是该二维表的元数据信息,是StructField的集合

StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)

  • 总结: 这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。

Dataset的构建方式

Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。

//dataset的构建
object _03SparkSQLDatasetOps {def main(args: Array[String]): Unit = {
​val spark = SparkSession.builder().appName("SparkSQLDataset").master("local[*]").getOrCreate()
​//dataset的构建val list = List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29))import spark.implicits._val ds = spark.createDataset[Student](list)ds.printSchema()ds.show()spark.stop()}
}
case class Student(id:Int, name:String, gender:Int, age:Int)

注意:出现如下错误

在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过

RDD和DataFrame以及DataSet的互相转换

RDD--->DataFrame

    def beanRDD2DataFrame(spark:SparkSession): Unit = {val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29)))val sdf =spark.createDataFrame(stuRDD, classOf[Student])sdf.printSchema()sdf.show()}

RDD--->Dataset

def  rdd2Dataset(spark:SparkSession): Unit = {val stuRDD = spark.sparkContext.parallelize(List(Student(1, "王盛芃", 1, 19),Student(2, "李金宝", 1, 49),Student(3, "张海波", 1, 39),Student(4, "张文悦", 0, 29)))import spark.implicits._val ds:Dataset[Student] = spark.createDataset(stuRDD)
​ds.show()
}   
case class Student(id:Int, name:String, gender:Int, age:Int)

在RDD转换为DataFrame和Dataset的时候可以有更加简单的方式

import spark.implicits._
rdd.toDF()
rdd.toDS()

DataFrame--->RDD

val rdd:RDD[Row] = df.rdd
rdd.foreach(row => {//            println(row)val id = row.getInt(0)val name = row.getString(1)val gender = row.getInt(2)val height = row.getAs[Double]("height")println(s"id=${id},name=$name,gender=$gender,height=$height")
})

DataFrame--->Dataset

无法直接将DataFrame转化为Dataset

Dataset --->RDD

val stuDS: Dataset[Student] = list2Dataset(spark)//dataset --> rdd
val stuRDD:RDD[Student] = stuDS.rdd
stuRDD.foreach(println)

Dataset--->DataFrame

val stuDS: Dataset[Student] = list2Dataset(spark)      
//dataset --->dataframe
val df:DataFrame = stuDS.toDF()
df.show()

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/236266.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业需要哪些数字化管理系统?

企业需要哪些数字化管理系统&#xff1f; ✅企业引进管理系统肯定是为了帮助整合和管理大量的数据&#xff0c;从而优化业务流程&#xff0c;提高工作效率和生产力。 ❌但是&#xff0c;如果各个系统之间不互通、无法互相关联数据的话&#xff0c;反而会增加工作量和时间成本…

【递归 回溯】LeetCode-226. 翻转二叉树

226. 翻转二叉树。 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 1&#xff1a; 输入&#xff1a;root [4,2,7,1,3,6,9] 输出&#xff1a;[4,7,2,9,6,3,1]示例 2&#xff1a; 输入&#xff1a;root [2,1,3] 输出&#xf…

写给测试同学的福利 | 招募

一、简介 寻找空闲时间有能力进行有偿协助测试的人员&#xff0c;协助大厂产品进行测试优化产品工作 二、要求 1.安卓设备2.具备熟练使用手机的能力 三、你可以得到 1.优先体验相关产品新功能、了解产品走向2.任务有偿&#xff1a;每次任务5-30米 四、需要了解事项 1.嫌…

生于越南,“开源改变了我的人生!”

注&#xff1a;本文精选自《新程序员 007&#xff1a;大模型时代的开发者》&#xff0c;欢迎点击订购。 作者 | 王启隆 责编 | 唐小引 出品 | 《新程序员》编辑部 随着人工智能浪潮的席卷&#xff0c;开源不再仅仅是计算机领域的一个话题&#xff0c;而是成为推动技术创新…

让测试效率起飞的8款浏览器兼容性测试工具,你get了吗?

浏览器的兼容性问题&#xff0c;是指不同浏览器使用内核及所支持的 HTML 等网页语言标准不同&#xff0c;用户客户端的环境不同造成的显示效果不能达到理想效果。 对于用户而言&#xff0c;无论使用哪款浏览器&#xff0c;期望看到的效果是正常的统一的。市面上发布的浏览器版本…

JVS低代码和智能BI(自助式数据分析)12.19更新功能说明

低代码更新功能 新增: 1、表单组件&#xff1a;标题、分割线、按钮等非数据组件增加小程序端隐藏设置&#xff1b; 隐藏设置允许开发者对表单组件中的非数据组件进行隐藏&#xff0c;例如&#xff0c;可能只想展示表单的部分内容&#xff0c;或者希望在特定条件下显示或隐藏…

<JavaEE> 网络编程 -- 网络编程和 Socket 套接字

目录 一、网络编程的概念 1&#xff09;什么是网络编程&#xff1f; 2&#xff09;网络编程中的基本概念 1> 收发端 2> 请求和响应 3> 客户端和服务端 二、Socket套接字 1&#xff09;什么是“套接字”&#xff1f; 2&#xff09;Socket套接字的概念 3&…

整数比较(比较4个数并从小到大输出)C语言xdoj94

描述&#xff1a; 从键盘输入四个整数&#xff0c;要求按由小到大的顺序输出。 输入说明&#xff1a; 输入四个整数&#xff0c;以空格间隔。 输出说明&#xff1a; 输出排序后的整数&#xff0c;以空格间隔。 输入样例 样例1输入 -99 9 99 -9 输出样例 样例1输出 -99 -9 9 99 …

关于“Python”的核心知识点整理大全32

目录 12.6.4 调整飞船的速度 settings.py ship.py alien_invasion.py 12.6.5 限制飞船的活动范围 ship.py 12.6.6 重构 check_events() game_functions.py 12.7 简单回顾 12.7.1 alien_invasion.py 12.7.2 settings.py 12.7.3 game_functions.py 12.7.4 ship.py …

JavaGUI(但期末速成版)之JFrame和JDialog

前言 学到期末发现越来越没时间来细写这些东西了&#xff0c;毕竟蒟蒻博主的发展方向主要需要学的不是Java&#xff0c;但为了期末高分通过&#xff0c;也不得不花一些精力上来&#xff0c;于是有了这样一篇速成GUI&#xff0c;本篇会以十分精简的语言来学习&#xff0c;主打一…

CN3302 PFM升压型双节电池充电控制集成电路外置MOS管 双节锂电池充电IC

CN3302是一款工作于2.7V到6.5V的PFM升压型双节鲤电池充电控制集成电路。CN3302采用恒流和准恒压模式(Quasi-CVTM)对电池进行充电管理&#xff0c;内部集成有基准电压源&#xff0c;电感电流检测单元&#xff0c;电池电压检测电路和片外场效应品体管驱动电路等&#xff0c;具有外…

企业网络常用技术-快速生成树RSTP原理与配置

STP协议虽然能够解决环路问题&#xff0c;但是收敛速度慢&#xff0c;影响了用户通信质量。如果STP网络的拓扑结构频繁变化&#xff0c;网络也会频繁失去连通性&#xff0c;从而导致用户通信频繁中断。IEEE于2001年发布的802.1w标准定义了快速生成树协议RSTP&#xff08;Rapid …

微信基于StarRocks的湖仓一体实践

作者&#xff1a;StarRocks Active Contributer、微信 OLAP 内核研发工程师 微信作为国内活跃用户最多的社交软件&#xff0c;其数据平台建设经历了从 Hadoop 到 ClickHouse 亚秒级实时数仓的阶段&#xff0c;但仍旧面临着数据体验割裂、存储冗余的问题。通过 StarRocks 的湖仓…

时钟偏移与时钟抖动

1、时钟偏移 如下图所示&#xff0c;由于布局布线导致&#xff0c;clk到达三个触发器的时间是不一样的。这个就是时钟偏移&#xff0c;对每个触发器而言&#xff0c;不会改变时钟周期。 2、时钟抖动 如下图所示&#xff0c;指芯片的某一个给定点上时钟周期发生暂时性变化&…

【Spring】15 ApplicationContextAware 接口

文章目录 1. 简介2. 作用3. 使用3.1 创建并实现接口3.2 配置 Bean 信息3.3 创建启动类3.4 启动 4. 应用场景总结 Spring 框架提供了许多回调接口&#xff0c;用于在 Bean 的生命周期中执行特定的操作。ApplicationContextAware 接口是其中之一&#xff0c;它允许 Bean 获取对 A…

SM5321 是一款带动态路径管理的开关型单节锂电池充电电路。

SM5321 2.5A.1MHz 带动态路径管理的开关型单节锂电池充电电路 简介&#xff1a; SM5321 是一款带动态路径管理的开关型单节锂电池充电电路。SM5321 可提供 2.5A 的充电电流&#xff0c;特别适合移动电源、平板电脑等配备超大容量锂电池的设备。SM5321 内部集成了大电流同步降压…

SpringBoot 使用Quartz执行定时任务对象时无法注入Bean问题

文章目录 问题描述解决方案结束语 大家好&#xff01;今天是2023年12月212日 | 农历十一月初九(距离2024年还有一周左右的时间)&#xff0c;最近还是比较忙的&#xff0c;忙着搞钱&#xff0c;毕竟马上过年啦。 问题描述 感谢大家对我一直以来的支持与帮助&#xff0c;今天这边…

IDEA 黑色主题很难看到鼠标

“控制面板”—搜索“鼠标”关键字—选择“更改鼠标设置” 参考&#xff1a; IDEA 黑色主题很难看到鼠标

JDBC常见的几种连接池使用(C3PO、Druid、HikariCP 、DBCP)

✨前言✨ 本篇作为主要在于介绍jdbc数据库连接池&#xff0c;以及多种连接池的用法 &#x1f352;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f4dd;私信必回哟&#x1f601; &#x1f352;博主将持续更新学习记录收获&#xff0c;友友们有任何问题可以在评论区留言 文章目…

计算机基础:网络基础

目录 一.网线制作 1.制作所需要工具 网线制作标准 ​编辑 2.水晶头使用 3.网线钳使用 4.视频教学 二.集线器、交换机介绍 1.OSI七层模型 2.TCP/IP四层参考模型 3.集线器、交换机。路由器介绍 集线器 交换机 路由器 区别 三.路由器的配置 1.路由器设置 说明书 设…