Spark RDD、DataFrame、DataSet比较

在Spark的学习当中,RDD、DataFrame、DataSet可以说都是需要着重理解的专业名词概念。尤其是在涉及到数据结构的部分,理解清楚这三者的共性与区别,非常有必要。


RDD,作为Spark的核心数据抽象,是Spark当中不可或缺的存在,而在SparkSQL中,Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。

RDD、DataFrame、DataSet三者的共性:
RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利。
三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。
三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
三者都有partition的概念。三者有许多共同的函数,如filter,排序等。

DataFrame、DataSet和RDD有什么区别?

首先从版本的产生上来看:RDD(Spark1.0)—>Dataframe(Spark1.3)—>Dataset(Spark1.6)

RDD:
RDD一般和spark mlib同时使用。
RDD不支持sparksql操作。
DataFrame:
①与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值。
②DataFrame引入了schema和off-heap
schema:RDD每一行的数据,结构都是一样的。这个结构就存储在schema中。Spark通过schame就能够读懂数据,因此在通信和IO时就只需要序列化和反序列化数据,而结构的部分就可以省略了。
off-heap:意味着JVM堆以外的内存,这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,当要操作数据时,就直接操作off-heap内存。由于Spark理解schema,所以知道该如何操作。
off-heap就像地盘,schema就像地图,Spark有地图又有自己地盘了,就可以自己说了算了,不再受JVM的限制,也就不再收GC的困扰了。
③结构化数据处理非常方便,支持Avro,CSV,Elasticsearch数据等,也支持Hive,MySQL等传统数据表。
④兼容Hive,支持Hql、UDF
有schema和off-heap概念,DataFrame解决了RDD的缺点,但是却丢了RDD的优点。DataFrame不是类型安全的(只有编译后才能知道类型错误),API也不是面向对象风格的。
Dataset:
①DataSet集中了RDD的优点(强类型和可以用强大lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map/flatmap/filter)进行多种操作。
②DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。DataSet通过Encoder实现了自定义的序列化格式,使得某些操作可以在无需序列化情况下进行。另外Dataset还进行了包括Tungsten优化在内的很多性能方面的优化。
③Dataset<Row>等同于DataFrame(Spark 2.X)

RDD、DataFrame、DataSet的创建:

创建RDD
在Spark中创建RDD的方式主要分为2种:
1.读取内存数据创建RDD
2.读取文件创建RDD
3.通过其他RDD创建RDD

1、读取内存数据创建RDD
读取内存数据创建RDD,Spark主要提供了两个方法:parallelize和makeRDD。
使用makeRDD创建RDD的时候还可以指定分区数量。 

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("CreateRDD"))
// 从内存中创建RDD,将内存中集合的数据作为处理的数据源
val seq = Seq[Int](elems = 1,2,3,4)
// parallelize方法创建RDD
// val rdd = sc.parallelize(seq)
// makeRDD方法创建RDD
// val rdd = sc.makeRDD(seq)
// 指定分区数量创建RDD
val rdd = sc.makeRDD(seq,3)
rdd.collect().foreach(println)
sc.stop()

2、读取文件创建RDD
读取文件创建RDD,Spark提供了textFile和wholeTextFiles方法:
textFile:以行为单位进行读取数据,
wholeTextFiles:以文件为单位读取数据,读取的结果为元组形式,第一个值为文件路径,第二个值为文件内容。 

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Rdd_File"))
// textFile方法读取文件创建RDD
// val rdd = sc.textFile(path = "test.txt")
// textFile方法也是可以指定分区数量的
// val rdd = sc.textFile(path = "test.txt", 3)
// wholeTextFiles方法读取多个文件创建RDD
val rdd = sc.wholeTextFiles(path = "test*.txt")
rdd.collect().foreach(println)
sc.stop()

3、通过其他RDD创建RDD 

val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("D:\\develop\\workspace\\bigdata2021\\spark2021\\input")
val flatRDD: RDD[String] = rdd.flatMap(_.split(" "))
sc.stop()
 

创建DataFrame

1、通过Seq生成

val spark = SparkSession
  .builder()
  .appName(this.getClass.getSimpleName).master("local")
  .getOrCreate()

val df = spark.createDataFrame(Seq(
  ("ming", 20, 15552211521L),
  ("hong", 19, 13287994007L),
  ("zhi", 21, 15552211523L)
)) toDF("name", "age", "phone")

df.show()

2、读取Json文件生成

json文件内容
{"name":"ming","age":20,"phone":15552211521}
{"name":"hong", "age":19,"phone":13287994007}
{"name":"zhi", "age":21,"phone":15552211523}


    val dfJson = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/student.json")
    dfJson.show()

3、读取csv文件生成

csv文件
name,age,phone
ming,20,15552211521
hong,19,13287994007
zhi,21,15552211523

val dfCsv = spark.read.format("csv").option("header", true).load("/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/students.csv")
dfCsv.show()

4、通过Json格式的RDD生成(弃用)

    val sc = spark.sparkContext
    import spark.implicits._
    val jsonRDD = sc.makeRDD(Array(
      "{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}",
      "{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}",
      "{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}"
    ))

    val jsonRddDf = spark.read.json(jsonRDD)
    jsonRddDf.show()

5、通过Json格式的DataSet生成

val jsonDataSet = spark.createDataset(Array(
  "{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}",
  "{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}",
  "{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}"
))
val jsonDataSetDf = spark.read.json(jsonDataSet)

jsonDataSetDf.show()

6、通过csv格式的DataSet生成

   val scvDataSet = spark.createDataset(Array(
      "ming,20,15552211521",
      "hong,19,13287994007",
      "zhi,21,15552211523"
    ))
    spark.read.csv(scvDataSet).toDF("name","age","phone").show()

7、动态创建schema

    val schema = StructType(List(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("phone", LongType, true)
    ))
    val dataList = new util.ArrayList[Row]()
    dataList.add(Row("ming",20,15552211521L))
    dataList.add(Row("hong",19,13287994007L))
    dataList.add(Row("zhi",21,15552211523L))
    spark.createDataFrame(dataList,schema).show()

8、通过jdbc创建

    //第八种:读取数据库(mysql)
    val options = new util.HashMap[String,String]()
    options.put("url", "jdbc:mysql://localhost:3306/spark")
    options.put("driver","com.mysql.jdbc.Driver")
    options.put("user","root")
    options.put("password","hollysys")
    options.put("dbtable","user")

    spark.read.format("jdbc").options(options).load().show()

创建Dateset

1、通过createDataset(seq,list,rdd)

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, SparkSession}

object CreateDataset {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate()
    //   需要导入隐式转换
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext
    //通过seq创建Dataset
    val seqDs: Dataset[Int] =spark.createDataset(1 to 10)
    //通过list创建Dataset
    val listDs: Dataset[(String, Int)] = spark.createDataset(List(("a",1),("b",2),("c",3)))
    //通过rdd创建Dataset
    val rddDs: Dataset[(String, Int, Int)] = spark.createDataset(sc.parallelize(List(("a",1,2),("b",2,3),("c",3,4))))

    seqDs.show()
    listDs.show()
    rddDs.show()
  }

}

2、通过case class

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.mutable

object CreateDataSetByCaseClass {

  case class Point(label:String,x:Double,y:Double)
  case class Category(id:Long,name:String)

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate()
    //   需要导入隐式转换
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext
    //通过Point的样例类创建一个seq,并将它转化为Dataset
    val points: Dataset[Point] = Seq(Point("bar",2.6,3.5),Point("foo",4.0,3.7)).toDS()
    //通过Category的样例类创建一个seq,并将它转化为Dataset
    val categories: Dataset[Category] = Seq(Category(1,"bar"),Category(2,"foo")).toDS()
    //进行join连接,注意这里需要传入三个”=“,这时一个方法
     points.join(categories,points("label")===categories("name")).show()

    //通过Point的样例类创建一个List,并将它转化为Dataset
    val points2: Dataset[Point] = List(Point("bar",2.6,3.5),Point("foo",4.0,3.7)).toDS()
    //通过Category的样例类创建一个List,并将它转化为Dataset
    val categories2: Dataset[Category] = List(Category(1,"bar"),Category(2,"foo")).toDS()
    //进行join连接,注意这里需要传入三个”=“,这时一个方法
    points2.join(categories2,points2("label")===categories2("name")).show()

    //通过Point的样例类创建一个RDD,并将它转化为Dataset
    val points3: Dataset[Point] = sc.parallelize(List(Point("bar",2.6,3.5),Point("foo",4.0,3.7))).toDS()
    //通过Category的样例类创建一个RDD,并将它转化为Dataset
    val categories3: Dataset[Category] = sc.parallelize(List(Category(1,"bar"),Category(2,"foo"))).toDS()
    points3.join(categories3,points3("label")===categories3("name")).show()
  }
}


RDD、DataFrame、DataSet三者之间的转换:

1.RDD与DataFrame转换

(1)toDF方法:将RDD转换为DataFrame;

## 创建RDD
val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "ww", 20), (2, "ss", 30), (3, "xx", 40)))
## 指定列名
val df: DataFrame = rdd.toDF("id", "name", "age")
## 不指定列名
val df1: DataFrame = rdd.toDF()
## 展示
df.show()
df1.show()

(2)rdd方法:将DataFrame转换为RDD。

val rowRDD: RDD[Row] = df.rdd
## 输出
rowRDD.collect().foreach(println)

2.DataFrame与DataSet转换

(1)as方法:将DataFrame转换为DataSet,使用 as[] 方法时需要指明数据类型或者采用样例类的方式;

## 引入隐式转换
import spark.implicits._
## 创建样例类(不能创建于main方法中)
case class User(id:Int,name:String,age:Int)
## 指定数据类型
val ds: Dataset[(Int,String,Int)] = df.as[(Int,String,Int)]
## 采用样例类
val ds1: Dataset[User] = df.as[User]
## 展示
ds.show()
ds1.show()

(2)toDF方法:将DataSet转换为DataFrame。

## 转换
val df2: DataFrame = ds.toDF()
## 展示
df2.show()

3.RDD与DataSet转换

(1)toDS方法:将RDD转换为DataSet,使用 toDS() 方法时可以先将数据包装为样例类的形式也可以直接以数据类型输出;

## 通过case将样例类User与数据进行匹配
val ds2: Dataset[User] = rdd.map {
  case (id, name, age) => {
    User(id, name, age)
  }
}.toDS()
## 直接转换
val ds3: Dataset[(Int, String, Int)]rdd.toDS()
## 展示
ds2.show()
ds3.show()

(2)rdd方法:将DataSet转换为RDD

## 转换
val userRDD: RDD[User] = ds1.rdd
## 输出
userRDD.collect().foreach(println)


编程要求
DD 转换成 DataFrame、Dataset: 
1、读取list数据创建 RDD; 
2、将 RDD转换为 DataFrame,并指定列名为("id","name","sex","age"); 
3、将 RDD转换为 DataSet,并以样例类的方式转换。
DataFrame 转换成 RDD、DataSet: 
1、读取staff.josn文件创建 DataFrame; 
2、将 DataFrame转换为 RDD; 
3、将 DataFrame转换为 DataSet。
DataSet 转换成 RDD、DataFrame: 
1、读取staff2.json文件创建 DataSet,并以Staff样例类的方法创建; 
2、将 DataSet转换为 DataFrame; 
3、将 DataSet转换为 RDD。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, sql}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object sparkSql_transform {
 
  case class Message()
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark =SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
 
    val list = List((202201, "Mark", "female", 23), (202202, "Peter", "female", 24), (202203, "Anna", "male", 21))
 
    val path1 = "/data/workspace/myshixun/step1/data/staff.json"
    val path2 = "/data/workspace/myshixun/step1/data/staff2.json"
 
      /********* Begin *********/
 
    /********* RDD 转换成 DataFrame、DataSet *********/
    // 读取list数据创建RDD
    val rdd:RDD[(Int,String,String,Int)]=spark.sparkContext.makeRDD(list)
 
    // 将RDD转换为DataFrame,指定列名为("id","name","sex","age"),并打印输出
    val df:DataFrame=rdd.toDF("id","name","sex","age")
    df.show()
 
    // 将RDD转换为DataSet,以样例类的方式转换,并打印输出
    val ds=rdd.map{line=>Staff(line._1,line._2,line._3,line._4)}.toDS()
    ds.show()
 
    /********* DataFrame 转换成 RDD、DataSet *********/
 
    // 读取staff.josn文件创建DataFrame
    val df1: DataFrame = spark.read.json(path1)
 
    // 将DataFrame转换为RDD,并打印输出
    val rdd1=df1.rdd
    rdd1.collect().foreach(println)
 
    // 将DataFrame转换为DataSet,并打印输出
    val ds1=df1.as[Staff]
    ds1.show()
 
    /********* DataSet 转换成 RDD、DataFrame *********/
    // 读取staff2.json文件创建DataSet,并以Staff样例类的方法创建
    val ds2: Dataset[Staff] = spark.read.json(path2).as[Staff]
    
    // 将DataSet转换为DataFrame,并打印输出
    val df2=ds2.toDF
    df2.show()
 
    // 将DataSet转换为RDD,并打印输出
    val rdd2=ds2.rdd
    rdd2.collect().foreach(println)
   
      /********* End *********/
 
    // TODO 关闭环境
    spark.close()
 
  }
  // Staff样例类
  case class Staff(id: BigInt,name: String,sex: String,age: BigInt) 
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/226052.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言写的 mini版的 http 服务器 , 很详细

文章目录 效果展示整体架构流程技术细节完整代码 效果展示 例如&#xff1a;htpp://192.168.23.140/home.html -> 正确的请求格式 home.html 这个资源是放在我们服务器里面的 , 并不是随便访问的资源,当然我们可以放很多的资源进去. 整体架构流程 整个实现的流…

无心剑英译朱自清《匆匆》

匆匆 Vanished in Haste 朱自清 By Zhu Ziqing 燕子去了,有再来的时候;杨柳枯了,有再青的时候;桃花谢了,有再开的时候。但是,聪明的,你告诉我,我们的日子为什么一去不复返呢?——是有人偷了他们罢:那是谁?又藏在何处呢?是他们自己逃走了罢:现在又到了哪里呢? Sw…

如何使用Docker进行容器的备份和恢复

一 简介&#xff1a; 在使用Docker进行应用程序的容器化部署时&#xff0c;我们经常需要对容器进行备份和恢复操作。备份容器可以保证数据的安全性&#xff0c;而恢复操作可以帮助我们快速恢复出现问题的容器。本文将介绍如何使用Docker进行容器的备份和恢复&#xff0c;同时提…

Buck电源设计常见的一些问题(二)MOS管炸机问题

MOS管炸机问题 1.概述2.MOS管的相关参数3.过电压失效4.过电流失效5.静电放电和热失效1.概述 在我们做电源产品或者电机控制器时候,经常会坏MOS管。我相信90%以上的硬件工程师在职场生涯中都会遇到这类问题。然而这类问题也总是让人防不胜防。经常我们都会开玩笑的说,没烧过管…

【数据结构第 6 章 ④】- 用 C 语言实现图的深度优先搜索遍历和广度优先搜索遍历

目录 一、深度优先搜索 1.1 - 深度优先搜索遍历的过程 1.2 - 深度优先搜索遍历的算法实现 二、广度优先搜索 2.1 - 广度优先搜索遍历的过程 2.2 - 广度优先搜索遍历的算法实现 和树的遍历类似&#xff0c;图的遍历也是从图中某一顶点出发&#xff0c;按照某种方法对图中所…

壹基金瑞金东升社区儿童服务站上演“甜蜜冬日”亲子DIY蛋糕秀

12月9日上午&#xff0c;一场温情满溢的亲子DIY蛋糕活动&#xff0c;在壹基金瑞金东升社区儿童服务站拉开了帷幕&#xff0c;空气里有香甜的奶油味道&#xff0c;浓浓的温馨气息感染着在场的每一个人。 自己动手做的&#xff0c;才有意义&#xff0c;蛋糕DIY是一项很好的亲子活…

想做游戏开发,我应该会点啥?

在知乎上&#xff0c;经常能看到类似“如何入门游戏开发”这样的问题&#xff0c;这篇文章&#xff0c;我试着概括性的对游戏开发所需要的技能做一个总结&#xff0c;希望大家对游戏开发能有一个基本的认识~ 游戏开发基础要求高么&#xff1f; 和其他程序猿一样&#xff0c;要…

前后端交互—开发一个完整的服务器

代码下载 初始化 新建 apiServer 文件夹作为项目根目录&#xff0c;并在项目根目录中运行如下的命令&#xff0c;初始化包管理配置文件: npm init -y运行如下的命令&#xff0c;安装 express、cors: npm i express cors在项目根目录中新建 app.js 作为整个项目的入口文件&a…

12V升18V4A同步升压恒压WT3210

12V升18V4A同步升压恒压WT3210 WT3210 是一款高功率密度的全集成同步升压转换器&#xff0c;内部集成的功率MOSFET管导通电阻为上管8mΩ和下管15mΩ。可为便携式系统提供空间小尺寸 解决方案。WT3210具有 2.7V 至 20V 的宽输入电压范围&#xff0c;应用在单节或两节锂电池的便携…

关于嵌入式开发的一些信息汇总:嵌入式C开发人员、嵌入式系统Linux

关于嵌入式开发的一些信息汇总&#xff1a;嵌入式C开发人员、嵌入式系统Linux 1 关于嵌入式 C 开发人员1.1 嵌入式 C 开发人员必须具备的一些基本技能是&#xff1a;1.2 嵌入式C开发的应用案例 2 如何学习用于嵌入式系统的 Linux2.1 如何学习Linux2.1.1 第一步&#xff1a;创建…

漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)

TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…

【MySQL】MySQL 在 Centos 7环境安装教程

文章目录 1.卸载不要的环境2.检查系统安装包3.获取mysql官方yum源4.安装mysql yum 源&#xff0c;对比前后yum源5.安装mysql服务6.查看配置文件和数据存储位置7.启动服务和查看启动服务8.登录9.配置my.cnf 1.卸载不要的环境 先检查是否有mariadb存在 ps ajx |grep mariadb如果…

C语言定长数组 变长数组 柔性数组

C语言定长数组 变长数组 柔性数组 文章目录 C语言定长数组 变长数组 柔性数组1. 定长数组2. 变长数组3. 柔性数组3.1 结构体的大小3.2 柔性数组的使用 1. 定长数组 在C99标准之前&#xff0c;C语言在创建数组的时候&#xff0c;数组的大小只能使用常量&#xff0c;常量表达式来…

【EI会议征稿】第三届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2024)

第三届电子信息工程、大数据与计算机技术国际学术会议&#xff08;EIBDCT 2024&#xff09; 2024 3rd International Conference on Electronic Information Engineering, Big Data and Computer Technology 第三届电子信息工程、大数据与计算机技术国际学术会议&#xff08;…

小狐狸GPT付费2.4.9弹窗版学习源码介绍

小狐狸GPT付费2.4.9弹窗版学习源码是一套基于GPT&#xff08;Generative Pre-trained Transformer&#xff09;模型的开源代码库&#xff0c;旨在帮助开发者快速构建和训练自己的语言模型。该源码集成了多个先进的自然语言处理技术&#xff0c;包括预训练、微调、对话生成等&am…

云服务领取证书,注册域名,实现nginx服务配置证书

目录 1.登录网址腾讯云 2.腾讯云注册域名 3.实名认证&#xff0c;上传信息 4.域名注册 5.领取证书 6.域名与证书绑定 7.下载证书 8.设置环境 9.域名解析 10. 本地域名解析 11.上传证书到目录 12.nginx配置文件做地址重写到证书域名 13.配置证书conf 14.设置index…

磁盘坏道修复工具-是一款非常方便实用的磁盘坏道修复软件-供大家学习研究参考

1、支持磁盘数据擦除。 2、杜绝因硬盘坏道&#xff0c;而产生个人隐私数据泄露的问题。 3、支持对该磁盘格式化。 下载&#xff1a;https://download.csdn.net/download/weixin_43097956/88625682

关于“Python”的核心知识点整理大全22

目录 ​编辑 9.4.2 在一个模块中存储多个类 虽然同一个模块中的类之间应存在某种相关性&#xff0c;但可根据需要在一个模块中存储任意数量的 类。类Battery和ElectricCar都可帮助模拟汽车&#xff0c;因此下面将它们都加入模块car.py中&#xff1a; car.py my_electric_car…

基于 Gin 的 HTTP 代理上网行为记录 demo

前言: 前端时间写了好几篇使用 Gin 框架来做 HTTP 代理 demo 的文章&#xff0c;然后就想着做一个记录上网行为的小工具&#xff0c;就是简单记录看看平时访问了什么网站&#xff08;基于隧道代理的&#xff0c;不是中间人代理&#xff0c;所以只能记录去了哪里&#xff0c;不能…

wps左上角有绿色小三角的数字如何求和

1.这个状态是求和不了的&#xff0c;使用求和公式求出来的也是0 2.进行如下操作 3.转换好后 则可以求和成功了