Flink流批一体计算(21):Flink SQL之Flink DDL

目录

执行 CREATE 语句

Python脚本

Java代码

SQL语句

列定义

物理/常规列

元数据列

计算列

WATERMARK

PRIMARY KEY

PARTITIONED BY

AS select_statement


Flink SQL是为了简化计算模型、降低您使用Flink门槛而设计的一套符合标准SQL语义的开发语言。

执行 CREATE 语句

Python脚本
table_env = TableEnvironment.create(...)# 对已经注册的表进行 SQL 查询# 注册名为 “Orders” 的表table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");# 在表上执行 SQL 查询,并把得到的结果作为一个新的表result = table_env.sql_query("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");# 对已注册的表进行 INSERT 操作# 注册 TableSinktable_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")# 在表上执行 INSERT 语句并向 TableSink 发出结果table_env \.execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Java代码
Environmentsettings settings = Environmentsettings.newInstance()...TableEnvironment tableEnv = TableEnvironment.create(settings);// 对已注册的表进行 SQL 查询// 注册名为 “Orders” 的表tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");// 在表上执行 SQL 查询,并把得到的结果作为一个新的表Table result = tableEnv.sqlQuery("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");// 对已注册的表进行 INSERT 操作// 注册 TableSinktableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");// 在表上执行 INSERT 语句并向 TableSink 发出结果tableEnv.executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

SQL语句
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);[INFO] Table has been created.Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);[INFO] Table has been created.Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';[INFO] Submitting SQL update statement to the cluster...

列定义

物理/常规列

物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效数据。其他类型的列可以在物理列之间声明,但不会影响最终的物理模式。

以下语句创建一个仅包含常规列的表:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING

) WITH (

  ...

);

元数据列

元数据列是SQL标准的扩展,元数据列由metadata关键字指示。例如,可以使用元数据列从Kafka记录读取时间戳,并将时间戳写入Kafka,以进行基于时间的操作。连接器和格式文档列出了每个组件的可用元数据字段。但是,在表的模式中声明元数据列是可选的。

以下语句创建一个表,其中包含引用元数据字段时间戳的附加元数据列:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp

) WITH (

  'connector' = 'kafka'

  ...

);

每个元数据字段都由基于字符串的键标识,并具有文档化的数据类型。例如,Kafka连接器公开了一个元数据字段,该字段具有键时间戳和数据类型timestamp_LTZ3),可用于读取和写入记录。

在上面的示例中,元数据列record_time成为表模式的一部分,可以像常规列一样进行转换和存储,为方便起见,如果列名应用作标识元数据键,则可以省略FROM子句。

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key

) WITH (

  'connector' = 'kafka'

  ...

);

计算列

计算列是使用语法column_name AS Computed_column_expression生成的虚拟列。

计算列计算可以引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不物理存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。

例如,计算列可以定义为

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `price` DOUBLE,

  `quantity` DOUBLE,

  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries

) WITH (

  'connector' = 'kafka'

  ...

);

Flink中通常使用计算列来定义CREATETABLE语句中的时间属性。

使用系统的PROCTIME()函数,可以通过proc AS PROCIME()轻松定义processing time属性。

Event time属性时间戳可以在WATERMARK声明之前预处理。例如,如果原始字段不是TIMESTAMP3)类型或嵌套在JSON字符串中,则可以使用计算列。

WATERMARK

WATERMARK 定义了表的Event time属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记event time的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。

Flink 提供了几种常用的 watermark 策略。

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

发出到目前为止已观察到的最大时间戳减 1 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。

  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 秒延迟的 watermark 策略。

CREATE TABLE Orders (

    `user` BIGINT,

    product STRING,

    order_time TIMESTAMP(3),

    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

) WITH (......);

PRIMARY KEY

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。

Notes:  CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

AS select_statement

表也可以通过一个 CTAS 语句中的查询结果来创建并填充数据,CTAS 是一种简单、快捷的创建表并插入数据的方法。

CTAS 有两个部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查询 CREATE 部分从 SELECT 查询中获取列信息,并创建目标表。  CREATE TABLE 类似,CTAS 要求必须在目标表的 WITH 子句中指定必填的表属性。

CTAS 的建表操作需要依赖目标 Catalog。比如,Hive Catalog 会自动在 Hive 中创建物理表。但是基于内存的 Catalog 只会将表的元信息注册在执行 SQL Client 的内存中。

CREATE TABLE my_ctas_table

WITH (

    'connector' = 'kafka',

    ...

)

AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意 CTAS 有如下约束:

  • 暂不支持创建临时表。
  • 暂不支持指定列信息。
  • 暂不支持指定 Watermark。
  • 暂不支持创建分区表。
  • 暂不支持主键约束。

注意 目前,CTAS 创建的目标表是非原子性的,如果在向表中插入数据时发生错误,该表不会被自动删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/180184.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

freerots启动过程分析(qemu仿真RISC-V架构为例)

1、前言 本文是基于qemu上virt板子适配的freertos系统源码进行讲解qemu安装可参考博客:《qemu源码下载和安装》;freertos移植到qemu上运行可参考博客:《移植freertos到qemu上运行》; 2、汇编代码部分 汇编文件:FreeR…

集成学习的两种常见策略:bagging VS. boosting

chatGPT回答,记在这里。 集成学习是一种通过组合多个弱学习器来构建一个更强大的学习器的方法。其中,bagging和boosting是两种常见的集成学习策略。 一、bagging & boosting 简介 Bagging(自助聚集法): Bagging…

Web框架与Django路由层

Web框架 一 web框架 Web框架(Web framework)是一种开发框架,用来支持动态网站、网络应用和网络服务的开发。这大多数的web框架提供了一套开发和部署网站的方式,也为web行为提供了一套通用的方法。web框架已经实现了很多功能&…

golang面试题:reflect(反射包)如何获取字段tag​?为什么json包不能导出私有变量的tag?

问题 json包里使用的时候,会结构体里的字段边上加tag,有没有什么办法可以获取到这个tag的内容呢? 举例 tag信息可以通过反射(reflect包)内的方法获取,通过一个例子加深理解。 package mainimport ("…

基于单片机的智能饮水机控制系统(论文+源码)

1. 系统设计 本次智能饮水机控制系统的设计研究一款以STC89C52单片机为核心的智能饮水机控制系统,其主要功能设计如下: 1.该饮水机利用DS18B20数字温度传感器实时采集饮水机内水的温度,其检测温度范围为0-100℃,精度0.1℃&#…

拆解按摩器:有意思的按键与LED控制电路,学习借鉴一下!

拆解 外观和配色个人感觉还行,比较青春 拉开拉链,拆开外面的布面,里面还有一层纱面 按键部分使用魔术贴固定 拆开纱面后,看到里面的结构,整体是一个海绵 可以看到如下,电池,按键板,充电线的三条…

Java 设计模式——建造者模式

目录 1.概述2.结构3.实例3.1.产品类3.2.抽象建造者类3.3.具体建造者类3.4.指挥者类3.5.测试 4.优缺点5.使用场景6.模式扩展7.创建者模式对比 1.概述 建造者模式 (Builder Pattern) 是一种创建型设计模式,用于创建复杂对象。它将对象的构建过程分离成独立的部分&…

单片机开发常见问题集合

文章目录 发送串口数据偶尔丢失字节 发送串口数据偶尔丢失字节 场景: 在STM32单片机中进行串口数据发送,在Linux/Windows上进行串口数据接收,会偶发出现接收到的数据有某些字节丢失。 分析: 在STM32中可以使用printf用于发送串口…

前端 | iframe框架标签应用

文章目录 📚嵌入方式📚图表加载显示📚100%嵌入及滑动条问题📚加载动画保留 前情提要: 计划用iframe把画好的home1.html(echarts各种图表组成的html数据大屏)嵌入整合到index.html(搭…

快速筛出EXCEL行中的重复项

比如A列是一些恶意IP需要导入防火墙,但包括一些重复项,为不产生错误,需要把重复项筛出来: 1、给A列排序,让重复项的内容排在相邻的行 2、在B列中写一个条件函数:IF(A1A2,1,0),然后下拉至行尾完成…

java设计模式 开闭原则

开闭原则(Open-Closed Principle,OCP)是面向对象设计中的一个重要原则,它指导着我们如何设计和组织代码,以便使系统在扩展性和可维护性方面更加优秀。 开闭原则的定义是:软件实体(类、模块、函数…

ESP32-Web-Server 实战编程-通过网页控制设备的 GPIO

ESP32-Web-Server 实战编程-通过网页控制设备的 GPIO 概述 前述博客讲解了 Web 编程的基本知识,包括 HTML、CSS、JavaScript 三个部分,从这节开始,我们进入实战部分,在实际项目中进一步学习 ESP32-Web 编程。 GPIO &#xff08…

WebGL笔记:图形旋转的原理和实现

旋转 1 )旋转的概念 三维物体的旋转要比位移复杂一点,三维物体的旋转需要满足以下条件: 旋转轴旋转方向旋转角度 场景举例 模型站在旋转轴的起点进行旋转模型要往左转还是往右转,就是旋转的方向模型旋转的大小就是旋转角度 2 &…

人工智能_AI服务器安装清华开源_CHATGLM大语言模型_GLM-6B安装部署_人工智能工作笔记0092

看到的这个开源的大模型,很牛,~关键让我们自己也可以部署体验一把了,虽然不知道具体内部怎么构造的但是,也可以自己使用也挺好. 可以部署在自己的机器上也可以部署在云服务器上. 安装以后,是可以使用python代码进行提问,然后返回结果的,这样就可以实现我们自己的chat应用了, …

仿美团外卖源码/在线外卖平台源码PHP/支持多商户+多样化配送费+本土外卖+支持第三方配送

源码简介: 进云仿美团外卖源码,作为外卖平台源码,它不仅支持多商户、多样化配送费、本土外卖,还支持第三方配送。 进云仿美团外卖源码是一个进云源生插件,支持多商户多样化配送费模式本土外卖平台支持第三方配送&…

excel表格在线编辑(开源版)

文章目录 前言一、Luckysheetvue3vite 例子如有启发,可点赞收藏哟~ 前言 本文记录好用的开源在线表格 具体如图显示 另外记录下更名后的univer~,如下图(有兴趣可自行详细了解) univer 在线思维导图 一、Luckysheet 参考git…

Java Web基础教程

Java Web基础教程 1. Servlet基础 1.1 什么是Servlet Servlet是JavaEE中的标准组件之一,专门用于处理客户端的HTTP请求。并且它必须依赖于Servlet容器(Tomcat就是一个标准的Servlet容器)才能运行。因为Servlet实例的创建和销毁都是由容器负…

蓝桥杯day02——移动机器人

1.题目 有一些机器人分布在一条无限长的数轴上,他们初始坐标用一个下标从 0 开始的整数数组 nums 表示。当你给机器人下达命令时,它们以每秒钟一单位的速度开始移动。 给你一个字符串 s ,每个字符按顺序分别表示每个机器人移动的方向。L 表…

基于Vue+SpringBoot的个人健康管理系统

项目编号: S 040 ,文末获取源码。 \color{red}{项目编号:S040,文末获取源码。} 项目编号:S040,文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 健康档案模块2.2 体检档案模块2.3 健…

C语言第三十六弹--实现转移表的多种方法

使用C语言通过多种方法实现转移表 方法一、普通法 思路:如图实现多种操作,首先创建菜单,需要运行一次再判断条件,所以通过do{}while(); 循环来实现多次。有多种选择,使用switch case选择语句,再在对应case…