多维分析
需求:有一张test表,表的字段为:A, B, C, amount, 其中A, B, C为维度字段,求以三个维度任意组合,统计sum(amount)
- Union方案:
- A, B, C的任意组合共有8种,分别为(A, B,C,AB,AC,BC,ABC,空集)
- 然后每种类型的个数也不一样,需要补足空白的字段
- 将每种类型进行group by+ sum 求和后Union在一起
- Flink方案
- 前面的语法一样
- group by grouping sets (A,B,C,(A,B),(A,C),(B,C),(A,B,C),())
- group by cube(A,B,C)
- roll up (A,B,C) ⇒ ((A,B,C), (A,B),(A),())
- Hive提供的Grouping Sets:
窗口
-
分组窗口 groupWindow
- 分类
- 滚动窗口
- 滑动窗口Hop Windows
- 会话窗口
- 分类
-
窗口表值函数 window TVF(支持topN)
- 滚动窗口
- 滑动窗口Hop Windows
- 累积窗口Cumulate Windows
- 会话窗口Sesssion Windows(不支持)
-
开窗函数 over
-
API的用法:
- 计数窗口(SQL中不支持计数窗口)
- 计数滚动:
Tumble.over(rowInterval(5L)).on($("处理时间")).as("w")
- 计数滑动(窗口的首次计算必须达到窗口大小):
Slide.over(rowInterval(5L)).every(rowInterval(3L)).on($("pt")).as("w")
- 使用窗口:
table.window(w1).groupBy($("w"), $("id")).select($("id"), $("vc")).execute().print();
- 计数滚动:
- 时间窗口:
- 滚动Tumble.over(lit(5).seconds()).on($(“pt”)).as(""w);
- 滑动Slide.over(lit(5).seconds()).every(lit(3).seconds()).as(“w”);
- 会话:Sessino.withGap(lit(3).seconds()).on($(“pt”)).as(“w”);
- 使用窗口: table.window(w7).groupBy( ( " w " ) , ("w"), ("w"),(“id”)).select()
- 计数窗口(SQL中不支持计数窗口)
-
SQL的用法:
//滚动时间窗口
select id,tumble_start(pt,interval '5' second) as wStart,tumble_end(pt,interval '5' second) as wEnd,sum(vc) sumvc
from t1
group by tumble(et,interval '5' second),id;//滑动时间窗口
selectid,hop_start(pt,interval '3' second, interval '5' second) as wStart,hop_end(pt,interval '3' second, interval '5' second) as wEnd,sum(vc) svc
from t1
group by hop(et,interval '5' second,)
- WindowTVF窗口表值函数(只有SQL形式)
//滚动窗口
select window_start,window_end,SUM(price)
FromTable(tumble(table t1,descriptor(pt)),//事件时间改为etinterval '5' second)group by window_start, window_end, id;//滑动窗口(窗口大小必须是滑动步长的整数倍)
select window_start,window_end,SUM(price)
FromTable(hop(table t1,descriptor(pt)),interval '3' second,//滑动步长interval '6' second//窗口大小)group by window_start, window_end, id;//累积窗口(统计类似0~1,0~2,0~3这样的窗口/)
select window_start,window_end,SUM(price)
FromTable(cumulate(table t1,descriptor(pt)),interval '2' second,//步长,一般为小时interval '10' second//每一轮的大小,一般为一天)group by window_start, window_end, id;
- Over聚合函数(划定一个范围,对窗口内的每条数据都做统计)
- SQL语法:over(partition by t1 order by t2 )
- API语法
- 定义窗口(无法指定下无边界,流式数据无法明确下边界)
Over.partitionBy($("id")).orderBy( $ ("pt")).preceding(unbounded_row).follow(current_row).as("w");
- 定义上两行到当前行:
Over.partitionBy( $ ("id")).orderBy( $ ("pt")).preceding(rowinterval(2L)).follow(current_row).as("w");
- 基于时间,上无边界到当前时间:
Over.partitionBy($("id")).orderBy( $ ("pt")).preceding(unbounded_range).follow(current_range).as("w");
- 上两秒到当前时间:
Over.partitionBy( $ ("id")).orderBy( $ ("pt")).preceding(lit(2).second()).follow(current_range).as("w");
- 使用窗口
sum().over( $ ("w1"))
- 定义窗口(无法指定下无边界,流式数据无法明确下边界)
- SQL 语法
//上无边界到当前行
selectid,vc,sum(vc) over (partition by id order by pt rows between unbounded preceding andcurrent row ) sumvc
from t1;//上两行到当前行//上无边界到当前时间//上两秒到当前时间
TopN
窗口表值函数 + over窗口实现
- 统计用来排名的数值(点击次数)和窗口时间信息
//统计每个user的点击次数
select user,count(*) cnt,window_start,window_end
from Table(tumble(talbe t1, descriptor(et), interval '10' second)
)
group by window_start, window_end,user;
- 按照点击次数排名(按照窗口结束时间分区,再排名,目前Flink1.17只支持row_number函数)
- 原本order by 后面只能是时间字段,且只能是升序
- 如果FLink能够识别当前操作是TopN的情况下,支持在order by后面出现非时间字段
(selectuser,cnt,row_number() over(partition by window_start,window_end order by cnt desc ) rk
from t2) t3
- 取TopN,进行where过滤
where row_num <= N
, 这段代码是识别为TopN查询的关键.
selectuser,cnt,rk
from t3
where rk <= 3;
实际上,所有代码可以合并为一个整体:
去重
TopN的特殊写法,根据主键开窗,只取where row_num = 1的数据,即能达到对重复数据进行去重的效果。
需求:统计每个窗口中每个url最后到达的数据
(select url,ts,window_start,window_end
from ) as t1//按照窗口的开始时间和结束时间,url进行分区,通过时间排序,求排名
(selecturl,ts,window_start,window_end,row_number(partition by window_start, window_end, url order by ts desc) rk
from t1;) as t2// 取rk = 1
select url,ts,window_end
from t2
where rk = 1;