聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

一、触发器(Trigger)

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。

1.1 Flink中预置的Trigger

窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

1.2 Trigger的抽象类

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • canMerge() 方法判断是否可以合并。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • clear() 方法处理在对应窗口被移除时所需的逻辑。

触发器接口的源码如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;/*** Called for every element that gets added to a pane. The result of this will determine whether* the pane is evaluated to emit results.** @param element The element that arrived.* @param timestamp The timestamp of the element that arrived.* @param window The window to which the element is being added.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)throws Exception;/*** Called when a processing-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception;/*** Called when an event-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)throws Exception;/*** Returns true if this trigger supports merging of trigger state and can therefore be used with* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.** <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,* OnMergeContext)}*/public boolean canMerge() {return false;}/*** Called when several windows have been merged into one window by the {@link* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.** @param window The new window that results from the merge.* @param ctx A context object that can be used to register timer callbacks and access state.*/public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}/*** Clears any state that the trigger might still hold for the given window. This is called when* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.*/public abstract void clear(W window, TriggerContext ctx) throws Exception;// ------------------------------------------------------------------------/*** A context object that is given to {@link Trigger} methods to allow them to register timer* callbacks and deal with state.*/public interface TriggerContext {// ...}/*** Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,* OnMergeContext)}.*/public interface OnMergeContext extends TriggerContext {<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);}
}

关于上述方法,需要注意三件事:

(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:

  • CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
  • FIRE:触发窗口计算,但是保留窗口元素。
  • PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
  • FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

源码如下:

public enum TriggerResult {// 不触发,也不删除元素CONTINUE(false, false),// 触发窗口,窗口出发后删除窗口中的元素FIRE_AND_PURGE(true, true),// 触发窗口,但是保留窗口元素FIRE(true, false),// 不触发窗口,丢弃窗口,并且删除窗口的元素PURGE(false, true);// ------------------------------------------------------------------------private final boolean fire;private final boolean purge;TriggerResult(boolean fire, boolean purge) {this.purge = purge;this.fire = fire;}public boolean isFire() {return fire;}public boolean isPurge() {return purge;}
}

(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。

1.3 ProcessingTimeTrigger源码分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/** Creates a new trigger that fires once system time passes the end of the window. */public static ProcessingTimeTrigger create() {return new ProcessingTimeTrigger();}
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。

EventTimeTriggerr在onElement设置的定时器:

在这里插入图片描述
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。

二、移除器(Evictor)

2.1 Evictor扮演的角色
在这里插入图片描述
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:

  • Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
  • Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
  • 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
  • WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。

Evictor接口定义如下:

在这里插入图片描述
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。

窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。

注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。

2.2 Flink内置的Evitor

  • CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
  • DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
  • TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) {// 小于最大数量,不做处理return;} else {int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {// 移除前size - maxCount个元素,只剩下最后maxCount个元素iterator.remove();}}}
}

2.2.2 DeltaEvictor

DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {// 获取最后一个元素TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();// 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较// 若计算结果大于threshold值或者是相等,则该元素会被移除if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}

2.2.3 TimeEvictor

TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:

在这里插入图片描述
TimeEvictor的代码实现如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {// 如果element没有timestamp,直接返回if (!hasTimestamp(elements)) {return;}// 获取elements中最大的时间戳(到来最晚的元素的时间)long currentTime = getMaxTimestamp(elements);// 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();// 清除所有时间戳小于截止时间的元素if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/61478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用Python做数据分析环境搭建及工具使用(Jupyter)

目录 一、Anaconda下载、安装 二、Jupyter 打开 三、Jupyter 常用快捷键 3.1 创建控制台 3.2 命令行模式下的快捷键 3.3 运行模式下快捷键 3.4 代码模式和笔记模式 3.5 编写Python代码 一、Anaconda下载、安装 【最新最全】Anaconda安装python环境_anaconda配置python…

基于51单片机的电子秤设计

本设计以STC89C52RC芯片作为主要的控制芯片&#xff1b;通过电阻应变式传感器实现物品的测量功能&#xff1b;通过HX711型A/D转换器完成模拟信号到数字信号之间的转换&#xff1b;矩阵按键实现单片机复位、物品单价输入等系列操作&#xff1b;LCD1602液晶显示屏可以实现测量结果…

原子类、AtomicLong、AtomicReference、AtomicIntegerFieldUpdater、LongAdder

原子类 JDK提供的原子类&#xff0c;即Atomic*类有很多&#xff0c;大体可做如下分类&#xff1a; 形式类别举例Atomic*基本类型原子类AtomicInteger、AtomicLong、AtomicBooleanAtomic*Array数组类型原子类AtomicIntegerArray、AtomicLongArray、AtomicReferenceArrayAtomic…

ClickHouse数据迁移(远程)

一、背景 公司最近买了新的服务器&#xff0c;旧的服务器上面安装了ClickHouse22.2.2.1&#xff0c;新的服务器上面安装了ClickHouse24.9.2.42&#xff0c;两个版本之间要做历史数据迁移 旧服务器&#xff1a;80(IP最后一段&#xff0c;以下代称)&#xff0c;ClickHouse版本&am…

Spring Boot日志总结

文章目录 1.我们的日志2.日志的作用3.使用日志对象打印日志4.日志框架介绍5.深入理解门面模式(外观模式)6.日志格式的说明7.日志级别7.1日志级别分类7.2配置文件添加日志级别 8.日志持久化9.日志文件的拆分9.1官方文档9.2IDEA演示文件分割 10.日志格式的配置11.更简单的日志输入…

「Qt Widget中文示例指南」如何为窗口实现流程布局?(二)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 本文将展示如何为不…

阿里云服务器(centos7.6)部署前后端分离项目(MAC环境)

mysql安装和部署 下载前准备 确定一下系统的glibc版本&#xff0c;可以使用以下命令进行查看&#xff0c;当前系统glibc版本&#xff1a;2.17 rpm -qa | grep glibclinux系统会自动携带一个数据库&#xff0c;需要把它给卸载掉&#xff0c;通过以下代码可以查看mariadb 并卸…

道路机器人识别交通灯,马路,左右转,黄线,人行道,机器人等路面导航标志识别-使用YOLO标记

数据集分割 train组66% 268图片 validation集22% 91图片 test集12&#xff05; 48图片 预处理 没有采用任何预处理步骤。 增强 未应用任何增强。 数据集图片&#xff1a; 交通灯 马路 右转 向右掉头 机器人识别 人行横道 黄线 直行或右转 数据集下载&#xff1a; 道路…

偏差-方差权衡(Bias–Variance Tradeoff):理解监督学习中的核心问题

偏差-方差权衡&#xff08;Bias–Variance Tradeoff&#xff09;&#xff1a;理解监督学习中的核心问题 在机器学习中&#xff0c;我们希望构建一个能够在训练数据上表现良好&#xff0c;同时对未见数据也具有强大泛化能力的模型。然而&#xff0c;模型的误差&#xff08;尤其…

Linux服务器安装mongodb

因为项目需要做评论功能&#xff0c;领导要求使用mongodb&#xff0c;所以趁机多学习一下。 在服务器我们使用docker安装mongodb 1、拉取mongodb镜像 docker pull mongo &#xff08;默认拉取最新的镜像&#xff09; 如果你想指定版本可以这样 docker pull mongo:4.4&#…

STM32 使用ARM Compiler V6 编译裸机 LWIP协议栈报错的解决方法

在lwip 的cc.h 中使用以下宏定义&#xff0c;来兼容 V5 和 V6编译器 #if defined (__ARMCC_VERSION) && (__ARMCC_VERSION > 6010050) /* ARM Compiler V6 */ #define __CC_ARM /* when use v6 compiler define this */ #endifV6编译的速度确实比V5块了好多倍。 …

使用ESP32通过Arduino IDE点亮1.8寸TFT显示屏

开发板选择 本次使用开发板模块丝印为ESP32-WROOM-32E 开发板库选择 Arduino IDE上型号选择为ESP32-WROOM-DA Module 显示屏选择 使用显示屏为8针SPI接口显示屏 驱动IC为ST7735S 使用库 使用三个Arduino平台库 分别是 Adafruit_GFXAdafruit_ST7735SPI 代码详解 首…

图像显示的是矩阵的行和列,修改为坐标范围。

x 3; y 3; f1x x^2 y^2; guance1 f1x; F (x, y) sqrt((x.^2 y.^2 - guance1).^2); % 使用点乘 [x, y] meshgrid(0:1:5, 0:1:5); Z F(x, y); figure; imagesc(Z); % 由于 imagesc 使用矩阵索引作为坐标&#xff0c;我们需要手动添加刻度 % 这里我们假设 x 和 y 的范围…

【K230 CanMV】图像识别-摄像头获取图像 Sensor 函数全解析

引言&#xff1a;随着图像处理技术的不断发展&#xff0c;摄像头在嵌入式系统中的应用越来越广泛&#xff0c;尤其是在智能监控、自动驾驶、机器人视觉等领域。K230作为一款高性能的嵌入式处理器&#xff0c;提供了强大的图像处理能力&#xff0c;支持多种类型的摄像头接入与图…

基于FPGA的FM调制(载波频率、频偏、峰值、DAC输出)-带仿真文件-上板验证正确

基于FPGA的FM调制-带仿真文件-上板验证正确 前言一、FM调制储备知识载波频率频偏峰值个人理解 二、代码分析1.模块分析2.波形分析 总结 前言 FM、AM等调制是学习FPGA信号处理一个比较好的小项目&#xff0c;通过学习FM调制过程熟悉信号处理的一个简单流程&#xff0c;进而熟悉…

论文笔记(五十九)A survey of robot manipulation in contact

A survey of robot manipulation in contact 文章概括摘要1. 引言解释柔顺性控制的概念&#xff1a;应用实例&#xff1a; 2. 需要接触操控的任务2.1 环境塑造2.2 工件对齐2.3 关节运动2.4 双臂接触操控 3. 接触操控中的控制3.1 力控制3.2 阻抗控制3.3 顺应控制 4. 接触操控中的…

拥抱 OpenTelemetry:阿里云 Java Agent 演进实践

作者&#xff1a;陈承 背景 在 2018 年的 2 月&#xff0c;ARMS Java Agent 的第一个版本正式发布&#xff0c;为用户提供无侵入的的可观测数据采集服务。6 年后的今天&#xff0c;随着软件技术的迅猛发展、业务场景的逐渐丰富、用户规模的快速增长&#xff0c;我们逐渐发现过…

学习ASP.NET Core的身份认证(基于Session的身份认证3)

开源博客项目Blog中提供了另一种访问控制方式&#xff0c;其基于自定义类及函数的特性类控制访问权限。本文学习并测试开源博客项目Blog的访问控制方式&#xff0c;测试程序中直接复用开源博客项目Blog中的相关类及接口定义&#xff0c;并在其上调整判断逻辑。   首先是接口A…

电子应用设计方案-31:智能AI音响系统方案设计

智能 AI 音响系统方案设计 一、引言 智能 AI 音响作为一种新兴的智能家居设备&#xff0c;通过融合语音识别、自然语言处理、音频播放等技术&#xff0c;为用户提供便捷的语音交互服务和高品质的音乐体验。本方案旨在设计一款功能强大、性能稳定、用户体验良好的智能 AI 音响系…

可变电阻和电位器

1.可变电阻和电位器 &#xff08;1&#xff09;可变电阻&#xff1a;阻值可以调整的电阻。 &#xff08;2&#xff09;电位器&#xff1a;为了获得某个电位&#xff08;电势、电压&#xff09;的器件。其本质就是在一个固定阻值的电阻中间增 加一个触点&#xff0c;滑动电阻的中…