flink的AggregateFunction,merge方法作用范围

背景

AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现

AggregateFunction.merge方法调用时机

AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
在这里插入图片描述

对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并

public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow =mergingWindows.addWindow(window,new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows,W stateWindowResult,Collection<W> mergedStateWindows)throws Exception {triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});

继续查看AbstractHeapMergingState.mergeNamespaces方法,

public void mergeNamespaces(N target, Collection<N> sources) throws Exception {if (sources == null || sources.isEmpty()) {return; // nothing to do}final StateTable<K, N, SV> map = stateTable;SV merged = null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState = map.removeAndGetOld(source);if (merged != null && sourceState != null) {//此处合并状态并调用AggregateFunction.merge方法merged = mergeState(merged, sourceState);} else if (merged == null) {merged = sourceState;}}// merge into the target, if neededif (merged != null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}

这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/133879.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据技术之集群数据迁移

文章目录 数据治理之集群迁移数据 数据治理之集群迁移数据 准备两套集群&#xff0c;我这使用apache集群和CDH集群。 启动集群 启动完毕后&#xff0c;将apache集群中&#xff0c;hive库里dwd,dws,ads三个库的数据迁移到CDH集群 在apache集群里hosts加上CDH Namenode对应域名并…

bff层解决了什么痛点

bff层 -- 服务于前端的后端 什么是bff&#xff1f; Backend For Frontend&#xff08;服务于前端的后端&#xff09;&#xff0c;也就是服务器设计API的时候会考虑前端的使用&#xff0c;并在服务端直接进行业务逻辑的处理&#xff0c;又称为用户体验适配器。BFF只是一种逻辑…

【hcie-cloud】【2】华为云Stack解决方案介绍、缩略语整理 【下】

文章目录 华为文档获取方式、云计算发展背景、坚实基座华为云Stack&#xff0c;政企只能升级首选智能数据湖仓一体&#xff0c;让业务洞见更准&#xff0c;价值兑现更快MRS&#xff1a;一个架构可构建三种数据湖&#xff0c;业务场景更丰富离线数据湖&#xff1a;提供云原生、湖…

服务器感染了.locked勒索病毒,如何确保数据文件完整恢复?

引言&#xff1a; 网络安全威胁的不断演变使得恶意软件如.locked勒索病毒成为当今数字时代的一大挑战。.locked勒索病毒能够加密您的文件&#xff0c;然后要求支付赎金以解锁它们。本文将深入探讨.locked勒索病毒的特点&#xff0c;以及如何应对感染&#xff0c;以及预防这种类…

3款免费又好用的 Docker 可视化管理工具

前言 Docker提供了命令行工具&#xff08;Docker CLI&#xff09;来管理Docker容器、镜像、网络和数据卷等Docker组件。我们也可以使用可视化管理工具来更方便地查看和管理Docker容器、镜像、网络和数据卷等Docker组件。今天我们来介绍3款免费且好用的 Docker 可视化管理工具。…

网络安全深入学习第八课——反向代理(工具:frp)

文章目录 一、实验环境二、实验要求三、开始模拟1、攻击机配置frp文件2、攻击拿下跳板机&#xff0c;并且上传frpc.ini、frpc.exe、frpc_full.ini文件3、把frps.ini、、frps.exe、frps_full.ini文件放到VPS主机上4、VPS机开启frp5、跳板机开启frp6、验证 一、实验环境 攻击机&…

云尘 命令执行系列

第一题 system <?php include "flag.php";if (isset($_POST[cmd])) {system($_POST[cmd]); }show_source(__FILE__);代码如上 system($_POST[cmd]); POST请求发送一个名为 cmd 的参数&#xff0c;然后将该参数的值传递给系统命令执行函数 system()&#xff0c…

NVIDIA大模型平台软件全家桶开启云智能第二曲线

第二曲线由英国管理思想大师查尔斯汉迪提出&#xff0c;讲的是在企业第一曲线达到巅峰的时候&#xff0c;找到驱动企业二次腾飞的第二曲线。而如果企业想实现基业长青&#xff0c;就需要通过主动式破局式创新&#xff0c;跨越到第二曲线中。对于当下的云智能产业以及基于云智能…

【电源专题】PSE如何与PD握手协商功率等级?

在文章:【电源专题】POE连接方式与功率等级划分 中我们讲到POE协议对不同的PD设备进行划分,比如根据不同的供电标准又可以细分成好几种不同的类型(Type1~Type4)和功率等级。 那么有这么多功率等级,PSE怎么知道PD是哪种类型呢?怎么能进行握手协商呢? 下图为PSE与PD设备在…

python 之生成器表达式,以及与列表推导式的区别

文章目录 生成器表达式基本结构示例生成一个简单的生成器遍历生成器并获取值使用条件过滤 优点 生成器表达式与列表推导式的区别1. 返回类型2. 生成方式3. 内存占用4. 访问方式示例总结 生成器表达式是一种在 Python 中用来创建生成器的高效方法。生成器表达式和列表推导式类似…

jstat虚拟机统计信息监控工具

jstat虚拟机统计信息监控工具 1、jstat&#xff08;JVM Statistics Monitorning Tool&#xff09; 用于监控虚拟机各种运行状态信息的命令行工具。 它可以显示本地或远程虚拟机进程中的类装载、内存、垃圾收集、JIT编译等运行数据&#xff0c;它是运行期定位虚拟机 性能问题…

数据结构——基于顺序表实现通讯录

一、. 基于动态顺序表实现通讯录 1.1 功能要求 1&#xff09;⾄少能够存储100个⼈的通讯信息 2&#xff09;能够保存⽤⼾信息&#xff1a;名字、性别、年龄、电话、地址等 3&#xff09;增加联系⼈信息 4&#xff09;删除指定联系⼈ 5&#xff09;查找制定联系⼈ 6&…

vuepress 打包后左侧菜单链接 404 问题解决办法

背景 上周看到一本开源书 《深入架构原理与实践》&#xff0c;是基于 vuepress 搭建的&#xff0c;下载了源码&#xff0c;本地部署了一下&#xff0c;本文记录如何打包该源码遇到的路径问题及思考。 结论&#xff1a; vuepress 插件的 sideBar 的菜单路径默认是相对 / 的&am…

FPGA高端项目:图像缩放+GTP+UDP架构,高速接口以太网视频传输,提供2套工程源码加QT上位机源码和技术支持

目录 1、前言免责声明本项目特点 2、相关方案推荐我这里已有的 GT 高速接口解决方案我这里已有的以太网方案我这里已有的图像处理方案 3、设计思路框架设计框图视频源选择ADV7611 解码芯片配置及采集动态彩条跨时钟FIFO图像缩放模块详解设计框图代码框图2种插值算法的整合与选择…

【Java 进阶篇】MVC 模式

欢迎来到本篇详细解释 MVC&#xff08;Model-View-Controller&#xff09;设计模式的教程。MVC 是一种用于组织应用程序的设计模式&#xff0c;有助于将应用程序分成不同的部分&#xff0c;以提高代码的可维护性和可扩展性。在本文中&#xff0c;我们将深入研究 MVC 模式&#…

SpringBoot整合定时任务遇到的多实例问题

唠嗑部分 是这样&#xff0c;前几日完善了定时任务的日志记录&#xff0c;今日切换了服务器&#xff0c;多部署了一个节点&#xff0c;使用nginx负载均衡&#xff0c;但是查看日志却发现了如下情况 那糟糕了&#xff0c;传说中的多实例问题出现了&#xff0c;今天我们就来聊聊…

虚幻引擎 5.1 中全新的增强型输入操作系统

教程链接 https://www.youtube.com/watch?vCYiHNbAIp4s 前提 虚幻引擎5.1之后&#xff0c;项目设置里的input选项&#xff0c;默认会有一条警告&#xff0c;告知旧的input系统已经不能用了。 做法 在content文件夹下新建一个input按钮 input文件夹里面分成两部分内容 1.…

【ARMv8 SIMD和浮点指令编程】浮点加减乘除指令——四则运算

浮点指令有专门的加减乘除四则运算指令,比如 FADD、FSUB、FMUL、FDIV 等。 1 FADD (scalar) 浮点加法(标量)。该指令将两个源 SIMD&FP 寄存器的浮点值相加,并将结果写入目标 SIMD&FP 寄存器。 该指令可以产生浮点异常。根据 FPCR 中的设置,异常会导致在 FPSR 中…

Pytest系列(16)- 分布式测试插件之pytest-xdist的详细使用

前言 平常我们功能测试用例非常多时&#xff0c;比如有1千条用例&#xff0c;假设每个用例执行需要1分钟&#xff0c;如果单个测试人员执行需要1000分钟才能跑完当项目非常紧急时&#xff0c;会需要协调多个测试资源来把任务分成两部分&#xff0c;于是执行时间缩短一半&#…

AVL树性质和实现

AVL树 AVL是两名俄罗斯数学家的名字&#xff0c;以此纪念 与二叉搜索树的区别 AVL树在二叉搜索树的基础上增加了新的限制&#xff1a;需要时刻保证每个树中每个结点的左右子树高度之差的绝对值不超过1 因此&#xff0c;当向树中插入新结点后&#xff0c;即可降低树的高度&…