目录 flink sql常用配置 kafka source to mysql sink 窗口函数 开窗 datagen 自动生成数据表 tumble 滚动窗口 hop 滑动窗口 cumulate 累积窗口 grouping sets 多维分析 over 函数 TopN
flink sql常用配置
设置输出结果格式
SET sql - client. execution. result- mode = tableau;
kafka source to mysql sink
kafka
topic: bop_log_realtime
数据结构:
{"timestamp" :"2023-10-31 14:26:02.528" , "serverip" :"10.13.177.209" , "level" :"INFO" , "servicename" :"bop-fms-query-info" , "traceid" :"" , "spanid" :"" , "parent" :"" , "message" :"Resolving eureka endpoints via configuration" }mysql表:
库名:flink_test
CREATE TABLE ` bop_log_realtime_warning` ( ` id` bigint ( 20 ) NOT NULL AUTO_INCREMENT , ` serverip` varchar ( 255 ) NOT NULL DEFAULT '' , ` timestamp` varchar ( 255 ) NOT NULL DEFAULT '' , ` level` varchar ( 255 ) NOT NULL DEFAULT '' , ` servicename` varchar ( 255 ) NOT NULL DEFAULT '' , ` traceid` varchar ( 255 ) NOT NULL DEFAULT '' , ` spanid` varchar ( 255 ) NOT NULL DEFAULT '' , ` parent` varchar ( 255 ) NOT NULL DEFAULT '' , ` message` varchar ( 255 ) NOT NULL DEFAULT '' , ` updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP , PRIMARY KEY ( ` id` )
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE kafka_log_realtime_json ( ` serverip` STRING, ` timestamp` STRING, ` level` STRING, ` servicename` STRING, ` traceid` STRING, ` spanid` STRING, ` parent` STRING, ` message` STRING
) WITH ( 'connector' = 'kafka' , 'topic' = 'bop_log_realtime' , 'properties.bootstrap.servers' = '10.2.25.221:9092,10.2.25.221:9093' , 'properties.group.id' = 'testGroup2' , 'format' = 'json' , 'scan.startup.mode' = 'latest-offset'
) ; CREATE TABLE bop_log_realtime_warning ( ` serverip` STRING, ` timestamp` STRING, ` level` STRING, ` servicename` STRING, ` traceid` STRING, ` spanid` STRING, ` parent` STRING, ` message` STRING
) WITH (
'connector' = 'jdbc'
, 'url' = 'jdbc:mysql://m3309i.hebe.grid.xx.com.cn:3309/flink_test?zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai'
, 'username' = 'super_mis'
, 'password' = 'mis_password'
, 'table-name' = 'bop_log_realtime_warning'
) ; insert into bop_log_realtime_warning
SELECT ` serverip` , ` timestamp` , ` level` , ` servicename` , ` traceid` , ` spanid` , ` parent` , ` message` FROM kafka_log_realtime_json;
窗口函数 开窗
datagen 自动生成数据表
CREATE TABLE ws ( id INT , vc INT , pt AS PROCTIME( ) , et AS cast( CURRENT_TIMESTAMP as timestamp ( 3 ) ) , WATERMARK FOR et AS et - INTERVAL '5' SECOND
) WITH ( 'connector' = 'datagen' , 'rows-per-second' = '10' , 'fields.id.min' = '1' , 'fields.id.max' = '3' , 'fields.vc.min' = '1' , 'fields.vc.max' = '100'
) ; CREATE TABLE sink ( id INT , ts BIGINT , vc INT
) WITH ( 'connector' = 'print'
) ;
tumble 滚动窗口
滚动窗口 窗口大小5 秒
select id, sum ( vc) vcSum, window_start, window_endfrom table ( TUMBLE( table ws, descriptor( et) , INTERVAL '5' SECOND ) ) group by id, window_start, window_end;
hop 滑动窗口
滑动窗口 滑动步长5 秒 窗口大小10 秒
注意:窗口大小= 滑动步长的整数倍(底层会优化成多个小滚动窗口)
select id, sum ( vc) vcSum, window_start, window_endfrom table ( hop( table ws, descriptor( et) , INTERVAL '5' SECOND , INTERVAL '10' SECOND ) ) group by id, window_start, window_end;
cumulate 累积窗口
注意:窗口大小= 累积步长的整数倍
select id, sum ( vc) vcSum, window_start, window_endfrom table ( CUMULATE( table ws, descriptor( et) , INTERVAL '5' SECOND ) ) group by id, window_start, window_end;
grouping sets 多维分析
select id, sum ( vc) vcSum, window_start, window_endfrom table ( TUMBLE( table ws, descriptor( et) , INTERVAL '5' SECOND ) ) group by window_start, window_end, grouping sets ( ( id) ) ;
over 函数
TopN