18、Flink的SQL 支持的操作和语法

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4

26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、SQL
    • 1、数据类型
    • 2、保留关键字
  • 二、SQL入门
    • 1、Flink SQL环境准备
      • 1)、安装Flink及提交任务方式
      • 2)、SQL客户端使用介绍
      • 3)、简单示例
    • 2、Source 表介绍及示例
    • 3、连续查询介绍及示例
    • 4、Sink 表介绍及示例


本文简单的介绍了SQL和SQL的入门,并以三个简单的示例进行介绍,由于示例涉及到其他的内容,需要了解更深入的内容请参考相关的文章。。
本文依赖flink和kafka、hadoop集群能正常使用。
本文分为2个部分,即介绍了Flink SQL和入门,并提供了完整的可验证通过的示例。

一、SQL

本文描述了 Flink 所支持的 SQL 语言,包括数据定义语言(Data Definition Language,DDL)、数据操纵语言(Data Manipulation Language,DML)以及查询语言。Flink 对 SQL 的支持基于实现了 SQL 标准的 Apache Calcite。

本文列出了目前(截至版本1.17) Flink SQL 所支持的所有语句:

  • SELECT (Queries),具体内容参考文章:27、Flink 的SQL之SELECT (Queries)

  • CREATE TABLE, CATALOG, DATABASE, VIEW, FUNCTION
    具体内容参考文章:22、Flink 的table api与sql之创建表的DDL

  • DROP TABLE, DATABASE, VIEW, FUNCTION

  • ALTER TABLE, DATABASE, FUNCTION

  • INSERT

  • ANALYZE TABLE
    具体内容参考文章:28、Flink 的SQL之DROP 语句、ALTER 语句、INSERT 语句、ANALYZE 语句

  • UPDATE

  • DELETE

  • SQL HINTS

  • DESCRIBE

  • EXPLAIN

  • USE

  • SHOW

  • LOAD

  • UNLOAD

具体内容参考文章: 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE

1、数据类型

通用类型与(嵌套的)复合类型 (如:POJO、tuples、rows、Scala case 类) 都可以作为行的字段。

复合类型的字段任意的嵌套可被 值访问函数(内置函数) 访问。

通用类型将会被视为一个黑箱,且可以被 用户自定义函数 传递或引用。

对于 DDL 语句而言,我们支持所有在 数据类型 页面中定义的数据类型。

SQL查询不支持部分数据类型(cast 表达式或字符常量值)。
如:STRING, BYTES, RAW, TIME§ WITHOUT TIME ZONE, TIME§ WITH LOCAL TIME ZONE, TIMESTAMP§ WITHOUT TIME ZONE, TIMESTAMP§ WITH LOCAL TIME ZONE, ARRAY, MULTISET, ROW.

更多内容,请参考文章:14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性

2、保留关键字

虽然 SQL 的特性并未完全实现,但是一些字符串的组合却已经被预留为关键字以备未来使用。如果你希望使用以下字符串作为你的字段名,请在使用时使用反引号将该字段名包起来(如 value, count )。

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANALYZE, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMNS, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MODULES, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE

二、SQL入门

Flink SQL 使得使用标准 SQL 开发流应用程序变的简单。如果你曾经在工作中使用过兼容 ANSI-SQL 2011 的数据库或类似的 SQL 系统,那么就很容易学习 Flink。

1、Flink SQL环境准备

1)、安装Flink及提交任务方式

参考文章:
1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤

2)、SQL客户端使用介绍

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

3)、简单示例

Flink SQL>SET execution.result-mode=tableau;Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in setFlink SQL> use default_database;
[INFO] Execute statement succeed.Flink SQL> show tables;
Empty setFlink SQL> SELECT 'Hello World';
+----+--------------------------------+
| op |                         _o__c0 |
+----+--------------------------------+
| +I |                    Hello World |
+----+--------------------------------+
Received a total of 1 row
Flink SQL> show functions;
Hive Session ID = 5d34cbf8-5984-4ec0-8527-e06a948ad7ca
+--------------------------------+
|                  function name |
+--------------------------------+
|                              ! |
|                             != |
|                          $sum0 |
|                              % |
|                              & |
|                              * |
|                              + |
|                              - |
|                              / |
|                              < |
|                             <= |
|                            <=> |
|                             <> |
|                              = |
|                             == |
|                              > |
|                             >= |
|                         IFNULL |
|               SOURCE_WATERMARK |
|                         TYPEOF |
|                              ^ |
|           _legacy_grouping__id |
|                            abs |
|                           acos |
|                     add_months |
|                    aes_decrypt |
|                    aes_encrypt |
|                            and |
|                          array |
|                 array_contains |
|                             as |
|                            asc |
|                          ascii |
|                           asin |
|                    assert_true |
|                assert_true_oom |
|                             at |
|                           atan |
|                          atan2 |
|                            avg |
|                         base64 |
|                        between |
|                         bigint |
|                            bin |
|                         binary |
|                   bloom_filter |
|                        boolean |
|                         bround |
|                    cardinality |
|          cardinality_violation |
|                           case |
|                           cast |
|                           cbrt |
|                           ceil |
|                        ceiling |
|                           char |
|                     charLength |
|                    char_length |
|               character_length |
|                            chr |
|                       coalesce |
|                        collect |
|                   collect_list |
|                    collect_set |
|                  compute_stats |
|                         concat |
|                      concat_ws |
|                 context_ngrams |
|                           conv |
|                           corr |
|                            cos |
|                           cosh |
|                            cot |
|                          count |
|                      covar_pop |
|                     covar_samp |
|                          crc32 |
|                   create_union |
|                    currentDate |
|                   currentRange |
|                     currentRow |
|            currentRowTimestamp |
|                    currentTime |
|               currentTimestamp |
|             current_authorizer |
|               current_database |
|                 current_groups |
|                   current_user |
|                           date |
|                     dateFormat |
|                       date_add |
|                    date_format |
|                       date_sub |
|                       datediff |
|                            day |
|                     dayofmonth |
|                      dayofweek |
|                        decimal |
|                         decode |
|                        degrees |
|                           desc |
|                       distinct |
|                            div |
|                         divide |
|                         double |
|                              e |
|                        element |
|                            elt |
|                         encode |
|             encryptphonenumber |
|                            end |
|             enforce_constraint |
|                         equals |
|                            exp |
|                        explode |
|                        extract |
|                  extract_union |
|                      factorial |
|                          field |
|                    find_in_set |
|                        flatten |
|                          float |
|                          floor |
|                      floor_day |
|                     floor_hour |
|                   floor_minute |
|                    floor_month |
|                  floor_quarter |
|                   floor_second |
|                     floor_week |
|                     floor_year |
|                  format_number |
|                     fromBase64 |
|                  from_unixtime |
|             from_utc_timestamp |
|                            get |
|                get_json_object |
|                     get_splits |
|                    greaterThan |
|             greaterThanOrEqual |
|                       greatest |
|                       grouping |
|                           hash |
|                            hex |
|              histogram_numeric |
|                           hour |
|                             if |
|                     ifThenElse |
|                             in |
|                in_bloom_filter |
|                        in_file |
|                          index |
|                        initCap |
|                        initcap |
|                         inline |
|                          instr |
|                            int |
|              internal_interval |
|              interval_day_time |
|            interval_year_month |
|                        isFalse |
|                     isNotFalse |
|                      isNotNull |
|                      isNotTrue |
|                         isNull |
|                         isTrue |
|                        isfalse |
|                     isnotfalse |
|                      isnotnull |
|                      isnottrue |
|                         isnull |
|                         istrue |
|                    java_method |
|                     json_tuple |
|                       last_day |
|                          lcase |
|                          least |
|                         length |
|                       lessThan |
|                lessThanOrEqual |
|                    levenshtein |
|                           like |
|                        likeall |
|                        likeany |
|                             ln |
|                      localTime |
|                 localTimestamp |
|                         locate |
|                            log |
|                          log10 |
|                           log2 |
|                 logged_in_user |
|                          lower |
|                      lowerCase |
|                           lpad |
|                          ltrim |
|                            map |
|                       map_keys |
|                     map_values |
|                           mask |
|                   mask_first_n |
|                      mask_hash |
|                    mask_last_n |
|              mask_show_first_n |
|               mask_show_last_n |
|                      matchpath |
|                            max |
|                            md5 |
|                            min |
|                          minus |
|                    minusPrefix |
|                         minute |
|                            mod |
|                          month |
|                 months_between |
|                    murmur_hash |
|                   named_struct |
|                       negative |
|                       next_day |
|                         ngrams |
|                           noop |
|                  noopstreaming |
|                    noopwithmap |
|           noopwithmapstreaming |
|                            not |
|                     notBetween |
|                      notEquals |
|                         nullif |
|                            nvl |
|                   octet_length |
|                             or |
|                           over |
|                        overlay |
|                      parse_url |
|                parse_url_tuple |
|                     percentile |
|              percentile_approx |
|                             pi |
|                           plus |
|                           pmod |
|                     posexplode |
|                       position |
|                       positive |
|                            pow |
|                          power |
|                         printf |
|                       proctime |
|                        quarter |
|                        radians |
|                           rand |
|                    randInteger |
|                        rangeTo |
|                        reflect |
|                       reflect2 |
|                         regexp |
|                  regexpExtract |
|                  regexpReplace |
|                 regexp_extract |
|                 regexp_replace |
|                      regr_avgx |
|                      regr_avgy |
|                     regr_count |
|                 regr_intercept |
|                        regr_r2 |
|                     regr_slope |
|                       regr_sxx |
|                       regr_sxy |
|                       regr_syy |
|                reinterpretCast |
|                         repeat |
|                        replace |
|                 replicate_rows |
|    restrict_information_schema |
|                        reverse |
|                          rlike |
|                          round |
|                            row |
|                        rowtime |
|                           rpad |
|                          rtrim |
|                         second |
|                      sentences |
|                            sha |
|                           sha1 |
|                           sha2 |
|                         sha224 |
|                         sha256 |
.................

至此,我们的环境都准备好了。

2、Source 表介绍及示例

与所有 SQL 引擎一样,Flink 查询操作是在表上进行。与传统数据库不同,Flink 不在本地管理静态数据;相反,它的查询在外部表上连续运行。

Flink 数据处理流水线开始于 source 表。source 表产生在查询执行期间可以被操作的行;它们是查询时 FROM 子句中引用的表。这些表可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。

可以通过 SQL 客户端或使用环境配置文件来定义表。SQL 客户端支持类似于传统 SQL 的 SQL DDL 命令。标准 SQL DDL 用于创建,修改,删除表。

Flink 支持不同的连接器和格式相结合以定义表。相关内容在本Flink专栏中均有介绍,请参考:alanchanchn的专栏-Flink专栏

下面是一个示例,定义一个以 CSV 文件作为存储格式的 source 表。由于Flink创建表涉及较多的内容,关于下面的示例请参考文章:16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)

Flink SQL> show catalogs;
Hive Session ID = 008f6263-1b8e-4eb7-b034-a2c8651809f1
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in setFlink SQL> use catalog default_catalog;
Hive Session ID = 1b1a3fb2-e303-4c2a-bfc8-5f38c47aa0f6
[INFO] Execute statement succeed.Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in setFlink SQL> use default_database;
[INFO] Execute statement succeed.Flink SQL> show tables;
Empty setFlink SQL> CREATE TABLE alan_first_table (
>     t_id BIGINT, 
>     t_name STRING, 
>     t_balance DOUBLE, 
>     t_age INT
> ) WITH (
>   'connector' = 'filesystem',           
>   'path' = 'hdfs://HadoopHAcluster/flinktest/firstdemo/', 
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> show tables;
+------------------+
|       table name |
+------------------+
| alan_first_table |
+------------------+
1 row in set
---能查出来数据是有前提的,那就是在创建表之前,我已经在hdfs://HadoopHAcluster/flinktest/firstdemo目录下上传了5个文件,每个文件一条数据
[alanchan@server4 testdata]$ hadoop fs -ls hdfs://HadoopHAcluster/flinktest/firstdemo
Found 5 items
-rw-r--r--   3 alanchan supergroup         15 2023-09-07 10:24 hdfs://HadoopHAcluster/flinktest/firstdemo/dim_user1.txt
-rw-r--r--   3 alanchan supergroup         19 2023-09-07 10:24 hdfs://HadoopHAcluster/flinktest/firstdemo/dim_user2.txt
-rw-r--r--   3 alanchan supergroup         22 2023-09-07 10:24 hdfs://HadoopHAcluster/flinktest/firstdemo/dim_user3.txt
-rw-r--r--   3 alanchan supergroup         20 2023-09-07 10:24 hdfs://HadoopHAcluster/flinktest/firstdemo/dim_user4.txt
-rw-r--r--   3 alanchan supergroup         24 2023-09-07 10:24 hdfs://HadoopHAcluster/flinktest/firstdemo/dim_user5.txtFlink SQL> select * from alan_first_table;
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op |                 t_id |                         t_name |                      t_balance |       t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I |                    5 |                  alan_chan_chn |                          52.23 |          38 |
| +I |                    3 |                    alanchanchn |                          32.23 |          28 |
| +I |                    1 |                           alan |                          12.23 |          18 |
| +I |                    4 |                      alan_chan |                          12.43 |          29 |
| +I |                    2 |                       alanchan |                          22.23 |          10 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 5 rows---带条件查询
Flink SQL> select * from alan_first_table where t_balance >=20;
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op |                 t_id |                         t_name |                      t_balance |       t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I |                    3 |                    alanchanchn |                          32.23 |          28 |
| +I |                    2 |                       alanchan |                          22.23 |          10 |
| +I |                    5 |                  alan_chan_chn |                          52.23 |          38 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 3 rows

可以从该表中定义一个连续查询,当新行可用时读取并立即输出它们的结果。

3、连续查询介绍及示例

虽然最初设计时没有考虑流语义,但 SQL 是用于构建连续数据流水线的强大工具。Flink SQL 与传统数据库查询的不同之处在于,Flink SQL 持续消费到达的行并对其结果进行更新。

一个连续查询永远不会终止,并会产生一个动态表作为结果。动态表是 Flink 中 Table API 和 SQL 对流数据支持的核心概念。

连续流上的聚合需要在查询执行期间不断地存储聚合的结果。例如,假设你需要从传入的数据流中计算每个部门的员工人数。查询需要维护每个部门最新的计算总数,以便在处理新行时及时输出结果。

关于连续查询更多的内容,参考文章:15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置

下面的示例说明:
1、在flink创建一张表,提交连续查询的任务(其实就是一个查询session,动态显示表内的数据)
2、为方便模拟,使用kafka作为消息源,即表的连接类型为kafka,也即需要有kafka的运行环境
3、sql客户端的环境与本文上述示例一致
4、关于该示例更多的信息参考:16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)

Flink SQL> CREATE TABLE alanchan_kafka_table (
>     `id` INT,
>     name STRING,
>     age INT,
>     balance DOUBLE
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 't_kafka_source',
>     'scan.startup.mode' = 'earliest-offset',
>     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>     'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> show tables;
+----------------------+
|           table name |
+----------------------+
|     alan_first_table |
| alanchan_kafka_table |
+----------------------+
2 rows in set
-----kafka一条一条写入数据,下文中的查询结果会根据kafka中发送的消息逐条展示出来------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafka_source
>1,alan,15,100
>2,alanchan,20,200
>3,alanchanchn,25,300
>4,alan_chan,30,400
>5,alan_chan_chn,50,45 
>Flink SQL> select * from alanchan_kafka_table;
+----+-------------+--------------------------------+-------------+--------------------------------+
| op |          id |                           name |         age |                        balance |
+----+-------------+--------------------------------+-------------+--------------------------------+
| +I |           1 |                           alan |          15 |                          100.0 |
| +I |           2 |                       alanchan |          20 |                          200.0 |
| +I |           3 |                    alanchanchn |          25 |                          300.0 |
| +I |           4 |                      alan_chan |          30 |                          400.0 |
| +I |           5 |                  alan_chan_chn |          50 |                           45.0 |

4、Sink 表介绍及示例

当运行此查询时,SQL 客户端实时但是以只读方式提供输出。存储结果,作为报表或仪表板的数据来源,需要写到另一个表。这可以使用 INSERT INTO 语句来实现。本节中引用的表称为 sink 表。INSERT INTO 语句将作为一个独立查询被提交到 Flink 集群中。

------创建数据源表,该表不能查询
Flink SQL> CREATE TABLE source_table (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='100',
>  'fields.userId.kind'='sequence',
>  'fields.userId.start'='1',
>  'fields.userId.end'='1000',
> 
>  'fields.balance.kind'='random',
>  'fields.balance.min'='1',
>  'fields.balance.max'='100',
> 
>  'fields.age.min'='1',
>  'fields.age.max'='1000',
> 
>  'fields.userName.length'='10'
> );
[INFO] Execute statement succeed.
----创建sink表,hdfs文件夹不需要手动创建,flink会自己创建
Flink SQL> CREATE TABLE alan_sink_table (
>     t_id BIGINT, 
>     t_name STRING, 
>     t_balance DOUBLE, 
>     t_age INT
> ) WITH (
>   'connector' = 'filesystem',           
>   'path' = 'hdfs://HadoopHAcluster/flinktest/firstsinkdemo/', 
>   'format' = 'csv'                
> );
[INFO] Execute statement succeed.
------批量插入sink表,也可以是动态的,但需要设置数据刷新频率,否则查不到结果,该事情在本Flink专栏中有说明
------此处也是提交一个flink任务,此处用的是yarn-session模式
Flink SQL> INSERT INTO alan_sink_table 
> SELECT userId ,userName,balance,age FROM source_table;Job ID: c2e1985745c5c938c56e26f8efe5a8db------查询结果如下
Flink SQL> select * from alan_sink_table;+----+----------------------+--------------------------------+--------------------------------+-------------+
| op |                 t_id |                         t_name |                      t_balance |       t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I |                    1 |                     d0c7d38b94 |              31.52935530019297 |         802 |
| +I |                    2 |                     b880adc262 |              45.43292342494475 |         556 |
| +I |                    3 |                     e1ce373b2e |             39.595138772111014 |         459 |
| +I |                    4 |                     3bd1242679 |              78.58761035208113 |         585 |
| +I |                    5 |                     88ba47bb2b |              4.870598793833649 |         508 |
| +I |                    6 |                     72bdba9132 |              48.33565877511729 |         115 |
| +I |                    7 |                     0fa82976d1 |               52.6978279057911 |         353 |
| +I |                    8 |                     8d546bab93 |             20.403401648898576 |         391 |
| +I |                    9 |                     9eb957d512 |              82.16967630094122 |         323 |
| +I |                   10 |                     5423755f01 |              49.12646233699912 |         769 |
| +I |                   11 |                     da6c7936ea |             16.877530563314846 |         687 |
| +I |                   12 |                     3ef87eb75a |              68.65154273578702 |         434 |
| +I |                   13 |                     e08320e927 |              8.403066874855323 |         292 |
| +I |                   14 |                     03e1ccfc69 |              98.61326426348097 |         653 |
......
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 1000 rows

提交后,它将运行并将结果直接存储到 sink 表中,而不是将结果加载到系统内存中。

以上,简单的介绍了SQL和SQL的入门,并以三个简单的示例进行介绍,由于示例涉及到其他的内容,需要了解更深入的内容请参考相关的文章。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/133948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

05【保姆级】-GO语言的标识符

之前我学过C、Java、Python语言时总结的经验&#xff1a; 先建立整体框架&#xff0c;然后再去抠细节。先Know how&#xff0c;然后know why。先做出来&#xff0c;然后再去一点点研究&#xff0c;才会事半功倍。适当的囫囵吞枣。因为死抠某个知识点很浪费时间的。对于GO语言&a…

vscode + cmake + opencv example

nice try on macos CMakeLists.txt cmake_minimum_required(VERSION 3.20) #添加OPENCV库 #指定OpenCV版本&#xff0c;代码如下 #find_package(OpenCV 3.3 REQUIRED) #如果不需要指定OpenCV版本&#xff0c;代码如下 find_package(OpenCV REQUIRED)#添加OpenCV头文件 includ…

智能语音和自然语言处理技术

一、定义 智能语音和自然语言处理技术是指通过计算机技术实现人机交互的一种技术。它可以让计算机和人类之间进行自然而流畅的交流&#xff0c;从而实现更高效、更便捷、更智能的信息交流和处理。 智能语音和自然语言处理技术主要包括语音识别、语音合成、自然语言理解、自然…

CRM客户管理系统究竟是什么?如何实施?

很多销售人员都不是特别喜欢使用信息化软件&#xff0c;然而从销售经理的角度看&#xff0c;信息化又的确提升了团队的管理效率和业绩。追究这些矛盾的原因&#xff0c;无外乎几点&#xff1a; 认知角度 → 销售员&#xff1a;数据没用又浪费我时间 VS 销售经理&#xff1a;数…

openssl交叉编译 (ubuntu+arm)

1.下载安装包 wget https://www.openssl.org/source/openssl-1.1.1w.tar.gz 2.解压安装包 tar -zxvf openssl-1.1.1l.tar.gz 3.进入源码文件夹-修改编译器 CCarm-linux-gnueabihf-gcc 4.配置编译参数 ./config no-asm -shared --prefix/home/alientek/sp_test/openssl/sp…

Springboot 集成 mongodb

一、引入依赖 1.1 Maven <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> 二、yml 配置文件 data:mongodb:# 基础链接参数# 连接的库database: mongo…

单例模式读取配置文件

单例模式&#xff08;Singleton Pattern&#xff09;是一种常见的设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点以获取该实例。当你需要在应用程序中读取配置文件时&#xff0c;使用单例模式可以确保你只创建一个配置对象&#xff0c;以避免重…

聊聊定时器 setTimeout 的时延问题

给大家推荐一个实用面试题库 1、前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;web前端面试题库 全局的 setTimeout() 方法设置一个定时器&#xff0c;一旦定时器到期&#xff0c;就会执行一个函数或指定的代码片…

字符函数和字符串函数详解

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 1. 字符分类函数 2. 字符转换函数 3. strlen的使用和模拟实现 3.1strlen的使用&#xff1a; 3.2strlen的模拟实现&#xff1a; 4. strcpy的使用和模拟实现 4.1strc…

漏刻有时百度地图API实战开发(1)华为手机无法使用addEventListener click 的兼容解决方案

现象 漏刻有时项目开发中的调用了百度地图API&#xff0c;在PC端、IOS和安卓机型测试都没有问题。但是使用华为手机部分型号时&#xff0c;前端在监听点击事件的时候是使用 map.addEventListener(click,function(){...})&#xff0c;无法触发。或 原理 通过监听touchstart和…

FreeRTOS_低功耗Tickless模式

目录 1. STM32F4 低功耗模式 1.1 睡眠(Sleep)模式 1.2 停止(Stop)模式 1.3 待机(Standby)模式 2. Tickless 模式详解 2.1 如何降低功耗 2.2 Tickless 具体实现 2.2.1 宏 configUSE_TICKLESS_IDLE 2.2.2 宏 portSUPPRESS_TICKS_AND_SLEEP() 2.2.3 宏 configPRE_SLEEP_…

利用Caddy实现http反向代理

利用Caddy实现http反向代理 1 Caddy是什么 Caddy是一个开源的&#xff0c;使用Golang编写的&#xff0c;支持HTTP/2的Web服务端。它的一个显著特征就是默认启用HTTPS。 和nginx类似。 2 多个后端服务 假如现在有3个后端http服务&#xff1a;分别在启动在 app1 http://10…

Django初窥门径-自定义附件存储模型

前言 Django自带了一个名为FileField的字段&#xff0c;用于处理文件上传。然而&#xff0c;有时我们需要更多的控制权&#xff0c;例如定义文件的存储路径、文件名以及文件类型。在本篇文章中&#xff0c;我们将探讨如何自定义Django附件存储模型。 创建attachment应用 pyt…

Python|OpenCV-图像的添加和混合操作(8)

前言 本文是该专栏的第8篇,后面将持续分享OpenCV计算机视觉的干货知识,记得关注。 在使用OpenCV库对图像操作的时候,有时需要对图像进行运算操作,类似于加法,减法,位操作等处理。而本文,笔者将针对OpenCV对图像的添加,混合以及位操作进行详细的介绍说明和使用。 下面,…

ZKP8.1 Polynomial-IOP and Polynomial Commitment Schemes

ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 8: FRI-based Polynomial Commitments and Fiat-Shamir (Justin Thaler) 8.1 Polynomial-IOP and Polynomial Commitment Schemes Recall: build an efficient SNARK Recall: Polynomial-IOP P’s first message in the pro…

YOLOv8轻量化模型:模型轻量化设计 | 轻量级可重参化EfficientRep| 来自YOLOv6思想

💡💡💡本文解决什么问题:在几乎不保证精度下降的前提下,轻量级模型创新设计 EfficientRep 在关键点检测任务中 | GFLOPs从9.6降低至8.5, mAP50从0.921下降至0.912,mAP50-95从0.697提升至0.779 YOLO轻量化模型专栏:http://t.csdnimg.cn/AeaEF 1.YOLOv6介绍 论文…

pandas教程:Data Transformation 数据变换、删除和替换

文章目录 7.2 Data Transformation&#xff08;数据变换&#xff09;1 删除重复值2 Transforming Data Using a Function or Mapping&#xff08;用函数和映射来转换数据&#xff09;3 Replacing Values&#xff08;替换值&#xff09;4 Renaming Axis Indexes&#xff08;重命…

API接口安全设计

简介 HTTP接口是互联网各系统之间对接的重要方式之一&#xff0c;使用HTTP接口开发和调用都很方便&#xff0c;也是被大量采用的方式&#xff0c;它可以让不同系统之间实现数据的交换和共享。 由于HTTP接口开放在互联网上&#xff0c;所以我们就需要有一定的安全措施来保证接口…

#龙迅视频转换IC LT7911D是一款高性能Type-C/DP/EDP 转MIPI®DSI/CSI/LVDS 芯片,适用于VR/显示应用。

1.说明 应用功能&#xff1a;LT7911D适用于DP1.2转MIPIDSI/MIPICSI/LVDS&#xff0c;EDP转MIPIDSI/MIPICSI/LVDS&#xff0c;TYPE-C转MIPIDSI/MIPICSI/LVDS应用方案 分辨率&#xff1a;单PORT高达4K30HZ&#xff0c;双PORT高达4K 60HZ 工作温度范围&#xff1a;−40C to 85C 产…

多比特杯武汉工程大学第六届ACM新生赛 A,L

为什么要演奏春日影&#xff01;&#xff01;&#xff01; 看到题目所说&#xff0c;若 b i b_i bi​在 i i i 之前&#xff0c;则…,那么很容易联想到拓扑排序&#xff0c;再仔细看题&#xff0c;对于每个 b i b_i bi​,我们都想其对应的 i 进行连边&#xff0c;那么我们很容…