【PostgreSQL内核学习 —— (WindowAgg(一))】

WindowAgg

  • 窗口函数介绍
  • WindowAgg
    • 理论层面
    • 源码层面
      • WindowObjectData 结构体
      • WindowStatePerFuncData 结构体
      • WindowStatePerAggData 结构体
      • eval_windowaggregates 函数
      • update_frameheadpos 函数

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书

窗口函数介绍

  首先,我将提供一个简单的 SQL 用例,并逐步解读窗口函数的使用过程。假设我们有一个名为 sales 的销售数据表,表结构如下:

CREATE TABLE sales (id SERIAL PRIMARY KEY,salesperson_id INT,sale_date DATE,sale_amount NUMERIC
);

  假设 sales 表包含以下数据:

idsalesperson_idsale_datesale_amount
112024-01-011000
212024-01-021200
322024-01-01800
422024-01-021100
532024-01-011500
632024-01-021300

SQL 用例:使用窗口函数计算每个销售人员的累计销售金额
  我们希望计算每个销售人员在每个销售记录的日期上的累计销售金额。为了实现这一目标,我们可以使用 SUM() 函数,它会对每个销售人员的数据进行累计。
SQL 查询如下:

SELECT id,salesperson_id,sale_date,sale_amount,SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
FROM sales
ORDER BYsalesperson_id, sale_date;

详细解读:

  1. SUM(sale_amount)

  这是一个聚合函数,通常用于对某个列的值进行汇总。在这个查询中,SUM(sale_amount) 用于计算销售额的累计值。

  1. OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

  这是一个窗口函数的关键部分,指定了如何对结果进行分区、排序和聚合。具体来说:

  • PARTITION BY salesperson_id:这是窗口函数的分区操作,将数据按 salesperson_id(销售人员 ID)分区。也就是说,每个销售人员的数据将分别计算,不同销售人员的累计销售是独立的。
  • ORDER BY sale_date:对每个分区内的数据按销售日期 (sale_date) 进行排序,确保累计计算是按时间顺序进行的。
  • ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:这是一个窗口帧的定义,意味着每个分区的累计值从该分区的第一行开始计算,一直到当前行。UNBOUNDED PRECEDING 表示从分区的第一行开始,CURRENT ROW 表示包括当前行。
  1. 结果分析:

  查询结果将会返回每个销售人员的每笔销售记录,并在 cumulative_sales 列显示该销售人员的累计销售金额。例如:

idsalesperson_idsale_datesale_amountcumulative_sales
112024-01-0110001000
212024-01-0212002200
322024-01-01800800
422024-01-0211001900
532024-01-0115001500
632024-01-0213002800
  • 对于销售人员 1,第一个销售记录的累计销售金额为 1000,第二个销售记录的累计销售金额为 1000 + 1200 = 2200
  • 对于销售人员 2,第一个销售记录的累计销售金额为 800,第二个销售记录的累计销售金额为 800 + 1100 = 1900
  • 对于销售人员 3,第一个销售记录的累计销售金额为 1500,第二个销售记录的累计销售金额为 1500 + 1300 = 2800

窗口函数的工作机制:

  • 分区:窗口函数首先会根据 PARTITION BY 子句将数据分成不同的分区。这里,数据按 salesperson_id 分区,每个销售人员的记录组成一个分区。
  • 排序:在每个分区内,数据会根据 ORDER BY 子句进行排序。在这个例子中,按 sale_date 对每个销售人员的销售记录按时间顺序进行排序。
  • 累计ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 确保了每个销售人员从分区的第一行开始,直到当前行的所有销售记录都会被累加,形成一个累积的结果。

更多详细的窗口函数使用教程可以参阅:GaussDB(DWS) SQL进阶之SQL操作之窗口函数

WindowAgg

理论层面

  下面我们来了解一下 WindowAgg 算子,先看看书中的描述:
在这里插入图片描述
在这里插入图片描述
  书中详细描述了 WindowAgg 节点在 PostgreSQL 中处理窗口函数时的执行过程,包括如何管理分区、排序、聚合等。通过 WindowAggState 和相关的数据结构,窗口聚合可以高效地计算多个窗口函数,同时保持对数据的完整性。性能优化方面,窗口函数的排序和缓存机制也起到了关键作用,帮助提升计算效率。

源码层面

WindowObjectData 结构体

  WindowObjectData 结构体用于在窗口函数调用过程中保存与窗口聚合操作相关的状态信息。在 PostgreSQL 中,窗口函数用于基于窗口进行计算,而每个窗口函数可能需要不同的上下文状态来处理其数据。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c

/** 所有窗口函数的API都通过这个对象进行调用,该对象会作为fcinfo->context传递给窗口函数。*/
typedef struct WindowObjectData
{NodeTag		type;				/* 类型标识符,用于区分不同的节点类型 */WindowAggState *winstate;		/* 指向父级窗口聚合状态的指针,用于获取窗口聚合的上下文状态 */List	   *argstates;			/* 窗口函数参数的表达式状态树 */void	   *localmem;			/* 当前窗口函数在执行过程中使用的局部内存,由WinGetPartitionLocalMemory分配 */int			markptr;			/* 用于标记当前窗口函数状态的tuplestore标记指针 */int			readptr;			/* 读取指针,指向当前正在处理的行位置 */int64		markpos;			/* 标记指针所指向的行号 */int64		seekpos;			/* 读取指针所指向的行号 */
} WindowObjectData;

WindowStatePerFuncData 结构体

  WindowStatePerFuncData 结构体用于存储与窗口函数和窗口聚合操作相关的工作状态和数据。它包含了窗口函数执行时需要的各种信息,如参数数量排序规则结果类型是否为聚合函数等。这些信息对于在窗口函数计算过程中正确管理和执行窗口函数非常重要。在 PostgreSQL 中,窗口函数的执行涉及多次状态保存和计算,而这个结构体便用于管理这些窗口函数的具体执行细节。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c

/** 为每个由该节点处理的窗口函数和窗口聚合创建一个 WindowStatePerFunc 结构体。*/
typedef struct WindowStatePerFuncData
{/* 链接到与此工作状态相关的 WindowFunc 表达式和状态节点 */WindowFuncExprState *wfuncstate;   /* 当前窗口函数的表达式状态 */WindowFunc *wfunc;                 /* 当前窗口函数的定义(结构体) */int			numArguments;		/* 窗口函数的参数数量 */FmgrInfo	flinfo;				/* 用于窗口函数的 fmgr 查找数据,存储有关函数的信息 */Oid			winCollation;		/* 窗口函数的排序规则,由当前函数派生 *//** 我们需要窗口函数结果的长度和 byval 信息,以便知道如何复制/删除值。*/int16		resulttypeLen;		/* 窗口函数返回值类型的长度 */bool		resulttypeByVal;	/* 窗口函数返回值类型是否为按值传递 */bool		plain_agg;			/* 是否仅为普通的聚合函数? */int			aggno;				/* 如果是,指明其对应的 WindowStatePerAggData 的索引 */WindowObject winobj;			 /* 用于窗口函数 API 的对象 */
} WindowStatePerFuncData;

WindowStatePerAggData 结构体

  WindowStatePerAggData 结构体主要用于保存窗口聚合过程中普通聚合函数的工作状态。它包含了有关过渡函数最终函数初始值当前帧的聚合结果过渡值等详细信息。通过这些信息,系统可以正确地计算窗口聚合函数的结果,处理每个聚合操作的中间状态,确保聚合计算按预期执行。此外,该结构体还考虑了内存管理和函数调用的效率,使得聚合操作在处理大数据量时能够高效执行。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c

/** 对于普通的聚合窗口函数,我们也有一个这样的结构体。*/
typedef struct WindowStatePerAggData
{/* 聚合函数的过渡函数 OID */Oid			transfn_oid;			/* 聚合函数的过渡函数的 OID */Oid			invtransfn_oid;		/* 反向过渡函数的 OID,可能是 InvalidOid */Oid			finalfn_oid;			/* 最终函数的 OID,可能是 InvalidOid *//** 聚合过渡函数的 fmgr 查找数据 --- 只有当对应的 OID 不为 InvalidOid 时才有效。* 特别注意,函数的 fn_strict 标志在这里保存。*/FmgrInfo	transfn;				/* 聚合函数的过渡函数的 fmgr 查找数据 */FmgrInfo	invtransfn;				/* 反向过渡函数的 fmgr 查找数据 */FmgrInfo	finalfn;				/* 最终函数的 fmgr 查找数据 */int			numFinalArgs;			/* 传递给最终函数的参数个数 *//** 来自 pg_aggregate 入口的初始值*/Datum		initValue;				/* 初始值 */bool		initValueIsNull;		/* 初始值是否为 NULL *//** 当前帧边界的缓存值*/Datum		resultValue;			/* 当前计算帧的结果值 */bool		resultValueIsNull;		/* 结果值是否为 NULL *//** 需要输入、结果和过渡数据类型的长度和 byval 信息,* 以便知道如何复制/删除值。*/int16		inputtypeLen,			/* 输入类型的长度 */resulttypeLen,			/* 结果类型的长度 */transtypeLen;			/* 过渡数据类型的长度 */bool		inputtypeByVal,			/* 输入类型是否按值传递 */resulttypeByVal,		 /* 结果类型是否按值传递 */transtypeByVal;		 /* 过渡数据类型是否按值传递 */int			wfuncno;				/* 关联的 WindowStatePerFuncData 的索引 *//* 持有过渡值和可能的其他附加数据的上下文 */MemoryContext aggcontext;			/* 聚合上下文,可能是私有的,或 winstate->aggcontext *//* 当前的过渡值 */Datum		transValue;			/* 当前过渡值 */bool		transValueIsNull;		/* 过渡值是否为 NULL */int64		transValueCount;		/* 当前聚合的行数 *//* eval_windowaggregates() 函数中使用的数据 */bool		restart;				/* 是否需要在本轮聚合中重新启动此聚合? */
} WindowStatePerAggData;

eval_windowaggregates 函数

  eval_windowaggregates 函数主要用于窗口聚合的计算,特别是普通聚合函数(如 SUM()COUNT() 等)。它在处理窗口时,根据窗口帧的位置和聚合的需求,优化了聚合操作。在帧起始位置为 UNBOUNDED_PRECEDING 时,采用增量计算策略,在窗口帧发生变化时,使用反向过渡函数或重新聚合数据。同时,它通过复用已计算的结果来提高性能,在需要时重启聚合并重置相应的状态。
  此外,它还管理了不同聚合函数的上下文,确保在窗口帧的不同部分对每个聚合函数都进行正确的计算,并在计算结束后保存结果。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c

/** eval_windowaggregates* 评估作为窗口函数的普通聚合函数** 这与 nodeAgg.c 不同的地方在于:首先,如果窗口的帧开始位置发生变化,我们使用反向过渡函数(如果存在)从过渡值中删除行。其次,我们希望在将更多数据聚合到同一过渡值后,可以多次调用聚合最终函数。这是 nodeAgg.c 中不要求的行为。*/
static void
eval_windowaggregates(WindowAggState *winstate)
{WindowStatePerAgg peraggstate;		/* 用于存储每个聚合函数的状态 */int			wfuncno,					/* 窗口函数的索引 */numaggs,					/* 聚合函数的数量 */numaggs_restart,			/* 需要重启的聚合函数数量 */i;							/* 循环变量 */int64		aggregatedupto_nonrestarted;	/* 尚未聚合的行数 */MemoryContext oldContext;				/* 内存上下文的备份 */ExprContext *econtext;					/* 当前表达式上下文 */WindowObject agg_winobj;				/* 窗口函数对象 */TupleTableSlot *agg_row_slot;			/* 用于存储聚合数据的行槽 */TupleTableSlot *temp_slot;				/* 临时槽,用于存储中间结果 */numaggs = winstate->numaggs;	/* 获取窗口聚合函数的数量 */if (numaggs == 0)return;		/* 如果没有聚合函数,直接返回 *//* 获取执行上下文 */econtext = winstate->ss.ps.ps_ExprContext;agg_winobj = winstate->agg_winobj;agg_row_slot = winstate->agg_row_slot;temp_slot = winstate->temp_slot_1;/** 如果窗口的帧起始位置为 UNBOUNDED_PRECEDING 且没有排除子句,* 那么窗口帧由从分区开始处向前延伸的一组连续的行组成,随着当前行向前推进,行只进入帧内,而不会退出帧。* 这样就可以使用增量策略来计算聚合值:我们为每个加入帧的行运行过渡函数,并在需要时运行最终函数来获取当前聚合值。* 这种方法比每次处理当前行时都重新运行整个聚合计算更高效。前提是假设最终函数不会破坏正在运行的过渡值,这一点在 nodeAgg.c 中也有类似的假设。** 如果帧起始位置有时会移动,我们仍然可以优化相邻的行,尽可能使用增量聚合策略,但如果帧头超出了上一个头,我们将尝试使用反向过渡函数删除这些行。* 反向过渡函数会恢复聚合的当前状态,仿佛被移除的行从未被聚合过。如果反向过渡函数无法删除该行,或者根本没有反向过渡函数,我们需要重新计算所有位于新帧边界内的元组的聚合结果。** 如果存在排除子句,我们可能需要在一个不连续的行集上聚合,因此需要重新计算每行的聚合。*//** 更新帧头位置** 窗口的帧头位置不应该向后移动,如果发生这种情况,代码将无法处理,因此在安全起见,我们会检查并报告错误。*/update_frameheadpos(winstate);if (winstate->frameheadpos < winstate->aggregatedbase)elog(ERROR, "window frame head moved backward");/** 如果帧没有变化,我们可以重用之前保存的结果值。* 如果帧结束模式是 UNBOUNDED FOLLOWING 或 CURRENT ROW 且没有排除子句,并且当前行位于前一行的帧内,那么当前帧和前一帧的结束位置必须重合。* 这意味着我们可以复用结果值。*/if (winstate->aggregatedbase == winstate->frameheadpos &&(winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |FRAMEOPTION_END_CURRENT_ROW)) &&!(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&winstate->aggregatedbase <= winstate->currentpos &&winstate->aggregatedupto > winstate->currentpos){for (i = 0; i < numaggs; i++){peraggstate = &winstate->peragg[i];wfuncno = peraggstate->wfuncno;econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;}return;}/* 初始化重启标志 */numaggs_restart = 0;for (i = 0; i < numaggs; i++){peraggstate = &winstate->peragg[i];/* 判断是否需要重启聚合函数 */if (winstate->currentpos == 0 ||(winstate->aggregatedbase != winstate->frameheadpos &&!OidIsValid(peraggstate->invtransfn_oid)) ||(winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||winstate->aggregatedupto <= winstate->frameheadpos){peraggstate->restart = true;numaggs_restart++;}elseperaggstate->restart = false;}/** 如果有任何可能需要移动的聚合函数,尝试通过删除从帧顶部掉落的输入行来将 aggregatedbase 向前推进。* 如果失败(即 advance_windowaggregate_base 返回 false),则需要重启聚合。*/while (numaggs_restart < numaggs &&winstate->aggregatedbase < winstate->frameheadpos){/** 获取要删除的元组。这应该永远不会失败,因为我们应该已经处理过这些行。*/if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,temp_slot))elog(ERROR, "could not re-fetch previously fetched frame row");/* 设置元组上下文,用于计算聚合函数的参数 */winstate->tmpcontext->ecxt_outertuple = temp_slot;/** 为每个聚合函数执行反向过渡,除非该聚合已经标记为需要重启。*/for (i = 0; i < numaggs; i++){bool		ok;peraggstate = &winstate->peragg[i];if (peraggstate->restart)continue;wfuncno = peraggstate->wfuncno;ok = advance_windowaggregate_base(winstate,&winstate->perfunc[wfuncno],peraggstate);if (!ok){/* 如果反向过渡函数失败,则需要重启聚合 */peraggstate->restart = true;numaggs_restart++;}}/* 重置每个输入元组的上下文 */ResetExprContext(winstate->tmpcontext);/* 进展到下一个聚合行 */winstate->aggregatedbase++;ExecClearTuple(temp_slot);}/** 如果我们成功推进了所有聚合的基准行,aggregatedbase 现在应该等于 frameheadpos;* 如果失败了,我们必须强制更新 aggregatedbase。*/winstate->aggregatedbase = winstate->frameheadpos;/** 如果为聚合函数创建了标记指针,则将其推进到帧头,以便 tuplestore 可以丢弃不必要的行。*/if (agg_winobj->markptr >= 0)WinSetMarkPosition(agg_winobj, winstate->frameheadpos);/** 现在重启需要重启的聚合函数。** 如果任何聚合函数需要重启,我们假设使用共享上下文的聚合函数也需要重启,* 并且在这种情况下我们会清理共享的 aggcontext。*/if (numaggs_restart > 0)MemoryContextResetAndDeleteChildren(winstate->aggcontext);for (i = 0; i < numaggs; i++){peraggstate = &winstate->peragg[i];/* 如果共享上下文的聚合函数需要重启,则重启所有需要重启的聚合 */Assert(peraggstate->aggcontext != winstate->aggcontext ||numaggs_restart == 0 ||peraggstate->restart);if (peraggstate->restart){wfuncno = peraggstate->wfuncno;initialize_windowaggregate(winstate,&winstate->perfunc[wfuncno],peraggstate);}else if (!peraggstate->resultValueIsNull){if (!peraggstate->resulttypeByVal)pfree(DatumGetPointer(peraggstate->resultValue));peraggstate->resultValue = (Datum) 0;peraggstate->resultValueIsNull = true;}}/** 非重启的聚合现在包含 aggregatedbase 和 aggregatedupto 之间的行,* 而重启的聚合不包含任何行。如果有重启的聚合,我们必须从 frameheadpos 开始重新聚合,* 否则可以从 aggregatedupto 开始继续聚合。*/aggregatedupto_nonrestarted = winstate->aggregatedupto;if (numaggs_restart > 0 &&winstate->aggregatedupto != winstate->frameheadpos){winstate->aggregatedupto = winstate->frameheadpos;ExecClearTuple(agg_row_slot);}/** 继续聚合直到遇到帧外的行(或分区结束)。*/for (;;){int			ret;/* 如果没有获取行,获取下一行 */if (TupIsNull(agg_row_slot)){if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,agg_row_slot))break;			/* 到达分区结束 */}/** 如果当前行不在帧内,跳过聚合。*/ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);if (ret < 0)break;if (ret == 0)goto next_tuple;/* 设置元组上下文 */winstate->tmpcontext->ecxt_outertuple = agg_row_slot;/* 将行累加到聚合中 */for (i = 0; i < numaggs; i++){peraggstate = &winstate->peragg[i];/* 跳过未重启的聚合 */if (!peraggstate->restart &&winstate->aggregatedupto < aggregatedupto_nonrestarted)continue;wfuncno = peraggstate->wfuncno;advance_windowaggregate(winstate,&winstate->perfunc[wfuncno],peraggstate);}next_tuple:/* 重置每个输入元组的上下文 */ResetExprContext(winstate->tmpcontext);/* 进展到下一个聚合行 */winstate->aggregatedupto++;ExecClearTuple(agg_row_slot);}/* 确保帧的结束位置不会向后移动 */Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);/** 最终化聚合并填充结果和空值字段*/for (i = 0; i < numaggs; i++){Datum	   *result;bool	   *isnull;peraggstate = &winstate->peragg[i];wfuncno = peraggstate->wfuncno;result = &econtext->ecxt_aggvalues[wfuncno];isnull = &econtext->ecxt_aggnulls[wfuncno];finalize_windowaggregate(winstate,&winstate->perfunc[wfuncno],peraggstate,result, isnull);/** 如果下一个行共享同一帧,保存结果值*/if (!peraggstate->resulttypeByVal && !*isnull){oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);peraggstate->resultValue =datumCopy(*result,peraggstate->resulttypeByVal,peraggstate->resulttypeLen);MemoryContextSwitchTo(oldContext);}else{peraggstate->resultValue = *result;}peraggstate->resultValueIsNull = *isnull;}
}

  让我们通过一个具体的例子来分析 eval_windowaggregates 函数的每一步操作。假设我们有一个销售数据表 sales,包含以下数据:

salesperson_idsale_datesale_amount
12024-01-01100
12024-01-02200
12024-01-03300
22024-01-01150
22024-01-02250
22024-01-03350

  假设我们希望计算每个销售人员的累计销售额,并且使用的是窗口聚合函数,按日期顺序(ORDER BY sale_date)来计算累计销售额。我们的窗口框架将从 UNBOUNDED PRECEDING 开始,直到当前行结束。

1. 初始化和设置

  在开始时,窗口函数会为每个聚合函数(在这个例子中是 SUM(sale_amount))创建一个 WindowStatePerAggData 结构体来保存当前的聚合状态。假设我们有两个销售人员的销售数据。对于每个销售人员,eval_windowaggregates 将会处理每个销售记录,维护其当前的聚合状态。
  初始化:numaggs = 1,因为只有一个聚合函数 SUM(sale_amount)aggregatedbaseaggregatedupto 变量分别用于跟踪当前已聚合和尚未聚合的行。

2. 更新帧头位置

  在窗口聚合中,frameheadpos 表示窗口帧的起始位置。update_frameheadpos 会根据窗口的当前状态更新这一位置。例如,假设当前处理的销售人员是销售员 1,并且当前销售记录是 2024-01-03
  帧头位置更新:frameheadpos 会根据查询的 PARTITION BYORDER BY 规则进行调整。这里,frameheadpos 会指向销售员 12024-01-03 的行。

3. 优化增量计算

  如果当前的窗口帧没有发生变化,我们就可以复用之前保存的聚合结果,而不必重新计算。例如,在销售员 1 的数据中,假设前两天(2024-01-012024-01-02)已经聚合完成。
  复用结果:假设当前帧的结束位置是 2024-01-03,且没有排除子句(EXCLUSION),那么程序会检查窗口帧是否变化。如果没有变化(即当前行仍然在上一帧内),则复用先前的聚合结果。

4. 处理帧的变化

  如果窗口帧的头位置发生变化,我们需要做以下几步:

  1. 检查是否需要重启聚合:如果帧的头移动,或者窗口的范围发生变化(例如,加入了 EXCLUSION 子句),我们就需要重新聚合数据。eval_windowaggregates 会为每个聚合函数设置重启标志。
  2. 更新聚合函数的状态:在此过程中,advance_windowaggregate_base 函数会根据新的帧头位置和数据,调整聚合的基准状态(aggregatedbase)。

  例如,如果帧的起始位置从 2024-01-01 移动到 2024-01-02eval_windowaggregates 将使用反向过渡函数(invtransfn)删除帧头之前的行。

5. 重新聚合数据

  如果 advance_windowaggregate_base 无法成功移动聚合的基准行(即删除掉帧头之前的行),或者没有反向过渡函数,系统就会重新开始聚合。例如,在 2024-01-02 之后的帧头位置,可能需要从新的帧开始重新计算聚合结果。

  • 重启聚合:如果需要重启聚合(例如因为反向过渡失败),restart 标志会被设置为 true,然后聚合函数的状态会被重新初始化。

6. 计算新行的聚合结果

  如果当前的聚合状态已经准备好,且没有出现需要重启的情况,eval_windowaggregates 会开始将新的一行数据添加到聚合中

  • 逐行聚合:每次计算新的聚合值时,advance_windowaggregate 函数会根据当前行的数据更新聚合结果。例如,在 2024-01-03,销售员 1 的累计销售额将是 100 + 200 + 300 = 600

7. 最终化聚合结果

  当所有的行都被处理完后,finalize_windowaggregate 会被调用来计算窗口聚合的最终结果。例如,计算销售员 1 和销售员 2 的最终累计销售额。

  • 保存和返回结果:最终,eval_windowaggregates保存每个聚合函数的结果,并更新相应的输出字段。如果存在共享上下文(即多个聚合函数使用同一个上下文),它会进行清理,以确保没有内存泄漏。

8. 返回结果

  函数会返回每个窗口聚合函数的在这里插入代码片最终结果,在每一行的输出中返回正确的累计销售额。

示例执行:
假设我们在销售员 1 上执行上述操作:

初始时,销售员 1 在 2024-01-01 的销售额为 100,聚合值为 100。
接着,销售员 1 在 2024-01-02 的销售额为 200,聚合值为 100 + 200 = 300。
最后,在 2024-01-03,销售员 1 的销售额为 300,最终累计值为 100 + 200 + 300 = 600。

update_frameheadpos 函数

  update_frameheadpos 函数的主要功能是更新窗口聚合的帧头位置 frameheadpos,确保其对于当前行有效。帧头的位置是窗口聚合计算的关键,因为它决定了每个窗口函数计算时所依据的数据范围。下面是详细的逐行注释和对每个步骤的解释。(路径:src\backend\executor\nodeWindowAgg.c

/** update_frameheadpos* 使 frameheadpos 对当前行有效** 注意,frameheadpos 计算时不考虑任何窗口排除子句;当前行和/或其同组行即使在后续需要被排除时,也会被视为帧的一部分。** 可能会覆盖 winstate->temp_slot_2。*/
static void
update_frameheadpos(WindowAggState *winstate)
{WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;   /* 获取窗口聚合节点 */int			frameOptions = winstate->frameOptions;  /* 获取当前的帧选项 */MemoryContext oldcontext;  /* 保存当前的内存上下文 *//* 如果帧头已经有效,则不需要更新,直接返回 */if (winstate->framehead_valid)return;/* 可能会在短生命周期的上下文中被调用,因此切换到合适的内存上下文 */oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);/* 根据帧的起始选项来计算帧头 */if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING){/* 在 UNBOUNDED PRECEDING 模式下,帧头始终是分区的第一行 */winstate->frameheadpos = 0;winstate->framehead_valid = true;}else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW){/* 如果是 CURRENT ROW 模式,根据排序模式计算帧头 */if (frameOptions & FRAMEOPTION_ROWS){/* 在 ROWS 模式下,帧头与当前行相同 */winstate->frameheadpos = winstate->currentpos;winstate->framehead_valid = true;}else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)){/* 如果没有 ORDER BY,所有行是同行的 */if (node->ordNumCols == 0){winstate->frameheadpos = 0;winstate->framehead_valid = true;MemoryContextSwitchTo(oldcontext);return;}/** 在 RANGE 或 GROUPS START_CURRENT_ROW 模式下,帧头是当前行的同组中的第一行。* 我们保持帧头的最后已知位置,并根据需要前进。*/tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot)){/* 如果尚未获取第一行,则将其获取到 framehead_slot */if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))elog(ERROR, "unexpected end of tuplestore");}/* 检查当前行是否是正确的帧头 */while (!TupIsNull(winstate->framehead_slot)){if (are_peers(winstate, winstate->framehead_slot, winstate->ss.ss_ScanTupleSlot))break;		/* 该行是正确的帧头 *//* 即使获取失败,仍然推进帧头位置 */winstate->frameheadpos++;spool_tuples(winstate, winstate->frameheadpos);if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))break;		/* 到达分区末尾 */}winstate->framehead_valid = true;}elseAssert(false);  /* 如果既不是 RANGE 也不是 GROUPS,应该抛出异常 */}else if (frameOptions & FRAMEOPTION_START_OFFSET){/* 在 OFFSET 模式下,帧头相对于当前行的位置是通过偏移量来决定的 */if (frameOptions & FRAMEOPTION_ROWS){int64 offset = DatumGetInt64(winstate->startOffsetValue);if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)offset = -offset;  /* 如果是 PRECEDING,则是负偏移量 */winstate->frameheadpos = winstate->currentpos + offset;/* 帧头不能小于第一行 */if (winstate->frameheadpos < 0)winstate->frameheadpos = 0;/* 确保帧头不超出分区末尾 */else if (winstate->frameheadpos > winstate->currentpos + 1){spool_tuples(winstate, winstate->frameheadpos - 1);if (winstate->frameheadpos > winstate->spooled_rows)winstate->frameheadpos = winstate->spooled_rows;}winstate->framehead_valid = true;}else if (frameOptions & FRAMEOPTION_RANGE){/** 在 RANGE START_OFFSET 模式下,帧头是满足范围约束的第一行。* 我们保持帧头的最后已知位置,并根据需要推进。*/int sortCol = node->ordColIdx[0];bool sub, less;/* 确保有排序列 */Assert(node->ordNumCols == 1);/* 计算用于范围检查的标志 */if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)sub = true;elsesub = false;less = false;  /* 通常,帧头应满足 >= sum */if (!winstate->inRangeAsc){sub = !sub;less = true;}tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot)){/* 如果尚未获取第一行,则将其获取到 framehead_slot */if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))elog(ERROR, "unexpected end of tuplestore");}/* 逐行检查,直到找到满足范围条件的帧头行 */while (!TupIsNull(winstate->framehead_slot)){Datum headval, currval;bool headisnull, currisnull;headval = slot_getattr(winstate->framehead_slot, sortCol, &headisnull);currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, &currisnull);if (headisnull || currisnull){/* 如果其中一行的值为 NULL,按照 nulls_first 设置推进帧头 */if (winstate->inRangeNullsFirst){if (!headisnull || currisnull)break;}else{if (headisnull || !currisnull)break;}}else{if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,winstate->inRangeColl,headval,currval,winstate->startOffsetValue,BoolGetDatum(sub),BoolGetDatum(less))))break;  /* 该行是正确的帧头 */}/* 即使获取失败,仍然推进帧头位置 */winstate->frameheadpos++;spool_tuples(winstate, winstate->frameheadpos);if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))break;  /* 到达分区末尾 */}winstate->framehead_valid = true;}else if (frameOptions & FRAMEOPTION_GROUPS){/** 在 GROUPS START_OFFSET 模式下,帧头是满足偏移量约束的第一组的第一行。*/int64 offset = DatumGetInt64(winstate->startOffsetValue);int64 minheadgroup;if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)minheadgroup = winstate->currentgroup - offset;elseminheadgroup = winstate->currentgroup + offset;tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot)){/* 如果尚未获取第一行,则将其获取到 framehead_slot */if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))elog(ERROR, "unexpected end of tuplestore");}/* 逐组推进帧头 */while (!TupIsNull(winstate->framehead_slot)){if (winstate->frameheadgroup >= minheadgroup)break;  /* 找到满足条件的帧头行 */ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);winstate->frameheadpos++;spool_tuples(winstate, winstate->frameheadpos);if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))break;  /* 到达分区末尾 */if (!are_peers(winstate, winstate->temp_slot_2, winstate->framehead_slot))winstate->frameheadgroup++;}ExecClearTuple(winstate->temp_slot_2);winstate->framehead_valid = true;}elseAssert(false);}elseAssert(false);/* 恢复原内存上下文 */MemoryContextSwitchTo(oldcontext);
}

  依旧通过一个具体的例子来分析该函数的具体执行过程,案例参考函数eval_windowaggregates
案例背景:

  我们希望计算每个销售员的累计销售额。使用窗口函数 SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),即每个销售员的累计销售额是从该销售员的第一个销售日期开始,到当前行的销售额的累积。

SQL 查询:

SELECT salesperson_id, sale_date, sale_amount,SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
FROM sales;

这个查询会根据 sale_date 排序每个销售员的数据,并为每一行计算累计销售额。为了计算窗口函数,update_frameheadpos 会在内部被调用来更新每个窗口的帧头位置。

详细步骤和代码说明:
  假设我们正在处理销售员 1 的数据,查询的当前行是 2024-01-02

第一步:更新帧头位置

  当函数 update_frameheadpos 被调用时,它的作用是更新 frameheadpos,即计算当前帧的起始位置。帧头位置决定了窗口函数计算时应包括哪些行。

1. 检查是否已经计算了帧头位置:

if (winstate->framehead_valid)return;  /* 如果帧头已经有效,直接返回 */

  如果帧头已经计算过了,就跳过计算,避免重复计算。

2. 切换到合适的内存上下文:

oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);

  这里,我们切换到合适的内存上下文,以确保计算不会泄漏内存。

3. 计算帧头位置: 接下来根据帧的选项 (frameOptions),我们来决定帧头的位置。

  • 如果是 UNBOUNDED PRECEDING
if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
{winstate->frameheadpos = 0;winstate->framehead_valid = true;
}

  这里,UNBOUNDED PRECEDING 表示帧从分区的第一行开始。因此,帧头位置就是 0,即第一行。

  • 如果是 CURRENT ROW
else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
{if (frameOptions & FRAMEOPTION_ROWS){winstate->frameheadpos = winstate->currentpos;winstate->framehead_valid = true;}
}

  如果是 CURRENT ROW,那么帧头就是当前行的位置。在我们的例子中,假设当前行是 2024-01-02frameheadpos 就是当前行的位置。

第二步:处理 RANGE 或 GROUPS 模式
  如果窗口定义了 RANGEGROUPS,我们需要根据排序规则找到当前行所在的组,并确定该组的第一行作为帧头。

4. 如果没有排序列(ORDER BY):

if (node->ordNumCols == 0)
{winstate->frameheadpos = 0;winstate->framehead_valid = true;MemoryContextSwitchTo(oldcontext);return;
}

  如果没有定义排序列,那么所有行被认为是同一组,帧头位置就是 0,即分区的第一行。

5. 如果有排序列

  如果有排序列,我们会根据当前行的值和分区内其他行的值,找到与当前行同组的第一行作为帧头。例如,如果是 2024-01-02 的数据,程序会查找销售员 1 中销售额最早的那一行(即 2024-01-01)。

5. 查找同组的第一行

while (!TupIsNull(winstate->framehead_slot))
{if (are_peers(winstate, winstate->framehead_slot, winstate->ss.ss_ScanTupleSlot))break;  /* 找到当前行同组的第一行作为帧头 */winstate->frameheadpos++;spool_tuples(winstate, winstate->frameheadpos);
}

  这里,我们通过检查每一行是否与当前行同组(are_peers 函数),找到属于同组的第一行,作为帧头。

第三步:更新帧头位置和返回

7. 设置帧头有效:

winstate->framehead_valid = true;

  一旦计算出帧头位置,就将 framehead_valid 设置为 true,表示帧头计算完成。

8. 恢复内存上下文:

MemoryContextSwitchTo(oldcontext);

  最后,恢复之前的内存上下文,确保内存管理的正确性。

具体例子:

假设当前行是 2024-01-02,销售员 1。
查询的窗口帧使用的是 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW。
第一步:frameheadpos 将被设置为 0,即从 2024-01-01 开始。
第二步:在 RANGE 模式下,程序检查是否有排序列,并找到销售员 1 在 2024-01-01 的销售额作为帧头。
第三步:最终,帧头位置 frameheadpos 被设置为 0,并且标记为有效。

因此,当前行的累计销售额将从 2024-01-012024-01-02,依此类推。

  窗口模式通过不同的帧定义方式,影响了窗口函数的计算范围,从而决定了聚合计算的结果。

  • UNBOUNDED PRECEDING:帧从分区的第一行开始,适用于计算从分区开始到当前行的累计值。
  • CURRENT ROW:帧仅包含当前行,适用于每行单独计算(如排名)。
  • RANGE:帧的起始位置是当前行所在同组的第一行,适用于基于排序的聚合(如销售排名)。
  • OFFSET:帧的起始位置是当前行位置的偏移,适用于计算行之间的偏移聚合。
  • GROUPS:帧的起始位置是当前行所在组的第一行,适用于按组聚合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/68155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[STM32 - 野火] - - - 固件库学习笔记 - - -十二.基本定时器

一、定时器简介 STM32 中的定时器&#xff08;TIM&#xff0c;Timer&#xff09;是其最重要的外设之一&#xff0c;广泛用于时间管理、事件计数和控制等应用。 1.1 基本功能 定时功能&#xff1a;TIM定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中…

鸿蒙next 自定义日历组件

效果图预览 20250124-113957 使用说明 1.选择日期左右箭头&#xff0c;实现每月日历切换&#xff0c;示例中超出当前月份&#xff0c;禁止进入下一月&#xff0c;可在代码更改 2.日历中显示当前选择的日期&#xff0c;选中的日期颜色可自定义 3.日历中可展示历史记录作为数据…

Linux 部署 Java 项目:Tomcat、Redis、MySQL 教程

在 Linux 服务器上部署 Java 项目通常需要配置应用服务器&#xff08;如 Tomcat&#xff09;、数据库&#xff08;如 MySQL&#xff09;和缓存服务器&#xff08;如 Redis&#xff09;。本文将详细介绍如何在 Linux 环境中部署一个 Java 项目&#xff0c;涵盖 Tomcat、Redis 和…

Python数据可视化(够用版):懂基础 + 专业的图表抛给Tableau等专业绘图工具

我先说说文章标题中的“够用版”啥意思&#xff0c;为什么这么写。 按照我个人观点&#xff0c;在使用Python进行数据分析时&#xff0c;我们有时候肯定要结合到图表去进行分析&#xff0c;去直观展现数据的规律和特定&#xff0c;那么我们肯定要做一些简单的可视化&#xff0…

【C++】特殊类设计、单例模式与类型转换

目录 一、设计一个类不能被拷贝 &#xff08;一&#xff09;C98 &#xff08;二&#xff09;C11 二、设计一个类只能在堆上创建对象 &#xff08;一&#xff09;将构造函数私有化&#xff0c;对外提供接口 &#xff08;二&#xff09;将析构函数私有化 三、设计一个类只…

Jetpack Compose 和 Compose Multiplatform 还有 KMP 的关系

今天刚好看到官方发布了一篇文章&#xff0c;用于讨论 Compose Multiplatform 和 Jetpack Compose 之间的区别&#xff0c;突然想起之前评论区经常看到说 “Flutter 和 CMP 对于 Google 来说项目重叠的问题”&#xff0c;刚好可以放一起聊一聊。 最近写的几篇内容写的太干&…

uniapp 在线更新应用

在线更新应用及进度条显示 1.比较现安装手机中的apk 与线上apk的版本 getVersion(){var newVersionuni.getStorageSync("newVersion").split(".")var versionplus.runtime.version.split(".") // 获取手机安装的版本var versionNum""…

ICSE‘25 LLM Assistance for Memory Safety

不知道从什么时候开始&#xff0c;各大技术社区&#xff0c;技术群聊流行着 “用Rust重写!” &#xff0c;放一张图(笑死… 这不, 随着大模型技术的流行&#xff0c;大家都在探索如何让大模型自动完成仓库级别(全程序)的代码重构&#xff0c;代码变换&#xff08;Refactor&…

华为 Ascend 平台 YOLOv5 目标检测推理教程

1. 背景介绍 随着人工智能技术的快速发展&#xff0c;目标检测在智能安防、自动驾驶、工业检测等领域中扮演了重要角色。YOLOv5 是一种高效的目标检测模型&#xff0c;凭借其速度和精度的平衡广受欢迎。 华为 Ascend 推理框架&#xff08;ACL&#xff09;是 Ascend CANN 软件…

Linux MySQL离线安装

一、准备工作 1. 下载MySQL安装包 访问MySQL官方网站&#xff0c;选择适合您Linux系统的MySQL版本进行下载。通常推荐下载Generic Linux (glibc 2.12)版本的.tar.gz压缩包&#xff0c;例如mysql-8.0.33-linux-glibc2.12-x86_64.tar.xz。将下载好的安装包拷贝到Linux服务器的某…

TRTC实时对话式AI解决方案,助力人机语音交互极致体验

近年来&#xff0c;AI热度持续攀升&#xff0c;无论是融资规模还是用户热度都大幅增长。2023 年&#xff0c;中国 AI 行业融资规模达2631亿人民币&#xff0c;较2022年上升51%&#xff1b;2024年第二季度&#xff0c;全球 AI 初创企业融资规模为 240 亿美金&#xff0c;较第一季…

Android多语言开发自动化生成工具

在做 Android 开发的过程中&#xff0c;经常会遇到多语言开发的场景&#xff0c;尤其在车载项目中&#xff0c;多语言开发更为常见。对应多语言开发&#xff0c;通常都是在中文版本的基础上开发其他国家语言&#xff0c;这里我们会拿到中-外语言对照表&#xff0c;这里的工作难…

最新最详细的配置Node.js环境教程

配置Node.js环境 一、前言 &#xff08;一&#xff09;为什么要配置Node.js&#xff1f;&#xff08;二&#xff09;NPM生态是什么&#xff08;三&#xff09;Node和NPM的区别 二、如何配置Node.js环境 第一步、安装环境第二步、安装步骤第三步、验证安装第四步、修改全局模块…

Greenplum临时表未清除导致库龄过高处理

1.问题 Greenplum集群segment后台日志报错 2.回收库龄 master上执行 vacuumdb -F -d cxy vacuumdb -F -d template1 vacuumdb -F -d rptdb 3.回收完成后检查 仍然发现segment还是有库龄报警警告信息发出 4.检查 4.1 在master上检查库年龄 SELECT datname, datfrozen…

CAPL自动化测试

CAPL自动化测试 目录 CAPL自动化测试1. 引言2. 测试用例设计与实现2.1 测试用例设计2.2 测试用例实现3. 测试报告生成与分析3.1 测试报告生成3.2 测试报告分析4. 自动化测试框架搭建4.1 自动化测试框架设计4.2 自动化测试框架实现5. 案例说明5.1 案例1:测试用例设计与实现5.2 …

【go语言】map 和 list

一、map map 是一种无序的键值对的集合。 无序 &#xff1a;map[key]键值对&#xff1a;key - value map 最重要的一点是通过 key 来快速检索数据&#xff0c;key 类似于索引&#xff0c;指向数据的值。map 是一种集合&#xff0c;所以我们可以像迭代数组和切片那样迭代他。…

Python自动化运维:一键掌控服务器的高效之道

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 在互联网和云计算高速发展的今天,服务器数量的指数增长使得手动运维和管理变得异常繁琐。Python凭借其强大的可读性和丰富的生态系统,成为…

数据融合的经典模型:早期融合、中期融合与后期融合的对比

在多模态数据融合中&#xff0c;如何将不同模态&#xff08;如图像、文本、语音等&#xff09;的数据整合到一个统一的表示中&#xff0c;是至关重要的环节。不同的任务需求和数据特点决定了我们应该采用哪种融合策略&#xff0c;而早期融合、中期融合和后期融合是多模态数据处…

使用python调用JIRA6 进行OAuth1认证获取AccessToken

Jira配置应用程序链接 1) 创建应用程序链接 登录 JIRA 管理后台。转到 Administration > Applications > Application Links。在输入框中输入外部应用程序的 URL&#xff08;例如 GitLab 或自定义应用&#xff09;&#xff0c;然后点击 Create new link。 2) 配置 Con…

WPF基础 | WPF 基础概念全解析:布局、控件与事件

WPF基础 | WPF 基础概念全解析&#xff1a;布局、控件与事件 一、前言二、WPF 布局系统2.1 布局的重要性与基本原理2.2 常见布局面板2.3 布局的测量与排列过程 三、WPF 控件3.1 控件概述与分类3.2 常见控件的属性、方法与事件3.3 自定义控件 四、WPF 事件4.1 路由事件概述4.2 事…