支持以下引擎
Spark
Flink
SeaTunnel Zeta
关键特性
批处理
精确一次性处理
列投影
并行处理
支持用户自定义拆分
支持查询 SQL 并实现投影效果
描述
通过 JDBC 读取外部数据源数据。
支持的数据源信息
Datasource | Supported versions | Driver | Url | Maven |
---|---|---|---|---|
Vertica | Different dependency version has different driver class. | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433/vertica | Download |
## 数据库依赖 |
请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录中
例如,Vertica 数据源:cp vertica-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
数据类型映射
Vertical Data type | SeaTunnel Data type |
---|---|
BIT | BOOLEAN |
TINYINT TINYINT UNSIGNED SMALLINT SMALLINT UNSIGNED MEDIUMINT MEDIUMINT UNSIGNED INT INTEGER YEAR | INT |
INT UNSIGNED INTEGER UNSIGNED BIGINT | LONG |
BIGINT UNSIGNED | DECIMAL(20,0) |
DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) |
DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) |
DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1, (Gets the designated column's number of digits to right of the decimal point.))) |
FLOAT FLOAT UNSIGNED | FLOAT |
DOUBLE DOUBLE UNSIGNED | DOUBLE |
CHAR VARCHAR TINYTEXT MEDIUMTEXT TEXT LONGTEXT JSON | STRING |
DATE | DATE |
TIME | TIME |
DATETIME TIMESTAMP | TIMESTAMP |
TINYBLOB MEDIUMBLOB BLOB LONGBLOB BINARY VARBINAR BIT(n) | BYTES |
GEOMETRY UNKNOWN | Not supported yet |
源选项
Name | Type | Required | Default | Description |
---|---|---|---|---|
url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:vertica://localhost:5433/vertica |
driver | String | Yes | - | The jdbc class name used to connect to the remote data source, if you use Vertica the value is com.vertica.jdbc.Driver . |
user | String | No | - | Connection instance user name |
password | String | No | - | Connection instance password |
query | String | Yes | - | Query statement |
connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete |
partition_column | String | No | - | The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column. |
partition_lower_bound | Long | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. |
partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure the row fetch size used in the query toimprove performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value. |
common-options | No | - | Source plugin common parameters, please refer to Source Common Options for details |
- 提示
如果未设置 partition_column
,则会在单一并发中运行;如果设置了 partition_column
,则将根据任务的并发性进行并行执行。
任务示例
简单示例:
此示例在单一并行中查询您的测试“数据库”中的 type_bin 'table'
16 个数据,并查询其所有字段。您还可以指定要查询的字段,以便将最终输出显示在控制台上。
env {您可以在此处设置 Flink 配置
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select * from type_bin limit 16"
}
}transform {
# 如果您想获取有关如何配置 seatunnel 的更多信息,并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/transform-v2/sql
}sink {
Console {}
}
并行示例:
并行读取您的查询表,使用您配置的 shard 字段和 shard 数据。如果要读取整个表,可以这样做。
source {
Jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# 根据需要定义查询逻辑
query = "select * from type_bin"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
并行边界示例:
根据查询的上限和下限指定数据更加高效,根据您配置的上限和下限来读取数据源更加高效
source {
Jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# 根据需要定义查询逻辑
query = "select * from type_bin"
partition_column = "id"
# 读取起始边界
partition_lower_bound = 1
# 读取结束边界
partition_upper_bound = 500
partition_num = 10
}
}
本文由 白鲸开源科技 提供发布支持!