【SQL篇】一、Flink动态表与流的关系以及DDL语法

文章目录

  • 1、启动SQL客户端
  • 2、SQL客户端常用配置
  • 3、动态表和持续查询
  • 4、将流转为动态表
  • 5、用SQL持续查询
  • 6、动态表转为流
  • 7、时间属性
  • 8、DDL-数据库相关
  • 9、DDL-表相关

在这里插入图片描述

1、启动SQL客户端

启动Flink(基于yarn-session模式为例):

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

再启动sql的客户端:

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session

简单看下:

show databases;

2、SQL客户端常用配置

设置结果的显示模式,默认table,还可以设置为tableau、changelog

SET sql-client.execution.result-mode=changelog;

设置执行环境,默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度:

SET parallelism.default=1;

设置状态TTL:

SET table.exec.state.ttl=1000;

通过SQL文件初始化,可以发现,exit退出客户端时,刚创建的库表都被清空了,这个SQL初始化文件就是在启动客户端时你想执行的SQL语句

# 创建SQL文件vim conf/sql-client-init.sqlSET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;
# 启动时,-i指定SQL文件/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql

3、动态表和持续查询

和MySQL等关系型表不同的是,无限流下,会有源源不断的数据过来进入表中,即动态表,来一条数据,往表中插入一条数据。对应的,想获取最新结果就要写条SQL去不间断的查询,即持续查询(每次数据到来都会触发查询操作),持续查询的结果也是一个动态表。

关系型表流处理的动态表
处理的数据对象字段元组的有界集合字段元组的无限序列
查询时对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
查询终止条件生成固定大小的结果集后终止永不停止, 根据持续收到的数据不断更新查询结果

在这里插入图片描述

如图,持续查询的流程为:

  • 流(stream)被转换为动态表(dynamic table)
  • 对动态表进行持续查询(continuous query),生成新的动态表
  • 生成的动态表被转换成流

如此,就通过执行SQL实现了对数据流的处理。

4、将流转为动态表

把流看作一张表,来一条数据,insert一次,比如有个记录用户点击事件的无限流:

在这里插入图片描述

5、用SQL持续查询

代码中定义一条查询SQL:

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

此时,结果表(动态),可能是简单的insert,如Bob这条数据,也可能是对旧数据的更新update,如Alice,这就是更新查询。 此时,结果表转DataStream调用toChangelogStream()方法。
在这里插入图片描述

修改查询SQL,使用TUMBLE加一个开窗,每个窗口触发时,输出结果,此时对结果表就只有insert追加数据,没有update,即追加查询。

在这里插入图片描述

6、动态表转为流

仅追加(Append-only)流

动态表仅仅通过insert来修改,转为流时,对应一个仅追加的流,流中的每条数据,就是动态表的每行数据。

撤回(Retract)流

流中有添加消息add和撤回消息retract两种,对应表中:

  • insert就是add消息
  • delete就是retract
  • update就是被改行的retract+新结果的add

在这里插入图片描述

更新插入(Upsert)流

流中有更新插入消息upsert和删除消息delete两种,对应表中:

  • update和insert是upsert消息
  • delete为delete消息

在这里插入图片描述

最后,注意,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流两种。

7、时间属性

在表中加个时间字段,数据类型为TimeStamp,分为事件时间和处理时间。事件时间通过watermark语句来定义:

CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...
);# TIMESTAMP后的3为精确度

以上,ts字段为事件时间属性,且基于ts设置5s的水位线延迟,注意,延迟秒数5必须加单引号。时间戳类型需要转为秒或者毫秒时,可:

# ...
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
# ...
3即精确到毫秒

定义处理时间属性用procTime函数:

CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()
) WITH (...
);

8、DDL-数据库相关

建库:

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1=val1, key2=val2, ...)

查询所有库:

SHOW DATABASES

查当前库:

SHOW CURRENT DATABASE

修改库的某些属性:

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

删库:

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

注意,

  • RESTRICT:删除非空数据库会触发异常。默认启用
  • CASCADE:删除非空数据库也会删除所有相关的表和函数,慎用

切换当前库:

USE database_name;

9、DDL-表相关

建表:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name# 字段({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]# 定义watermark[ <watermark_definition> ][ <table_constraint> ][ , ...n])   # 注释  [COMMENT table_comment]# 分区[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]# Flink特色WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] | AS select_query ]

关于表中的字段:physical_column就是常规列。metadata_column是元数据列,可访问到数据源本身的一些元数据,必须加METADATA关键字标识,如:读取数据写入Kafka时,Kafka引擎给数据打上的时间戳标记:

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  # !!!
) WITH ('connector' = 'kafka'...
);

自定义的列名称和 Connector 中定义 metadata 字段的名称一样时,后面的FROM省略:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA  # !!!
) WITH (
'connector' = 'kafka'
...
);

自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致时,会自动强转,因此这两个类型必须可以强转:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

默认metadata_column列可读可写,加VIRTUAL表示只读:

CREATE TABLE MyTable (`timestamp` BIGINT METADATA, `offset` BIGINT METADATA VIRTUAL,  # !!!!`user_id` BIGINT,`name` STRING,
) WITH ('connector' = 'kafka'...
);

computed_column即计算列,把几列的计算结果做为新列,这在关系型SQL中一般在查询语句中完成,而不存成一个新列。

CREATE TABLE MyTable (`user_id` BIGINT,`price` DOUBLE,`quantity` DOUBLE,`cost` AS price * quanitity   # !!!
) WITH ('connector' = 'kafka'...
);

主键的定义,只支持 not enforced:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced   # !!!
) WITH (
'connector' = 'kafka'
...
);

with子句,用于指定这个表相关的外部系统的相关配置,如Kafka:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

like子句,即在现有表的基础上,创建另一种表:

CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset'
);CREATE TABLE Orders_with_watermark (-- Add watermark definitionWATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (-- Overwrite the startup-mode'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;  # !!!!

举例:新表中的value字段加偏引号是因为value和关键字冲突了

CREATE TABLE test(id INT, ts BIGINT, vc INT
) WITH (
'connector' = 'print'
);CREATE TABLE test1 (`value` STRING
)
LIKE test;

create-table-as-select,即CTAS语句,通过查询结果创建表:

CREATE TABLE my_ctas_table
WITH ('connector' = 'kafka',...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;# 注意此时不能自己来定义列

查所有表:

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

查某张表信息:

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

修改表名:

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性:

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

删表:

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink SQL 窗口聚合详解

1.滚动窗⼝&#xff08;TUMBLE&#xff09; **滚动窗⼝定义&#xff1a;**滚动窗⼝将每个元素指定给指定窗⼝⼤⼩的窗⼝&#xff0c;滚动窗⼝具有固定⼤⼩&#xff0c;且不重叠。 例如&#xff0c;指定⼀个⼤⼩为 5 分钟的滚动窗⼝&#xff0c;Flink 将每隔 5 分钟开启⼀个新…

从零开始制作一个割草机器人

项目背景 为啥要做一个割草机器人呢&#xff1f;&#xff08;个人因素&#xff1a;我梦想就是做一款人形机器人保护人类&#xff0c;解放人类&#xff09; 基础准备&#xff1a;我们公司本身做过高精度&#xff0c;基于高精度的技术扩展到农机自动化驾驶。目前可以实现AB线拖…

数字人IP为何成家电品牌年轻化营销黑马?

伴随着数字人概念的出现&#xff0c;家电品牌逐渐通过3D虚拟数字人定制&#xff0c;让数字人成为内容、变现一体的IP&#xff0c;形成一定影响力的品牌效应&#xff0c;利用长线内容沉淀粉丝&#xff0c;使品牌实现年轻化营销。 *图片源于网络 如近日在海尔智家旗下品牌发布会上…

uniapp踩坑之项目:uniapp数字键盘组件—APP端

//在components文件夹创建digitKeyboard文件夹&#xff0c;再创建digitKeyboard.vue <!-- 数字键盘 --> <template><view class"digit-keyboard"><view class"digit-keyboard_bg" tap"hide"></view><view clas…

平面扫描(Plane-sweeping)深度体会

先看文章 三维重建之平面扫描算法&#xff08;Plane-sweeping&#xff09;_plane sweeping_小玄玄的博客-CSDN博客 Plane Sweeping | 平面扫描 - 知乎 (zhihu.com) 注意平面Dm,这是其中一个平面&#xff0c;平面上有一个M点&#xff0c;这个点也再物体上。所以会被摄像机看到…

SSD-1B:Segmind的加速稳定扩散模型

Segmind 稳定扩散模型 (SSD-1B) 是稳定扩散 XL (SDXL) 缩小 50% 的精简版本&#xff0c;可提供 60% 的加速&#xff0c;同时保持高质量的文本到图像生成功能。 它已经过各种数据集的训练&#xff0c;包括 Grit 和 Midjourney scrap 数据&#xff0c;以增强其根据文本提示创建各…

docker---dockerfile相关知识

第 3 章 Docker 高级实践 在这一部分我们主要来介绍一些Docker的高级内容&#xff1a; Dockerfile 和 Docker compose 3.1 Dockerfile Dockerfile我们从下面的几个方面来介绍&#xff1a; Dockerfile简介 Dockerfile快速入门 Dockerfile详解 Dockerfile简单 实践 3.1.1 Docke…

python-opencv写入视频文件无法播放

python-opencv写入视频文件无法播放 在采用Python写OpenCV的视频时&#xff0c;生成的视频总是无法播放&#xff0c;大小只有不到两百k&#xff0c;播放器提示视频已经损坏。网上搜了一些方法&#xff0c;记录下解决办法。 代码如下 fourcc cv2.VideoWriter_fourcc(*MJPG) fp…

腾讯云CVM服务器操作系统镜像大全

腾讯云CVM服务器的公共镜像是由腾讯云官方提供的镜像&#xff0c;公共镜像包含基础操作系统和腾讯云提供的初始化组件&#xff0c;公共镜像分为Windows和Linux两大类操作系统&#xff0c;如TencentOS Server、Windows Server、OpenCloudOS、CentOS Stream、CentOS、Ubuntu、Deb…

性能工作站,双十一大促,超值推荐:蝰蛇峡谷 NUC12SNKi7迷你主机,优惠抢购!

近年来&#xff0c;ITX主机和小型化系统变得越来越受欢迎。英特尔的NUC受到许多玩家们的关注。作为mini主机的代表NUC小巧设计和灵活性使它成为很多玩家和科技爱好者的选择。它的高性能和可玩性使得它在迷你型准系统市场上备受推崇。双11来临之际&#xff0c;我们分析下哪款高性…

世微LED 大功率升压恒流驱动芯片 平板显示LED背光板灯串恒流控制器 AP9193

概述 AP9193 是一款高效率、高精度的升 压型大功率 LED 灯恒流驱动控制芯片。 AP9193 内置高精度误差放大器&#xff0c;固 定关断时间控制电路&#xff0c;恒流驱动电路等&#xff0c; 特别适合大功率、多个高亮度 LED 灯的串 恒流驱动。 AP9193 采用固定关断时间的控制方 式…

产业园区中工业厂房的能源综合配置——工业园区综合能源数字化系统建设方案

以下内容转自微信公众号&#xff1a;PPP产业大讲堂&#xff0c;《产业园区中工业厂房的能源综合配置》。 园区工业地产中能源综合配置存在的问题 我国园区工业地产建设已历经近40年的发展, 园区在区域经济发展、产业集聚方面发挥了重要的载体和平台作用, 有力推动了我国社会经…

聊一聊关于手机Charge IC的电流流向

关于手机Charge&#xff0c;小白在以前的文章很少讲&#xff0c;一是这部分东西太多&#xff0c;过于复杂。二是总感觉写起来欠缺点什么。但后来想一想&#xff0c;本是抱着互相学习来写文章的心理态度&#xff0c;还是决定尝试写一些。 关于今天要讲的关于手机Charge的内容&a…

史上最全Windows安全工具汇总

史上最全Windows安全工具锦集来源于网络整理&#xff0c;安全性自测。 下载方式&#xff1a;史上最全Windows安全工具汇总

shiro 框架使用学习

简介 Shiro安全框架是Apache提供的一个强大灵活的安全框架Shiro安全框架提供了认证、授权、企业会话管理、加密、缓存管理相关的功能&#xff0c;使用Shiro可以非常方便的完成项目的权限管理模块开发 Shiro的整体架构 1、Subject ​ Subject即主体&#xff08;可以把当前用户…

Mysql Cluster (NDB - Network Database) - 分布式

Mysql高可用架构 复制&#xff08;Replication&#xff09; 是本文中所有 MySQL 技术的基础。包括&#xff1a;异步复制、半同步复制&#xff0c;增强半同步复制。InnoDB 副本集&#xff08;MySQL InnoDB ReplicaSet&#xff09; 无缝衔接其他 MySQL 官方提供的应用程序&#…

没有MES管理系统,先用数据采集设备能有用吗

在当前的数字化时代&#xff0c;企业纷纷意识到了数字化转型的重要性。数据被誉为新型生产要素&#xff0c;对于企业的运营和决策具有至关重要的作用。在数字化转型的过程中&#xff0c;许多企业面临着一个共同的问题&#xff1a;如何获取所需的数据&#xff1f; 有两家企业在…

偶数矩阵判断【C语言作业】

题目 若一个布尔矩阵所有行和所有列的和都是偶数&#xff0c;则称为偶数矩阵。请编写一个程序&#xff0c;判断一个布尔矩阵是否是偶数矩阵。 要求&#xff1a; &#xff08;1&#xff09;输入:首先输入一个正整数n(n<100),代表该矩阵的大小&#xff0c;接下来是n行n列的矩…

TCP/IP协议群

TCP/IP协议群 什么是TCP/IP协议群 从字面意义上讲&#xff0c;有人可能会认为 TCP/IP 是指 TCP 和 IP 两种协议。实际生活当中有时也确实就是指这两种协议。然而在很多情况下&#xff0c;它只是利用 IP 进行通信时所必须用到的协议群的统称。具体来说&#xff0c;IP 或 ICMP、…

微信小程序 uCharts的使用方法

一、背景 微信小程序项目需要渲染一个柱状图&#xff0c;使用uCharts组件完成 uCharts官网指引&#x1f449;&#xff1a;uCharts官网 - 秋云uCharts跨平台图表库 二、实现效果 三、具体使用 进入官网查看指南&#xff0c;有两种方式进行使用&#xff1a;分别是原生方式与组…