1、简介
在上一节中已经介绍过 JDFrame,文章链接stream流太难用了看看JDFrame 没看过的朋友可以先看看,
这次主要讲讲窗口函数相关API的使用
在各种数据库mysql, hive、spark中都有非常好用的开窗函数使用, 但是java却没好用的JVM层级的窗口函数使用,于是乎写了这个,如果能熟练使用开窗函数相信能在业务代码中大大减少我们的统计计算逻辑代码。
本文不会介绍每个开窗函数是什么,它的语义与其他语言的窗口函数一模一样,在这里仅作简单介绍,后续会出相关实战的数据分析案例。
2、Maven依赖
<dependency><groupId>io.github.burukeyou</groupId><artifactId>jdframe</artifactId><version>0.0.4</version>
</dependency>
3、窗口函数的API使用
测试代码
static List<WebPvDto> dataList = new ArrayList<>();static {dataList.add(new WebPvDto("a",0,1));dataList.add(new WebPvDto("a",1,5));dataList.add(new WebPvDto("a",2,7));dataList.add(new WebPvDto("a",3,3));dataList.add(new WebPvDto("a",4,2));dataList.add(new WebPvDto("a",5,4));dataList.add(new WebPvDto("a",6,4));dataList.add(new WebPvDto("b",7,1));dataList.add(new WebPvDto("b",8,4));dataList.add(new WebPvDto("b",7,6));dataList.add(new WebPvDto("b",8,2));
}@Data
public static class WebPvDto {private String type;private Integer score;private Integer pvCount;public Object value;
}
ROW_NUMBER 窗口函数
生成行号,从1开始
// 等价于 select ROW_NUMBER() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overRowNumberS(WebPvDto::setValue).show(30);
输出结果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 4
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
RANK 窗口函数
生成排名号,相同值排名一样,排名不连续 。 如: 1 2 2 2 5 6 7
// 等价于 select rank() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overRankS(WebPvDto::setValue).show(30);
输出结果
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 3
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
DENSE_RANK 窗口函数
生成排名号,相同值排名一样,排名连续 如 1 2 2 2 3 4 5
// 等价于 select DENSE_RANK() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overDenseRankS(WebPvDto::setValue).show(30);
输出结果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 3
a 3 3 4
a 4 2 5
a 0 1 6
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
PERCENT_RANK 窗口函数
// 等价于 select PERCENT_RANK() over(partition by type order pv_count desc)
SDFrame.read(dataList).defaultScale(6).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overPercentRankS(WebPvDto::setValue).show(30);
输出结果
type score pvCount value
a 2 7 0
a 1 5 0.166667
a 5 4 0.333333
a 6 4 0.333333
a 3 3 0.666667
a 4 2 0.833333
a 0 1 1.000000
b 7 6 0
b 8 4 0.333333
b 8 2 0.666667
b 7 1 1.000000
Count窗口函数
// 等价于SQL: select count(*) over(partition by type order by pv_count desc rows between UNBOUNDED PRECEDING and CURRENT ROW)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount).roundStartRow2CurrentRow()).overCountS(WebPvDto::setValue).show(30);
输出结果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 4
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
Sum窗口函数
// 等价于 select sum(pv_count) over(rows between 1 PRECEDING and 2 FOLLOWING)
JDFrame.read(dataList).window(Window.roundBetweenBy(Range.BEFORE(1),Range.AFTER(2))).overSumS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
输出结果:
type score pvCount value
a 0 1 13
a 1 5 16
a 2 7 17
a 3 3 16
a 4 2 13
a 5 4 11
a 6 4 13
b 7 1 15
b 8 4 13
b 7 6 12
b 8 2 8
Avg窗口函数
// 等价于 select avg(pv_count) over(partition by type )
SDFrame.read(dataList).defaultScale(4).window(Window.groupBy(WebPvDto::getType)).overAvgS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
输出结果
type score pvCount value
a 0 1 3.7143
a 1 5 3.7143
a 2 7 3.7143
a 3 3 3.7143
a 4 2 3.7143
a 5 4 3.7143
a 6 4 3.7143
b 7 1 3.2500
b 8 4 3.2500
b 7 6 3.2500
b 8 2 3.2500
Max窗口函数
// 等价于 select max(pv_count) over(partition by type order pv_count asc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortAsc(WebPvDto::getPvCount)).overMaxValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
输出结果:
type score pvCount value
a 0 1 7
a 4 2 7
a 3 3 7
a 5 4 7
a 6 4 7
a 1 5 7
a 2 7 7
b 7 1 6
b 8 2 6
b 8 4 6
b 7 6 6
Min窗口函数
// 等价于 select min(pv_count) over(rows between CURRENT ROW and 2 FOLLOWING)
SDFrame.read(dataList).window(Window.roundCurrentRow2AfterBy(2)).overMinValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
type score pvCount value
a 0 1 1
a 1 5 3
a 2 7 2
a 3 3 2
a 4 2 2
a 5 4 1
a 6 4 1
b 7 1 1
b 8 4 2
b 7 6 2
b 8 2 2
Lag窗口函数
获取当前行的前N行数据
// 等价于 select lag(pv_count,2) over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overLagS(WebPvDto::setValue,WebPvDto::getPvCount,2).show(30);
输出结果:
type score pvCount value
a 2 7
a 1 5
a 5 4 7
a 6 4 5
a 3 3 4
a 4 2 4
a 0 1 3
b 7 6
b 8 4
b 8 2 6
b 7 1 4
Lead窗口函数
获取当前行的后N行数据
// 等价于 select lead(pv_count,3) over()
SDFrame.read(dataList).window().overLeadS(WebPvDto::setValue,WebPvDto::getPvCount,3).show(30);
输出结果:
type score pvCount value
a 0 1 3
a 1 5 2
a 2 7 4
a 3 3 4
a 4 2 1
a 5 4 4
a 6 4 6
b 7 1 2
b 8 4
b 7 6
b 8 2
NthValue 窗口函数
获取窗口范围内的第N行数据
// 等价于 select NTH_VALUE(pv_count,2) over(rows between 1 PRECEDING and CURRENT ROW)
SDFrame.read(dataList).window(Window.roundBefore2CurrentRowBy(3)).overNthValueS(WebPvDto::setValue,WebPvDto::getPvCount,2).show(30);
输出结果:
type score pvCount value
a 0 1
a 1 5 5
a 2 7 5
a 3 3 5
a 4 2 7
a 5 4 3
a 6 4 2
b 7 1 4
b 8 4 4
b 7 6 1
b 8 2 4
FirstValue 窗口函数
获取窗口范围内的第1行数据
// 等价于 select FIRST_VALUE(pv_count) over(rows between 2 PRECEDING and CURRENT ROW)
SDFrame.read(dataList).window(Window.roundBetweenBy(Range.BEFORE(2), Range.CURRENT_ROW)).overFirstValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
type score pvCount value
a 0 1 1
a 1 5 1
a 2 7 1
a 3 3 5
a 4 2 7
a 5 4 3
a 6 4 2
b 7 1 4
b 8 4 4
b 7 6 1
b 8 2 4
LastValue 窗口函数
获取窗口范围内的最后一行数据
// 等价于 select LAST_VALUE(pv_count) over(rows between 2 PRECEDING and 2 FOLLOWING)
SDFrame.read(dataList).window(Window.roundBeforeAfterBy(2,2)).overLastValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
输出结果
type score pvCount value
a 0 1 7
a 1 5 3
a 2 7 2
a 3 3 4
a 4 2 4
a 5 4 1
a 6 4 4
b 7 1 6
b 8 4 2
b 7 6
b 8 2
Ntile 窗口函数
给窗口尽量均匀的分成N个桶, 每个桶的编号从1开始, 如果分布不均匀,则优先分配给最小的桶,桶之间的大小差值最多不超过1
// 等价于 select Ntile(3) over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType)).overNtileS(WebPvDto::setValue,3).show(30);
输出结果:
type score pvCount value
a 0 1 1
a 1 5 1
a 2 7 1
a 3 3 1
a 4 2 2
a 5 4 2
a 6 4 2
b 7 1 2
b 8 4 3
b 7 6 3
b 8 2 3
Cume_Dist 窗口函数
累积分布值, 统计的是 (小于等于当前排名号的行数 / 窗口行数) 的比率
// select cume_dist() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overCumeDistS(WebPvDto::setValue).show(30);
输出结果
type score pvCount value
a 2 7 0.14
a 1 5 0.29
a 5 4 0.57
a 6 4 0.57
a 3 3 0.71
a 4 2 0.86
a 0 1 1.00
b 7 6 0.25
b 8 4 0.50
b 8 2 0.75
b 7 1 1.00
4 窗口
主要是通过Window对象去构建开窗的信息,包括窗口的分区情况,窗口的排序情况,还有窗口范围。
窗口范围可以通过 Range对象去枚举指定。
如果不指定窗口信息默认窗口范围就是全部行。 众所周知而在 mysql中如果使用了order默认窗口范围就是 rows between UNBOUNDED PRECEDING and CURRENT ROW
, 如果没有使用order也没指定rows between
, 默认窗口范围才是全部。 这点要注意区分
5 最后
1、窗口函数的计算结果的存储有两种方式,一种是直接返回到FI2里, 一种是可以通过指定SetFunction 进行存储, 所有后缀带S的方法就是通过后者的方式的存储, 之所以带S后缀是为了以便于区分,并且是放到第一个方法参数里。
2、除了可以通过单独的window()的方法去指定窗口信息,在每个over方法也可以了单独设置。 没单独设置就使用window()方法里指定的窗口信息
3、在不同窗口范围内的数据计算目前用的是各种滑动窗口算法,时间复杂度基本在O(N)左右
代码地址
Maven依赖地址