Spark-SQL概述:是Spark用于结构化数据处理的模块,前身是Shark。Shark基于Hive开发,使SQL-on-Hadoop性能大幅提升,但对Hive依赖制约了Spark发展。SparkSQL汲取Shark优点并重新开发,在数据兼容、性能优化和组件扩展上优势显著。2014年6月,Shark停止开发,发展出SparkSQL和Hive on Spark两个支线。
Spark-SQL特点:易整合,能无缝整合SQL查询和Spark编程;统一数据访问,以相同方式连接不同数据源;兼容Hive,可在已有仓库运行SQL或HQL;支持标准数据连接,能通过JDBC或ODBC连接。
DataFrame介绍:是基于RDD的分布式数据集,类似二维表格,与RDD区别在于带有schema元信息,支持嵌套数据类型,API更友好,性能比RDD高,因为查询计划经Spark catalyst optimiser优化。
DataSet介绍:是分布式数据集合,是DataFrame的扩展。兼具RDD强类型、使用lambda函数的能力和Spark SQL优化执行引擎的优点,用样例类定义结构信息,是强类型的,DataFrame是DataSet的特例(DataFrame = DataSet[Row]) ,可通过 as 方法相互转换。
SparkSession:是Spark最新的SQL查询起始点,它整合了SQLContext和HiveContext的功能,内部封装了SparkContext,负责实际的计算。在spark-shell中,系统会自动创建名为spark的SparkSession对象。
DataFrame
创建方式:可通过Spark的数据源创建
SQL语法查询:使用SQL语法风格查询数据时,需要借助临时视图或全局视图。先读取JSON文件创建DataFrame,如 val df1 = spark.read.json("data/user.json") ;接着对DataFrame创建临时表 df1.createOrReplaceTempView("people") ,之后就能通过SQL语句查询,如 val sqlDF = spark.sql("select * from people") ,并使用 sqlDF.show 展示结果。创建全局表时,首次运行可能报错,需将hive-site.xml文件复制到spark的conf路径下(最好把整个hive目录放在本地文件系统中),完成配置后创建全局表 df1.createGlobalTempView("people1") ,可通过 spark.sql("SELECT * FROM global_temp.people1").show() 等语句查询展示数据。
DataFrame的DSL语法:DataFrame提供DSL管理结构化数据,使用时无需创建临时视图。操作包括创建DataFrame;选取特定列,如 df.select("username").show() ;进行列运算 df.select($"username",$"age" + 1).show ;筛选数据 df.filter($"age">18).show ;按列分组统计 df.groupBy("age").count.show 。
2. RDD转换为DataFrame:在IDEA开发时,RDD与DataFrame或DataSet相互操作需引入 import spark.implicits._ (spark为SparkSession对象变量名,且必须用val修饰),spark-shell中自动导入。可直接将RDD转为DataFrame ;实际开发常借助样例类转换,定义 case class User(name:String, age:Int) 后,通过 sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show 实现转换 。
DataFrame转换为RDD:DataFrame可直接获取内部RDD 。获取的RDD存储类型为Row,可通过 collect 方法收集数据,如 val array = rdd.collect ,还能通过 getAs 方法按列名获取数据。
DataSet操作
创建DataSet:可使用样例类序列,如定义 case class Person(name: String, age: Long) 后,通过 Seq(Person("zhangsan",2)).toDS() 创建;也能用基本类型序列创建,如 Seq(1,2,3,4,5).toDS ,但实际更多从RDD获取DataSet。
RDD与DataSet相互转换:包含case类的RDD能自动转为DataSet,如 sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS ;DataSet可直接获取内部RDD,如 val rdd = res3.rdd 。
DataFrame和DataSet转换:DataFrame是DataSet的特例,二者可相互转换。DataFrame转DataSet,定义样例类后用 as 方法,如 val ds = df.as[User] ;DataSet转DataFrame使用 toDF 方法,如 val df = ds.toDF 。
RDD、DataFrame、DataSet关系
版本产生顺序:Spark1.0推出RDD,Spark1.3出现DataFrame,Spark1.6引入DataSet 。
共性:都是分布式弹性数据集,有惰性机制、共同函数,操作需 import spark.implicits._ ,会自动缓存运算,都有分区概念,DataFrame和DataSet可模式匹配获取字段信息。
区别:RDD常与spark mllib使用,不支持sparksql操作;DataFrame每行类型为Row,访问列值需解析,支持SparkSQL操作和便捷保存方式;DataSet与DataFrame成员函数相同,每行数据类型自定义,获取行信息更自由。三者可相互转换。