Flink sql 1.17笔记

环境准备

# 启动hadoop集群
# 启动Flink yarn session
(base) [link999@hadoop102 flink-1.17.0]$ bin/yarn-session.sh -d# 启动finksql客户端
(base) [link999@hadoop102 flink-1.17.0]$ bin/sql-client.sh -s yarn-session# 如果有初始化文件
bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
  • 检查State是RUNNING 后启用Flink sql客户端在这里插入图片描述在这里插入图片描述

常用配置

结果显示模式
#默认table,还可以设置为tableau、changelog
SET sql-client.execution.result-mode=tableau;执行环境
SET execution.runtime-mode=streaming; #默认streaming,也可以设置batch默认并行度
SET parallelism.default=1;设置状态TTL
SET table.exec.state.ttl=1000;通过sql文件初始化
(1)创建sql文件
vim conf/sql-client-init.sqlSET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;(2)启动时,指定sql文件
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
  • 初始化文件
SET sql-client.execution.result-mode=tableau;create DATABASE db_flink;
use db_flink;CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000', 'fields.vc.kind'='random', 'fields.vc.min'='2', 'fields.vc.max'='100'
);CREATE TABLE sink (id INT, ts BIGINT, vc INT
) WITH (
'connector' = 'print'
);

流处理中的表

在这里插入图片描述

将动态表转为流

  • 仅追加流
  • 撤回流(先删再插)
  • 更新流

时间属性

  • 事件时间
    在这里插入图片描述
 ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒(BIGINT 类型),这种情况可以通过如下方式转换
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  • 处理时间
  • 在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。
    在创建表的DDL(CREATE TABLE语句)中,可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。
CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()
) WITH (...
);

数据库相关操作

1)创建数据库
(1)语法
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1=val1, key2=val2, ...)(2)案例
CREATE DATABASE db_flink;2)查询数据库
(1)查询所有数据库
SHOW DATABASES(2)查询当前数据库
SHOW CURRENT DATABASE3)修改数据库
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)4)删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
RESTRICT:删除非空数据库会触发异常。默认启用
CASCADE:删除非空数据库也会删除所有相关的表和函数。
DROP DATABASE db_flink2;5)切换当前数据库
USE database_name;

表相关操作

创建表 语法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n][ <watermark_definition> ][ <table_constraint> ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] | AS select_query ]
  • 创建简单表,在控制台输出
CREATE table test(id int,ts bigint,vc int
) with ('connector' = 'print');
  • 元数据列
'recore_time' timestamp_ltz(3) metadata from 'timestamp'如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样, FROM xxx 子句可省略
`timestamp` TIMESTAMP_LTZ(3) METADATA可以使用VIRTUAL关键字排除元数据列的持久化(表示只读)`offset` BIGINT METADATA VIRTUAL,
  • 计算列
`cost` AS price * quanitity
  • watermark

有界无序: WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘string’ timeUnit 。

  • 其他方式创建表
CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset'
);CREATE TABLE Orders_with_watermark (-- Add watermark definitionWATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (-- Overwrite the startup-mode'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
查询表
查看表
(1)查看所有表
SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
如果没有指定数据库,则从当前数据库返回表。
LIKE子句中sql pattern的语法与MySQL方言的语法相同:
	%匹配任意数量的字符,甚至零字符,\%匹配一个'%'字符。
	_只匹配一个字符,\_只匹配一个'_'字符(2)查看表信息
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name3)修改表
(1)修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
(2)修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)4)删除表
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

查询

自动化生成数据
1)创建数据生成器源表
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);CREATE TABLE sink (id INT, ts BIGINT, vc INT
) WITH (
'connector' = 'print'
);2)查询源表
select * from source3)插入sink表并查询
INSERT INTO sink select  * from source;
select * from sink;
with字句
with source_tmp as (select id, id+10 as new_idfrom source)select id, new_id from source_tmp;
SELECT & WHERE 子句
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)SELECT vc + 10 FROM source WHERE id >10
SELECT DISTINCT 子句
  • 注意TTL大小的设置
SELECT DISTINCT vc FROM source;
分组聚合
  • SUM()、MAX()、MIN()、AVG()以及COUNT()
  • 这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。
CREATE TABLE source1 (
dim STRING,
user_id BIGINT,
price BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);CREATE TABLE sink1 (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);insert into sink1
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source1
group by
dim,
-- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, 
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
  • 多维分析
    Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:
SELECTsupplier_id
, rating
, product_id
, COUNT(*)
FROM (
VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4)
)
-- 供应商id、产品id、评级
AS Products(supplier_id, product_id, rating)  
GROUP BY GROUPING SETS((supplier_id, product_id, rating),(supplier_id, product_id),(supplier_id, rating),(supplier_id),(product_id, rating),(product_id),(rating),()
);
分组窗口聚合
  • 从1.13版本开始,分组窗口聚合已经标记为过时,鼓励使用更强大、更有效的窗口TVF聚合
  • SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。
  • tumble(time_attr, interval):滚动窗口
  • hop(time_attr, interval, interval):滑动窗口
  • session(time_attr, interval):会话窗口
1)	准备数据
CREATE TABLE ws (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.id.min' = '1','fields.id.max' = '3','fields.vc.min' = '1','fields.vc.max' = '100'
);2)滚动窗口示例(时间属性字段,窗口长度)
select  
id,
TUMBLE_START(et, INTERVAL '5' SECOND)  wstart,
TUMBLE_END(et, INTERVAL '5' SECOND)  wend,
sum(vc) sumVc
from ws
group by id, TUMBLE(et, INTERVAL '5' SECOND);3)滑动窗口(时间属性字段,滑动步长,窗口长度)
select  
id,
HOP_START(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND)   wstart,
HOP_END(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND)  wend,sum(vc) sumVc
from ws
group by id, HOP(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND);4)会话窗口(时间属性字段,会话间隔)
select  
id,
SESSION_START(et, INTERVAL '5' SECOND)  wstart,
SESSION_END(et, INTERVAL '5' SECOND)  wend,
sum(vc) sumVc
from ws
group by id, SESSION(et, INTERVAL '5' SECOND);
窗口表值函数(TVF)聚合
  • 对于窗口表值函数,窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end
  • 语法
FROM TABLE(
窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…)
)
GROUP BY [window_start,][window_end,] --可选
1)滚动窗口
SELECT 
window_start, 
window_end, 
id , SUM(vc) 
sumVC
FROM TABLE(TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, id;2)	滑动窗口
要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)
SELECT window_start, window_end, id , SUM(vc) sumVC
FROM TABLE(HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, id;3)累积窗口
累积窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。SELECT 
window_start, 
window_end, 
id , 
SUM(vc) sumVC
FROM TABLE(CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS))
GROUP BY window_start, window_end, id;

在这里插入图片描述

4)grouping sets多维分析
SELECT 
window_start, 
window_end, 
id , 
SUM(vc) sumVC
FROM TABLE(TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end,
rollup( (id) )
--  cube( (id) )
--  grouping sets( (id),()  )
;

在这里插入图片描述

Over 聚合
  • ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序
  • (1)按照时间区间聚合
-- 统计每个传感器前10秒到现在收到的水位数据条数。
SELECT id, et, vc,count(vc) OVER (PARTITION BY idORDER BY etRANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS cnt
FROM ws

在这里插入图片描述

--也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口,可以多次使用:
SELECT id, et, vc,
count(vc) OVER w AS cnt,
sum(vc) OVER w AS sumVC
FROM ws
WINDOW w AS (PARTITION BY idORDER BY etRANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
);

在这里插入图片描述

  • (2)按照行数聚合
统计每个传感器前5条到现在数据的平均水位
SELECT id, et, vc,avg(vc) OVER (PARTITION BY idORDER BY etROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS avgVC
FROM ws也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:
SELECT id, et, vc,
avg(vc) OVER w AS avgVC,
count(vc) OVER w AS cnt
FROM ws
WINDOW w AS (PARTITION BY idORDER BY etROWS BETWEEN 5 PRECEDING AND CURRENT ROW
)

在这里插入图片描述

特殊语法 —— TOP-N
1)语法
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
select id,et,vc,rownum
from 
(select id,et,vc,row_number() over(partition by id order by vc desc ) as rownumfrom ws
)
where rownum<=3;

在这里插入图片描述

特殊语法 —— Deduplication去重
  • 去重,也即上文介绍到的TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其排序字段一定是时间属性列,可以降序,不能是其他非时间属性的普通列。
1)语法
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
2)案例
对每个传感器的水位值去重
select id,et,vc,rownum
from 
(select id,et,vc,row_number() over(partition by id,vc order by et ) as rownumfrom ws
)
where rownum=1;

在这里插入图片描述

联结(Join)查询
  • Regular Join:

  • Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]

  • Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]

  • Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反

  • Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]

  • Regular Join 的注意事项: 实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联

--数据准备
CREATE TABLE ws (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.id.min' = '1','fields.id.max' = '3','fields.vc.min' = '1','fields.vc.max' = '100'
);CREATE TABLE ws1 (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.id.min' = '3','fields.id.max' = '5','fields.vc.min' = '1','fields.vc.max' = '100'
);
--等值内连接(INNER Equi-JOIN)
SELECT *
FROM ws
INNER JOIN ws1
ON ws.id = ws1.id--等值外连接
SELECT *
FROM ws
LEFT JOIN ws1
ON ws.id = ws1.idSELECT *
FROM ws
RIGHT JOIN ws1
ON ws.id = ws1.idSELECT *
FROM ws
FULL OUTER JOIN ws1
ON ws.id = ws.id
  • 间隔联结查询:除了常规的联结条件外,还多了一个时间间隔的限制
  • (1)ltime = rtime
    (2)ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
    (3)ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND
SELECT *
FROM ws,ws1
WHERE ws.id = ws1. id
AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND 

在这里插入图片描述

  • 维表联结查询
  • Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找
  • 上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段。
表A
JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名
ON xx.字段=别名.字段
CREATE TABLE Customers (id INT,name STRING,country STRING,zip STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://hadoop102:3306/customerdb','table-name' = 'customers'
);-- order表每来一条数据,都会去mysql的customers表查找维度数据SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;
Order by 和 limit
  • 支持 Batch\Streaming,但在实时任务中一般用的非常少。
    实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。
SELECT *
FROM ws
ORDER BY et, id desc;SELECT *
FROM ws
LIMIT 3;

在这里插入图片描述
在这里插入图片描述

SQL Hints
  • 在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;
集合操作
1UNIONUNION ALL
UNION:将集合合并并且去重
UNION ALL:将集合合并,不做去重。(SELECT id FROM ws) UNION (SELECT id FROM ws1);
(SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);2IntersectIntersect All
Intersect:交集并且去重
Intersect ALL:交集不做去重(SELECT id FROM ws) INTERSECT (SELECT id FROM ws1);
(SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);3ExceptExcept All
Except:差集并且去重
Except ALL:差集不做去重(SELECT id FROM ws) EXCEPT (SELECT id FROM ws1);
(SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);4In 子查询
In 子查询的结果集只能有一列
SELECT id, vc
FROM ws
WHERE id IN (
SELECT id FROM ws1
)
系统函数
  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/
Module操作
  • Module 允许 Flink 扩展函数能力。目前 Flink 包含了以下三种 Module:

  • CoreModule:CoreModule 是 Flink 内置的 Module,Flink 默认开启的 Module 就是 CoreModule。

  • HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)

  • 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module

1)语法
-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name-- 查看
SHOW MODULES;
SHOW FULL MODULES;--Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:
USE MODULE hive,core;
需要先引入 hive 的 connector。
(1)上传jar包到flink的lib中
上传hive connector
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/注意:拷贝hadoop的包,解决依赖冲突问题
cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/2)重启flink集群和sql-client
(3)加载hive module
-- hive-connector内置了hive module,提供了hive自带的系统函数,注意hive的版本
load module hive with ('hive-version'='3.1.2');
show modules;
show functions;-- 可以调用hive的split函数
select split('a,b', ',');

在这里插入图片描述

常用 Connector 读写

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/overview/
kafka
  • 1、下载kafka的jar包
  • 2、上传到目录 /opt/module/flink-1.17.0/lib/
  • 3、 重启yarn-session、sql-client
    在这里插入图片描述
  • 普通kafka表
1)创建Kafka的映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'hadoop103:9092','properties.group.id' = 'atguigu',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed', --每个 Flink partition 最终对应最多一个 Kafka partition。'topic' = 'ws1','format' = 'json'
)2)插入Kafka表
insert into t1(id,ts,vc) select * from source;3)查询Kafka表
select * from t1;

在这里插入图片描述
在这里插入图片描述

  • upsert-kafka表
  • 如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
    Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
1)创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'hadoop102:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);2)插入upsert-kafka表
insert into t2 select  id,sum(vc) sumVC  from source group by id;3)查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。
select * from t2;

在这里插入图片描述

File
上传所需jar包到lib下
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/
cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/2)更换planner依赖
只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。
mv /opt/module/flink-1.17.0/opt/flink-table-planner_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/flink-table-planner_2.12-1.17.0.jarmv /opt/module/flink-1.17.0/lib/flink-table-planner-loader-1.17.0.jar /opt/module/flink-1.17.0/opt/flink-table-planner-loader-1.17.0.jar
1)创建FileSystem映射表
CREATE TABLE t3( id int, ts bigint , vc int )
WITH ('connector' = 'filesystem','path' = 'hdfs://hadoop102:8020/data/t3','format' = 'csv'
);2)写入
insert into t3 select * from source;3)查询
select * from t3 where id = 1;

在这里插入图片描述
在这里插入图片描述

JDBC(MySQL)
  • Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。
  • 在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。
  • 上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:
    flink-connector-jdbc-3.1.0-1.17.jar
    mysql-connector-j-8.0.31.jar
1)mysql的test库中建表
CREATE TABLE `ws2` (`id` int(11) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf82)创建JDBC映射表
CREATE TABLE t4
(id                      INT,ts                   BIGINT,
vc                     INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector'='jdbc','url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = '000000','connection.max-retry-timeout' = '60s','table-name' = 'ws2','sink.buffer-flush.max-rows' = '500','sink.buffer-flush.interval' = '5s','sink.max-retries' = '3','sink.parallelism' = '1'
);3)查询
select * from t4;4)写入
insert into t4 select * from source;

在这里插入图片描述

savepoint

  • 1、 先提交一个insert作业
1)创建Kafka的映射表
CREATE TABLE tc( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'hadoop103:9092','properties.group.id' = 'atguigu',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed', --每个 Flink partition 最终对应最多一个 Kafka partition。'topic' = 'ws2','format' = 'json'
);2)插入Kafka表
insert into tc(id,ts,vc) select * from source;3)查看job ID
show jobs;

在这里插入图片描述

  • 2、停止作业,触发savepoint
SET state.checkpoints.dir='hdfs://hadoop102:8020/check';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';STOP JOB 'e56fe0bd5df28b07b50181a9de985171' WITH SAVEPOINT;

在这里插入图片描述
在这里插入图片描述

  • 3、从savepoint恢复
-- 设置从savepoint恢复的路径, 之后直接提交sql,就会从savepoint恢复
SET execution.savepoint.path=' hdfs://hadoop102:8020/sp/savepoint-e56fe0-be00e39fac52';  --允许跳过无法还原的保存点状态
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 
  • 4、恢复后重置路径
指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。RESET execution.savepoint.path;如果出现reset没生效可以重启sql-client

在这里插入图片描述

catalog

  • Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
catalog类型
  • 基于内存实现的 Catalog,所有元数据只在session 的生命周期内可用。默认自动创建,会有名为“default_catalog”的内存Catalog,这个Catalog默认只有一个名为“default_database”的数据库,会区分大小写。
  • JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
  • HiveCatalog:有两个用途,一是单纯作为 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称。
  • 用户自定义 Catalog
JdbcCatalog(MySQL)
  • JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。
  • 创建Catalog
    JdbcCatalog支持以下选项:
     name:必需,Catalog名称。
     default-database:必需,连接到的默认数据库。
     username: 必需,Postgres/MySQL帐户的用户名。
     password:必需,该帐号的密码。
     base-url:必需,数据库的jdbc url(不包含数据库名)
    对于Postgres Catalog,是"jdbc:postgresql://:<端口>"
    对于MySQL Catalog,是"jdbc: mysql://:<端口>"
  • jar包 依赖等配置前面已经使用过,此处省略
1)创建catalog
CREATE CATALOG my_jdbc_catalog WITH('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = '000000','base-url' = 'jdbc:mysql://hadoop102:3306'
);2)查看Catalog
SHOW CATALOGS;
--查看当前的CATALOG
SHOW CURRENT CATALOG;3)使用指定Catalog
USE CATALOG my_jdbc_catalog;4)查看当前的CATALOG
SHOW CURRENT CATALOG;

在这里插入图片描述

  • 对当前jdbc进行查询写入等操作
    在这里插入图片描述
    在这里插入图片描述
hivecatalog
  • jar包 依赖等配置前面已经使用过,此处省略

  • 启动外置的hive metastore服务

[link999@hadoop102 hive]$ hive --service metastore &查看
netstat -anp|grep 9083
ps -ef|grep -i metastore
  • 创建hivecatalog
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'default','hive-conf-dir' = '/opt/module/hive/conf'
);查看Catalog
SHOW CATALOGS;--查看当前的CATALOG
SHOW CURRENT CATALOG;使用指定Catalog
USE CATALOG myhive;--查看当前的CATALOG
SHOW CURRENT CATALOG;

在这里插入图片描述

  • 读写Hive表
SHOW DATABASES; -- 可以看到hive的数据库USE test;  -- 可以切换到hive的数据库SHOW TABLES; -- 可以看到hive的表SELECT * from ws; --可以读取hive表INSERT INTO ws VALUES(1,1,1); -- 可以写入hive表

IDEA中编写FlinkSQL

需要引入的依赖
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
创建表、查询表、写入表

“表环境”主要负责:
(1)注册Catalog和表;
(2)执行 SQL 查询;
(3)注册用户自定义函数(UDF);
(4)DataStream 和表之间的转换。

public class sqldemo {public static void main(String[] args) throws Exception {// 创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建表环境StreamTableEnvironment TableEnv = StreamTableEnvironment.create(env);// 创建表TableEnv.executeSql("CREATE TABLE source ( \n" +"    id INT, \n" +"    ts BIGINT, \n" +"    vc INT\n" +") WITH ( \n" +"    'connector' = 'datagen', \n" +"    'rows-per-second'='1', \n" +"    'fields.id.kind'='random', \n" +"    'fields.id.min'='1', \n" +"    'fields.id.max'='5', \n" +"    'fields.ts.kind'='sequence', \n" +"    'fields.ts.start'='1', \n" +"    'fields.ts.end'='10', \n" +"    'fields.vc.kind'='random', \n" +"    'fields.vc.min'='1', \n" +"    'fields.vc.max'='50'\n" +");");TableEnv.executeSql("CREATE TABLE sink (\n" +"    id INT, \n" +"    ts BIGINT, \n" +"    vc INT\n" +") WITH (\n" +"'connector' = 'print'\n" +");");// 查询sqlTable table = TableEnv.sqlQuery("select id, min(vc), max(vc) from source group by id;");// 把table对象,注册成表名(创建了一个“虚拟表”)TableEnv.createTemporaryView("tmp", table);TableEnv.sqlQuery("select * from tmp;");// 输出表 sql用法TableEnv.executeSql("insert into sink select * from tmp");// 用table api来查询
//        Table source = TableEnv.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sumVC"))
//                .select($("id"), $("sumVC"));
//
//        // 输出表 table api用法
//        result.executeInsert("sink");}
}
流转表、表转流
public class tablestreamdemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> sensorDS = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s2", 2L, 2),new WaterSensor("s2", 3L, 3),new WaterSensor("s5", 5L, 5),new WaterSensor("s5", 2L, 2));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 流转表Table sensorTable = tableEnv.fromDataStream(sensorDS);Table sensorTable1 = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));tableEnv.createTemporaryView("sensor", sensorTable);Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where vc > 2;");Table sumTable = tableEnv.sqlQuery("select id, sum(vc) from sensor group by id;");// 表转流// 追加流tableEnv.toDataStream(filterTable).print("filter0");tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");// changelog流tableEnv.toChangelogStream(sumTable).print("sum");// 只要代码中调用了datastreamAPI,就需要execute,否则不需要env.execute();}
}
自定义函数UDF
  • 标量函数(Scalar Functions),一进一出

  • 表函数(Table Functions),一进多出

  • 聚合函数

  • 表聚合函数

问题记录

  • 1、TableAlreadyExistException :表已经存在
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.TableAlreadyExistException: Table (or view) myflink.sink already exists in Catalog default_
catalog.
  • 2、java.net.ConnectException: 拒绝连接
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: 拒绝连接

重启Flink 集群:(base) [link999@hadoop102 flink-1.17.0]$ bin/yarn-session.sh -d

  • 3、资源不足,可以把之前跑过的job关掉,释放出资源重新跑
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.End of exception on server side>]
    在这里插入图片描述

  • 4、sql语法问题

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "\'CONNECTor\'" at line 9, column 3.
Was expecting one of:<EOF> "AS" ..."LIKE" ...";" ...
  • 5、缺少依赖或依赖冲突在这里插入图片描述
  • 6、 select * from t3 where id = ‘1’;因为ID是int类型的,不能加引号,去掉引号即可
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: implicit type conversion between INTEGER and CHAR is not supported now

感谢

  • 本文是尚硅谷Flink1.17的学习笔记,非常感谢尚硅谷的课程,本文用于学习和查阅。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/82821.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Cloud Alibaba Ribbon负载均衡器

文章目录 Ribbon 负载均衡器环境搭建1.依赖2.配置3.修改其默认的负载均衡策略3.1 验证 4.创建自定义的Rule4.1 MyRule&#xff08;&#xff09;4.2 在配置config类中配置 5.饥饿加载6.我只想访问不想被别的访问 Ribbon 负载均衡器 背景 Ribbon 是一个用于客户端负载均衡的开源…

【ElementUI】ElementUI Tooltip 根据内容判断是否显示、文字提示自定义样式

【ElementUI】ElementUI Tooltip 根据内容判断是否显示、文字提示自定义样式 封装组件自定义内容 <template><span v-if"[, null, undefined].indexOf(content) -1"><el-tooltip :content"content" effect"light" placement&q…

Pytorch从零开始实战04

Pytorch从零开始实战——猴痘病识别 本系列来源于365天深度学习训练营 原作者K同学 文章目录 Pytorch从零开始实战——猴痘病识别环境准备数据集模型选择模型训练数据可视化其他模型图片预测 环境准备 本文基于Jupyter notebook&#xff0c;使用Python3.8&#xff0c;Pytor…

前端-layui动态渲染表格行列与复杂表头合并

说在前面&#xff1a; 最近一直在用layui处理表格 写的有些代码感觉还挺有用的&#xff0c;顺便记录下来方便以后查看使用&#xff1b; HTML处代码 拿到id 渲染位置表格 <div class"layui-table-body salaryTable"><table class"layui-table" i…

GE WES5120 5120-1506 自动化控制模块

GE WES5120 5120-1506 是一种自动化控制模块&#xff0c;通常用于工业自动化和控制系统中&#xff0c;用于监测和控制各种工业过程。这种类型的自动化控制模块在多个应用领域都有广泛的用途&#xff0c;包括但不限于以下几个领域&#xff1a; 制造业&#xff1a; WES5120 5120-…

Nginx配置负载均衡时访问地址无法生效

场景还原 今天有小伙伴练习Nginx配置负载均衡时总是无法使用配置好的网址访问 配置文件信详情 http {# 负载均衡 后端IP地址和端口 webservers 策略 轮询upstream webservers{server 192.168.1.100:8080 weight90; server 127.0.0.1:8080 weight10; }server{listen 80;ser…

Lostash同步Mysql数据到ElasticSearch(二)logstash脚本配置和常见坑点

1. logstash脚本编写&#xff08;采用单文件对应单表实例&#xff09; 新建脚本文件夹 cd /usr/local/logstash mkdir sql & cd sql vim 表名称.conf #如: znyw_data_gkb_logstash.conf 建立文件夹&#xff0c;保存资源文件更新Id mkdir -p /data/logstash/data/last_r…

【FAQ】安防监控系统/视频云存储/监控平台EasyCVR服务器解释器出现变更该如何修改?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

【数据结构】图的基本概念,图的存储结构(邻接矩阵;邻接表;十字链表;邻接多重表)

欢~迎~光~临~^_^ 目录 1、图的基本概念 2、图的存储结构 2.1邻接矩阵 2.2邻接表 2.3十字链表 2.4邻接多重表 2.5图的四种存储结构的对比 1、图的基本概念 图是由一组节点&#xff08;通常称为顶点&#xff09;和一组连接这些节点的边&#xff08;通常称为边&#xff0…

密码学概论

1.密码学的三大历史阶段&#xff1a; 第一阶段 古典密码学 依赖设备&#xff0c;主要特点 数据安全基于算法的保密&#xff0c;算法不公开&#xff0c;只要破译算法 密文就会被破解&#xff0c; 在1883年第一次提出 加密算法应该基于算法公开 不影响密文和秘钥的安全&#xff…

添加一个仅管理员可见的页面

例如我新加一个页面 申请一个路由 《插播》 前端是如何知道我们是管理员的呢&#xff0c;ant-design框架会帮我们存到InitialState里&#xff0c;做为全局变量 在access.ts里我们获取到了用户是否为管理员 &#xff08;用户存在且为管理员&#xff09; 框架为我们打通了个路由…

JADE盲分离算法仿真

JADE算法原理 JADE 算法首先通过去均值预白化等预处理过程得到解相关的混合信号&#xff0c;预处理后的信号构建的协方差矩阵变为单位阵&#xff0c;为后续的联合对角化奠定基础&#xff1b;其次&#xff0c;通过建立四阶累积量矩阵&#xff0c;利用高阶累积量的统计独立性等性…

uniapp获取一周日期和星期

UniApp可以使用JavaScript中的Date对象来获取当前日期和星期几。以下是一个示例代码&#xff0c;可以获取当前日期和星期几&#xff0c;并输出在一周内的每天早上和晚上&#xff1a; // 获取当前日期和星期 let date new Date(); let weekdays ["Sunday", "M…

Android Aidl跨进程通讯(四)--接口回调,服务端向客户端发送数据

学更好的别人&#xff0c; 做更好的自己。 ——《微卡智享》 本文长度为3325字&#xff0c;预计阅读9分钟 前言 前几篇介绍了AIDL通讯的基础&#xff0c;进阶和异常捕获&#xff0c;本篇就来看看服务端怎么向客户端来实现发送消息。 实现服务端往客户端发送消息&#xff0c;主要…

java版Spring Cloud+Mybatis+Oauth2+分布式+微服务+实现工程管理系统

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

爬虫框架Scrapy学习笔记-1

前言 在现代互联网时代&#xff0c;网页数据获取和处理已经成为了重要的技能之一。无论是为了获取信息、做市场研究&#xff0c;还是进行数据分析&#xff0c;掌握网页爬取和数据处理技术都是非常有用的。本文将介绍从网页加载到数据存储的完整过程&#xff0c;包括网络请求、…

(手撕)数据结构--->堆

文章内容 目录 一&#xff1a;堆的相关概念与结构 二&#xff1a;堆的代码实现与重要接口代码讲解 让我们一起来学习:一种特殊的数据结构吧&#xff01;&#xff01;&#xff01;&#xff01; 一&#xff1a;堆的相关概念与结构 在前面我们已经简单的学习过了二叉树的链式存储结…

Linux Day17 生产者消费者

一、生产者消费者问题概述 生产者 / 消费者问题&#xff0c;也被称作有限缓冲问题。两个或者更多的线程共享同一个缓冲 区&#xff0c;其中一个或多个线程作为 “ 生产者 ” 会不断地向缓冲区中添加数据&#xff0c;另一个或者多个线程作为 “ 消费者 ” 从缓冲区中取走数据。…

【MySQL系列】- MySQL自动备份详解

【MySQL系列】- MySQL自动备份详解 文章目录 【MySQL系列】- MySQL自动备份详解一、需求背景二、Windows mysql自动备份方法2.1 复制date文件夹备份实验备份环境创建bat直接备份脚本 2 .2 mysqldump备份成sql文件创建mysqldump备份脚本 2 .3 利用WinRAR对MySQL数据库进行定时备…

【每日一题】154. 寻找旋转排序数组中的最小值 II

154. 寻找旋转排序数组中的最小值 II - 力扣&#xff08;LeetCode&#xff09; 已知一个长度为 n 的数组&#xff0c;预先按照升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,4,4,5,6,7] 在变化后可能得到&#xff1…