Spark SQL----连接其他数据库的JDBC

Spark SQL----连接其他数据库的JDBC

  • 数据源选项

Spark SQL还包括一个数据源,可以使用JDBC从其他数据库读取数据。与使用 JdbcRDD相比,应该优先使用此功能。这是因为结果以DataFrame的形式返回,并且可以很容易地在Spark SQL中进行处理或与其他数据源join。JDBC数据源也更容易从Java或Python中使用,因为它不需要用户提供ClassTag。(请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。
要开始,你需要在spark类路径中包含特定数据库的JDBC驱动程序。例如,要从Spark Shell连接到postgres,你需要运行以下命令:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

数据源选项

Spark支持以下JDBC不区分大小写的选项。JDBC的数据源选项可以通过以下方式设置:

  • 以下类的.option/.options方法
    • DataFrameReader
    • DataFrameWriter
  • CREATE TABLE USING DATA_SOURCE处的OPTIONS子句

对于连接属性,用户可以在数据源选项中指定JDBC连接属性。用户和密码通常作为登录到数据源的连接属性提供。

Property NameDefaultMeaningScope
url(none)要连接到的JDBC:subprotocol:subname 形式的JDBC URL。特定于源的连接属性可以在URL中指定。例如, jdbc:postgresql://localhost/test?user=fred&password=secretread/write
dbtable(none)应该读取或写入的JDBC表。请注意,当在读路径中使用它时,可以使用SQL查询的FROM子句中有效的任何内容。例如,除了完整的表,还可以在括号中使用子查询。不允许同时指定dbtable和query选项。read/write
query(none)将用于把数据读取到Spark中的查询。指定的查询将用括号括起来,并用作FROM子句中的子查询。Spark还将为子查询子句分配一个别名。例如,spark将向JDBC源发出以下形式的查询。SELECT <columns> FROM (<user_specified_query>) spark_gen_alias。 下面是使用此选项时的几个限制。1. 不允许同时指定dbtable和query选项。2. 不允许同时指定query和partitionColumn选项。当需要指定partitionColumn选项时,可以使用dbtable选项来指定子查询,并且可以使用作为dbtable的一部分提供的子查询别名来限定分区列。例子: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“query”, “select c1, c2 from t1”).load()read/write
prepareQuery(none)与query一起构成最终查询的前缀。由于指定的查询将作为FROM子句中的子查询括起来,而且有些数据库不支持子查询中的所有子句,因此prepareQuery属性提供了一种运行此类复杂查询的方法。作为示例,spark将向JDBC Source发出如下形式的查询。 <prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias。 下面是几个例子。1. MSSQL Server不接受子查询中的WITH子句,但可以将这样的查询拆分为prepareQuery和query: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“prepareQuery”, “WITH t AS (SELECT x, y FROM tbl)”).option(“query”, “SELECT * FROM t WHERE x > 10”).load() 2. MSSQL Server不接受子查询中的临时表子句,但可以将这样的查询拆分为prepareQuery和query: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“prepareQuery”, “(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)”).option(“query”, “SELECT * FROM #TempTable”).load()read/write
driver(none)用于连接到此URL的JDBC驱动程序的类名。read/write
partitionColumn, lowerBound, upperBound(none)如果指定了其中任何一个选项,则必须指定所有这些选项。此外,还必须指定numPartitions。它们描述了在从多个workers并行读取数据时如何对表进行分区。partitionColumn必须是相关表中的数字列、日期列或时间戳列。注意,lowerBound和upperBound仅用于决定分区步长(stride),而不是用于过滤表中的行。因此表中的所有行都将被分区并返回。此选项仅适用于读取。例子: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“dbtable”, “(select c1, c2 from t1) as subq”).option(“partitionColumn”, “c1”).option(“lowerBound”, “1”).option(“upperBound”, “100”).option(“numPartitions”, “3”).load()read
numPartitions(none)在表读取和写入过程中可用于并行的最大分区数。这也决定了并发JDBC连接的最大数量。如果要写入的分区数超过了这个限制,我们会在写入之前通过调用coalesce(numPartitions)将其减少到这个限制。read/write
queryTimeout0驱动程序将等待语句对象执行的秒数到给定的秒数。零表示没有限制。在写路径中,这个选项取决于JDBC驱动程序如何实现API setQueryTimeout,例如,h2 JDBC驱动程序检查每个查询的超时,而不是整个JDBC batch。read/write
fetchsize0JDBC获取大小,它决定每次往返要获取多少行。这可以帮助JDBC驱动程序的性能,默认为低读取大小(例如Oracle的10行)。read
batchsize1000JDBC批处理大小,它决定每次往返要插入多少行。这有助于提高JDBC驱动程序的性能。此选项仅适用于写入。write
isolationLevelREAD_UNCOMMITTED事务隔离级别,适用于当前连接。它可以是NONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ或SERIALIZABLE中的一个,对应于JDBC的Connection对象定义的标准事务隔离级别,默认为READ_UNCOMMITTED。请参考java.sql.Connection中的文档。write
sessionInitStatement(none)在向远程DB打开每个数据库会话之后,在开始读取数据之前,该选项执行自定义SQL语句(或PL/SQL块)。使用它来实现会话初始化代码。例子: option(“sessionInitStatement”, “”“BEGIN execute immediate ‘alter session set “_serial_direct_read”=true’; END;”“”)read
truncatefalse这是一个与JDBC writer 相关的选项。当启用“SaveMode.Overwrite”时,此选项会导致Spark截断现有表,而不是删除并重新创建它。这可以提高效率,并防止删除表元数据(如索引)。然而,在某些情况下,例如当新数据具有不同的schema时,它将不起作用。若出现故障,用户应关闭truncate选项以再次使用DROP TABLE。此外,由于TRUNCATE TABLE在DBMS之间的行为不同,使用它并不总是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect和OracleDialect支持此功能,而PostgresDialect和默认JDBCDirect不支持。对于未知和不受支持的JDBCDirect,用户选项truncate将被忽略。write
cascadeTruncate所讨论的JDBC数据库的默认级联truncate行为,在每个JDBCDialect中的isCascadeTruncate中指定这是一个与JDBC writer相关的选项。如果JDBC数据库(目前是PostgreSQL 和Oracle )启用并支持此选项,则允许执行TRUNCATE TABLE t CASCADE(在PostgreSQL 的情况下,执行TRUNCATE TABLE ONLY t CASCADE以防止无意中truncate descendant表)。这将影响其他表,因此应谨慎使用。write
createTableOptions这是一个与JDBC writer相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如, CREATE TABLE t (name string) ENGINE=InnoDB.).write
createTableColumnTypes(none)创建表时要使用的数据库列数据类型,而不是默认值。数据类型信息应该以与CREATE TABLE列语法相同的格式指定(例如:“name CHAR(64), comments VARCHAR(1024)”)。指定的类型应该是有效的spark sql数据类型。write
customSchema(none)用于从JDBC连接器读取数据的自定义schema。例如:id DECIMAL(38, 0), name STRING。你还可以指定部分字段,其他字段使用默认类型映射。例如,“id DECIMAL(38, 0)”。列名应该与JDBC表的相应列名相同。用户可以指定相应的Spark SQL数据类型,而不是使用默认值。read
pushDownPredicatetrue启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark将尽可能地将filters下推到JDBC数据源。否则,如果设置为false,则不会将任何筛选器下推到JDBC数据源,因此所有filters都将由Spark处理。当Spark执行谓词过滤的速度比JDBC数据源快时,通常关闭谓词下推。read
pushDownAggregatetrue在V2 JDBC数据源中启用或禁用聚合下推的选项。默认值为true,在这种情况下,Spark将下推聚合(aggregates)到JDBC数据源。否则,如果设置为false,聚合将不会下推到JDBC数据源。当Spark执行聚合的速度比JDBC数据源更快时,聚合下推通常会关闭。请注意,当且仅当所有聚合函数和相关过滤器都可以向下推时,聚合才能下推。如果numPartitions等于1,或者group by key与partitionColumn相同,Spark将完全下推聚合到数据源,而不会对数据源输出应用最终聚合。否则,Spark将对数据源输出应用最终聚合。read
pushDownLimittrue启用或禁用LIMIT下推到V2 JDBC数据源的选项。LIMIT下推还包括LIMIT + SORT,也称为Top N运算符。默认值为true,在这种情况下,Spark将LIMIT或LIMIT+SORT一起下推到JDBC数据源。否则,如果设置为false,则LIMIT或LIMIT + SORT不会下推到JDBC数据源。如果numPartitions大于1,即使LIMIT或LIMIT with SORT被下推,Spark仍会对数据源的结果应用LIMIT 或LIMIT +SORT。否则,如果LIMIT或LIMIT+ SORT被下推,并且numPartitions等于1,Spark将不会对数据源的结果应用LIMIT 或LIMIT +SORT。read
pushDownOffsettrue启用或禁用OFFSET下推到V2 JDBC数据源的选项。默认值为true,在这种情况下,Spark将把OFFSET下推到JDBC数据源。否则,如果设置为false, Spark将不会尝试将OFFSET下推到JDBC数据源。如果pushDownOffset为true并且numPartitions等于1,OFFSET将被下推到JDBC数据源。否则,OFFSET不会被下推,Spark仍然对数据源的结果应用OFFSET。read
pushDownTableSampletrue启用或禁用TABLESAMPLE下推到V2 JDBC数据源的选项。默认值为true,在这种情况下,Spark将TABLESAMPLE下推到JDBC数据源。否则,如果该值设置为false,则不会将TABLESAMPLE下推到JDBC数据源。read
keytab(none)JDBC客户端的kerberos keytab文件的位置(必须通过spark-submit的–files选项或手动将其预先上传到所有节点)。当找到路径信息时,Spark认为keytab是手动分布的,否则假定为–files。如果同时定义了keytab和principal,那么Spark将尝试进行kerberos身份验证。read/write
principal(none)为JDBC客户端指定kerberos principal名称。如果同时定义了keytab和principal,那么Spark将尝试进行kerberos身份验证。read/write
refreshKrb5Configfalse此选项控制在建立新连接之前是否刷新JDBC客户端的kerberos配置。如果要刷新配置,请设置为true,否则设置为false。默认值为false。请注意,如果将此选项设置为true并尝试建立多个连接,则可能会出现争用情况。一种可能的情况如下。1.refreshKrb5Config标志被设置为具有安全上下文1。2.JDBC连接provider用于相应的DBMS。3.修改了krb5.conf,但JVM还没有意识到必须重新加载它。4.Spark成功验证安全上下文1。5.JVM从修改后的krb5.conf中加载安全上下文2。6.Spark恢复之前保存的安全上下文1。7.修改后的krb5.conf内容消失了read/write
connectionProvider(none)用于连接到此URL的JDBC连接provider的名称,例如db2、mssql。必须是加载了JDBC数据源的providers之一。当多个provider可以处理指定的驱动程序和选项时,用于消除歧义。所选的provider不能被spark.sql.sources.disabledJdbcConnProviderList禁用。read/write
preferTimestampNTZfalse当该选项设置为true时,所有时间戳都推断为TIMESTAMP WITHOUT TIME ZONE。否则,时间戳将被读取为带有本地时区的TIMESTAMP。read

请注意,JDBC驱动程序并不总是支持使用keytab的kerberos身份验证。
在使用keytab和principal配置选项之前,请确保满足以下要求:

  • 所包含的JDBC驱动程序版本支持使用keytab进行kerberos身份验证。
  • 有一个内置的连接provider,它支持所使用的数据库。
    以下数据库有一个内置的连接provider:
  • DB2
  • MariaDB
  • MS Sql
  • Oracle
  • PostgreSQL
    如果不满足要求,请考虑使用JdbcConnectionProvider developer API来处理自定义身份验证。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.load()jdbcDF2 = spark.read \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})# Specifying dataframe column data types on read
jdbcDF3 = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.option("customSchema", "id DECIMAL(38, 0), name STRING") \.load()# Saving data to a JDBC source
jdbcDF.write \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.save()jdbcDF2.write \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})# Specifying create table column data types on write
jdbcDF.write \.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})

在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/35328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React 中 useState 和 useReducer 的联系和区别

文章目录 使用场景使用 useState使用 useReducer 联系区别用法状态更新逻辑适用场景可读性和可维护性 使用场景 使用 useState 状态逻辑简单。只涉及少量的状态更新。需要快速和简单的状态管理。 使用 useReducer 状态逻辑复杂。涉及多个子状态或多种状态更新逻辑。需要更好…

Vatee万腾平台:一站式智慧服务,让生活更美好

在数字化浪潮席卷全球的今天&#xff0c;我们生活的方方面面都在经历着前所未有的变革。Vatee万腾平台凭借其一站式智慧服务&#xff0c;正成为推动这场变革的重要力量&#xff0c;让我们的生活变得更加美好。 Vatee万腾平台&#xff0c;作为一家专注于提供智慧服务的领军企业&…

【运维】如何分析和清理 Linux 根目录的磁盘空间使用情况

要分析根目录(/)使用了这么多空间&#xff0c;您可以使用以下几种方法来找出具体的占用情况&#xff1a; 1. 使用 du 命令 du 命令可以显示目录或文件的磁盘使用情况。 运行以下命令来找出根目录下的哪些目录占用了大量空间&#xff1a; sudo du -h --max-depth1 / | sort …

基于weixin小程序校园快递系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;用户管理&#xff0c;订单管理&#xff0c;快递管理&#xff0c;快递记录管理&#xff0c;公告管理&#xff0c;基础数据管理 小程序功能包括&#xff1a;系统首页&#xff0c;…

企业有必要安装数据文件加密软件吗?哇!这么多好处

需要的 一、查看以下分析&#xff0c;便能得出结论 安全防护提升&#xff1a;禁止拷贝、打印、截屏等&#xff0c;还能够设置文件的浏览次数、有效期&#xff0c;提供多层次的文档保护措施。 核心机密保护&#xff1a;企业的核心机密文件、技术资料、客户资料等重要信息是公…

MySQL——Update语句详解

update 修改谁 &#xff08;条件&#xff09; set 原来的值 新值 -- 修改学员的名字(指定条件&#xff0c;只改一列) UPDATE student SET name 辰阳 WHERE id 1; -- 修改学员的名字(不指定条件&#xff0c;全改) UPDATE student SET name 宝宝-- 语法&#xff1a; -- U…

【MySQL备份】mysqldump篇

目录 1.简介 2.基本用途 3.命令格式 3.1常用选项 3.2常用命令 4.备份脚本 5.定时执行备份脚本 1.简介 mysqldump 是 MySQL 数据库管理系统的命令行实用程序&#xff0c;用于创建数据库的逻辑备份。它能够导出数据库的结构&#xff08;如表结构、视图、触发器等&#xf…

reactjs18 中使用@reduxjs/toolkit同步异步数据的使用

react18 中使用@reduxjs/toolkit 1.安装依赖包 yarn add @reduxjs/toolkit react-redux2.创建 store 根目录下面创建 store 文件夹,然后创建 index.js 文件。 import {configureStore } from "@reduxjs/toolkit"; import {counterReducer } from "./feature…

Does a vector database maintain pre-vector chunked data for RAG systems?

题意&#xff1a;一个向量数据库是否为RAG系统维护预向量化分块数据&#xff1f; 问题背景&#xff1a; I believe that when using an LLM with a Retrieval-Augmented Generation (RAG) approach, the results retrieved from a vector search must ultimately be presented…

WIFI各版本的带宽

带宽的定义&#xff1a; 带宽在网络领域通常指信道带宽&#xff0c;即信号在频谱中占用的频宽&#xff0c;单位是MHz&#xff08;兆赫&#xff09;。在无线通信中&#xff0c;带宽越宽&#xff0c;能够传输的数据量越大&#xff0c;因此信道带宽直接影响着数据传输速率。WiFi标…

FairGuard游戏加固无缝兼容 Android 15 预览版

2024年6月25日&#xff0c;谷歌发布了 Android 15 Beta 3 &#xff0c;作为Android 15 “平台稳定性”的里程碑版本&#xff0c;谷歌建议所有应用、游戏、SDK、库和游戏引擎开发者都将“平台稳定性”里程碑版本作为规划最终兼容性测试和公开发布的目标。 安卓开发者博客提供的版…

【2024-热-办公软件】ONLYOFFICE8.1版本桌面编辑器测评

在今日快速发展的数字化办公环境中&#xff0c;选择一个功能全面且高效的办公软件是至关重要的。最近&#xff0c;我有幸体验了ONLYOFFICE 8.1版本的桌面编辑器&#xff0c;这款软件不仅提供了强大的编辑功能&#xff0c;还拥有众多改进&#xff0c;让办公更加流畅和高效。在本…

货运大模型的未来:轻量化、场景化

“加快数字化和智能化转型发展&#xff0c;已成为物流行业的重要战略方向。”6月25日&#xff0c;在第十九届中国国际物流节暨第二十一届中国国际运输与物流博览会&2024亚洲物流双年展在上海开幕&#xff0c;中国交通运输协会会长、原铁道部副部长胡亚东在开幕致辞中表示。…

Python实践项目讲解:如何用制作一个桌面宠物

制作一个桌面宠物&#xff08;Desktop Pet&#xff09;在Python中通常涉及多个步骤&#xff0c;包括创建宠物的图形界面、添加动画效果、处理用户交互等。下面是一个简化的步骤指南&#xff0c;帮助你开始使用Python制作桌面宠物&#xff1a; 选择图形库&#xff1a; Tkinter&…

《我的第一本编程书》-Sunaba编程中的存储区与数字

第一个Sunaba的代码是 存储区[65049]→500000 其中存储区相当于在内存中申请了一个空间&#xff0c;这个空间用来存储数据 有点类似其他变量并赋值 存储区后面的数字是在sunaba界面存储的坐标&#xff0c;修改这个坐标&#xff0c;输出的位置不同。 →后面的数字是存储的数…

新能源行业必会基础知识-----电力市场概论笔记-----绪论

新能源行业知识体系-------主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/139946830 目录 1. 电力市场的定义2. 对传统电力系统理论的挑战 1. 电力市场的定义 1. 我国电力市场的进程 我国新一轮电力体制改革的5大亮点&…

量子信息基础知识与实践指南

量子信息是一门涉及量子力学和信息理论的交叉学科&#xff0c;它探讨如何利用量子力学的性质来传输、存储和处理信息。以下是关于量子信息的基础知识和实践指南&#xff1a; 量子信息的基础知识&#xff1a; 量子比特&#xff08;Qubit&#xff09;&#xff1a; 量子比特是量子…

【Echarts】散点图 制作 气泡 类型图表

目录 需求主要代码效果展示注 需求 需参照设计图画出对应图表 主要代码 /**** 数据 ****/ this.dataList [...Array(8).keys()].map((item) > {return {ywlxmc: 业务类型 (item 1),sl: item > 4 ? 50 : 70} })/**** 气泡样式 ****/ const styleList [{offset: [56…

NVIDIA控制面板3D设置一栏中不能通过预览更改图形设置的解决办法

今天因为GeForce Experience弹窗让我更新之后&#xff0c;手欠直接删掉了 然后图中标出的两个选项就没了 解决方法很简单&#xff0c;就是下回来&#xff0c;hhh https://www.nvidia.cn/geforce/drivers/ 直接下载就行&#xff0c;不用管版本&#xff0c;但是这种驱动千万不要…

基于Patroni的Citus高可用环境部署

1. 前言 Citus是一个非常实用的能够使PostgreSQL具有进行水平扩展能力的插件&#xff0c;或者说是一款以PostgreSQL插件形式部署的基于PostgreSQL的分布式HTAP数据库。本文简单说明Citus的高可用技术方案&#xff0c;并实际演示基于Patroni搭建Citus HA环境的步骤。 2. 技术方…