文章目录
- 1 DataFrame的构建方式
- 方式一:JavaBean+反射的方式
- 1.1 创建Scala类
- 1.2 创建Scala对象
- 方式二:动态编码的方式
- 2 DataSet的构建方式
- 3 RDD和DataFrame以及DataSet之间的相互转换
- 3.1【RDD-->DataFrame】和【RDD-->DataSet】
- 3.2【DataFrame-->RDD】和【DataFrame-->DataSet】
- 3.3【DataSet-->RDD】和【DataSet-->DataFrame】
1 DataFrame的构建方式
方式一:JavaBean+反射的方式
1.1 创建Scala类
package _02SparkSQL// 统一的样例类
case class _02student(id:Int,name:String,gender:String,age:Int)
1.2 创建Scala对象
package _02SparkSQLimport org.apache.spark.sql.{DataFrame, SparkSession}object _02createDataFrame {//使用JavaBean方式+反射def main(args: Array[String]): Unit = {//创建SparkSession对象val spark = SparkSession.builder().appName("CreateDataFrame").master("local[*]").getOrCreate()val list = List(new _02student(id =1,name="张三",gender = "男",age=18),new _02student(id =1,name="李四",gender = "女",age=26),new _02student(id =1,name="王五",gender = "男",age=34),new _02student(id =1,name="赵六",gender = "女",age=45),)//需要提供隐式转换才可以进行操作(需要使用SparkSession对象进行操作)import spark.implicits._val frame: DataFrame = list.toDF()frame.printSchema()/*root|-- id: integer (nullable = false)|-- name: string (nullable = true)|-- gender: string (nullable = true)|-- age: integer (nullable = false)*/frame.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|张三| 男| 18|| 1|李四| 女| 26|| 1|王五| 男| 34|| 1|赵六| 女| 45|+---+----+------+---+*/}
}
方式二:动态编码的方式
说明:这里学习三个新的类
【Row】:代表的是二维表中的一行记录,或者就是一个Java对象
【StructType】:是该二维表的元数据信息,是StructField的集合
【StructField】:是该二维表中某一个字段/列的元数据信息(主要包括列名,类型,是否可以为null)
总结:
这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。
package _02SparkSQLimport org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object _03createDataFrame {//动态编程方式def main(args: Array[String]): Unit = {// 构建SparkSession对象val spark =SparkSession.builder().appName("03createDataFrame").master("local[*]").getOrCreate()//需要构建RDD数据//因为SparkSession的底层是包含是SparkContext对象val row = spark.sparkContext.makeRDD(List(// 需要使用Row来表示一行的内容Row(1,"张三","男",18),Row(2,"李四","女",23),Row(3,"王五","男",35),Row(4,"赵六","女",56)))//表对应的元数据信息【列,列数据类型,是否可以为空】val schema = StructType(List(//需要根据Row中列的个数来决定提供StructField的个数StructField("id",DataTypes.IntegerType,false),StructField("name",DataTypes.StringType,false),StructField("gender",DataTypes.StringType,false),StructField("age",DataTypes.IntegerType,false),))//构建DataFrame对象val frame: DataFrame = spark.createDataFrame(row, schema)frame.printSchema()/*运行结果:root|-- id: integer (nullable = false)|-- name: string (nullable = false)|-- gender: string (nullable = false)|-- age: integer (nullable = false)*/frame.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|张三| 男| 18|| 2|李四| 女| 23|| 3|王五| 男| 35|| 4|赵六| 女| 56|+---+----+------+---+*/}
}
2 DataSet的构建方式
DataSet是DataFrame的升级版,创建方式和DataFrame类似,但有不同
在创建Dataset的时候,需要注意数据的格式,必须使用 == caseclass ==,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,从而抽取出对应的元数据信息,否则编译无法通过
package _02SparkSQLimport org.apache.spark.sql.{Dataset, SparkSession}object _04createDataSet {def main(args: Array[String]): Unit = {//创建SparkSession对象val session = SparkSession.builder().appName("CreateDataSet").master("local[*]").getOrCreate()//提供List集合存储数据val list = List(new _02student(id =1,name="咪咪",gender = "男",age=6),new _02student(id =2,name="凯凯",gender = "男",age=8),new _02student(id =3,name="超超",gender = "男",age=7),new _02student(id =4,name="大宝",gender = "女",age=9),)//通过List集合构建DataSet对象(List集合中存储的是样例类对象)import session.implicits._val ds: Dataset[_02student] = list.toDS()ds.printSchema()/*运行结果:root|-- id: integer (nullable = false)|-- name: string (nullable = true)|-- gender: string (nullable = true)|-- age: integer (nullable = false)*/ds.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*///支持基本数据类型val list2 = List(1,2,3,4,5,6,7,8)val ds2: Dataset[Int] = list2.toDS()ds2.printSchema()/*运行结果:root|-- value: integer (nullable = false)*/ds2.show()/*运行结果:+-----+|value|+-----+| 1|| 2|| 3|| 4|| 5|| 6|| 7|| 8|+-----+*/
}
}
3 RDD和DataFrame以及DataSet之间的相互转换
3.1【RDD–>DataFrame】和【RDD–>DataSet】
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit = {//创建SparkSession对象val spark= SparkSession.builder().appName("RDDToDataFrame").master("local[*]").getOrCreate()val rdd = spark.sparkContext.makeRDD(List(new _02student(id =1,name="咪咪",gender = "男",age=6),new _02student(id =2,name="凯凯",gender = "男",age=8),new _02student(id =3,name="超超",gender = "男",age=7),new _02student(id =4,name="大宝",gender = "女",age=9),))println(rdd.collect().toBuffer)/*运行结果:ArrayBuffer(_02student(1,咪咪,男,6),_02student(2,凯凯,男,8),_02student(3,超超,男,7),_02student(4,大宝,女,9))*//*** RDD 转换为 DataFrame*///需要使用SparkSession所创建的对象进行隐式转换导入操作import spark.implicits._val frame: DataFrame = rdd.toDF()frame.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*//*** RDD 转换为 DataSet*/val dataset: Dataset[_02student] = rdd.toDS()dataset.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*/}
}
3.2【DataFrame–>RDD】和【DataFrame–>DataSet】
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit = {//创建SparkSession对象val spark= SparkSession.builder().appName("RDDToDataFrame").master("local[*]").getOrCreate()val rdd = spark.sparkContext.makeRDD(List(new _02student(id =1,name="咪咪",gender = "男",age=6),new _02student(id =2,name="凯凯",gender = "男",age=8),new _02student(id =3,name="超超",gender = "男",age=7),new _02student(id =4,name="大宝",gender = "女",age=9),))println(rdd.collect().toBuffer)/*运行结果:ArrayBuffer(_02student(1,咪咪,男,6),_02student(2,凯凯,男,8),_02student(3,超超,男,7),_02student(4,大宝,女,9))*//*** RDD 转换为DataFrame*///需要使用SparkSession所创建的对象进行隐式转换导入操作import spark.implicits._val frame: DataFrame = rdd.toDF()frame.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*//*** DataFrame转换为RDD*///Row是DataFrame动态构建时提供的行对象val rdd1: RDD[Row] = frame.rddrdd1.foreach(row=>{//按照列的序号获取即可(序号与数组下标一样,从0开始,到长度-1)println(row)/*运行结果:[2,凯凯,男,8][4,大宝,女,9][1,咪咪,男,6][3,超超,男,7]*///取值的时候使用getxxx方法,xxx就是列的数据类型val id = row.getInt(0)val name = row.getString(1)val gender = row.getString(2)val age = row.getAs[Int]("age")println(id+" "+name+" "+gender+" "+age)/*运行结果:3 超超 男 71 咪咪 男 62 凯凯 男 84 大宝 女 9*/})/*** DataFrame转换为DataSet*///DataFrame其实就是DataSet的特例val dataset2: Dataset[_02student] = frame.as[_02student]dataset2.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*/}
}
3.3【DataSet–>RDD】和【DataSet–>DataFrame】
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit = {//创建SparkSession对象val spark= SparkSession.builder().appName("RDDToDataFrame").master("local[*]").getOrCreate()val rdd = spark.sparkContext.makeRDD(List(new _02student(id =1,name="咪咪",gender = "男",age=6),new _02student(id =2,name="凯凯",gender = "男",age=8),new _02student(id =3,name="超超",gender = "男",age=7),new _02student(id =4,name="大宝",gender = "女",age=9),))println(rdd.collect().toBuffer)/*运行结果:ArrayBuffer(_02student(1,咪咪,男,6), _02student(2,凯凯,男,8), _02student(3,超超,男,7), _02student(4,大宝,女,9))*///RDD 转换为DataSetval dataset: Dataset[_02student] = rdd.toDS()dataset.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*//*** DataSet转换为RDD*/val rdd2: RDD[_02student] = dataset.rddprintln(rdd2.collect().toBuffer)/*运行结果:ArrayBuffer(_02student(1,咪咪,男,6), _02student(2,凯凯,男,8), _02student(3,超超,男,7), _02student(4,大宝,女,9))*//*** DataSet转换为DataFrame*/val frame1: DataFrame = dataset.toDF()frame1.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*/}
}
附上完整版代码:
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit = {//创建SparkSession对象val spark= SparkSession.builder().appName("RDDToDataFrame").master("local[*]").getOrCreate()val rdd = spark.sparkContext.makeRDD(List(new _02student(id =1,name="咪咪",gender = "男",age=6),new _02student(id =2,name="凯凯",gender = "男",age=8),new _02student(id =3,name="超超",gender = "男",age=7),new _02student(id =4,name="大宝",gender = "女",age=9),))println(rdd.collect().toBuffer)/*运行结果:*///RDD 转换为DataFrame//需要使用SparkSession所创建的对象进行隐式转换导入操作import spark.implicits._val frame: DataFrame = rdd.toDF()frame.show()/*运行结果:*///RDD 转换为DataSetval dataset: Dataset[_02student] = rdd.toDS()dataset.show()/*运行结果:*///DataFrame转换为RDD//Row是DataFrame动态构建时提供的行对象val rdd1: RDD[Row] = frame.rddrdd1.foreach(row=>{//按照列的序号获取即可(序号与数组下标一样,从0开始,到长度-1)println(row)/*运行结果:[2,凯凯,男,8][4,大宝,女,9][1,咪咪,男,6][3,超超,男,7]*///取值的时候使用getxxx方法,xxx就是列的数据类型val id = row.getInt(0)val name = row.getString(1)val gender = row.getString(2)val age = row.getAs[Int]("age")println(id+" "+name+" "+gender+" "+age)/*运行结果:3 超超 男 71 咪咪 男 62 凯凯 男 84 大宝 女 9*/})//DataFrame转换为DataSet//DataFrame其实就是DataSet的特例val dataset2: Dataset[_02student] = frame.as[_02student]dataset2.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*///DataSet转换为RDDval rdd2: RDD[_02student] = dataset.rddprintln(rdd2.collect().toBuffer)/*运行结果:ArrayBuffer(_02student(1,咪咪,男,6),_02student(2,凯凯,男,8),_02student(3,超超,男,7),_02student(4,大宝,女,9))*///DataSet转换为DataFrameval frame1: DataFrame = dataset.toDF()frame1.show()/*运行结果:+---+----+------+---+| id|name|gender|age|+---+----+------+---+| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|+---+----+------+---+*/}
}