Flink 窗口触发器Triggers

Triggers

定义:触发器决定了窗口何时被触发。在Flink中,窗口的触发是通过设置定时器来实现的。

作用:控制窗口数据的聚合时机,确保数据在适当的时间点被处理和输出。

Trigger关键方法

onElement: 当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。

onElement(T element, long timestamp, W window, TriggerContext ctx);

onEventTime:当事件时间计时器触发时调用,用于处理事件时间相关的触发逻辑。

onEventTime(long time, W window, TriggerContext ctx);

onProcessingTime :当处理时间计时器触发时调用,这里时间指机器处理时间,而不考虑时间本身的时间。见后文ProcessingTimeTrigger实现

onProcessingTime(long time, W window, TriggerContext ctx);

clear 当窗口被删除时调用,用于清理窗口的状态和定时器。

clear(W window, TriggerContext ctx);

内置Trigger

Flink提供了多种内置的触发器,以下为几种常用类型:

  • EventTimeTrigger 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。

  • ProcessingTimeTrigger 工作原理:基于处理时长(即机器的系统时间)来触发窗口计算。当处理时间达到窗口的结束时间时,触发窗口计算。

    @Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}
  • CountTrigger 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。

  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);count.add(1L);if (count.get() >= maxCount) {count.clear();return TriggerResult.FIRE;}return TriggerResult.CONTINUE;
  • ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处理时间)时触发计算。适用场景:适用于需要周期性处理数据的场景,如实时监控、周期性报表等。

       if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());}

  • DeltaTrigger 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。适用场景:适用于需要基于数据变化量进行处理的场景,如异常检测、趋势分析等。

    if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {lastElementState.update(element);return TriggerResult.FIRE;}

自定义一个Trigger

实现一个CountTrigger 窗口元素数量达到阈值时,触发计算

package com.codetonight.datastream.trigger;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;  
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;  public class CountTrigger<T> extends Trigger<T, GlobalWindow> {  private final long countThreshold;  private long count = 0L;  public CountTrigger(long countThreshold) {  this.countThreshold = countThreshold;  }  @Override  public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {  count++;  if (count >= countThreshold) {  // 触发窗口并清除计数器count = 0;return TriggerResult.FIRE_AND_PURGE;}  return TriggerResult.CONTINUE;  }@Overridepublic TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}// 其他方法(onEventTime, onProcessingTime, onMerge, clear)可以留空或实现特定的逻辑  @Override  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {count = 0L;  }  }
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;  
import org.apache.flink.streaming.api.windowing.windows.Window;  public class FlinkGlobalWindowExample {  public static void main(String[] args) throws Exception {  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  DataStream<Long> source = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);  // 应用全局窗口和自定义触发器  DataStream<Long> result = source.keyBy(value -> 1).windowAll(GlobalWindows.create()).trigger(new CountTrigger<>(5)) // 当接收到5个元素时触发.reduce(new ReduceFunction<Long>() {  @Override  public Long reduce(Long value1, Long value2) {  return value1 + value2;  }  });  // 打印结果  result.print();  // 执行作业  env.execute("Flink Global Window Example");  }  
}

Evictor

Flink 的窗口模型允许在指定 WindowAssigner 和 Trigger 之外,还可以选择性地指定一个 Evictor。

Evictor 的功能是在触发器触发后,且在窗口函数应用之前和/或之后,从窗口中移除元素。为了实现这一功能,Evictor 接口定义了两个方法:

public interface Evictor<T, W extends Window> extends Serializable {void evictBefore(Iterable<TimestampedValue<T>> elements,int size,W window,EvictorContext evictorContext);void evictAfter(Iterable<TimestampedValue<T>> elements,int size,W window,EvictorContext evictorContext);}

通过这两个方法,Evictor 提供了在窗口生命周期中灵活控制元素保留与移除的能力。

内置Evictor

这些 Evictor 可以单独使用,也可以与 Flink 的 WindowAssigner 和 Trigger 一起使用, 以创建复杂而强大的窗口处理逻辑。通过灵活组合这些组件, Flink 用户可以处理各种实时数据流场景,包括滑动窗口、滚动窗口、会话窗口等。

CountEvictor: 功能:保留窗口中用户指定的元素数量,并从窗口缓冲区的开头丢弃剩余的元素。应用场景:当你只需要保留窗口中最新的 N 个元素时,这个 Evictor 非常有用。


DeltaEvictor: 移除逻辑代码比较清晰:

  1. 取窗口最后一个元素lastElement

  2. 所有元素与lastElement 比较计算出差值( Delta )

  3. 差值( Delta ) 超过阈值则移除

DeltaFunction用于计算两个元素之间的差值

    private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<T> element = iterator.next();if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())>= this.threshold) {iterator.remove();}}}

TimeEvictor: 功能:基于时间戳来移除窗口中的元素。它接受一个时间间隔(以毫秒为单位),对于给定的窗口,它会找到元素中的最大时间戳 max_ts,并移除所有时间戳小于 max_ts 减去指定时间间隔的元素。应用场景:当你希望基于时间戳来过滤窗口中的旧元素时,这个 Evictor 非常有用。

TimeEvictor evcit 方法代码逻辑,方法命名很清晰。

  1. 取窗口元素最大的时间戳 currentTime,

  2. 保留的时间戳阈值evictCutoff = currentTime -windowSize

  3. 循环遍历移除不在evictCutoff 之前的元素

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime = getMaxTimestamp(elements);long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/56865.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

幂律分布笔记

一、幂律分布的数据拟合 数据分箱&#xff1a; 所谓分箱就是对原始数据进行分组&#xff0c;然后对每一组内的数据进行平滑处理。常见的分箱方式主要有等深分箱、等宽分箱、用户自定义等 对数分箱&#xff1a; 对原数据进行分箱&#xff0c;第i个箱的宽度为bi&#xff0c;b…

客户案例 | Ansys与台积电和微软合作加速光子仿真

Ansys与台积电和微软展开合作&#xff0c;将硅光子器件的仿真和分析速度提高10倍以上 主要亮点 借助使用NVIDIA图形处理单元&#xff08;GPU&#xff09;的Microsoft Azure虚拟机&#xff0c;Ansys Lumerical™ FDTD 3D电磁仿真的光子器件仿真速度实现了10倍提升 凭借Azure云…

CRMEB标准版Mysql修改sql_mode

数据库配置 1.宝塔控制面板-软件商店-MySql-设置 2.点击配置修改&#xff0c;查找sql-mode或sql_mode &#xff08;可使用CtrlF快捷查找&#xff09; 3.复制 NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION 然后替换粘贴&#xff0c;保存 注&#xff1a;MySQL8.0版本的 第三步用…

Vulkan 开发(三):Vulkan 物理设备

Vulkan 物理设备 图片来自《 Vulkan 应用开发指南》 上一节了解了 Vulkan 实例&#xff0c;一旦有了实例&#xff0c;就可以查找系统里安装的与 Vulkan 兼容的物理设备。 Vulkan 物理设备&#xff08;PhysicalDevice&#xff09;一般是指支持 Vulkan 的物理硬件&#xff0c;通…

基于模型设计的智能平衡移动机器人-基础实验eCAP

目录 eCAP基本介绍 捕捉模式或者是APWM模式 捕获模块功能 CCS中打开模型 eCAP基本介绍 TMS320F28069的捕获单元模块能够捕获外部输入引脚的逻辑状态&#xff08;电平的高或低、电平翻转时的上升沿或下降沿&#xff09;&#xff0c;并利用内部定时器对外部事件或者引脚状态变…

关于网络接口监测工具ifstat命令的功能详解以及Linux下lsof命令的使用详解

一、关于网络接口监测工具ifstat命令的应用 ifstat工具是个网络接口监测工具,比较简单看网络流量&#xff0c;像VMSTAT那样一行行显示着浏览信息&#xff0c;可以设置显示某个或所有网卡流量数据。ifstat默认不监控回环接口&#xff0c;显示的流量单位是KB。系统默认未安装&…

【目标检测---旋转框标注】roLabelImg安装与使用

在目标检测领域&#xff0c;数据标注是至关重要的一环。为了提升模型的准确率和泛化能力&#xff0c;高质量的标注数据集是必不可少的。而roLabelImg作为一款专门用于标注旋转框的工具&#xff0c;为处理复杂场景下的目标检测提供了极大的便利。本文将详细介绍roLabelImg的安装…

电力变压器故障诊断数据集(猫脸码客 第219期)

电力变压器故障诊断数据集 电力变压器作为电力系统中不可或缺的重要设备&#xff0c;其稳定性和可靠性直接关系到整个电网的安全运行。然而&#xff0c;由于运行环境复杂、负载多变以及设备老化等因素&#xff0c;变压器在运行过程中难免会出现各种故障。这些故障若不能及时发…

【解决Docker无剩余存储磁盘空间问题】

【解决Docker无剩余存储磁盘空间问题】 目录 【解决Docker无剩余存储磁盘空间问题】一、问题概述二、问题原因三、解决方案1、方案一&#xff1a;清除Docker磁盘空间2、方案二&#xff1a;更换Docker磁盘存储目录 一、问题概述 执行Docker build -t [镜像名] [源目录] 命令报错…

基于Neo4j的推理知识图谱展示:智能系统与图谱可视化

还在找毕业设计项目吗&#xff1f;试试我们基于Neo4j打造的推理知识图谱展示系统&#xff01;这是一个兼具前沿技术与实战经验积累的项目&#xff0c;完美适合作为你的毕业设计。 &#x1f449; 什么是知识图谱&#xff1f; 简单来说&#xff0c;它是通过连接的节点&#xff0…

线性代数基础02

目录 1.向量 1.1向量的定义 1.2向量的运算 1.2.1向量加法 1.2.2向量数乘 1.2.3向量点积 1.3矩阵的特征值和特征向量 1.4向量的模 1.4.1向量的模的定义 1.4.2向量的模的几何解释 1.4.3向量的模的性质 1.5向量的内积 1.5.1向量的内积的定义 1.5.2向量的内积的几何解…

STMicroelectronics 意法半导体芯片选型表

意法半导体作为全球知名的半导体厂商&#xff0c;其产品广泛应用于各个领域&#xff0c;从消费电子到工业控制&#xff0c;从汽车电子到通信设备&#xff0c;都能看到意法半导体芯片的身影。在电子硬件设计领域&#xff0c;芯片的选型至关重要。亿配芯城&#xff08;ICgoodFind…

WPF常见容器全方位介绍

Windows Presentation Foundation (WPF) 是微软的一种用于构建Windows桌面应用程序的UI框架。WPF的布局系统基于容器&#xff0c;帮助开发者以灵活、响应的方式组织用户界面 (UI) 元素。本篇文章将详细介绍WPF中几种常见的容器&#xff0c;包括Grid、StackPanel、WrapPanel、Do…

Aspose.PDF功能演示:使用 JavaScript 从 PDF 中提取文本

在数据提取、业务文档自动化和文本挖掘方面&#xff0c;使用 JavaScript 从PDF中提取文本非常有用。它允许开发人员自动执行从 PDF 收集信息的过程&#xff0c;从而显著提高处理大量文档的生产力和效率。在这篇博文中&#xff0c;我们将学习如何使用 JavaScript 从 PDF 中提取文…

python-django-mysql原生sql增删改查搭建搭建web项目

先看我本地的项目结构 1 设置虚拟环境 python -m venv venv .\venv\Scripts\activate 2 在虚拟环境中安装Django 执行 pip install -r requirements.txt asgiref3.8.1 backports.zoneinfo0.2.1 Django3.2 mysqlclient2.2.4 pytz2024.2 sqlparse0.5.1 typing-extensions4.1…

数据结构--二叉树随记

二叉树主要分为四类&#xff1a;满二叉树、完全二叉树、二叉搜索树、平衡二叉搜索树。 高度,深度,层 满二叉树 满二叉树就是每一层节点都是满的&#xff0c;整棵树像一个正三角形&#xff1a; 满二叉树有个优势&#xff0c;就是它的节点个数很好算。假设深度为 h&#xff0c;那…

讲一讲Redis五大数据类型的底层实现

讲一讲Redis五大数据类型的底层实现 Redis五大数据类型的底层实现 Redis的五大数据类型分别是字符串&#xff08;String&#xff09;、列表&#xff08;List&#xff09;、哈希&#xff08;Hash&#xff09;、集合&#xff08;Set&#xff09;和有序集合&#xff08;Zset&…

Fake Location 限制解除(运动世界校园,keep......)

一觉起来成绩还是正常的&#xff0c;运动世界校园的审核是非常严格的&#xff0c;因为在这之前&#xff0c;我帮助同学登入别的账号进行跑步&#xff0c;发现过来几天全被检测到了异常,成绩也是直接无效了哈&#xff0c;我们今天再搞一个关于keep的&#xff0c;因为当时关于kee…

pikachu靶场SSRF-curl测试报告

目录 一、测试环境 1、系统环境 2、使用工具/软件 二、测试目的 三、操作过程 1、实现ssrf攻击 四、源代码分析 五、结论 一、测试环境 1、系统环境 渗透机&#xff1a;本机(127.0.0.1) 靶 机&#xff1a;本机(127.0.0.1) 2、使用工具/软件 测试网址&#xff1a;…

DNS 与 ICMP

DNS(Domain Name System)快速了解 DNS 是一整套从域名映射到 IP 的系统 DNS 背景 TCP/IP 中使用 IP 地址和端口号来确定网络上的一台主机的一个程序. 但是 IP 地址不方便记忆 于是人们发明了一种叫主机名的东西, 是一个字符串, 并且使用 hosts 文件来描述主机 名和 IP 地…