目录
第1关:SparkSQL加载和保存
任务描述
相关知识
加载数据
直接在文件上运行SQL
保存到路径
保存模式介绍
保存到持久表
存储和排序或分区
编程要求
测试说明
第2关:Parquet文件介绍
任务描述
相关知识
编程方式加载Parquet文件
Parquet分区
结构合并
元数据刷新
参数配置
编程要求
测试说明
第3关:json文件介绍
任务描述
相关知识
json文件介绍
编程要求
测试说明
第4关:JDBC读取数据源
任务描述
相关知识
使用JDBC如何读取数据源
编程要求
测试说明
第1关:SparkSQL加载和保存
- 任务描述
- 相关知识
- 加载数据
- 直接在文件上运行SQL
- 保存到路径
- 保存模式介绍
- 保存到持久表
- 存储和排序或分区
- 编程要求
- 测试说明
任务描述
本关任务:编写一个SparkSQL
程序,完成加载和保存数据。
相关知识
为了完成本关任务,你需要掌握:
-
加载数据
-
直接在文件上运行
SQL
-
保存到路径
-
保存模式介绍
-
保存到持久表
-
存储和排序或分区
加载数据
DataFrameReader
用于从外部存储系统(例如文件系统,键值存储等)加载数据集的接口。使用SparkSession.read
来访问它。
DataFrameReader
提供了(json
,parquet
,jdbc
,orc
,libsvm
,csv
,text
)格式支持,DataFrameReader.load(String... paths)
方法支持多个路径的数据源,默认使用parquet
格式(除非另有配置,spark.sql.sources.default
)用于所有操作
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
您还可以手动指定将要使用的数据源以及要传递给数据源的任何其他选项。数据源通过其全名指定(即org.apache.spark.sql.parquet
),但内置的来源,你也可以使用自己的短名称(json
,parquet
,jdbc
,orc
,libsvm
,csv
,text
)。从任何数据源类型加载的 Dataset 都可以使用此语法转换为其他类型。
要加载JSON
文件,您可以使用:
//加载
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
要加载CSV
文件,您可以使用:
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
直接在文件上运行SQL
您可以直接使用SQL
查询该文件,而不是使用读取API
将文件加载到 Dataset
并进行查询。
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`src/main/resources/users.parquet`");
保存到路径
DataFrameWriter
用于将数据集写入外部存储系统的接口(例如文件系统,键值存储等), 使用Dataset.write
访问它。
使用DataFrameWriter.save(String path)
,就可以将Dataset
的内容保存在指定的路径中。
//写入并保存到指定路径
peopleDF.select("name","age").write().format("parquet").save("F:\\test\\anamesAndAges");
保存模式介绍
save()
方法支持设置保存模式,使用DataFrameWriter.mode(SaveMode saveMode)
可用于指定将Dataset
保存到数据源的预期行为,指定如何处理现有数据(例如,执行时设置类型为Overwrite
,则数据将在写出新数据之前被删除。)但需要注意的这些保存模式不使用任何锁定并且不是原子的。
SaveMode
类型如下:
Scala/Java | 含义 |
---|---|
SaveMode.ErrorIfExists (默认) | 将Dataset 保存到数据源时,如果数据已存在,则会引发异常。 |
SaveMode.Append | 将Dataset 保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。 |
SaveMode.Overwrite | 覆盖模式意味着在将Dataset 保存到数据源时,如果数据/表已经存在,则预期现有数据将被Dataset 的内容覆盖。 |
SaveMode.Ignore | 忽略模式意味着在将Dataset 保存到数据源时,如果数据已存在,则预期保存操作不会保存Dataset 的内容而不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL 中的类似。 |
//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
peopleDF.select("name","age").write().mode("overwrite").save("F:\\test\\anamesAndAges")
保存到持久表
Dataset
也可以使用saveAsTable
命令将持久表保存到Hive Metastore
中。请注意,使用此功能不需要现有的Hive
部署。Spark
将为您创建默认的本地Hive Metastore
(使用Derby
)。
与createOrReplaceTempView
命令不同,saveAsTable
将实现DataSet
的内容并创建指向Hive Metastore
中数据的指针。只要您保持与同一 Metastore
的连接,即使您的Spark
程序重新启动后,持久表仍然存在。可以通过spark.sql()
方法通过表的名称调用来创建持久表的Dataset
。
对于基于文件的数据源,例如text
,parquet
,json
等,您可以通过path
选项指定自定义表路径 ,例如:
df.write.option("path", "/some/path").saveAsTable("t")
删除表时,将不会删除自定义表路径,并且表数据仍然存在。如果未指定自定义表路径,则Spark
会将数据写入仓库目录下的默认表路径。删除表时,也将删除默认表路径。
从Spark 2.1
开始,持久数据源表将每个分区元数据存储在Hive Metastore
中。这带来了几个好处:
由于Metastore
只能返回查询所需的分区,因此不再需要在表的第一个查询中发现所有分区。
Hive DDL ALTER TABLE PARTITION ... SET LOCATION
现在可用于使用 Datasource API
创建的表。
请注意,在创建外部数据源表(带有path
选项的表)时,默认情况下不会收集分区信息。要同步Metastore
中的分区信息,可以调用MSCK REPAIR TABLE
。
//写入到t1表
peopleDF.select("name","age").write().saveAsTable("t1")
存储和排序或分区
对于基于文件的数据源,还可以对输出进行存储和排序或分区。存储和排序仅适用于持久表:
peopleDF.write().bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed");
分区可以在save
和saveAsTable
时使用
usersDF
.write()
.partitionBy("name")
.format("parquet")
.save("people");
分区后结构:
可以对单个表名称进行分区
peopleDF
.write()
.partitionBy("name")
.bucketBy(42, "age")
.saveAsTable("people");
分区后结构:
编程要求
在右侧编辑器补充代码,加载people.json
文件,以覆盖的方式保存到people
路径里,继续加载people1.json
文件,以附加的方式保存到people
路径里,最后以表格形式显示people
里前20
行Dataset
。
people.json
、people1.json
文件内容分别如下:
{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}
{"name":"Michael", "salary":"6000"}
{"name":"Andy", "age":30 , "salary":"9000"}
{"name":"Justin", "age":19 , "salary":"6900"}
测试说明
平台将对你编写的代码进行评测:
预期输出:
+----+-------+------+
| age| name|salary|
+----+-------+------+
| 21| 张三| 3000|
| 22| 李四| 4500|
| 23| 王五| 7500|
|null|Michael| 6000|
| 30| Andy| 9000|
| 19| Justin| 6900|
+----+-------+------+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;public class Test1 {public static void main(String[] args) throws AnalysisException {SparkSession spark = SparkSession .builder().appName("test1").master("local").getOrCreate();/********* Begin *********/spark.read().format("json").load("people.json").write().mode(SaveMode.Append).save("people"); spark.read().format("json").load("people1.json").write().mode(SaveMode.Append).save("people"); spark.read().load("people").show(); /********* End *********/}}
第2关:Parquet文件介绍
- 任务描述
- 相关知识
- 编程方式加载Parquet文件
- Parquet分区
- 结构合并
- 元数据刷新
- 参数配置
- 编程要求
- 测试说明
任务描述
本关任务:编写Parquet
分区文件,并输出表格内容
相关知识
为了完成本关任务,你需要掌握:
-
编程方式加载
Parquet
文件 -
Parquet
分区 -
结构合并
-
元数据刷新
-
Parquet
参数配置
编程方式加载Parquet文件
Parquet
是一种柱状格式,许多其他数据处理系统都支持它。Spark SQL
支持读取和写入Parquet
文件,这些文件自动保留原始数据的模式。在编写 Parquet
文件时,出于兼容性原因,所有列都会自动转换为可为空:
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
//Dataset写入到Parquet文件
peopleDF.write().parquet("people.parquet");
//读入上面创建的Parquet文件。
//Parquet文件是自描述的,因此保留了模式
//加载Parquet文件的结果也是Dataset
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet文件也可用于创建临时视图,然后在SQL语句
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Parquet分区
表分区是Hive
等系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值在每个分区目录的路径中编码。所有内置文件源(包括Text / CSV / JSON / ORC / Parquet
)都能够自动发现和推断分区信息。
例如,我们可以使用以下目录结构将所有以前使用的填充数据存储到分区表中,使用两个额外的列,gender
和country
作为分区列:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
通过SparkSession.read.parquet
或者SparkSession.read.load
方法读取路径path/to/table
,Spark SQL
将自动从路径中提取分区信息。返回的 Dataset
的结构信息为:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
请注意,分区列的数据类型是自动推断的。目前,支持数字数据类型,日期,时间戳和字符串类型。有时,可能不希望自动推断分区列的数据类型。
对于这些用例,可以配置是否允许自动推断类型spark.sql.sources.partitionColumnTypeInference.enabled
,默认为true
。禁用类型推断后,分区列的数据类型为字符串类型。如下:
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL基本示例")
.master("local")
.config("spark.sql.sources.partitionColumnTypeInference.enabled" , "false")
.getOrCreate();
spark.read().load("people").printSchema();
// age由自动推断的数字数据类型,变成字符串类型
// root
// |-- name: string (nullable = true)
// |-- age: string (nullable = true)
// |-- gender: string (nullable = true)
// |-- country: string (nullable = true)
从Spark 1.6.0
开始,分区默认只查找给定路径下的分区。对于上面的示例,如果用户传递path/to/table/gender=male
给 SparkSession.read.parquet
或者SparkSession.read.load
,gender
则不会将其视为分区列。
如果用户需要指定分区发现应该开始的基本路径,则可以basePath
在option
中进行设置。例如,当path / to / table / gender = male
是数据的路径,用户将basePath
设置为path / to / table /
时,gender
将成为分区列。
//不设置basePath
spark.read().load("people/age=21").printSchema();
//root
//|-- name: string (nullable = true)
//|-- salary: string (nullable = true)
//设置basePath
spark.read().option("basePath", "people").load("people/age=21").printSchema();
//root
//|-- name: string (nullable = true)
//|-- salary: string (nullable = true)
//|-- age: integer (nullable = true)
结构合并
Parquet
支持模式演变。用户可以从简单模式开始,并根据需要逐渐向模式添加更多列。通过这种方式,用户可能最终得到具有不同但相互兼容的模式的多个 Parquet
文件。Parquet
数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一项相对昂贵的操作,并且在大多数情况下不是必需的,默认是关闭的。可以通过一下两个方法启用它:
-
option
设置参数mergeSchema
为true
-
将全局
config
设置spark.sql.parquet.mergeSchema
为true
示例如下:
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// 创建一个简单的Dateset,存储到一个分区目录中
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
//在新的分区目录中创建另一个Dataset,
//添加新列并删除现有列
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// 读取分区表
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
//最终模式由Parquet文件中的所有3列组成
//分区列出现在分区目录路径中
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
元数据刷新
Spark SQL
缓存Parquet
元数据以获得更好的性能。启用Hive Metastore Parquet
表转换后,还会缓存这些转换表的元数据。
如果这些表由Hive
或其他外部工具更新,则需要手动刷新它们以确保元数据一致。
spark.catalog().refreshTable("my_table");
参数配置
可以使用SQL setConf
上的方法SparkSession
或通过SET key=value
使用SQL
运行 命令来完成Parquet
的配置。
参数名称 | 默认值 | 描述 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 其他一些Parquet 生成系统,特别是Impala ,Hive 和旧版本的Spark SQL ,在写出Parquet 模式时不区分二进制数据和字符串。此标志告诉Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。 |
spark.sql.parquet.int96AsTimestamp | true | 一些Parquet 生产系统,特别是Impala 和Hive ,将时间戳存储到INT96 中。此标志告诉Spark SQL 将INT96 数据解释为时间戳,以提供与这些系统的兼容性。 |
spark.sql.parquet.compression.codec | ture | 设置编写Parquet 文件时使用的压缩编解码器。如果在特定于表的选项/属性中指定了“compression ”或“parquet.compression ”,则优先级为“compression ”,“parquet.compression ”,“spark.sql.parquet.compression.codec ”。可接受的值包括:none ,uncompressed ,snappy ,gzip ,lzo ,brotli ,lz4 ,zstd 。请注意,zstd 需要在Hadoop 2.9.0 之前安装ZStandardCodec ,brotli 需要安装BrotliCodec 。 |
spark.sql.parquet.filterPushdown | true | 设置为true 时启用Parquet 过滤器下推优化。 |
spark.sql.hive.convertMetastoreParquet | true | 设置为false 时,Spark SQL 将使用Hive SerDe 作为镶木桌而不是内置支持。 |
spark.sql.parquet.mergeSchema | false | 如果为true ,则Parquet 数据源合并从所有数据文件收集的模式,否则,如果没有可用的摘要文件,则从摘要文件或随机数据文件中选取模式。 |
spark.sql.parquet.writeLegacyFormat | false | 如果为true ,则数据将以Spark 1.4 及更早版本的方式写入。例如,十进制值将以Apache Parquet 的固定长度字节数组格式写入,其他系统(如Apache Hive 和Apache Impala )也使用该格式。如果为false ,将使用Parquet 中的较新格式。例如,小数将以基于int 的格式写入。如果Parquet 输出旨在用于不支持此较新格式的系统,请设置为true 。 |
编程要求
在右侧编辑器补充代码,把文件people
、people1
存在people
路径下,通过id=1
和id=2
进行分区,以表格方式显示前20
行内容。
people.json
、people1.json
文件内容分别如下:
{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}
{"name":"Michael", "salary":"6000"}
{"name":"Andy", "age":30 , "salary":"9000"}
{"name":"Justin", "age":19 , "salary":"6900"}
测试说明
平台将对你编写的代码进行评测:
预期输出:
+----+-------+------+---+
| age| name|salary| id|
+----+-------+------+---+
| 21| 张三| 3000| 1|
| 22| 李四| 4500| 1|
| 23| 王五| 7500| 1|
|null|Michael| 6000| 2|
| 30| Andy| 9000| 2|
| 19| Justin| 6900| 2|
+----+-------+------+---+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;public class Test2 {public static void main(String[] args) throws AnalysisException {SparkSession spark = SparkSession .builder().appName("test1").master("local").getOrCreate();/********* Begin *********/spark.read().format("json").load("people.json").write().parquet("people/id=1"); spark.read().format("json").load("people1.json").write().parquet("people/id=2"); spark.read().load("people").show(); /********* End *********/}}
第3关:json文件介绍
- 任务描述
- 相关知识
- json文件介绍
- 编程要求
- 测试说明
任务描述
本关任务:编写一个sparksql
程序,统计平均薪水。
相关知识
为了完成本关任务,你需要掌握json
文件介绍及使用。
json文件介绍
Spark SQL
可以自动推断JSON
数据集的模式并将其加载为Dataset<Row>
。可以使用SparkSession.read().json()
。
请注意,作为json
文件提供的文件不是典型的JSON
文件。每行必须包含一个单独的,自包含的有效JSON
对象。有关更多信息,请参阅JSON Lines
文本格式,也称为换行符分隔的JSON
。
例如:
{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}
对于常规多行JSON
文件,请将multiLine
选项设置为true
。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
//路径指向JSON数据集。
//路径可以是单个文本文件,也可以是存储文本文件的目录
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
//可以使用printSchema()方法显示推断的模式
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
//使用Dataset创建临时视图
people.createOrReplaceTempView("people");
//可以使用spark提供的sql方法运行SQL语句
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
//或者,可以为表示的JSON数据集创建Dataset
//数据集<String>每个字符串存储一个JSON对象。
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
编程要求
在右侧编辑器补充代码,通过people
文件和people1
文件,统计薪水平均值。
people.json
、people1.json
文件内容分别如下:
{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}
{"name":"Michael", "salary":"6000"}
{"name":"Andy", "age":30 , "salary":"9000"}
{"name":"Justin", "age":19 , "salary":"6900"}
测试说明
平台将对你编写的代码进行评测:
预期输出:
+---------------------------+
|avg(CAST(salary AS DOUBLE))|
+---------------------------+
| 6150.0|
+---------------------------+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;public class Test3 {public static void main(String[] args) throws AnalysisException {SparkSession spark = SparkSession .builder().appName("test1").master("local").getOrCreate();/********* Begin *********/spark.read().format("json").load("people.json").createOrReplaceTempView("people"); spark.read().format("json").load("people1.json").createOrReplaceTempView("people1"); spark.sql("select avg(salary) from ( select salary from people union all select salary from people1) a").show(); /********* End *********/}}
第4关:JDBC读取数据源
- 任务描述
- 相关知识
- 使用JDBC如何读取数据源
- 编程要求
- 测试说明
任务描述
本关任务:编写sparksql
程序,保存文件信息到mysql
,并从mysql
进行读取。
相关知识
为了完成本关任务,你需要掌握如何使用JDBC
读取数据源。
使用JDBC如何读取数据源
Spark SQL
还包括一个可以使用JDBC
从其他数据库读取数据的数据源,与使用JdbcRDD
相比,此功能应该更受欢迎。这是因为结果作为DataSet
返回,可以在Spark SQL
中轻松处理,也可以与其他数据源连接。
JDBC
数据源也更易于使用Java
或Python
,因为它不需要用户提供 ClassTag
。(请注意,这与Spark SQL JDBC
服务器不同,后者允许其他应用程序使用Spark SQL
运行查询)。
首先,您需要在spark
类路径中包含特定数据库的JDBC
驱动程序。例如,要从 Spark Shell
连接到postgres
,您将运行以下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
可以使用Data Sources API
将远程数据库中的表加载为Dataset
或Spark SQL
临时视图。用户可以在数据源选项中指定JDBC
连接属性。 user
和 password
通常作为用于登录数据源的连接属性提供。除连接属性外,Spark
还支持以下不区分大小写的选项:
常量名称 | 含义 |
---|---|
url | 要连接的JDBC URL。可以在URL中指定特定于源的连接属性。例如,jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 应该读取或写入的JDBC表。请注意,在读取路径中使用它时,可以使用在FROMSQL查询的子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表。不允许同时指定dbtable 和query 选项。 |
query | 将用于将数据读入Spark的查询。指定的查询将被括起来并在FROM子句中用作子查询。Spark还会为子查询子句分配别名。 |
driver | 用于连接到此URL的JDBC驱动程序的类名。 |
partitionColumn, lowerBound, upperBound | 如果指定了任何选项,则必须全部指定这些选项。另外, numPartitions必须指定。它们描述了在从多个工作者并行读取时如何对表进行分区。 partitionColumn必须是相关表中的数字,日期或时间戳列。请注意,lowerBound和upperBound只是用来决定分区步幅,而不是在表中过滤行。因此,表中的所有行都将被分区并返回。此选项仅适用于阅读。 |
numPartitions | 表读取和写入中可用于并行的最大分区数。这还确定了最大并发JDBC连接数。如果要写入的分区数超过此限制,我们通过coalesce(numPartitions)在写入之前调用将其减少到此限制。 |
queryTimeout | 驱动程序等待Statement对象执行到指定秒数的秒数。零意味着没有限制。在写入路径中,此选项取决于JDBC驱动程序如何实现API setQueryTimeout,例如,h2 JDBC驱动程序检查每个查询的超时而不是整个JDBC批处理。它默认为0。 |
fetchsize | JDBC提取大小,用于确定每次往返要获取的行数。这可以帮助JDBC驱动程序的性能,默认为低读取大小(例如,Oracle有10行)。此选项仅适用于阅读。 |
batchsize | JDBC批处理大小,用于确定每次往返要插入的行数。这可以帮助JDBC驱动程序的性能。此选项仅适用于书写。它默认为1000。 |
isolationLevel | 事务隔离级别,适用于当前连接。它可以是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。此选项仅适用于书写。请参阅文档java.sql.Connection。 |
sessionInitStatement | 在向远程数据库打开每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句(或PL / SQL块)。使用它来实现会话初始化代码。例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") |
truncate | 这是JDBC编写器相关选项。当SaveMode.Overwrite启用时,此选项会导致火花截断,而不是删除和重建其现有的表。这可以更有效,并防止删除表元数据(例如,索引)。但是,在某些情况下,例如新数据具有不同的架构时,它将无法工作。它默认为false。此选项仅适用于书写。 |
cascadeTruncate | 这是JDBC编写器相关选项。如果JDBC数据库(PostgreSQL和Oracle目前)启用并支持,则此选项允许执行a TRUNCATE TABLE t CASCADE(在TRUNCATE TABLE ONLY t CASCADE执行PostgreSQL的情况下执行a以防止无意中截断后代表)。这将影响其他表,因此应谨慎使用。此选项仅适用于书写。它默认为有问题的JDBC数据库的默认级联截断行为,isCascadeTruncate在每个JDBCDialect中指定。 |
createTableOptions | 这是JDBC编写器相关选项。如果指定,则此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB.)。此选项仅适用于书写。 |
createTableColumnTypes | 创建表时要使用的数据库列数据类型而不是默认值。数据类型信息应以与CREATE TABLE列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应该是有效的spark sql数据类型。此选项仅适用于写入。 |
customSchema | 用于从JDBC连接器读取数据的自定义架构。例如,"id DECIMAL(38, 0), name STRING"。您还可以指定部分字段,其他字段使用默认类型映射。例如,"id DECIMAL(38, 0)"。列名应与JDBC表的相应列名相同。用户可以指定Spark SQL的相应数据类型,而不是使用默认值。此选项仅适用于阅读。 |
pushDownPredicate | 用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark会尽可能地将过滤器下推到JDBC数据源。否则,如果设置为false,则不会将过滤器下推到JDBC数据源,因此所有过滤器都将由Spark处理。当Spark通过比JDBC数据源更快地执行谓词过滤时,谓词下推通常会被关闭。 |
//注意:可以通过load / save或jdbc方法实现JDBC加载和保存
//从JDBC源
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
//将数据保存到mysql源
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&cha\fracterEncoding=utf-8")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
//写入
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
编程要求
在右侧编辑器补充代码,读取people
、people1
文件到mysql
的people
表,并从people
表里读取内容,以表格方式显示前20
行内容。
people.json
、people1.json
文件内容分别如下:
{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}
{"name":"Michael", "salary":"6000"}
{"name":"Andy", "age":30 , "salary":"9000"}
{"name":"Justin", "age":19 , "salary":"6900"}
测试说明
平台将对你编写的代码进行评测:
预期输出:
+----+-------+------+
| age| name|salary|
+----+-------+------+
| 21| 张三| 3000|
| 22| 李四| 4500|
| 23| 王五| 7500|
|null|Michael| 6000|
| 30| Andy| 9000|
| 19| Justin| 6900|
+----+-------+------+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;public class Test4 {public static void case4(SparkSession spark) {/********* Begin *********/Dataset<Row> load = spark.read().format("json").load("people.json"); load.write() .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "people") .option("user", "root") .option("password", "123123") .mode(SaveMode.Overwrite) .save(); Dataset<Row> load1 = spark.read().format("json").load("people1.json"); load1.write() .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "people") .option("user", "root") .option("password", "123123") .mode(SaveMode.Append) .save(); Dataset<Row> load2 = spark.read() .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "people") .option("user", "root") .option("password", "123123").load(); load2.show(); /********* End *********/}}