【大数据】Flink SQL 语法篇(一):CREATE

Flink SQL 语法篇(一)

  • 1.建表语句
  • 2.表中的列
    • 2.1 常规列(物理列)
    • 2.2 元数据列
    • 2.3 计算列
  • 3.定义 Watermark
  • 4.Create Table With 子句
  • 5.Create Table Like 子句

CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。

目前 Flink SQL 支持下列 CREATE 语句:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

1.建表语句

下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同名表已经在 Catalog 中存在了,则无法注册。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n][ <watermark_definition> ][ <table_constraint> ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] ]<physical_column_definition>:column_name column_type [ <column_constraint> ] [COMMENT column_comment]<column_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED<table_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED<metadata_column_definition>:column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]<computed_column_definition>:column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>:WATERMARK FOR rowtime_column_name AS watermark_strategy_expression<source_table>:[catalog_name.][db_name.]table_name<like_options>:
{{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

2.表中的列

2.1 常规列(物理列)

物理列 是数据库中所说的 常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING
) WITH (...
);

2.2 元数据列

元数据列 是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。

例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,-- 读取 kafka 本身自带的时间戳`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka'...
);

元数据列可以用于后续数据的处理,或者写入到目标表中。

INSERT INTO MyTable 
SELECT user_id, name, record_time + INTERVAL '1' SECOND 
FROM MyTable;

如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,-- 读取 kafka 本身自带的时间戳`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH ('connector' = 'kafka'...
);

关于 Flink SQL 的每种 Connector 都提供了哪些 metadata 字段,详细可见 官网文档。

如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。但是这要求两种数据类型是可以强转的。

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,-- 将时间戳强转为 BIGINT`timestamp` BIGINT METADATA
) WITH ('connector' = 'kafka'...
);

默认情况下,Flink SQL Planner 认为 metadata 列是可以 读取 也可以 写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。

那么在往一个表写入的场景下,我们就可以使用 VIRTUAL 关键字来标识某个元数据列不写入到外部存储中(不持久化)。

CREATE TABLE MyTable (-- sink 时会写入`timestamp` BIGINT METADATA,-- sink 时不写入`offset` BIGINT METADATA VIRTUAL,`user_id` BIGINT,`name` STRING,
) WITH ('connector' = 'kafka'...
);

在上面这个案例中,Kafka 引擎的 offset 是只读的。所以我们在把 MyTable 作为数据源(输入)表时,Schema 中是包含 offset 的。在把 MyTable 作为数据汇(输出)表时,Schema 中是不包含 offset 的。如下:

所以这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。

2.3 计算列

计算列 其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。这些列本身是没有以物理形式存储到数据源中的。

CREATE TABLE MyTable (`user_id` BIGINT,`price` DOUBLE,`quantity` DOUBLE,-- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity`cost` AS price * quanitity,
) WITH ('connector' = 'kafka'...
);

计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。

小伙伴萌这时会问到一个问题,既然只能包含列、常量或者函数计算,我就直接在 DML Query 代码中写就完事了呗,为啥还要专门在 DDL 中定义呢?

没错,如果只是简单的四则运算的话直接写在 DML 中就可以,但是 计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。比如要把输入数据的时间格式标准化。处理时间、事件时间分别举例如下:

  • 处理时间:使用 PROCTIME() 函数来定义处理时间列。
  • 事件时间:事件时间的时间戳可以在声明 Watermark 之前进行预处理。比如,如果字段不是 TIMESTAMP(3) 类型或者时间戳是嵌套在 JSON 字符串中的,则可以使用计算列进行预处理。

❗注意:和虚拟 metadata 列是类似的,计算列也是只能读不能写的。

也就是说,我们在把 MyTable 作为数据源(输入)表时,Schema 中是包含 cost 的。

在把 MyTable 作为数据汇(输出)表时,Schema 中是不包含 cost 的。

-- 当做数据源(输入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)-- 当做数据汇(输出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

3.定义 Watermark

Watermark 是在 Create Table 中进行定义的。具体 SQL 语法标准是:

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
  • rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列。
  • watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。

注意:

  • 如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。
  • Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。

Flink SQL 提供了几种 WATERMARK 生产策略:

  • 有界无序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。此类策略就可以用于设置最大乱序时间,假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND,则生成的是运行 5s 延迟的 Watermark。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。
  • 严格升序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用这种方式。如果你能保证你的数据源的时间戳是严格升序的,那就可以使用这种方式。严格升序代表 Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。
  • 递增:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。

4.Create Table With 子句

CREATE TABLE KafkaTable (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)

可以看到 DDL 中 With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的

一般 With 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的 With 配置项都是不同的。

注意:

  • Flink SQL 中 Connector 其实就是 Flink 用于链接外部数据源的接口。举一个类似的例子,在 Java 中想连接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去链接。映射到 Flink SQL 中,在 Flink SQL 中要连接到 Kafka,需要使用 kafka connector
  • Flink SQL 已经提供了一系列的内置 Connector,具体可见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。

回到上述案例中,With 声明了以下几项信息:

  • 'connector' = 'kafka':声明外部存储是 Kafka。
  • 'topic' = 'user_behavior':声明 Flink SQL 任务要连接的 Kafka 表的 topicuser_behavior
  • 'properties.bootstrap.servers' = 'localhost:9092':声明 Kafka 的 server iplocalhost:9092
  • 'properties.group.id' = 'testGroup':声明 Flink SQL 任务消费这个 Kafka topic,会使用 testGroupgroup id 去消费。
  • 'scan.startup.mode' = 'earliest-offset':声明 Flink SQL 任务消费这个 Kafka topic 会从最早位点开始消费。
  • 'format' = 'csv':声明 Flink SQL 任务读入或者写出时对于 Kafka 消息的序列化方式是 csv 格式。

从这里也可以看出来 With 中具体要配置哪些配置项都是和每种 Connector 决定的。

5.Create Table Like 子句

Like 子句是 Create Table 子句的一个延伸。

下面定义了一张 Orders 表:

CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset'
);

但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定义一张带 Watermark 的新表:

CREATE TABLE Orders_with_watermark (-- 1. 添加了 WATERMARK 定义WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (-- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;

上面这个语句的效果就等同于:

CREATE TABLE Orders_with_watermark (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH ('connector' = 'kafka','scan.startup.mode' = 'latest-offset'
);

不过这种不常使用。就不过多介绍了。如果小伙伴萌感兴趣,直接去 官网 参考具体注意事项。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655751.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue使用百度地图实现聚合的效果(vue-baidu-map)

Vue使用百度地图实现聚合的效果(vue-baidu-map) 安装插件&#xff1a;yarn add vue-baidu-map在main.js中全局引入密钥&#xff08;在百度开发者中心注册&#xff09;&#xff1a;import BaiduMap from vue-baidu-map Vue.use(BaiduMap, {ak: your_app_key // 百度地图秘钥 })…

PySimpleGUI界面读取PDF转换Excel

PySimpleGUI 是一个用于创建图形用户界面的 Python 库&#xff0c;而 PDF 文件和 Excel 文件是两种不同的数据格式。要将 PDF 文件转换为 Excel 文件&#xff0c;你需要使用额外的库&#xff0c;如 pdf2excel。 下面是一个示例流程&#xff0c;展示了如何使用 PySimpleGUI 创建…

Compose | UI组件(十一) | Spacer - 留白

文章目录 前言Spacer组件的参数说明Spacer组件的使用 总结 前言 Spacer组件是让两组件之间留有空白间隔 Spacer组件的参数说明 Spacer只有一个修饰符&#xff0c;修饰留空白的大小和比例&#xff0c;颜色 Spacer(modifier: Modifier)Spacer组件的使用 Row {Box(modifier M…

Ruby安装演示教程

当涉及到 Ruby 的安装过程时&#xff0c;一种常见的方法是通过 RVM&#xff08;Ruby Version Manager&#xff09;来进行安装和管理。以下是在 Linux 系统上使用 RVM 安装 Ruby 的步骤演示教程&#xff1a; 1、安装 RVM&#xff1a; # 在终端中执行以下命令来下载并安装 RVM…

【React教程】(2) React之JSX入门与列表渲染、条件渲染详细代码示例

目录 JSX环境配置基本语法规则在 JSX 中嵌入 JavaScript 表达式在 JavaScript 表达式中嵌入 JSXJSX 中的节点属性声明子节点JSX 自动阻止注入攻击在 JSX 中使用注释JSX 原理列表循环DOM Elements 列表渲染语法高亮 条件渲染示例1&#xff1a;示例2&#xff1a;示例3&#xff08…

Learn to Earn,Move星航计划第三期诚邀您探索编程和区块链的乐趣

*以下文章来源于MoveFuns &#xff0c;作者MoveFuns DAO 星航计划是一个 Web3 技术的公益计划,旨在引导更多的人加入开源社区,学习Move语言&#xff0c;了解Web3。本期星航计划由 MoveFuns Dao 发起&#xff0c;由Sui官方基金会支持&#xff0c;汇集了 Web3开发领域内的专业导…

FullStack之Django(1)开发环境配置

FullStack之Django(1)开发环境配置 author: Once Day date&#xff1a;2022年2月11日/2024年1月27日 漫漫长路&#xff0c;才刚刚开始… 全系列文档请查看专栏: FullStack开发_Once_day的博客-CSDN博客Django开发_Once_day的博客-CSDN博客 具体参考文档: The web framewor…

键盘记录器Python代码

键盘记录器完整代码 from pynput.keyboard import Key, Listener import logging logging.basicConfig(filename("keylog.txt"), levellogging.DEBUG, format" %(asctime)s - %(message)s") def on_press(key):logging.info(str(key)) with Listener(on_p…

mysql 正则表达式用法(一)

记录下关于mysql中regexp 正则匹配字符串的相关用法 一、匹配字符类 [:alnum:]  任意字母和数字(同[a-zA-Z0-9]) [:alpha:]  任意字符(同[a-zA-Z]) [:blank:]  空格和制表(同[\t]) [:cntrl:]  ASCII控制字符(ASCII 0到31和127) [:digit:]  任意数字(同[0-9]) [:graph:] …

Leetcode 203 移除链表元素

Leetcode 203 移除链表元素 准备工作1&#xff09;ListNode基本结构2&#xff09;初始化ListNode集合 解法一&#xff1a;遍历判定解法二&#xff1a;递归判定 Leetcode 203 移除链表元素 准备工作 1&#xff09;ListNode基本结构 public class ListNode {public int val;pu…

图灵之旅--ArrayList顺序表LinkedList链表栈Stack队列Queue

目录 线性表顺序表ArrayList简介ArrayList使用ArrayList的构造ArrayList常见操作ArrayList的遍历ArrayList的扩容机制利用ArrayList洗牌ArrayList的优缺点 链表链表的实现双向链表的实现 LinkedListLinkedList引入LinkedList的使用LinkedList的构造LinkedList的常用方法介绍Lin…

pytorch nearest upsample整数型tensor

在用 torch.nn.Upsample 给分割 label 上采样时报错&#xff1a;RuntimeError: "upsample_nearest2d_out_frame" not implemented for Long。 参考 [1-3]&#xff0c;用 [3] 给出的实现。稍微扩展一下&#xff0c;支持 h、w 用不同的 scale factor&#xff0c;并测试…

ArcGIS Pro如何新建字段

无论是地图制作还是数据分析&#xff0c;字段的操作是必不可少的&#xff0c;在某些时候现有的字段不能满足需求还需要新建字段&#xff0c;这里为大家讲解一下在ArcGIS Pro中怎么新建字段&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的水…

pytorch安装教程(Anaconda + GPU)

可以去nvidia官网更新驱动 获取下载pytorch的命令地址&#xff1a;Start Locally | PyTorch 在这里可以找到旧版本的cuda的命令&#xff1a;Previous PyTorch Versions | PyTorch 如果使用conda没有安装成功的话&#xff0c;就使用pip&#xff1a;

ToF传感器在移动机器人中的作用

原创 | 文 BFT机器人 在日新月异的机器人技术领域&#xff0c;技术的无缝整合正引领着人类与机器交互方式的革新潮流。ToF传感器作为变革性创新的一个例子&#xff0c;对移动机器人更好地感知周围环境起到了决定性的作用。 ToF传感器与激光雷达技术在创建深度图方面有着异曲同…

大模型视觉理解能力更进一步,谷歌提出全新像素级对齐模型PixelLLM

论文题目&#xff1a;Pixel Aligned Language Models 论文链接&#xff1a;https://arxiv.org/abs/2312.09237 项目主页&#xff1a;Pixel Aligned Language Models 近一段时间以来&#xff0c;大型语言模型&#xff08;LLM&#xff09;在计算机视觉领域中也取得了巨大的成功&a…

Unity 观察者模式(实例详解)

文章目录 简介示例1 - 简单的文本更新通知示例2 - 多观察者监听游戏分数变化示例3 - 事件系统实现观察者模式示例4 - 泛型观察者和可序列化的事件系统示例5 - 使用C#委托简化版 简介 在Unity中实现观察者模式&#xff0c;我们可以创建一个Subject&#xff08;目标/主题&#x…

前端面试题-js部分-数组去重-数组扁平化-伪数组转数组-面向对象的继承方式(ES5)

前端面试题-js部分-数组去重-数组扁平化-伪数组转数组-面向对象的继承方式ES5 数组去重数组扁平化伪数组转换为数组面向对象的继承方式&#xff08;ES5&#xff09; 数组去重 1.利用es6 set 去重 Set 类型不允许有值重复 let arr1 [1, 2, 4, 3, 5, 7, 1, 8, 2, 4, 9]console.…

WebRTC系列-自定义媒体数据加密

文章目录 1. 对外加密接口2. 对外加密实现前面的文章都有提过WebRTC使用的加密方式是SRTP这个库提供的,这个三方库这里就不做介绍,主要是对rtp包进行加密;自然的其调用也是WebRTC的rtp相关模块;同时在WebRTC里也提供一个自定义加密的接口,本文将围绕这个接口做介绍及分析;…

【郑益慧】模拟电子技术:7.Mos管的工作原理

Mos管的工作原理 Mos管的出现&#xff0c;几乎不怎么耗电。因此在集成电路中起了非常大的作用 在某些方面确实比晶体三极管强。 基本原理&#xff1a;依靠电场效应来控制。 电场效应几乎是没有电流的&#xff0c;没有电流几乎是没有功率的。 从控制上来说&#xff0c;消耗…