30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置

22、Flink 的table api与sql之创建表的DDL

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、SQL客户端
    • 1、启动 SQL 客户端命令行界面
    • 2、执行 SQL 查询
    • 3、配置
      • 1、环境配置文件
        • 1、flink cli配置文件说明如下
        • 2、重启策略(Restart Strategies)
      • 2)、依赖
      • 3)、自定义函数(User-defined Functions)
        • 1、构造函数参数
    • 4、Catalogs
    • 5、分离的 SQL 查询
    • 6、SQL 视图
    • 7、临时表(Temporal Table)
    • 8、局限与未来


本文介绍了flink cli的启动、使用以及通过创建kafka、filesystem等例子介绍了配置文件的使用,同时也简单的介绍了视图、临时表等内容。
本文依赖环境是flink、kafka环境可用,flink的版本是1.13.5、jdk8.

一、SQL客户端

Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的表程序中。此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了 Java/Scala 程序员对 Flink 的使用。

SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果。
部署请参考:1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证 和 2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤

1、启动 SQL 客户端命令行界面

SQL Client 脚本也位于 Flink 的 bin 目录中。将来,用户可以通过启动嵌入式 standalone 进程或通过连接到远程 SQL 客户端网关来启动 SQL 客户端命令行界面。目前仅支持 embedded 模式。可以通过以下方式启动 CLI:

#启动flink sql客户端
./bin/sql-client.sh embedded -d 配置文件
#或
./bin/sql-client.sh embedded
#或
./bin/sql-client.sh#注意:需要使用部署flink集群的用户启动
[alanchan@server1 bin]$ sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/flink-1.13.5/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for '/usr/local/flink-1.13.5/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/alanchan/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL> select 1;

默认情况下,SQL 客户端将从 ./conf/sql-client-defaults.yaml 中读取配置。有关环境配置文件结构的更多信息,请参见本文配置部分。

2、执行 SQL 查询

命令行界面启动后,你可以使用 HELP 命令列出所有可用的 SQL 语句。输入第一条 SQL 查询语句并按 Enter 键执行,可以验证你的设置及集群连接是否正确:

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in setFlink SQL> use default_database;
[INFO] Execute statement succeed.Flink SQL> show tables;
Empty setFlink SQL> SELECT 'Hello World';

在这里插入图片描述

该查询不需要 table source,并且只产生一行结果。CLI 将从集群中检索结果并将其可视化。按 Q 键退出结果视图。

CLI 为维护和可视化结果提供三种模式。

  • 表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET execution.result-mode=table;Flink SQL> SET execution.result-mode=table;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.
  • 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET execution.result-mode=changelog;Flink SQL> SET execution.result-mode=changelog;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.
  • Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
SET execution.result-mode=tableau;Flink SQL> SET execution.result-mode=tableau;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.

注意当你使用这个模式运行一个流式查询的时候,Flink 会将结果持续的打印在当前的屏幕之上。如果这个流式查询的输入是有限的数据集, 那么Flink在处理完所有的数据之后,会自动的停止作业,同时屏幕上的打印也会相应的停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停掉作业同时停止屏幕上的打印。

你可以用如下查询来查看三种结果模式的运行情况:


SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

此查询执行一个有限字数示例:

变更日志模式 下,看到的结果应该类似于:
在这里插入图片描述
表格模式 下,可视化结果表将不断更新,直到表程序以如下内容结束:
在这里插入图片描述

Tableau模式 下,如果这个查询以流的方式执行,那么将显示以下内容:
在这里插入图片描述

如果这个查询以批的方式执行,显示的内容如下:

±------±----+
| name | cnt |
±------±----+
| Alice | 1 |
| Bob | 2 |
| Greg | 1 |
±------±----+
3 rows in set

这几种结果模式在 SQL 查询的原型设计过程中都非常有用。这些模式的结果都存储在 SQL 客户端 的 Java 堆内存中。为了保持 CLI 界面及时响应,变更日志模式仅显示最近的 1000 个更改。表格模式支持浏览更大的结果,这些结果仅受可用主内存和配置的最大行数(max-table-result-rows)的限制。

在批处理环境下执行的查询只能用表格模式或者Tableau模式进行检索。

定义查询语句后,可以将其作为长时间运行的独立 Flink 作业提交给集群。为此,其目标系统需要使用 INSERT INTO 语句指定存储结果。配置部分解释如何声明读取数据的 table source,写入数据的 sink 以及配置其他表程序属性的方法。

3、配置

SQL 客户端启动时可以添加 CLI 选项,具体如下。

./bin/sql-client.sh embedded --helpMode "embedded" submits Flink jobs from the local machine.Syntax: embedded [OPTIONS]"embedded" mode options:-d,--defaults <environment file>      The environment properties with which every new session is initialized.Properties might be overwritten by session properties.-e,--environment <environment file>   The environment properties to be  imported into the session. It mightoverwrite default environment properties.-h,--help                             Show the help message with descriptions of all options.-hist,--history <History file path>   The file which you want to save the command history into. If notspecified, we will auto-generate one under your user's home directory.-j,--jar <JAR file>                   A JAR file to be imported into thesession. The file might containuser-defined classes needed for theexecution of statements such asfunctions, table sources, or sinks.Can be used multiple times.-l,--library <JAR directory>          A JAR file directory with which everynew session is initialized. The filesmight contain user-defined classesneeded for the execution ofstatements such as functions, tablesources, or sinks. Can be usedmultiple times.-pyarch,--pyArchives <arg>            Add python archive files for job. Thearchive files will be extracted tothe working directory of python UDFworker. Currently only zip-format issupported. For each archive file, atarget directory be specified. If thetarget directory name is specified,the archive file will be extracted toa name can directory with thespecified name. Otherwise, thearchive file will be extracted to adirectory with the same name of thearchive file. The files uploaded viathis option are accessible viarelative path. '#' could be used asthe separator of the archive filepath and the target directory name.Comma (',') could be used as theseparator to specify multiple archivefiles. This option can be used toupload the virtual environment, thedata files used in Python UDF (e.g.:--pyArchivesfile:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutablepy37.zip/py37/bin/python). The datafiles could be accessed in PythonUDF, e.g.: f = open('data/data.txt','r').-pyexec,--pyExecutable <arg>          Specify the path of the pythoninterpreter used to execute thepython UDF worker (e.g.:--pyExecutable/usr/local/bin/python3). The pythonUDF worker depends on Python 3.5+,Apache Beam (version == 2.23.0), Pip(version >= 7.1.0) and SetupTools(version >= 37.0.0). Please ensurethat the specified environment meetsthe above requirements.-pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.These files will be added to thePYTHONPATH of both the local clientand the remote python UDF worker. Thestandard python resource filesuffixes such as .py/.egg/.zip ordirectory are all supported. Comma(',') could be used as the separatorto specify multiple files (e.g.:--pyFilesfile:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).-pyreq,--pyRequirements <arg>         Specify a requirements.txt file whichdefines the third-party dependencies.These dependencies will be installedand added to the PYTHONPATH of thepython UDF worker. A directory whichcontains the installation packages ofthese dependencies could be specifiedoptionally. Use '#' as the separatorif the optional parameter exists(e.g.: --pyRequirementsfile:///tmp/requirements.txt#file:///tmp/cached_dir).-s,--session <session identifier>     The identifier for a session.'default' is the default identifier.-u,--update <SQL update statement>    Experimental (for testing only!):Instructs the SQL Client toimmediately execute the given updatestatement after starting up. Theprocess is shut down after thestatement has been submitted to thecluster and returns an appropriatereturn code. Currently, this featureis only supported for INSERT INTOstatements that declare the targetsink table.

1、环境配置文件

1、flink cli配置文件说明如下

默认情况下,SQL 客户端将从 ./conf/sql-client-defaults.yaml 中读取配置。
SQL 查询执行前需要配置相关环境变量。环境配置文件 定义了 catalog、table sources、table sinks、用户自定义函数和其他执行或部署所需属性。
每个环境配置文件是常规的 YAML 文件,例子如下。

tables:- name: Users_Testing_alanchantype: source-tableupdate-mode: appendconnector:type: filesystempath: "/usr/local/bigdata/testdata/flink/users.csv"format:type: csvfields:- name: u_iddata-type: INT- name: u_namedata-type: VARCHAR- name: u_agedata-type: INT- name: u_balancedata-type: DOUBLEline-delimiter: "\n"comment-prefix: "#"schema:- name: u_iddata-type: INT- name: u_namedata-type: VARCHAR- name: u_agedata-type: INT- name: u_balancedata-type: DOUBLE- name: Users_Testing_alanchan_Viewtype: viewquery: "SELECT u_id,u_name,u_age FROM Users_Testing_alanchan"# 定义用户自定义函数
# functions:
#   - name: myUDF
#     from: class
#     class: foo.bar.AggregateUDF
#     constructor:
#       - 7.6
#       - false# 定义 catalogs
# catalogs:
#   - name: catalog_1
#      type: hive
#      property-version: 1
#      hive-conf-dir: ...
#    - name: catalog_2
#      type: hive
#      property-version: 1
#      default-database: mydb2
#      hive-conf-dir: ...# 改变表程序基本的执行行为属性。
execution:planner: blink                    # 可选: 'blink' (默认)或 'old'type: streaming                   # 必选:执行模式为 'batch' 或 'streaming'result-mode: table                # 必选:'table' 或 'changelog'max-table-result-rows: 1000000    # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)time-characteristic: event-time   # 可选: 'processing-time' 或 'event-time' (默认)parallelism: 1                    # 可选:Flink 的并行数量(默认为 1)periodic-watermarks-interval: 200 # 可选:周期性 watermarks 的间隔时间(默认 200 ms)max-parallelism: 16               # 可选:Flink 的最大并行数量(默认 128)min-idle-state-retention: 0       # 可选:表程序的最小空闲状态时间max-idle-state-retention: 0       # 可选:表程序的最大空闲状态时间current-catalog: default_catalog  # 可选:当前会话 catalog 的名称(默认为 'default_catalog')current-database: default_database # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)restart-strategy:                 # 可选:重启策略(restart-strategy)type: fallback                  #   默认情况下“回退”到全局重启策略# 用于调整和调优表程序的配置选项。
# 在专用的”配置”页面上可以找到完整的选项列表及其默认值。
configuration:table.optimizer.join-reorder-enabled: truetable.exec.spill-compression.enabled: truetable.exec.spill-compression.block-size: 128kb# 描述表程序提交集群的属性。
deployment:response-timeout: 5000

上述配置:

  • 定义一个从 CSV 文件中读取的 table source Users_Testing_alanchan 所需的环境,
  • 定义了一个视图 Users_Testing_alanchan_View ,该视图是用 SQL 查询声明的虚拟表,
  • 定义了一个用户自定义函数 myUDF,该函数可以使用类名和两个构造函数参数进行实例化,已注释掉
  • 连接到两个 Hive catalogs 并用 catalog_1 来作为当前目录,用 mydb1 来作为该目录的当前数据库,已注释掉
  • streaming 模式下用 blink planner 来运行时间特征为 event-time 和并行度为 1 的语句,
  • 在 table 结果模式下运行试探性的(exploratory)的查询,
  • 并通过配置选项对联结(join)重排序和溢出进行一些计划调整。

重新启动flink cli,内容如下:

  • users.csv文件内容
    需要将该文件放在flink集群中的每台机器上
1,alan,18,20
2,alanchan,19,30
3,alanchanchn,20,40
4,alan_chan,25,60
  • flink cli查询

查询表和视图

Flink SQL> select * from Users_Testing_alanchan ;
  • 表显示如下
    在这里插入图片描述
  • 视图显示如下
Flink SQL> select * from Users_Testing_alanchan_View;

在这里插入图片描述

根据使用情况,配置可以被拆分为多个文件。因此,一般情况下(用 --defaults 指定默认环境配置文件)以及基于每个会话(用 --environment 指定会话环境配置文件)来创建环境配置文件。每个 CLI 会话均会被属于 session 属性的默认属性初始化。例如,默认环境配置文件可以指定在每个会话中都可用于查询的所有 table source,而会话环境配置文件仅声明特定的状态保留时间和并行性。启动 CLI 应用程序时,默认环境配置文件和会话环境配置文件都可以被指定。如果未指定默认环境配置文件,则 SQL 客户端将在 Flink 的配置目录中搜索 ./conf/sql-client-defaults.yaml。

在 CLI 会话中设置的属性(如 SET 命令)优先级最高:
CLI commands > session environment file > defaults environment file

2、重启策略(Restart Strategies)

重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的全局重启策略相似,更细精度的重启配置可以在环境配置文件中声明。

Flink 支持以下策略:

execution:# 退回到 flink-conf.yaml 中定义的全局策略restart-strategy:type: fallback# 作业直接失败并且不尝试重启restart-strategy:type: none# 最多重启作业的给定次数restart-strategy:type: fixed-delayattempts: 3      # 作业被宣告失败前的重试次数(默认:Integer.MAX_VALUE)delay: 10000     # 重试之间的间隔时间,以毫秒为单位(默认:10 秒)# 只要不超过每个时间间隔的最大故障数就继续尝试restart-strategy:type: failure-ratemax-failures-per-interval: 1   # 每个间隔重试的最大次数(默认:1)failure-rate-interval: 60000   # 监测失败率的间隔时间,以毫秒为单位delay: 10000                   # 重试之间的间隔时间,以毫秒为单位(默认:10 秒)

2)、依赖

SQL 客户端不要求用 Maven 或者 SBT 设置 Java 项目。相反,你可以以常规的 JAR 包给集群提交依赖项。你也可以分别(用 --jar)指定每一个 JAR 包或者(用 --library)定义整个 library 依赖库。为连接扩展系统(如 Apache Kafka)和相应的数据格式(如 JSON),Flink提供了开箱即用型 JAR 捆绑包(ready-to-use JAR bundles)。这些 JAR 包各个发行版都可以从 Maven 中央库中下载到。

更多的关于外部连接系统示例参考:
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache HBase示例(5)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

如下例子展示了从 Apache Kafka 中读取 csv 文件并作为 table source 的环境配置文件。

# kafka表
tables:- name: Test_Flink_Kafka_Source_SQLtype: source-tableupdate-mode: appendconnector:property-version: 1type: kafkaversion: "universal"topic: t_kafkasource_testingstartup-mode: earliest-offsetproperties:bootstrap.servers: "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092"group.id: flink_alan_kafkaformat:property-version: 1type: csvschema: "ROW<t_id LONG, t_name STRING, t_balance DOUBLE, t_age INT, t_insert_time TIMESTAMP>"schema:- name: t_iddata-type: BIGINT- name: t_namedata-type: STRING- name: t_balancedata-type: DOUBLE- name: t_agedata-type: INT- name: rowTimedata-type: TIMESTAMP(3)rowtime:timestamps:type: "from-field"from: "t_insert_time"watermarks:type: "periodic-bounded"delay: "60000"- name: procTimedata-type: TIMESTAMP(3)proctime: true# 改变表程序基本的执行行为属性。
execution:planner: blink                    # 可选: 'blink' (默认)或 'old'type: streaming                   # 必选:执行模式为 'batch' 或 'streaming'result-mode: table                # 必选:'table' 或 'changelog'max-table-result-rows: 1000000    # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)time-characteristic: event-time   # 可选: 'processing-time' 或 'event-time' (默认)parallelism: 1                    # 可选:Flink 的并行数量(默认为 1)periodic-watermarks-interval: 200 # 可选:周期性 watermarks 的间隔时间(默认 200 ms)max-parallelism: 16               # 可选:Flink 的最大并行数量(默认 128)min-idle-state-retention: 0       # 可选:表程序的最小空闲状态时间max-idle-state-retention: 0       # 可选:表程序的最大空闲状态时间current-catalog: default_catalog  # 可选:当前会话 catalog 的名称(默认为 'default_catalog')current-database: default_database # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)restart-strategy:                 # 可选:重启策略(restart-strategy)type: fallback                  #   默认情况下“回退”到全局重启策略# 用于调整和调优表程序的配置选项。
# 在专用的”配置”页面上可以找到完整的选项列表及其默认值。
configuration:table.optimizer.join-reorder-enabled: truetable.exec.spill-compression.enabled: truetable.exec.spill-compression.block-size: 128kb# 描述表程序提交集群的属性。
deployment:response-timeout: 5000

运行上面的配置需要在flink的lib文件夹下增加关于kafka的依赖包,上述例子中增加了如下包
本文的依赖环境是flink 1.13.5

flink-sql-connector-kafka_2.11-1.13.5.jar
# 要重启集群

Test_Flink_Kafka_Source_SQL 表的结果格式与绝大多数的 csv 格式相似。此外,它还添加了 rowtime 属性 rowTime 和 processing-time 属性 procTime。
connector 和 format 都允许定义属性版本(当前版本为 1 )以便将来向后兼容。

  • 验证
# kafka主题操作
kafka-topics.sh --delete --topic t_kafkasource_testing --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource_testing --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_testing# 测试数据
1,alan,15,18,2022-08-07 09:18:25
2,alanchan,20,19,2022-08-07 09:20:25
3,alanchanchn,25,20,2022-08-07 09:22:25
4,alan_chan,30,21,2022-08-07 09:24:25
5,alan_chan_chn,45,22,2022-08-07 09:26:25# kafka客户端操作
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_testing
>1,alan,15,18,2022-08-07 09:18:25
>2,alanchan,20,19,2022-08-07 09:20:25
>3,alanchanchn,25,20,2022-08-07 09:22:25
>4,alan_chan,30,21,2022-08-07 09:24:25
>5,alan_chan_chn,45,22,2022-08-07 09:26:25
  • 查询结果
    在这里插入图片描述

3)、自定义函数(User-defined Functions)

SQL 客户端允许用户创建用户自定义的函数来进行 SQL 查询。当前,这些自定义函数仅限于 Java/Scala 编写的类以及 Python 文件。

为提供 Java/Scala 的自定义函数,你首先需要实现和编译函数类,该函数继承自 ScalarFunction、 AggregateFunction 或 TableFunction(19、Flink 的table api与sql之内置函数: Table API 和 SQL 中的内置函数)。一个或多个函数可以打包到 SQL 客户端的 JAR 依赖中。

所有函数在被调用之前,必须在环境配置文件中提前声明。functions 列表中每个函数类都必须指定

  • 用来注册函数的 name,
  • 函数的来源 from(目前仅限于 class(Java/Scala UDF),

Java/Scala UDF 必须指定:

  • 声明了全限定名的函数类 class 以及用于实例化的 constructor 参数的可选列表。
functions:- name: java_udf               # required: name of the functionfrom: class                  # required: source of the functionclass: ...                   # required: fully qualified class name of the functionconstructor:                 # optional: constructor parameters of the function class- ...                      # optional: a literal parameter with implicit type- class: ...               # optional: full class name of the parameterconstructor:             # optional: constructor parameters of the parameter's class- type: ...            # optional: type of the literal parametervalue: ...           # optional: value of the literal parameter- name: python_udf             # required: name of the functionfrom: python                 # required: source of the function fully-qualified-name: ...    # required: fully qualified class name of the function    

对于 Java/Scala UDF,要确保函数类指定的构造参数顺序和类型都要严格匹配。

1、构造函数参数

根据用户自定义函数可知,在用到 SQL 语句中之前,有必要将构造参数匹配对应的类型。
如上述示例所示,当声明一个用户自定义函数时,可以使用构造参数来配置相应的类,有以下三种方式:

  • 隐式类型的文本值:SQL 客户端将自动根据文本推导对应的类型。目前,只支持 BOOLEAN、INT、 DOUBLE 和 VARCHAR 。

如果自动推导的类型与期望不符(例如,你需要 VARCHAR 类型的 false),可以改用显式类型。

- true         # -> BOOLEAN (case sensitive)
- 42           # -> INT
- 1234.222     # -> DOUBLE
- foo          # -> VARCHAR
  • 显式类型的文本值:为保证类型安全,需明确声明 type 和 value 属性的参数。
- type: DECIMALvalue: 11111111111111111

下表列出支持的 Java 参数类型和与之相对应的 SQL 类型。
在这里插入图片描述

  • (嵌套)类实例:除了文本值外,还可以通过指定构造参数的 class 和 constructor 属性来创建(嵌套)类实例。这个过程可以递归执行,直到最后的构造参数是用文本值来描述的。
- class: foo.bar.paramClassconstructor:- StarryName- class: java.lang.Integerconstructor:- class: java.lang.Stringconstructor:- type: VARCHARvalue: 3

4、Catalogs

Catalogs 可以由 YAML 属性集合定义,并且在 SQL 客户端启动之前自动注册到运行环境中。

用户可以指定在 SQL CLI 中哪些 catalog 要被作为当前的 catalog,以及哪个数据库的 catalog 可以用于当前数据库。

catalogs:- name: catalog_1type: hiveproperty-version: 1default-database: mydb2hive-conf-dir: <path of Hive conf directory>- name: catalog_2type: hiveproperty-version: 1hive-conf-dir: <path of Hive conf directory>execution:...current-catalog: catalog_1current-database: mydb1

更多关于 catalog 的内容,参考 24、Flink 的table api与sql之Catalogs。

5、分离的 SQL 查询

为定义端到端的 SQL 管道,SQL 的 INSERT INTO 语句可以向 Flink 集群提交长时间运行的分离查询。查询产生的结果输出到除 SQL 客户端外的扩展系统中。这样可以应对更高的并发和更多的数据。CLI 自身在提交后不对分离查询做任何控制。

INSERT INTO MyTableSink SELECT * FROM MyTableSource

sink MyTableSink 必须在环境配置文件中声明。查看更多关于 Flink 支持的外部系统及其配置信息,参见
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache HBase示例(5)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

如下展示 Apache Kafka 的 sink 示例。

CREATE TABLE Table_Sink_Kafka (t_id BIGINT, t_name STRING, t_balance DOUBLE, t_age INT, t_insert_time TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 't_kafka_sink_flink','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092','format' = 'csv'
);Flink SQL> CREATE TABLE Table_Sink_Kafka (
>     t_id BIGINT, 
>     t_name STRING, 
>     t_balance DOUBLE, 
>     t_age INT, 
>     t_insert_time TIMESTAMP(3)
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 't_kafka_sink_flink',
>     'scan.startup.mode' = 'earliest-offset',
>     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>     'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> select * from Test_Flink_Kafka_Source_SQL;
[INFO] Result retrieval cancelled.-- 查询结果如下

在这里插入图片描述
SQL 客户端要确保语句成功提交到集群上。一旦提交查询,CLI 将展示关于 Flink 作业的相关信息。

Flink SQL> INSERT INTO Table_Sink_Kafka SELECT t_id,t_name,t_balance,t_age,rowTime FROM Test_Flink_Kafka_Source_SQL;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 40769216d90fa12fd0696e8e0a6dec2dFlink SQL> select * from Table_Sink_Kafka;
-- 插入后的数据如下

在这里插入图片描述

提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的重启策略负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。

6、SQL 视图

视图是一张虚拟表,允许通过 SQL 查询来定义。视图的定义会被立即解析与验证。然而,提交常规 INSERT INTO 或 SELECT 语句后不会立即执行,在访问视图时才会真正执行。

视图可以用环境配置文件或者 CLI 会话来定义。

下例展示如何在一个文件里定义多张视图。视图注册的顺序和定义它们的环境配置文件一致。支持诸如 视图 A 依赖视图 B ,视图 B 依赖视图 C 的引用链。

tables:
# filesystem表- name: Users_Testing_view_alanchantype: source-tableupdate-mode: appendconnector:type: filesystempath: "/usr/local/bigdata/testdata/flink/users.csv"format:type: csvfields:- name: u_iddata-type: INT- name: u_namedata-type: VARCHAR- name: u_agedata-type: INT- name: u_balancedata-type: DOUBLEline-delimiter: "\n"comment-prefix: "#"schema:- name: u_iddata-type: INT- name: u_namedata-type: VARCHAR- name: u_agedata-type: INT- name: u_balancedata-type: DOUBLE- name: Users_Testing_alanchan_View1type: viewquery: "SELECT u_id,u_name,u_age FROM Users_Testing_view_alanchan"- name: Users_Testing_alanchan_View2type: viewquery: "SELECT u_id,u_name,u_balance FROM Users_Testing_view_alanchan"

相较于 table soruce 和 sink,会话环境配置文件中定义的视图具有最高优先级。
-验证

Flink SQL> select * from Users_Testing_view_alanchan;Flink SQL> select * from Users_Testing_alanchan_View2;

在这里插入图片描述

在这里插入图片描述

视图还可以在 CLI 会话中用 CREATE VIEW 语句来创建:

CREATE VIEW MyNewView AS select u_name from Users_Testing_view_alanchan;Flink SQL> select * from MyNewView;

在这里插入图片描述

视图能在 CLI 会话中创建,也能用 DROP VIEW 语句删除:

Flink SQL> DROP VIEW MyNewView;
[INFO] Execute statement succeed.Flink SQL> show tables;
+------------------------------+
|                   table name |
+------------------------------+
| Users_Testing_alanchan_View1 |
| Users_Testing_alanchan_View2 |
|  Users_Testing_view_alanchan |
+------------------------------+
3 rows in set

CLI 中视图的定义仅限于上述语法。将来版本会支持定义视图结构以及在表名中加入转义的空格。

7、临时表(Temporal Table)

临时表是在变化的历史记录表上的(参数化)视图,该视图在某个特定时间点返回表的内容。这对于在特定的时间戳将一张表的内容联结另一张表是非常有用的。更多信息见15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置页面。

下例展示如何定义一张临时表 SourceTemporalTable:

tables:# 定义包含对临时表的更新的 table source (或视图)- name: HistorySourcetype: source-tableupdate-mode: appendconnector: # ...format: # ...schema:- name: integerFielddata-type: INT- name: stringFielddata-type: STRING- name: rowtimeFielddata-type: TIMESTAMP(3)rowtime:timestamps:type: from-fieldfrom: rowtimeFieldwatermarks:type: from-source# 在具有时间属性和主键的变化历史记录表上定义临时表- name: SourceTemporalTabletype: temporal-tablehistory-table: HistorySourceprimary-key: integerFieldtime-attribute: rowtimeField  # could also be a proctime field

如例子中所示,table source,视图和临时表的定义可以相互混合。它们按照在环境配置文件中定义的顺序进行注册。例如,临时表可以引用一个视图,该视图依赖于另一个视图或 table source。

8、局限与未来

当前的 SQL 客户端仅支持嵌入式模式。在1.17版本后提供基于 REST 的 SQL 客户端网关(Gateway) 的功能。

以上,介绍了flink cli的启动、使用以及通过创建kafka、filesystem等例子介绍了配置文件的使用,同时也简单的介绍了视图、临时表等内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/34123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iphone拷贝照片中间带E自动去重软件,以及java程序如何打包成jar和exe

文章目录 一、前提二、问题描述三、原始处理方式四、程序处理4.1 java程序如何打包exe4.1.1 首先打包jar4.1.2 开始生成exe4.1.3 软件使用方式 4.2 更换图标4.2.1 更换swing的打包jar图标4.2.2 更换exe图标 4.3 如何使生成的exe在没有java环境的电脑上运行4.3.1 Inno Setup打包…

el-select 动态添加多个下拉框

实现的效果如下: 主要的代码如下: 这是formdata 的结构 主要的逻辑 在这个 methods

Linux网络协议和管理

Linux网络协议和管理 一.网络设备基本知识 图1-网络设备基本知识 二.TCP/IP协议栈简介 1.概述 网络协议通常工作在不同的层中&#xff0c;每一层分别负责不同的通信功能。一个协议族&#xff0c; 比如T C P / I P&#xff0c;是一组不同层次上的多个协议的组合。T C P / I P通…

UVA-1601 万圣节后的早晨 题解答案代码 算法竞赛入门经典第二版

GitHub - jzplp/aoapc-UVA-Answer: 算法竞赛入门经典 例题和习题答案 刘汝佳 第二版 以三个点的当前位置作为状态&#xff0c;广度优先遍历&#xff0c;找到终点即为最短次数。 注意&#xff1a; 一次可以移动多个点&#xff0c;但是每个点只能移动一步。在同一次中&#xf…

工单管理系统有什么优点?工单系统是如何提高企业服务质量和运营效率的?

工单管理系统是一款基于云平台打造的高效报修工单管理系统&#xff0c;为企业报修管理、维保流程优化和后勤决策分析提供全面支持。通过应用工单管理系统&#xff0c;企业能够轻松提升报修效率&#xff0c;降低人工成本&#xff0c;同时提高后勤管理的质量和效益。系统利用先进…

快速上手React:从概述到组件与事件处理

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

Java进阶(1)——JVM的内存分配 反射Class类的类对象 创建对象的几种方式 类加载(何时进入内存JVM) 注解 反射+注解的案例

目录 引出java内存分配java内存分布概略图堆方法区常量池 创建对象内存分配 反射class文件的底层类加载顺序1.检查2.开辟静态资源空间3.常量池4.其他...5.创建一个唯一的类的对象获取Class对象的几种方式 创建对象几种方式new 看到new : new Book()反射 Class.forName(“包名.类…

逆向破解学习-割绳子

试玩 支付失败&#xff0c;请检查网络设置 Hook成功 Hook代码 import android.app.Application; import android.content.Context;import de.robv.android.xposed.XC_MethodHook; import de.robv.android.xposed.XposedHelpers; import de.robv.android.xposed.callbacks.XC_…

vue2 封装 webSocket 开箱即用

第一步&#xff1a; 下载 webSocket npm install vue-native-websocket --save 第二步&#xff1a; 需要在 main.js 中 引入 import websocket from vue-native-websocket; Vue.use(websocket, , {connectManually: true, // 手动连接format: json, // json格式reconnection:…

SpringMVC的架构有什么优势?——表单和数据校验(四)

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

深度学习(37)—— 图神经网络GNN(2)

深度学习&#xff08;37&#xff09;—— 图神经网络GNN&#xff08;2&#xff09; 这一期主要是一些简单示例&#xff0c;针对不同的情况&#xff0c;使用的数据都是torch_geometric的内置数据集 文章目录 深度学习&#xff08;37&#xff09;—— 图神经网络GNN&#xff08…

list模拟实现【引入反向迭代器】

文章目录 1.适配器1.1传统意义上的适配器1.2语言里的适配器1.3理解 2.list模拟实现【注意看反向迭代器】2.1 list_frame.h2.2riterator.h2.3list.h2.4 vector.h2.5test.cpp 3.反向迭代器的应用1.使用要求2.迭代器的分类 1.适配器 1.1传统意义上的适配器 1.2语言里的适配器 容…

基于python+MobileNetV2算法模型实现一个图像识别分类系统

一、目录 算法模型介绍模型使用训练模型评估项目扩展 二、算法模型介绍 图像识别是计算机视觉领域的重要研究方向&#xff0c;它在人脸识别、物体检测、图像分类等领域有着广泛的应用。随着移动设备的普及和计算资源的限制&#xff0c;设计高效的图像识别算法变得尤为重要。…

fork函数和exec族函数的结合使用 的案例

首先回顾之前所讲&#xff0c;在说明“为什么要创建进程”的时候&#xff0c;提到过以下两个原因&#xff1a; 其中第一个原因很好理解&#xff0c;而第二个原因就包含了上节所讲的exec族函数的知识点&#xff0c;并且不管是之前的博文还是上节的exec&#xff0c;都提到了一点“…

重启服务器引发的Docker异常

公司使用云服务器需要硬盘扩容&#xff0c;服务器重启才生效。 重启以后发现拉取远程镜像的命令登录失败了&#xff01; 然后发现找不到容器和镜像列表了&#xff0c;但是容器都启动了。 查看docker运行状态都是正常的 systemctl is-active docker systemctl status docker.…

爬虫015_python异常_页面结构介绍_爬虫概念介绍---python工作笔记034

来看python中的异常 可以看到不做异常处理因为没有这个文件所以报错了 来看一下异常的写法

exec族函数

本节学习exec族函数&#xff0c;并大量参考了以下链接&#xff1a; linux进程---exec族函数(execl, execlp, execle, execv, execvp, execvpe)_云英的博客-CSDN博客 exec族函数函数的作用 我们用fork函数创建新进程后&#xff0c;经常会在新进程中调用exec函数去执行另外一个程…

Fortinet安全专家问答实录|如何防护暴力破解、撞库攻击

黑客攻防&#xff0c;一个看似神秘&#xff0c;但却必不可缺的领域。近期&#xff0c;全球网络与安全融合领域领导者Fortinet&#xff08;Nasdaq&#xff1a;FTNT&#xff09;&#xff0c;开启了Fortinet DEMO DAY系列实战攻防演练线上直播&#xff0c;让人人都能零距离观摩黑客…

AI 绘画Stable Diffusion 研究(六)sd提示词插件

大家好&#xff0c;我是风雨无阻。 今天为大家推荐一款可以有效提升我们使用 Stable Diffusion WebUI 效率的插件&#xff0c; 它就是 prompt-all-in-one&#xff0c; 它不但能直接将 WebUI 中的中文提示词转换为英文&#xff0c;还能一键为关键词加权重&#xff0c;更能建立常…

消息中间件 —— 初识Kafka

文章目录 1、Kafka简介1.1、消息队列1.1.1、为什么要有消息队列&#xff1f;1.1.2、消息队列1.1.3、消息队列的分类1.1.4、p2p 和 发布订阅MQ的比较1.1.5、消息系统的使用场景1.1.6、常见的消息系统 1.2、Kafka简介1.2.1、简介1.2.2、设计目标1.2.3、kafka核心的概念 2、Kafka的…