SparkSQL数据源

目录

第1关:SparkSQL加载和保存

任务描述

相关知识

加载数据

直接在文件上运行SQL

保存到路径

保存模式介绍

保存到持久表

存储和排序或分区

编程要求

测试说明

第2关:Parquet文件介绍

任务描述

相关知识

编程方式加载Parquet文件

Parquet分区

结构合并

元数据刷新

参数配置

编程要求

测试说明

第3关:json文件介绍

任务描述

相关知识

json文件介绍

编程要求

测试说明

第4关:JDBC读取数据源

任务描述

相关知识

使用JDBC如何读取数据源

编程要求

测试说明


第1关:SparkSQL加载和保存

  • 任务描述
  • 相关知识
    • 加载数据
    • 直接在文件上运行SQL
    • 保存到路径
    • 保存模式介绍
    • 保存到持久表
    • 存储和排序或分区
  • 编程要求
  • 测试说明

任务描述

本关任务:编写一个SparkSQL程序,完成加载和保存数据。

相关知识

为了完成本关任务,你需要掌握:

  1. 加载数据

  2. 直接在文件上运行SQL

  3. 保存到路径

  4. 保存模式介绍

  5. 保存到持久表

  6. 存储和排序或分区

加载数据

DataFrameReader用于从外部存储系统(例如文件系统,键值存储等)加载数据集的接口。使用SparkSession.read来访问它。

DataFrameReader提供了(jsonparquetjdbcorclibsvmcsvtext)格式支持,DataFrameReader.load(String... paths)方法支持多个路径的数据源,默认使用parquet格式(除非另有配置,spark.sql.sources.default)用于所有操作

 
  1. Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");

您还可以手动指定将要使用的数据源以及要传递给数据源的任何其他选项。数据源通过其全名指定(即org.apache.spark.sql.parquet),但内置的来源,你也可以使用自己的短名称(jsonparquetjdbcorclibsvmcsvtext)。从任何数据源类型加载的 Dataset 都可以使用此语法转换为其他类型。

要加载JSON文件,您可以使用:

 
  1. //加载
  2. Dataset<Row> peopleDF =
  3. spark.read().format("json").load("examples/src/main/resources/people.json");

要加载CSV文件,您可以使用:

 
  1. Dataset<Row> peopleDFCsv = spark.read().format("csv")
  2. .option("sep", ";")
  3. .option("inferSchema", "true")
  4. .option("header", "true")
  5. .load("examples/src/main/resources/people.csv");
直接在文件上运行SQL

您可以直接使用SQL查询该文件,而不是使用读取API将文件加载到 Dataset并进行查询。

 
  1. Dataset<Row> sqlDF =
  2. spark.sql("SELECT * FROM parquet.`src/main/resources/users.parquet`");
保存到路径

DataFrameWriter用于将数据集写入外部存储系统的接口(例如文件系统,键值存储等), 使用Dataset.write访问它。

使用DataFrameWriter.save(String path),就可以将Dataset的内容保存在指定的路径中。

 
  1. //写入并保存到指定路径
  2. peopleDF.select("name","age").write().format("parquet").save("F:\\test\\anamesAndAges");
保存模式介绍

save()方法支持设置保存模式,使用DataFrameWriter.mode(SaveMode saveMode)可用于指定将Dataset保存到数据源的预期行为,指定如何处理现有数据(例如,执行时设置类型为Overwrite,则数据将在写出新数据之前被删除。)但需要注意的这些保存模式不使用任何锁定并且不是原子的。

SaveMode 类型如下:

Scala/Java含义
SaveMode.ErrorIfExists (默认)Dataset保存到数据源时,如果数据已存在,则会引发异常。
SaveMode.AppendDataset保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。
SaveMode.Overwrite覆盖模式意味着在将Dataset保存到数据源时,如果数据/表已经存在,则预期现有数据将被Dataset的内容覆盖。
SaveMode.Ignore忽略模式意味着在将Dataset保存到数据源时,如果数据已存在,则预期保存操作不会保存Dataset的内容而不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL中的类似。
 
  1. //覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
  2. peopleDF.select("name","age").write().mode("overwrite").save("F:\\test\\anamesAndAges")
保存到持久表

Dataset也可以使用saveAsTable 命令将持久表保存到Hive Metastore中。请注意,使用此功能不需要现有的Hive部署。Spark将为您创建默认的本地Hive Metastore(使用Derby)。

createOrReplaceTempView命令不同,saveAsTable将实现DataSet的内容并创建指向Hive Metastore中数据的指针。只要您保持与同一 Metastore的连接,即使您的Spark程序重新启动后,持久表仍然存在。可以通过spark.sql()方法通过表的名称调用来创建持久表的Dataset

对于基于文件的数据源,例如textparquetjson等,您可以通过path选项指定自定义表路径 ,例如:

 
  1. df.write.option("path", "/some/path").saveAsTable("t")

删除表时,将不会删除自定义表路径,并且表数据仍然存在。如果未指定自定义表路径,则Spark会将数据写入仓库目录下的默认表路径。删除表时,也将删除默认表路径。

Spark 2.1开始,持久数据源表将每个分区元数据存储在Hive Metastore中。这带来了几个好处:

由于Metastore只能返回查询所需的分区,因此不再需要在表的第一个查询中发现所有分区。

Hive DDL ALTER TABLE PARTITION ... SET LOCATION现在可用于使用 Datasource API创建的表。

请注意,在创建外部数据源表(带有path选项的表)时,默认情况下不会收集分区信息。要同步Metastore中的分区信息,可以调用MSCK REPAIR TABLE

 
  1. //写入到t1表
  2. peopleDF.select("name","age").write().saveAsTable("t1")
存储和排序或分区

对于基于文件的数据源,还可以对输出进行存储和排序或分区。存储和排序仅适用于持久表:

 
  1. peopleDF.write().bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed");

分区可以在savesaveAsTable时使用

 
  1. usersDF
  2. .write()
  3. .partitionBy("name")
  4. .format("parquet")
  5. .save("people");

分区后结构:

可以对单个表名称进行分区

 
  1. peopleDF
  2. .write()
  3. .partitionBy("name")
  4. .bucketBy(42, "age")
  5. .saveAsTable("people");

分区后结构:

编程要求

在右侧编辑器补充代码,加载people.json文件,以覆盖的方式保存到people路径里,继续加载people1.json文件,以附加的方式保存到people路径里,最后以表格形式显示people里前20Dataset

people.jsonpeople1.json文件内容分别如下:

 
  1. {"age":21,"name":"张三", "salary":"3000"}
  2. {"age":22,"name":"李四", "salary":"4500"}
  3. {"age":23,"name":"王五", "salary":"7500"}
 
  1. {"name":"Michael", "salary":"6000"}
  2. {"name":"Andy", "age":30 , "salary":"9000"}
  3. {"name":"Justin", "age":19 , "salary":"6900"}
测试说明

平台将对你编写的代码进行评测:

预期输出:

 
  1. +----+-------+------+
  2. | age| name|salary|
  3. +----+-------+------+
  4. | 21| 张三| 3000|
  5. | 22| 李四| 4500|
  6. | 23| 王五| 7500|
  7. |null|Michael| 6000|
  8. | 30| Andy| 9000|
  9. | 19| Justin| 6900|
  10. +----+-------+------+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;public class Test1 {public static void main(String[] args) throws AnalysisException {SparkSession  spark  =  SparkSession .builder().appName("test1").master("local").getOrCreate();/********* Begin *********/spark.read().format("json").load("people.json").write().mode(SaveMode.Append).save("people");  spark.read().format("json").load("people1.json").write().mode(SaveMode.Append).save("people");  spark.read().load("people").show();  /********* End *********/}}

第2关:Parquet文件介绍

  • 任务描述
  • 相关知识
    • 编程方式加载Parquet文件
    • Parquet分区
    • 结构合并
    • 元数据刷新
    • 参数配置
  • 编程要求
  • 测试说明

任务描述

本关任务:编写Parquet分区文件,并输出表格内容

相关知识

为了完成本关任务,你需要掌握:

  1. 编程方式加载Parquet文件

  2. Parquet分区

  3. 结构合并

  4. 元数据刷新

  5. Parquet参数配置

编程方式加载Parquet文件

Parquet是一种柱状格式,许多其他数据处理系统都支持它。Spark SQL支持读取和写入Parquet文件,这些文件自动保留原始数据的模式。在编写 Parquet文件时,出于兼容性原因,所有列都会自动转换为可为空:

 
  1. import org.apache.spark.api.java.function.MapFunction;
  2. import org.apache.spark.sql.Encoders;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
  6. //Dataset写入到Parquet文件
  7. peopleDF.write().parquet("people.parquet");
  8. //读入上面创建的Parquet文件。
  9. //Parquet文件是自描述的,因此保留了模式
  10. //加载Parquet文件的结果也是Dataset
  11. Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
  12. // Parquet文件也可用于创建临时视图,然后在SQL语句
  13. parquetFileDF.createOrReplaceTempView("parquetFile");
  14. Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
  15. Dataset<String> namesDS = namesDF.map(
  16. (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
  17. Encoders.STRING());
  18. namesDS.show();
  19. // +------------+
  20. // | value|
  21. // +------------+
  22. // |Name: Justin|
  23. // +------------+
Parquet分区

表分区是Hive等系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值在每个分区目录的路径中编码。所有内置文件源(包括Text / CSV / JSON / ORC / Parquet)都能够自动发现和推断分区信息。

例如,我们可以使用以下目录结构将所有以前使用的填充数据存储到分区表中,使用两个额外的列,gendercountry作为分区列:

 
  1. path
  2. └── to
  3. └── table
  4. ├── gender=male
  5. │ ├── ...
  6. │ │
  7. │ ├── country=US
  8. │ │ └── data.parquet
  9. │ ├── country=CN
  10. │ │ └── data.parquet
  11. │ └── ...
  12. └── gender=female
  13. ├── ...
  14. ├── country=US
  15. │ └── data.parquet
  16. ├── country=CN
  17. │ └── data.parquet
  18. └── ...

通过SparkSession.read.parquet或者SparkSession.read.load方法读取路径path/to/tableSpark SQL将自动从路径中提取分区信息。返回的 Dataset的结构信息为:

 
  1. root
  2. |-- name: string (nullable = true)
  3. |-- age: long (nullable = true)
  4. |-- gender: string (nullable = true)
  5. |-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断的。目前,支持数字数据类型,日期,时间戳和字符串类型。有时,可能不希望自动推断分区列的数据类型。

对于这些用例,可以配置是否允许自动推断类型spark.sql.sources.partitionColumnTypeInference.enabled,默认为true。禁用类型推断后,分区列的数据类型为字符串类型。如下:

 
  1. SparkSession spark = SparkSession
  2. .builder()
  3. .appName("Java Spark SQL基本示例")
  4. .master("local")
  5. .config("spark.sql.sources.partitionColumnTypeInference.enabled" , "false")
  6. .getOrCreate();
  7. spark.read().load("people").printSchema();
  8. // age由自动推断的数字数据类型,变成字符串类型
  9. // root
  10. // |-- name: string (nullable = true)
  11. // |-- age: string (nullable = true)
  12. // |-- gender: string (nullable = true)
  13. // |-- country: string (nullable = true)

Spark 1.6.0开始,分区默认只查找给定路径下的分区。对于上面的示例,如果用户传递path/to/table/gender=maleSparkSession.read.parquet或者SparkSession.read.loadgender则不会将其视为分区列。

如果用户需要指定分区发现应该开始的基本路径,则可以basePathoption中进行设置。例如,当path / to / table / gender = male是数据的路径,用户将basePath设置为path / to / table /时,gender将成为分区列。

 
  1. //不设置basePath
  2. spark.read().load("people/age=21").printSchema();
  3. //root
  4. //|-- name: string (nullable = true)
  5. //|-- salary: string (nullable = true)
  6. //设置basePath
  7. spark.read().option("basePath", "people").load("people/age=21").printSchema();
  8. //root
  9. //|-- name: string (nullable = true)
  10. //|-- salary: string (nullable = true)
  11. //|-- age: integer (nullable = true)
结构合并

Parquet支持模式演变。用户可以从简单模式开始,并根据需要逐渐向模式添加更多列。通过这种方式,用户可能最终得到具有不同但相互兼容的模式的多个 Parquet文件。Parquet数据源现在能够自动检测这种情况并合并所有这些文件的模式。

由于模式合并是一项相对昂贵的操作,并且在大多数情况下不是必需的,默认是关闭的。可以通过一下两个方法启用它:

  1. option设置参数mergeSchematrue

  2. 将全局config设置spark.sql.parquet.mergeSchematrue

示例如下:

 
  1. import java.io.Serializable;
  2. import java.util.ArrayList;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import org.apache.spark.sql.Dataset;
  6. import org.apache.spark.sql.Row;
  7. public static class Square implements Serializable {
  8. private int value;
  9. private int square;
  10. // Getters and setters...
  11. }
  12. public static class Cube implements Serializable {
  13. private int value;
  14. private int cube;
  15. // Getters and setters...
  16. }
  17. List<Square> squares = new ArrayList<>();
  18. for (int value = 1; value <= 5; value++) {
  19. Square square = new Square();
  20. square.setValue(value);
  21. square.setSquare(value * value);
  22. squares.add(square);
  23. }
  24. // 创建一个简单的Dateset,存储到一个分区目录中
  25. Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
  26. squaresDF.write().parquet("data/test_table/key=1");
  27. List<Cube> cubes = new ArrayList<>();
  28. for (int value = 6; value <= 10; value++) {
  29. Cube cube = new Cube();
  30. cube.setValue(value);
  31. cube.setCube(value * value * value);
  32. cubes.add(cube);
  33. }
  34. //在新的分区目录中创建另一个Dataset,
  35. //添加新列并删除现有列
  36. Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
  37. cubesDF.write().parquet("data/test_table/key=2");
  38. // 读取分区表
  39. Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
  40. mergedDF.printSchema();
  41. //最终模式由Parquet文件中的所有3列组成
  42. //分区列出现在分区目录路径中
  43. // root
  44. // |-- value: int (nullable = true)
  45. // |-- square: int (nullable = true)
  46. // |-- cube: int (nullable = true)
  47. // |-- key: int (nullable = true)
元数据刷新

Spark SQL缓存Parquet元数据以获得更好的性能。启用Hive Metastore Parquet表转换后,还会缓存这些转换表的元数据。

如果这些表由Hive或其他外部工具更新,则需要手动刷新它们以确保元数据一致。

 
  1. spark.catalog().refreshTable("my_table");
参数配置

可以使用SQL setConf上的方法SparkSession或通过SET key=value使用SQL运行 命令来完成Parquet的配置。

参数名称默认值描述
spark.sql.parquet.binaryAsStringfalse其他一些Parquet生成系统,特别是ImpalaHive和旧版本的Spark SQL,在写出Parquet模式时不区分二进制数据和字符串。此标志告诉Spark SQL将二进制数据解释为字符串,以提供与这些系统的兼容性。
spark.sql.parquet.int96AsTimestamptrue一些Parquet生产系统,特别是ImpalaHive,将时间戳存储到INT96中。此标志告诉Spark SQLINT96数据解释为时间戳,以提供与这些系统的兼容性。
spark.sql.parquet.compression.codecture设置编写Parquet文件时使用的压缩编解码器。如果在特定于表的选项/属性中指定了“compression”或“parquet.compression”,则优先级为“compression”,“parquet.compression”,“spark.sql.parquet.compression.codec”。可接受的值包括:noneuncompressedsnappygziplzobrotlilz4zstd。请注意,zstd需要在Hadoop 2.9.0之前安装ZStandardCodecbrotli需要安装BrotliCodec
spark.sql.parquet.filterPushdowntrue设置为true时启用Parquet过滤器下推优化。
spark.sql.hive.convertMetastoreParquettrue设置为false时,Spark SQL将使用Hive SerDe作为镶木桌而不是内置支持。
spark.sql.parquet.mergeSchemafalse如果为true,则Parquet数据源合并从所有数据文件收集的模式,否则,如果没有可用的摘要文件,则从摘要文件或随机数据文件中选取模式。
spark.sql.parquet.writeLegacyFormatfalse如果为true,则数据将以Spark 1.4及更早版本的方式写入。例如,十进制值将以Apache Parquet的固定长度字节数组格式写入,其他系统(如Apache HiveApache Impala)也使用该格式。如果为false,将使用Parquet中的较新格式。例如,小数将以基于int的格式写入。如果Parquet输出旨在用于不支持此较新格式的系统,请设置为true
编程要求

在右侧编辑器补充代码,把文件peoplepeople1存在people路径下,通过id=1id=2进行分区,以表格方式显示前20行内容。

people.jsonpeople1.json文件内容分别如下:

 
  1. {"age":21,"name":"张三", "salary":"3000"}
  2. {"age":22,"name":"李四", "salary":"4500"}
  3. {"age":23,"name":"王五", "salary":"7500"}
 
  1. {"name":"Michael", "salary":"6000"}
  2. {"name":"Andy", "age":30 , "salary":"9000"}
  3. {"name":"Justin", "age":19 , "salary":"6900"}
测试说明

平台将对你编写的代码进行评测:

预期输出:

 
  1. +----+-------+------+---+
  2. | age| name|salary| id|
  3. +----+-------+------+---+
  4. | 21| 张三| 3000| 1|
  5. | 22| 李四| 4500| 1|
  6. | 23| 王五| 7500| 1|
  7. |null|Michael| 6000| 2|
  8. | 30| Andy| 9000| 2|
  9. | 19| Justin| 6900| 2|
  10. +----+-------+------+---+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;public class Test2 {public static void main(String[] args) throws AnalysisException {SparkSession  spark  =  SparkSession .builder().appName("test1").master("local").getOrCreate();/********* Begin *********/spark.read().format("json").load("people.json").write().parquet("people/id=1");  spark.read().format("json").load("people1.json").write().parquet("people/id=2");  spark.read().load("people").show();     /********* End *********/}}

第3关:json文件介绍

  • 任务描述
  • 相关知识
    • json文件介绍
  • 编程要求
  • 测试说明

任务描述

本关任务:编写一个sparksql程序,统计平均薪水。

相关知识

为了完成本关任务,你需要掌握json文件介绍及使用。

json文件介绍

Spark SQL可以自动推断JSON数据集的模式并将其加载为Dataset<Row>。可以使用SparkSession.read().json()

请注意,作为json文件提供的文件不是典型的JSON文件。每行必须包含一个单独的,自包含的有效JSON对象。有关更多信息,请参阅JSON Lines文本格式,也称为换行符分隔的JSON

例如:

 
  1. {"age":21,"name":"张三", "salary":"3000"}
  2. {"age":22,"name":"李四", "salary":"4500"}
  3. {"age":23,"name":"王五", "salary":"7500"}

对于常规多行JSON文件,请将multiLine选项设置为true

 
  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. //路径指向JSON数据集。
  4. //路径可以是单个文本文件,也可以是存储文本文件的目录
  5. Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
  6. //可以使用printSchema()方法显示推断的模式
  7. people.printSchema();
  8. // root
  9. // |-- age: long (nullable = true)
  10. // |-- name: string (nullable = true)
  11. //使用Dataset创建临时视图
  12. people.createOrReplaceTempView("people");
  13. //可以使用spark提供的sql方法运行SQL语句
  14. Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
  15. namesDF.show();
  16. // +------+
  17. // | name|
  18. // +------+
  19. // |Justin|
  20. // +------+
  21. //或者,可以为表示的JSON数据集创建Dataset
  22. //数据集<String>每个字符串存储一个JSON对象。
  23. List<String> jsonData = Arrays.asList(
  24. "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
  25. Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
  26. Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
  27. anotherPeople.show();
  28. // +---------------+----+
  29. // | address|name|
  30. // +---------------+----+
  31. // |[Columbus,Ohio]| Yin|
  32. // +---------------+----+
编程要求

在右侧编辑器补充代码,通过people文件和people1文件,统计薪水平均值。

people.jsonpeople1.json文件内容分别如下:

 
  1. {"age":21,"name":"张三", "salary":"3000"}
  2. {"age":22,"name":"李四", "salary":"4500"}
  3. {"age":23,"name":"王五", "salary":"7500"}
 
  1. {"name":"Michael", "salary":"6000"}
  2. {"name":"Andy", "age":30 , "salary":"9000"}
  3. {"name":"Justin", "age":19 , "salary":"6900"}
测试说明

平台将对你编写的代码进行评测:

预期输出:

 
  1. +---------------------------+
  2. |avg(CAST(salary AS DOUBLE))|
  3. +---------------------------+
  4. | 6150.0|
  5. +---------------------------+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;public class Test3 {public static void main(String[] args) throws AnalysisException {SparkSession  spark  =  SparkSession .builder().appName("test1").master("local").getOrCreate();/********* Begin *********/spark.read().format("json").load("people.json").createOrReplaceTempView("people");  spark.read().format("json").load("people1.json").createOrReplaceTempView("people1");  spark.sql("select avg(salary) from ( select salary from people union all select salary from people1) a").show();  /********* End *********/}}

第4关:JDBC读取数据源

  • 任务描述
  • 相关知识
    • 使用JDBC如何读取数据源
  • 编程要求
  • 测试说明

任务描述

本关任务:编写sparksql程序,保存文件信息到mysql,并从mysql进行读取。

相关知识

为了完成本关任务,你需要掌握如何使用JDBC读取数据源。

使用JDBC如何读取数据源

Spark SQL 还包括一个可以使用JDBC从其他数据库读取数据的数据源,与使用JdbcRDD相比,此功能应该更受欢迎。这是因为结果作为DataSet返回,可以在Spark SQL中轻松处理,也可以与其他数据源连接。

JDBC数据源也更易于使用JavaPython,因为它不需要用户提供 ClassTag。(请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。

首先,您需要在spark类路径中包含特定数据库的JDBC驱动程序。例如,要从 Spark Shell连接到postgres,您将运行以下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可以使用Data Sources API将远程数据库中的表加载为DatasetSpark SQL临时视图。用户可以在数据源选项中指定JDBC连接属性。 userpassword通常作为用于登录数据源的连接属性提供。除连接属性外,Spark 还支持以下不区分大小写的选项:

常量名称含义
url要连接的JDBC URL。可以在URL中指定特定于源的连接属性。例如,jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable应该读取或写入的JDBC表。请注意,在读取路径中使用它时,可以使用在FROMSQL查询的子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表。不允许同时指定dbtablequery选项。
query将用于将数据读入Spark的查询。指定的查询将被括起来并在FROM子句中用作子查询。Spark还会为子查询子句分配别名。
driver用于连接到此URL的JDBC驱动程序的类名。
partitionColumn, lowerBound, upperBound如果指定了任何选项,则必须全部指定这些选项。另外, numPartitions必须指定。它们描述了在从多个工作者并行读取时如何对表进行分区。 partitionColumn必须是相关表中的数字,日期或时间戳列。请注意,lowerBound和upperBound只是用来决定分区步幅,而不是在表中过滤行。因此,表中的所有行都将被分区并返回。此选项仅适用于阅读。
numPartitions表读取和写入中可用于并行的最大分区数。这还确定了最大并发JDBC连接数。如果要写入的分区数超过此限制,我们通过coalesce(numPartitions)在写入之前调用将其减少到此限制。
queryTimeout驱动程序等待Statement对象执行到指定秒数的秒数。零意味着没有限制。在写入路径中,此选项取决于JDBC驱动程序如何实现API setQueryTimeout,例如,h2 JDBC驱动程序检查每个查询的超时而不是整个JDBC批处理。它默认为0。
fetchsizeJDBC提取大小,用于确定每次往返要获取的行数。这可以帮助JDBC驱动程序的性能,默认为低读取大小(例如,Oracle有10行)。此选项仅适用于阅读。
batchsizeJDBC批处理大小,用于确定每次往返要插入的行数。这可以帮助JDBC驱动程序的性能。此选项仅适用于书写。它默认为1000。
isolationLevel事务隔离级别,适用于当前连接。它可以是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。此选项仅适用于书写。请参阅文档java.sql.Connection。
sessionInitStatement在向远程数据库打开每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句(或PL / SQL块)。使用它来实现会话初始化代码。例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate这是JDBC编写器相关选项。当SaveMode.Overwrite启用时,此选项会导致火花截断,而不是删除和重建其现有的表。这可以更有效,并防止删除表元数据(例如,索引)。但是,在某些情况下,例如新数据具有不同的架构时,它将无法工作。它默认为false。此选项仅适用于书写。
cascadeTruncate这是JDBC编写器相关选项。如果JDBC数据库(PostgreSQL和Oracle目前)启用并支持,则此选项允许执行a TRUNCATE TABLE t CASCADE(在TRUNCATE TABLE ONLY t CASCADE执行PostgreSQL的情况下执行a以防止无意中截断后代表)。这将影响其他表,因此应谨慎使用。此选项仅适用于书写。它默认为有问题的JDBC数据库的默认级联截断行为,isCascadeTruncate在每个JDBCDialect中指定。
createTableOptions这是JDBC编写器相关选项。如果指定,则此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB.)。此选项仅适用于书写。
createTableColumnTypes创建表时要使用的数据库列数据类型而不是默认值。数据类型信息应以与CREATE TABLE列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应该是有效的spark sql数据类型。此选项仅适用于写入。
customSchema用于从JDBC连接器读取数据的自定义架构。例如,"id DECIMAL(38, 0), name STRING"。您还可以指定部分字段,其他字段使用默认类型映射。例如,"id DECIMAL(38, 0)"。列名应与JDBC表的相应列名相同。用户可以指定Spark SQL的相应数据类型,而不是使用默认值。此选项仅适用于阅读。
pushDownPredicate用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark会尽可能地将过滤器下推到JDBC数据源。否则,如果设置为false,则不会将过滤器下推到JDBC数据源,因此所有过滤器都将由Spark处理。当Spark通过比JDBC数据源更快地执行谓词过滤时,谓词下推通常会被关闭。
 
  1. //注意:可以通过load / save或jdbc方法实现JDBC加载和保存
  2. //从JDBC源
  3. Dataset<Row> jdbcDF = spark.read()
  4. .format("jdbc")
  5. .option("url", "jdbc:postgresql:dbserver")
  6. .option("dbtable", "schema.tablename")
  7. .option("user", "username")
  8. .option("password", "password")
  9. .load();
  10. Properties connectionProperties = new Properties();
  11. connectionProperties.put("user", "username");
  12. connectionProperties.put("password", "password");
  13. Dataset<Row> jdbcDF2 = spark.read()
  14. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
  15. //将数据保存到mysql源
  16. jdbcDF.write()
  17. .format("jdbc")
  18. .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&cha\fracterEncoding=utf-8")
  19. .option("dbtable", "schema.tablename")
  20. .option("user", "username")
  21. .option("password", "password")
  22. .save();
  23. jdbcDF2.write()
  24. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
  25. //写入
  26. jdbcDF.write()
  27. .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  28. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
编程要求

在右侧编辑器补充代码,读取peoplepeople1文件到mysqlpeople表,并从people表里读取内容,以表格方式显示前20行内容。

people.jsonpeople1.json文件内容分别如下:

 
  1. {"age":21,"name":"张三", "salary":"3000"}
  2. {"age":22,"name":"李四", "salary":"4500"}
  3. {"age":23,"name":"王五", "salary":"7500"}
 
  1. {"name":"Michael", "salary":"6000"}
  2. {"name":"Andy", "age":30 , "salary":"9000"}
  3. {"name":"Justin", "age":19 , "salary":"6900"}
测试说明

平台将对你编写的代码进行评测:

预期输出:

 
  1. +----+-------+------+
  2. | age| name|salary|
  3. +----+-------+------+
  4. | 21| 张三| 3000|
  5. | 22| 李四| 4500|
  6. | 23| 王五| 7500|
  7. |null|Michael| 6000|
  8. | 30| Andy| 9000|
  9. | 19| Justin| 6900|
  10. +----+-------+------+
package com.educoder.bigData.sparksql2;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;public class Test4 {public static void case4(SparkSession spark) {/********* Begin *********/Dataset<Row> load = spark.read().format("json").load("people.json");  load.write()  .format("jdbc")  .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8")  .option("dbtable", "people")  .option("user", "root")  .option("password", "123123")  .mode(SaveMode.Overwrite)  .save();  Dataset<Row> load1 = spark.read().format("json").load("people1.json");  load1.write()  .format("jdbc")  .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8")  .option("dbtable", "people")  .option("user", "root")  .option("password", "123123")  .mode(SaveMode.Append)  .save();  Dataset<Row> load2 = spark.read()  .format("jdbc")  .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8")  .option("dbtable", "people")  .option("user", "root")  .option("password", "123123").load();  load2.show();  /********* End *********/}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/8671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Canvas实现画板

Canvas如何实现画板功能。 <!DOCTYPE html> <html> <head><title>Canvas 画板</title><style>canvas {border: 1px solid black;}</style> </head> <body><canvas id"canvas" width"800" heig…

第41天:WEB攻防-ASP应用HTTP.SYS短文件文件解析Access注入数据库泄漏

第四十一天 一、ASP-SQL注入-Access数据库 1.解释 ACCESS数据库无管理帐号密码&#xff0c;顶级架构为表名&#xff0c;列名&#xff08;字段&#xff09;&#xff0c;数据&#xff0c;所以在注入猜解中一般采用字典猜解表和列再获取数据&#xff0c;猜解简单但又可能出现猜解…

flask和django的对比

文章目录 1. 简介2. 安装和设置3. 路由和视图4. ORM5. 管理界面6. 社区和文档7. 性能结论 当涉及构建 Web 应用程序时&#xff0c;Flask 和 Django 是两个最受欢迎的 Python Web 框架之一。它们都提供了强大的工具和功能&#xff0c;但在某些方面却有所不同。本文将对 Flask 和…

Vue-路由介绍

目录 一、思考引入 二、路由介绍 一、思考引入 单页面应用程序&#xff0c;之所以开发效率高&#xff0c;性能高&#xff0c;用户体验好&#xff0c;是因为页面按需更新。 而如果要按需更新&#xff0c;首先需要明确&#xff1a;访问路径和组件的对应关系。该关系通过路由来…

microsoft的azure语音,开发环境运行正常,发布到centos7线上服务器之后,无法运行

最近在做AI语音对话的功能&#xff0c;用到了azure的语音语音服务&#xff0c;开发的时候还算顺利&#xff0c;部署到线上后&#xff0c;发现在正式服上无法完成语音转文本的操作&#xff0c;提示&#xff1a; org.springframework.web.util.NestedServletException: Handler d…

数字图像处理知识点

数字图像处理知识点 一、绪论1、数字图像处理相关概念2、数字图像处理流程1.3 数字图像处理主要研究内容二、视觉与色度基础1、图像传感器与二维成像原理2、三基色2.1 三基色原理2.2 亮度方程3、HSI模型3.1 HSI模型优点3.2 RGB到HSI转换三、数字图像处理基础1、图像的数字化及表…

linux Shell编程之条件语句

条件测试操作 test命令 条件测试操作 Shell环境根据命令执行后的返回状态值&#xff08;$?&#xff09;来判断是否执行成功&#xff0c;当返回值为0&#xff08;真true&#xff09;时表示成功&#xff0c;返回值为非0值&#xff08;假false&#xff09;时表示失败或异常。 t…

C++ | Leetcode C++题解之第77题组合

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> temp;vector<vector<int>> ans;vector<vector<int>> combine(int n, int k) {// 初始化// 将 temp 中 [0, k - 1] 每个位置 i 设置为 i 1&#xff0c;即 [0, k - 1] 存…

如何在您的域名中使用 Google Apps 创建 SPF 记录

关于 SPF 记录 SPF 记录是一种域名服务&#xff08;DNS&#xff09;记录&#xff0c;用于标识哪些邮件服务器被允许代表您的域发送电子邮件。它与在您的 DNS 区域中添加 MX 或 A 记录一样简单。 为什么它很重要&#xff1f; 如今&#xff0c;几乎所有滥用电子邮件消息都携带…

OpenSPG docker 安装教程

文章目录 前言自述 一、OpenSPG1.介绍 二、安装步骤1.安装服务端2.客户端部署 前言 自述 我最近是想结合chatglm3-6b和知识图谱做一个垂直领域的技术规范的问答系统&#xff0c;过程中也遇到了很多困难&#xff0c;在模型微调上&#xff0c;在数据集收集整理上&#xff0c;在知…

面向侧扫声纳目标检测的YOLOX-ViT知识精馏

面向侧扫声纳目标检测的YOLOX-ViT知识精馏 摘要IntroductionRelated WorkYOLOv-ViTKnowledge DistillationExperimental Evaluation Knowledge Distillation in YOLOX-ViT for Side-Scan Sonar Object Detection 摘要 在本文中&#xff0c;作者提出了YOLOX-ViT这一新型目标检测…

Sealos急速部署生产用k8s集群

最近一段时间部署k8s全部使用sealos了&#xff0c;整体使用感觉良好&#xff0c;基本没有什么坑。推荐给大家。 使用 Sealos&#xff0c;可以安装一个不包含任何组件的裸 Kubernetes 集群。 最大的好处是提供 99 年证书&#xff0c;用到我跑路是足够了。不用像之前kubeadm安装…

【6D位姿估计】FoundationPose 支持6D位姿估计和跟踪 CVPR 2024

前言 本文介绍6D位姿估计的方法FoundationPose&#xff0c;是CVPR 2024的满分论文&#xff0c;支持6D位姿估计和跟踪。 通过大规模的合成数据训练&#xff0c;具有强大的泛化能力&#xff0c;在测试新物体时&#xff0c;无需进行微调。 论文地址&#xff1a;FoundationPose:…

1688数据分析实操技巧||1688商品数据采集接口 数据分析

今天&#xff0c;聊一聊B2B平台的数据分析&#xff0c;以1688国内站为例。 1688平台数据接口 1688也属于阿里巴巴的体系&#xff0c;跟淘宝天猫运营很像&#xff0c;因此很多淘宝天猫的玩法调整后也适用于1688。数据分析也是如此。 在1688搞数据分析&#xff0c;搞数据化运营可…

绘唐3 零基础系列教程

绘唐3 团长董事长,即可下载工具 第一讲:安装注册 日期:2024-04-01 17:50:10 录制文件:查看 第二讲:SD,MJ出图配置演示出图 日期:2024-04-01 18:06:46 录制文件:查看

【笔试训练】day22

1.添加字符 求最少不相等的位数&#xff0c;可以先求最多相等的位数。 在添加字符之前&#xff0c;A和B最多相等的位数是多少&#xff1f;由于A后面可以添加字符&#xff0c;也就使得A字符可以在B的任意一个位置开始比较。遍历一遍这个比较的起点&#xff0c;从这个起点开始跟…

Linux中的简单操作 ls/tar/pwd/cd/mkdir/touch 等

目录 前言 安装和卸载软件包 ls 查看指定路径下的文件和文件夹 tar 解压缩/压缩命令 pwd 查看当前路径 cd 改变目录 mkdir 创建目录 递归创建 rm rmdir 删除文件或目录 touch 创建文件 ll、echo、重定向符&#xff08;>,>>&#xff09; ll echo 重定向符…

SQL查询语句(四)模糊查询

前文介绍的查询语句&#xff0c;无论是利用常规的数学运算符&#xff0c;还是IN&#xff0c;BETWEEN和EXISTS等范围查询关键字&#xff0c;本质上都属于精确查询的范围&#xff0c;也就是说&#xff0c;我们在条件中写明了完全限定死的条件。而有些场景&#xff0c;我们的条件并…

安全继电器的使用和作用

目录 一、什么是安全继电器 二、安全继电器的接线方式 三、注意事项 四、总结 一、什么是安全继电器 安全继电器是由多个继电器与硬件电路组合而成的一种模块&#xff0c;是一种电路组成单元&#xff0c;其目的是要提高安全因素。完整点说&#xff0c;应该叫成安全继电器模…

【JavaEE网络】HTTP响应详解:状态码、报头与正文的全面解析

目录 HTTP响应&#xff08;Response&#xff09;认识 "状态码" (status code)认识响应 “报头”&#xff08;header&#xff09;认识响应 “正文”&#xff08;body&#xff09; HTTP响应&#xff08;Response&#xff09; 响应&#xff1a; 首行响应头空行正文 认…