16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例:JDBC
    • 1、maven依赖(java编码依赖)
    • 2、创建 JDBC 表
      • 1)、创建jdbc表,并插入、查询
      • 2)、批量插入表数据
      • 3)、JDBC 表在时态表关联中作为维表
    • 3、连接器参数
    • 4、已弃用的配置
      • 5、特性
      • 1)、键处理
      • 2)、分区扫描
      • 3)、Lookup Cache
      • 4)、幂等写入
    • 5、JDBC Catalog
      • 1)、JDBC Catalog 的使用
      • 2)、JDBC Catalog for PostgreSQL
      • 3)、JDBC Catalog for MySQL
    • 6、数据类型映射


本文简单的介绍了flink sql读取外部系统的jdbc示例(每个示例均是验证通过的,并且具体给出了运行环境的版本)。
本文依赖环境是hadoop、kafka、mysql环境好用,如果是ha环境则需要zookeeper的环境。

一、Table & SQL Connectors 示例:JDBC

1、maven依赖(java编码依赖)

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
在这里插入图片描述
驱动jar需放在flink的安装目录lib下,且需要重启服务。
本示例jar包有

flink-connector-jdbc_2.11-1.13.6.jar
mysql-connector-java-5.1.5.jar 或
mysql-connector-java-6.0.6.jar(1.17版本中使用的mysql驱动,用上面mysql驱动有异常信息)

2、创建 JDBC 表

JDBC table 可以按如下定义,以下示例中包含创建表、批量插入以及left join的维表。

1)、创建jdbc表,并插入、查询

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'users'
);-------------------具体事例----------------------------------
-- 在 Flink SQL 中注册一张 MySQL 表 'user'
CREATE TABLE Alan_JDBC_User_Table (id BIGINT,name STRING,age INT,balance DOUBLE,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.10.44:3306/test','table-name' = 'user'
);-- mysql中的数据
mysql> select * from user;
+----+-------------+------+---------+-----------------------+------------+
| id | name        | age  | balance | email                 | pwd        |
+----+-------------+------+---------+-----------------------+------------+
|  1 | aa6         |   61 |   60000 | 6@163.com             | 123456     |
|  2 | aa4         |   71 |   70000 | 7@163.com             | 7123       |
|  4 | test        | NULL |    NULL | NULL                  | NULL       |
|  5 | test2       | NULL |    NULL | NULL                  | NULL       |
|  7 | alanchanchn |   19 |     800 | alan.chan.chn@163.com | vx         |
|  8 | alanchan    |   19 |     800 | alan.chan.chn@163.com | sink mysql |
+----+-------------+------+---------+-----------------------+------------+
6 rows in set (0.00 sec)---------在flink sql中建表并查询--------
Flink SQL> CREATE TABLE Alan_JDBC_User_Table (
>   id BIGINT,
>   name STRING,
>   age INT,
>   balance DOUBLE,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://192.168.10.44:3306/test',
>    'table-name' = 'user'
> );
[INFO] Execute statement succeed.Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                            aa6 |          61 |                        60000.0 |
| +I |                    2 |                            aa4 |          71 |                        70000.0 |
| +I |                    4 |                           test |      (NULL) |                         (NULL) |
| +I |                    5 |                          test2 |      (NULL) |                         (NULL) |
| +I |                    7 |                    alanchanchn |          19 |                          800.0 |
| +I |                    8 |                       alanchan |          19 |                          800.0 |
+----+----------------------+--------------------------------+-------------+--------------------------------+
Received a total of 6 rows

2)、批量插入表数据

-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;---------创建数据表----------------------
CREATE TABLE source_table (userId INT,age INT,balance DOUBLE,userName STRING,t_insert_time AS localtimestamp,WATERMARK FOR t_insert_time AS t_insert_time
) WITH ('connector' = 'datagen','rows-per-second'='5','fields.userId.kind'='sequence','fields.userId.start'='1','fields.userId.end'='5000','fields.balance.kind'='random','fields.balance.min'='1','fields.balance.max'='100','fields.age.min'='1','fields.age.max'='1000','fields.userName.length'='10'
);-- 从另一张表 "source_table" 将数据写入到 JDBC 表中
INSERT INTO Alan_JDBC_User_Table
SELECT userId,  userName, age, balance FROM source_table;-- 查看 JDBC 表中的数据
select * from Alan_JDBC_User_Table;---------------flink sql中查询----------------------------------
Flink SQL> INSERT INTO Alan_JDBC_User_Table
> SELECT userId,  userName, age, balance FROM source_table;
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e91cd3c41ac20aaf8eab79f0094f9e46Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                     728297a8d9 |         410 |                           35.0 |
| +I |                    3 |                     643c2226cd |         142 |                           80.0 |
......
-------------验证mysql中的数据是否写入,此处只查总数----------------
mysql> select count(*) from user;
+----------+
| count(*) |
+----------+
|     2005 |
+----------+
1 row in set (0.00 sec)

3)、JDBC 表在时态表关联中作为维表

-- 1、创建 JDBC 表在时态表关联中作为维表
CREATE TABLE Alan_JDBC_User_Table (id BIGINT,name STRING,age INT,balance DOUBLE,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.10.44:3306/test','table-name' = 'user'
);
-----2、查询表中的数据(实际数据是之前测试的结果)   -----
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                     728297a8d9 |         410 |                           35.0 |
| +I |                    3 |                     643c2226cd |         142 |                           80.0 |
| +I |                    4 |                     6115f11f01 |         633 |                           69.0 |
| +I |                    5 |                     044ba5fa2f |          74 |                           71.0 |
| +I |                    6 |                     98a112dc87 |         729 |                           54.0 |
| +I |                    7 |                     705326a369 |         846 |                           99.0 |
| +I |                    8 |                     532692924f |         872 |                           79.0 |
| +I |                    9 |                     b816802948 |         475 |                           67.0 |
| +I |                   10 |                     06906bebb2 |         109 |                           57.0 |
......-----3、创建事实表,以kafka表作为代表   -----
CREATE TABLE Alan_KafkaTable_3 (user_id BIGINT, -- 用户iditem_id BIGINT, -- 商品idaction STRING,  -- 用户行为ts     BIGINT,  -- 用户行为发生的时间戳proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件时间WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND  -- 在eventTime上定义watermark
) WITH ('connector' = 'kafka','topic' = 'testtopic','properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);-----4、发送kafka消息,同时观察事实表中的数据   -----
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic testtopic
>1,1001,"login",1692593500222
>2,1002,"p_read",1692593502242
>Flink SQL> select * from Alan_KafkaTable_3;+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op |              user_id |              item_id |                         action |                   ts |                proctime |              event_time |
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I |                    1 |                 1001 |                          login |        1692593500222 | 2023-08-22 05:33:38.830 | 2023-08-22 05:39:54.439 |
| +I |                    2 |                 1002 |                         p_read |        1692593502242 | 2023-08-22 05:33:38.833 | 2023-08-22 05:40:41.284 |
Query terminated, received a total of 2 rows-----5、以jdbc的维表进行关联查询事实表数据-----
SELECTkafkamessage.user_id, kafkamessage.item_id,kafkamessage.action,  jdbc_dim_table.name,jdbc_dim_table.age,jdbc_dim_table.balance
FROM Alan_KafkaTable_3 AS kafkamessage 
LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;Flink SQL> SELECT
>   kafkamessage.user_id, 
>   kafkamessage.item_id,
>   kafkamessage.action,  
>   jdbc_dim_table.name,
>   jdbc_dim_table.age,
>   jdbc_dim_table.balance
> FROM Alan_KafkaTable_3 AS kafkamessage 
> LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| op |              user_id |              item_id |                         action |                           name |         age |                        balance |
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                 1001 |                          login |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                 1002 |                         p_read |                     728297a8d9 |         410 |                           35.0 |
  • java
    该部分示例仅仅是以java实现创建表及查询,简单示例。
// 注册名为 “jdbcOutputTable” 的JDBC表String sinkDDL = "create table jdbcOutputTable (" + "id bigint not null, " + "name varchar(20) , " + "age int ,"+"balance bigint,"+"pwd varchar(20),"+"email varchar(20) , PRIMARY KEY (id) NOT ENFORCED" +") with (" + " 'connector.type' = 'jdbc', "				+ " 'connector.url' = 'jdbc:mysql://192.168.10.44:3306/test', " + " 'connector.table' = 'user', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', "				+ " 'connector.username' = 'root', " + " 'connector.password' = '123456' )";tenv.executeSql(sinkDDL);String sql = "SELECT *  FROM jdbcOutputTable ";String sql2 = "SELECT *  FROM jdbcOutputTable  where name like '%alan%'";Table table = tenv.sqlQuery(sql2);table.printSchema();DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);result.print();env.execute();//运行结果
(`id` BIGINT NOT NULL,`name` VARCHAR(20),`age` INT,`balance` BIGINT,`pwd` VARCHAR(20),`email` VARCHAR(20)
)15> (true,+I[7, alanchanchn, 19, 800, vx, alan.chan.chn@163.com])
15> (true,+I[8, alanchan, 19, 800, sink mysql, alan.chan.chn@163.com])

3、连接器参数

在这里插入图片描述
在这里插入图片描述

4、已弃用的配置

这些弃用配置已经被上述的新配置代替,而且最终会被弃用。请优先考虑使用新配置。
在这里插入图片描述

5、特性

1)、键处理

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

有关 PRIMARY KEY 语法的更多详细信息,请参见 22、Flink 的table api与sql之创建表的DDL。

2)、分区扫描

为了在并行 Source task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。

scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

  • scan.partition.column:输入用于进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。

3)、Lookup Cache

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,你可以将 lookup.cache 设置为 PARTIAL 参数来启用。

lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。
默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。
当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.partial-cache.max-rows 或当行超过 lookup.partial-cache.expire-after-write 或 lookup.partial-cache.expire-after-access 指定的最大存活时间时,缓存中的行将被设置为已过期。 缓存中的记录可能不是最新的,用户可以将缓存记录超时设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。

所以要做好吞吐量和正确性之间的平衡。

默认情况下,flink 会缓存主键的空查询结果,你可以通过将 lookup.partial-cache.cache-missing-key 设置为 false 来切换行为。

4)、幂等写入

如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。

由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:
在这里插入图片描述

5、JDBC Catalog

JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。

// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);
其他的 Catalog 方法现在尚不支持。

1)、JDBC Catalog 的使用

本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。
本处描述的版本是flink 1.17,flink1.13版本只支持postgresql,在1.13版本中执行会出现如下异常:

[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Catalog for 'org.apache.flink.connector.jdbc.dialect.MySQLDialect@1bc49bc5' is not supported yet.

JDBC catalog 支持以下参数:

  • name:必填,catalog 的名称。

  • default-database:必填,默认要连接的数据库。

  • username:必填,Postgres/MySQL 账户的用户名。

  • password:必填,账户的密码。

  • base-url:必填,(不应该包含数据库名)
    对于 Postgres Catalog base-url 应为 “jdbc:postgresql://:” 的格式。
    对于 MySQL Catalog base-url 应为 “jdbc:mysql://:” 的格式。

  • sql

---需要将mysql-connector-java-6.0.6.jar、flink-connector-jdbc-3.1.0-1.17.jar放在flink的lib目录,并重启flink集群
CREATE CATALOG alan_catalog WITH('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = '123456','base-url' = 'jdbc:mysql://192.168.10.44:3306'
);USE CATALOG alan_catalog;
---------------------------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = '123456',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.Flink SQL> show CATALOGS;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in setFlink SQL> use CATALOG alan_catalog;
[INFO] Execute statement succeed.
  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);String name            = "my_catalog";
String defaultDatabase = "mydb";
String username        = "...";
String password        = "...";
String baseUrl         = "..."JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);// 设置 JdbcCatalog 为会话的当前 catalog
tableEnv.useCatalog("my_catalog");
  • yaml
execution:...current-catalog: alan_catalog  # 设置目标 JdbcCatalog 为会话的当前 catalogcurrent-database: testcatalogs:- name:alan_catalogtype: jdbcdefault-database: testusername: ...password: ...base-url: ...

2)、JDBC Catalog for PostgreSQL

  • PostgreSQL 元空间映射
    除了数据库之外,postgreSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 “public”,每个 schema 可以包含多张表。 在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name 或只有 table_name,其中 schema_name 是可选的,默认值为 “public”。

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:
在这里插入图片描述
Flink 中的 Postgres 表的完整路径应该是 “..<schema.table>”。如果指定了 schema,请注意需要转义 <schema.table>。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

3)、JDBC Catalog for MySQL

  • MySQL 元空间映射
    MySQL 实例中的数据库与 MySQL Catalog 注册的 catalog 下的数据库处于同一个映射层级。一个 MySQL 实例可以拥有多个数据库,每个数据库可以包含多张表。 在 Flink 中,当查询由 MySQL catalog 注册的表时,用户可以使用 database.table_name 或只使用 table_name,其中 database 是可选的,默认值为创建 MySQL Catalog 时指定的默认数据库。

因此,Flink Catalog 和 MySQL catalog 之间的元空间映射如下:
在这里插入图片描述
Flink 中的 MySQL 表的完整路径应该是 “<catalog>.<db>.<table>”。

这里提供了一些访问 MySQL 表的例子(在版本1.17中完成):

-- 扫描 默认数据库(test)中的 'person' 表
select * from alan_catalog.test.person;
select * from test.person;
select * from person;-- 扫描 'cdhhive' 数据库中的 'version' 表,
select * from alan_catalog.cdhhive.version;
select * from cdhhive.version;
select * from version;---------------具体操作详见下文------------------
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = '123456',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in setFlink SQL> select * from alan_catalog.test.person;+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rowsFlink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.Flink SQL> select * from test.person;+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rowsFlink SQL> use alan_catalog.test;
[INFO] Execute statement succeed.Flink SQL> select * from person;+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rowsFlink SQL> select * from alan_catalog.cdhhive.version;+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 rowFlink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.Flink SQL> select * from cdhhive.version;+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 rowFlink SQL> use alan_catalog.cdhhive;
[INFO] Execute statement succeed.Flink SQL> select * from version;+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

6、数据类型映射

Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。

在这里插入图片描述
在这里插入图片描述
以上,简单的介绍了flink sql读取外部系统的jdbc示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/49893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3D旅游情景实训教学展示

随着科技的不断发展&#xff0c;情景实训教学在教育领域中的应用越来越广泛。通过虚拟现实技术&#xff0c;3D视觉技术&#xff0c;计算机技术等为学生提供了一个身临其境的学习环境&#xff0c;让他们能够在模拟的场景中学习和实践&#xff0c;从而更好地理解和掌握知识。 3D虚…

Docker搭建LNMP运行Wordpress平台

一、项目1.1 项目环境1.2 服务器环境1.3 任务需求 二、Linux 系统基础镜像三、Nginx1、建立工作目录2、编写 Dockerfile 脚本3、准备 nginx.conf 配置文件4、生成镜像5、创建自定义网络6、启动镜像容器7、验证 nginx 四、Mysql1、建立工作目录2、编写 Dockerfile3、准备 my.cnf…

Ubuntu系统安装之后首需要做的事情

Ubuntu系统的初步环境搭建 1、换源2、显卡3、浏览器4、输入法5、终端6、ROS7、VSCode8、设置时间与win一致9、 TimeShift10、 Anaconda&#xff08;考虑装不装&#xff09; 1、换源 点开Software&&Update&#xff0c;找到Ubuntu Software中的Download from&#xff0c…

Linux学习记录——이십오 多线程(2)

文章目录 1、理解原生线程库线程局部存储 2、互斥1、并发代码&#xff08;抢票&#xff09;2、锁3、互斥锁的实现原理 3、线程封装1、线程本体2、封装锁 4、线程安全5、死锁6、线程同步1、条件变量1、接口2、demo代码 1、理解原生线程库 线程库在物理内存中存在&#xff0c;也…

[bug日志]springboot多模块启动,在yml配置启动端口8081,但还是启动了8080

【问题描述】 配置的启动端口是8081&#xff0c;实际启动端口是8080 【解决方法】 1.检查application.yml的配置是否有错误(配置项中&#xff0c;显示白色就错&#xff0c;橙色无措) 2.检查pom.xml的打包方式配置项配置&#xff0c;主pom.xml中的配置项一般为&#xff1a;&l…

【hello git】初识Git

目录 一、简述Git 二、Linux 下 Git 的安装&#xff1a;CentOS 2.1 基本命令 2.2 示例&#xff1a; 三、Linux 下 Git 的安装&#xff1a;ubuntu 3.1 基本命令 3.2 示例&#xff1a; 一、简述Git Git &#xff1a;版本控制器&#xff0c;记录每次的修改以及版本迭代的一个管…

day 37 | ● 1049. 最后一块石头的重量 II ● 494. 目标和 ● 474.一和零

1049. 最后一块石头的重量 II 与前一道分割等和子集的思路差不多&#xff0c;都是01背包问题。因为是采用滚动数组的形式&#xff0c;所以必须要倒序遍历才可以。 dp[i]代表着在i的限制下最大的承重。所以另一半就是all - dp【all / 2】 func lastStoneWeightII(stones []int…

流媒体内容分发终极解决方案:当融合CDN与P2P视频交付结合

前言 随着互联网的发展&#xff0c;流媒体视频内容日趋增多&#xff0c;已经成为互联网信息的主要承载方式。相对传统的文字&#xff0c;图片等传统WEB应用&#xff0c;流媒体具有高数据量&#xff0c;高带宽、高访问量和高服务质量要求的特点&#xff0c;而现阶段互联网“尽力…

中介者模式-协调多个对象之间的交互

在深圳租房市场&#xff0c;有着许多的“二房东”&#xff0c;房主委托他们将房子租出去&#xff0c;而租客想要租房的话&#xff0c;也是和“二房东”沟通&#xff0c;租房期间有任何问题&#xff0c;找二房东解决。对于房主来说&#xff0c;委托给“二房东”可太省事了&#…

【图论】缩点的综合应用(一)

一.缩点的概念 缩点&#xff0c;也称为点缩法&#xff08;Vertex Contraction&#xff09;&#xff0c;是图论中的一种操作&#xff0c;通常用于缩小图的规模&#xff0c;同时保持了图的某些性质。这个操作的目标是将图中的一些节点合并为一个超级节点&#xff0c;同时调整相关…

LLM 生成式配置的推理参数温度 top k tokens等 Generative configuration inference parameters

在这个视频中&#xff0c;你将了解一些方法和相关的配置参数&#xff0c;这些参数可以用来影响模型在下一个词生成时的最终决策方式。如果你在Hugging Face网站或AWS的游乐场中使用过LLMs&#xff0c;你可能已经看到了这些控制选项&#xff0c;用来调整LLM的行为。每个模型都暴…

Java接口详解

接口 接口的概念 在现实生活中&#xff0c;接口的例子比比皆是&#xff0c;比如&#xff1a;笔记本上的USB口&#xff0c;电源插座等。 电脑的USB口上&#xff0c;可以插&#xff1a;U盘&#xff0c;鼠标&#xff0c;键盘等所有符合USB协议的设备 电源插座插孔上&#xff0c;…

【C++】—— 简述C++11新特性

序言&#xff1a; 从本期开始&#xff0c;我将会带大家学习的是关于C11 新增的相关知识&#xff01;废话不多说&#xff0c;我们直接开始今天的学习。 目录 &#xff08;一&#xff09;C11简介 &#xff08;二&#xff09;统一的列表初始化 1、&#xff5b;&#xff5d;初始…

百望云华为云共建零售数字化新生态 聚焦数智新消费升级

零售业是一个充满活力和创新的行业&#xff0c;但也是当前面临很大新挑战和新机遇的行业。数智新消费时代&#xff0c;数字化转型已经成为零售企业必须面对的重要课题。 8 月 20 日-21日&#xff0c;以“云上创新 韧性增长”为主题的华为云数智新消费创新峰会2023在成都隆重召…

爬虫selenium获取元素定位方法总结(动态获取元素)

目录 元素 查看元素信息 元素定位 通过元素id定位 通过元素name定位 通过xpath表达式定位 绝对路径 相对路径 通过完整超链接定位 通过部分链接定位 通过标签定位 通过类名进行定位 通过css选择器进行定位 id选择器 class选择器 标签选择器 属性选择器 定位带…

蓝帽杯半决赛2022

手机取证_1 iPhone手机的iBoot固件版本号:&#xff08;答案参考格式&#xff1a;iBoot-1.1.1&#xff09; 直接通过盘古石取证 打开 取证大师和火眼不知道为什么都无法提取这个 手机取证_2 该手机制作完备份UTC8的时间&#xff08;非提取时间&#xff09;:&#xff08;答案…

技术的巅峰演进:深入解析算力网络的多层次技术设计

在数字化时代的浪潮中&#xff0c;网络技术正以前所未有的速度演进&#xff0c;而算力网络作为其中的一颗明星&#xff0c;以其多层次的技术设计引领着未来的网络构架。本文将带您深入探索算力网络独特的技术之旅&#xff0c;从底层协议到分布式控制&#xff0c;为您呈现这一创…

opencv进阶19-基于opencv 决策树cv::ml::DTrees 实现demo示例

opencv 中创建决策树 cv::ml::DTrees类表示单个决策树或决策树集合&#xff0c;它是RTrees和 Boost的基类。 CART是二叉树&#xff0c;可用于分类或回归。对于分类&#xff0c;每个叶子节点都 标有类标签&#xff0c;多个叶子节点可能具有相同的标签。对于回归&#xff0c;每…

cuml机器学习GPU库 sklearn升级版AutoDL使用

CUML库 最近在做机器学习任务的时候发现我自己的数据集太大&#xff0c;直接用sklearn 跑起来时间很长&#xff0c;然后问GPT得知了有CUML库&#xff0c;后来去研究了一下&#xff0c;发现这个库只支持linux系统&#xff0c;从官网直接获取下载命令基本上也实现不了最后&#…

outlook等客户端报错:-ERR Login fail. Please using weixin token to login

使用outlook配置腾讯邮箱后&#xff0c;无法收取邮件&#xff0c;点击接收/发送所有文件夹&#xff0c; 提示报错&#xff1a; 任务“testqq.com - 正在接收”报告了错误(0x800CCC92):“电子邮件服务器拒绝您登录。请在“帐户设置”中验证此帐户的用户名及密码。 响应服务器:…