在了解了窗口函数实现原理 spark、hive中窗口函数实现原理复盘 和 sparksql比hivesql优化的点(窗口函数)之后,今天又撸了一遍hive sql 中窗口函数的源码实现,写个笔记记录一下。
简单来说,窗口查询有两个步骤:将记录分割成多个分区;然后在各个分区上调用窗口函数。
传统的 UDAF 函数只能为每个分区返回一条记录,而我们需要的是不仅仅输入数据是一张表,输出数据也是一张表(table-in, table-out),因此 Hive 社区引入了分区表函数 Partitioned Table Function (PTF)。
1、代码流转图
PTF 运行在分区之上、能够处理分区中的记录并输出多行结果的函数。
hive会把QueryBlock,翻译为执行操作树OperatorTree,其中每个operator都会有三个重要的方法:
initializeOp() --初始化算子
process() --执行每一行数据
forward() --把处理好的每一行数据发送到下个Operator
当遇到窗口函数时,会生成PTFOperator,PTFOperator 依赖PTFInvocation读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;
WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。
2、其它细节
PTFOperator.process(Object row, int tag)-->PTFInvocation.processRow(row)
void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); //主要操作就是把数据 append到 ptfpartition中,这里的partition与map-reduce中的分区不同,map-reduce分区是按照key的hash分,而这里是要把相同的key要放在同一个ptfpartition,方便后续的windowfunction操作 }}
真正对数据的操作是当相同的key完全放入同一个ptfpartition之后,时机就是finishPartition:
void finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { outputPartRowsItr = inputPart == null ? null : tabFn.iterator(inputPart.iterator()); } else { outputPart = inputPart == null ? null : tabFn.execute(inputPart); //这里TableFunctionEvaluator outputPartRowsItr = outputPart == null ? null : outputPart.iterator(); } if ( next != null ) { if (!next.isStreaming() && !isOutputIterator() ) { next.inputPart = outputPart; } else { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { next.processRow(outputPartRowsItr.next()); } } } } } if ( next != null ) { next.finishPartition(); } else { if (!isStreaming() ) { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { forward(outputPartRowsItr.next(), outputObjInspector); } } } }}
还有一个雷区,PTFPartition append():
public void append(Object o) throws HiveException { if ( elems.rowCount() == Integer.MAX_VALUE ) { //当一个ptfpartition加入的条数等于Integer.MAX_VALUE时会抛异常 throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition", Integer.MAX_VALUE)); } @SuppressWarnings("unchecked") List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE); elems.addRow(l);}
需要把相同key的数据完全放入一个ptfPartition进行操作,这时对加入的的条数做了限制,不能>=Integer.MAX_VALUE(21亿),这块需要注意。
我是小萝卜算子 在成为最厉害最厉害最厉害的道路上 很高兴认识你 ~~ enjoy ~~ |