通用方式
SparkSQL提供了通用的数据加载方式,使用spark.read.loa方法,并可通过format指定数据类型(如csv、jdbc、json、orc、parquet、textFile)。
load方法后需传入数据路径(针对csv、jdbc、json、orc、parquet、textFile格式)。
option方法用于设置特定格式的参数,如jdbc的url、user、password、dbtable。
特定格式加载
Parquet:Spark SQL的默认数据源,无需指定format即可载。
JSON:Spark SQL能自动推测JSON数据集结构,使用spark.read.json(path)加载。注意,每行应为一个JSON串。
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)
查询数据:可以通过SQL语句查询JSON数据。
val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"
CSV:需指定format为csv,并可通过option设置分隔符、是否推断schema、是否包含表头等信息。
MySQL:通过JDBC从关系型数据库读取数据,使用spark.read.format("jdbc").option(...)方式,并传入数据库连接信息。
数据保存
通用方式
使用df.write.save方法保存数据,同样可通过format指定数据类型。
save方法后需传入保存路径(针对csv、orc、parquet、textFile格式)。
option方法用于设置特定格式的参数。
保存操作可使用SaveMode来指明如何处理数据,如覆盖(overwrite)、追加(append)等,通过mode方法设置。
特定格式保存
与加载类似,Parquet、JSON、CSV等格式均可通过指定format进行保存。
MySQL等关系型数据库的写入也通过JDBC实现,需指定format为jdbc,并传入数据库连接信息及表名。
注意事项
在处理JSON数据时,需确保文件格式符合Spark的要求,即每行一个JSON串。
在读取CSV文件时,可通过设置option来指定分隔符、是否推断schema等信息,以便正确解析文件内容。
在通过JDBC连接数据库时,需确保数据库驱动已正确导入,并正确配置数据库连接信息。
在保存数据时,需根据实际需求选择合适的SaveMode,以避免数据覆盖或丢失。
Spark SQL与Hive的集成
Spark SQL可以编译时包含Hive支持,从而提供对Hive表访问、UDF(用户自定义函数)、Hive查询语言(HQL)等特性的支持。在使用时,无需事先安装Hive,但最好在编译Spark SQL时引入Hive支持。
IDEA通过JDBC对MySQL进行操作:
读取数据
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//通用的load方式读取
spark.read.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/system")
.option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver
.option("user","root")
.option("password","123456")
.option("dbtable","user")
.load().show()
spark.stop()
//通用的load方法的另一种形式
spark.read.format("jdbc")
.options(
Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))
.load().show()
//通过JDBC
val pros :Properties = new Properties()
pros.setProperty("user","root")
pros.setProperty("password","123456")
val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)
df.show()
写入数据
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),
Stu("zs", 30)))
val ds:Dataset[Stu] = rdd.toDS()
ds.write.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/system")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","123456")
.option("dbtable","user2")
.mode(SaveMode.Append)
.save()
spark.stop()