flink-触发器Trigger和移除器Evictor

窗口原理与机制

图片链接:https://blog.csdn.net/qq_35590459/article/details/132177154

  1. 数据流进入算子前,被提交给WindowAssigner,决定元素被放到哪个或哪些窗口,同时可能会创建新窗口或者合并旧的窗口。
  2. 每一个窗口都拥有一个属于自己的触发器Trigger,每当有元素被分配到该窗口,或者之前注册的定时器超时时,Trigger都会被调用。
  3. Trigger被触发后,窗口中的元素集合就会交给Evictor(如果指定了),遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。
  4. 窗口函数计算结果值,发送给下游;

Trigger 触发器

触发器作用:控制窗口什么时候除法计算。即执行窗口函数;基于WindowStream调用trigger()方法,传入自定义触发器(trigger);

每一个窗口分配器(windowAssigner) 都会对应一个默认的触发器;

 源码样例

  @PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);}@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}==============默认触发器===
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}

Triger类有4个方法

  1.  onElement:窗口中每来一个元素调用该方法。
    onProcessingTime:当注册的处理时间定时器触发时,将调用这个方法。onEventTime:当注时的事件时间定时器触发时,将调用这个方法。clear:窗口关闭冰销毁时调用这个方法,一般用来清除自定义状态。onElement() ,onProcessingTime(),onEventTime()方法的返回类型都是 TriggerResult;TriggerResult中包含四个枚举值:
    CONTINUE:表示对窗口不执行任何操作。
    FIRE:触发计算并输出结果。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
    PURGE:表示将窗口中的数据和窗口清除。
    FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
    

源码

/** No action is taken on the window. */
CONTINUE(false, false),
/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
FIRE_AND_PURGE(true, true),
/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/
FIRE(true, false),
/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/
PURGE(false, true);

flink提供的触发器 

flink提供触发器

  • EventTimeTrigger通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessingTimeoutTrigger:当内置触发器满足设置的超时时间时,触发窗口的计算。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算,全局窗口触发器,

原文链接:https://blog.csdn.net/qq_37555071/article/details/122514061

水印触发一般是窗口关闭时间

flink提供的触发器是与窗口对应,当有水印时,如果水印时间大于等于窗口结束时间会触发计算;window.maxTimestamp()获取的是窗口end-1; EventTimeTrigger 的源码可以很明确可以看到注册时注册了触发时间为window.maxTimestamp(),这也是窗口关闭的触发机制。

如果在窗口关闭前触发计算设置多个触发计算时间,这样实现一些特定的需求。例如,每10s输出一次当天的累计数据;

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {// 限定触发条件为窗口关闭时间,否则就继续窗口 return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}
.....
 

自定义触发器

继承Triger,重写抽象方法,案例

.window(TumblingEventTimeWindows.of(Time.hours(24))).trigger(new MyTrigger()).process(new WindowResult()).print();窗口长24小时,每十秒触发一次计算
===================public static class MyTrigger extends Trigger<Event, TimeWindow> {@Overridepublic TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {//定义状态,记录该状态 触发器第一个元素进来时注册全部的触发器ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));//第一次注册,右面全部跳过if (isFirstEvent.value() == null) {for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 10000L) {//注册触发器  间隔10striggerContext.registerEventTimeTimer(i);}isFirstEvent.update(true);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {//使用的事件时间,因此触发窗口的计算return TriggerResult.FIRE;}@Overridepublic TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));isFirstEvent.clear();}}

移除器Evictor

作用:主要用来定义移除某些数据的逻辑。基于windowedStream调用evictor()方法,就可以传入一个自定义得移除器(Evictor)。不同窗口类型都有各自预测实现的移除器。

stream.keyby().window().evictor(new MyEvictor)

evictBefore():定义窗口执行函数之前移除的数据操作,移除后的数据不参与窗口计算;

evictAfter():定义执行窗口函数后移除数据的操作;

默认情况下预实现的移出弃都是在执行窗口函数之前移除数据

flink 提供的移除器

CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除; CountEvictor在countwindow中有明确定义引用。
DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。(暂时不清楚作用)
TimeEvictor:  接受窗口inteval时间,它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - inteval小的所有元素。TimeEvictor.of() 方法来构建; inteval 不是窗口时间,如果为0,窗口没有数据输出

//TimeEvictor  部分源码 @Overridepublic void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (!doEvictAfter) {evict(elements, size, ctx);}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (doEvictAfter) {evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime = getMaxTimestamp(elements);long evictCutoff = currentTime - windowSize;//移除时间窗口时间之前的数据,注意:获取的并不是窗口end时间for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}}
// 获取当前元素中最大的时间private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {long currentTime = Long.MIN_VALUE;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();currentTime = Math.max(currentTime, record.getTimestamp());}return currentTime;}// 保留多长时间的数据public static <W extends Window> TimeEvictor<W> of(Time windowSize) {return new TimeEvictor<>(windowSize.toMilliseconds());}/*** Creates a {@code TimeEvictor} that keeps the given number of elements. Eviction is done* before/after the window function based on the value of doEvictAfter.** @param windowSize The amount of time for which to keep elements.* @param doEvictAfter Whether eviction is done after window function.*/public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {return new TimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter);}
例如
stream.keyBy(r -> r.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.evictor(TimeEvictor.of(Time.seconds(3)))  // 只输出窗口关闭前3s的数据
.process( new WindowResult())
.print();

注意:如果在evict中使用了iterable.iterator(),后面再次使用时不能直接使用

 .keyBy(r -> r.user).window(TumblingEventTimeWindows.of(Time.seconds(10)));window.evictor(new Evictor<Event, TimeWindow>() {@Overridepublic void evictBefore(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {Iterator<TimestampedValue<Event>> iterator = elements.iterator();while (iterator.hasNext()){TimestampedValue<Event> next = iterator.next();if(next.getValue().url.equals("./prod?id=1")){iterator.remove();}}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {return;}}).process(new ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {AtomicInteger i= new AtomicInteger();elements.forEach(v-> i.getAndIncrement());out.collect(new UrlViewCount(s+"====",// 获取迭代器中的元素个数  不能再使用iterable.spliterator().getExactSizeIfKnown(),否侧获取数据一一直为-1i.longValue(),context.window().getStart(),context.window().getEnd()));} }).print();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/863543.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pc端多功能视频混剪工具/便携版打开即用

PC便携版 视频批量剪辑大师&#xff0c;全自动剪辑神器&#xff0c;会打字就能做视频 多功能&#xff0c;视频混剪&#xff0c;视频配音&#xff0c;文字生成语音&#xff0c;图片合成视频&#xff0c;自动识别音频并生成字幕等功能 链接&#xff1a;https://pan.baidu.com/…

文件操作与管理

程序经常需要访问文件和目录&#xff0c;读取文件信息或写入文件信息&#xff0c;在Python语言中对文件的读写是通过文件对象&#xff08;file object&#xff09;实现的。Python的文件对象也称为类似文件对象或流&#xff08;stream&#xff09;&#xff0c;因为Python提供一种…

<电力行业> - 《第9课:输电(二)》

4 输送电能流程 输送电能总共有&#xff1a;发电站→升压变压器→高压输电线→降压变压器→用电单位等五个流程。 电力工业初期&#xff0c;发电厂建在电力用户附近&#xff0c;直接向用户送电&#xff0c;所以那个时候只有发电和用电两个环节。 随着电力生产规模和负荷中心规…

烧结刚玉砂轮片 磨具用晶谷低温陶瓷结合剂玻璃粉

晶谷CBN 砂轮磨具用低温陶瓷结合剂玻璃粉的一些特点如下&#xff1a; - 软化点&#xff1a;通常为450~650度&#xff1b; - 膨胀系数&#xff1a;50~12010-7&#xff1b; - 粒径&#xff1a;300~3000目&#xff08;可按要求订做&#xff09;&#xff1b; - 外观颜色&#xff…

h5兼容table ,如何实现h5在app内使用h5渲染table表格而且实现横屏预览?

压图地址 横屏div 通过css 实现 transform: rotate(90deg); transformOrigin: 50vw 50vw ; height: 100vw; width: 100vh;<divclass"popup-box":style"{transform: originSet 0 ? rotate(90deg) : ,transformOrigin: originSet 0 ? 50vw 50vw : ,height…

GuLi商城-商品服务-API-三级分类-删除-逻辑删除

注意&#xff1a;官方文档说logic配置可以省略&#xff0c;代码中直观些&#xff0c;配上吧 逻辑删除注解&#xff1a; 实体类字段上加逻辑删除注解&#xff1a; 启动nacos&#xff1a; 启动商品服务&#xff1a; postman测试&#xff1a; 数据库字段值改成了0&#xff0c;说明…

Linux----> tail、cat、more、head、less的用法详解

1.tail命令&#xff1a;用于查看文件的最后几行内容。 基本用法&#xff1a;tail [选项] [文件] 常用选项&#xff1a; -n <行数>&#xff1a;显示最后的 <行数> 行。-f&#xff1a;实时显示文件新增内容&#xff0c;通常用于查看日志文件。 示例&#xff1a;…

数据恢复篇:如何在没有备份的情况下从恢复已删除的照片

许多用户更喜欢将他们的私人照片保存在他们的 Android 设备上的一个单独的安全空间中&#xff0c;以确保他们的记忆不仅被存储&#xff0c;而且受到保护。这就是“安全文件夹”功能派上用场的地方。您可以使用 PIN 码、密码、指纹或图案锁定此文件夹&#xff0c;即使您的设备落…

[小试牛刀-习题练]《计算机组成原理》之数据信息的表示、运算方法与运算器

【数据信息的表示运算方法与运算器】 1、【机器码转换】X-0.11111111&#xff0c;X的补码是 1.00000001 。 最高位符号位为负值&#xff1a; 反码法——绝对值按位取反末位加一&#xff0c;1.000000000.000000011.00000001扫描法——从右往左找到第一个为1的&#xff…

常用字符串方法<python>

导言 在python中内置了许多的字符串方法&#xff0c;使用字符串方法可以方便快捷解决很多问题&#xff0c;所以本文将要介绍一些常用的字符串方法。 目录 导言 string.center(width[,fillchar]) string.capitalize() string.count(sub[,start[,end]]) string.join(iterabl…

ffmpeg编码图象时报错Invalid buffer size, packet size * < expected frame_size *

使用ffmpeg将单个yuv文件编码转为jpg或其他图像格式时&#xff0c;报错&#xff1a; Truncating packet of size 11985408 to 3585 [rawvideo 0x1bd5390] Packet corrupt (stream 0, dts 1). image_3264_2448_0.yuv: corrupt input packet in stream 0 [rawvideo 0x1bd7c60…

在本地和Linux之间传输文件

1.打开本地的cmd窗口 2. 然后按这个链接的说法在cmd中远程连接Linux&#xff08;技术|如何在 Linux 中使用 sFTP 上传或下载文件与文件夹&#xff09; 3. 看这个链接里面的sftp命令进行本地和Linux之间的文件互传 &#xff08;https://www.cnblogs.com/niuben/p/13324099.htm…

【嵌入式CLion】进阶调试——WSL下的Linux体验

说明&#xff1a; 1&#xff0c;这里所指的嵌入式其实是指嵌入式微控制器MCU&#xff0c;即单片机 2&#xff0c;万事开头难&#xff0c;本文目前提供了WSL工具链的搭建&#xff0c;后面会持续更新 一、启用RTOS集成 在搭建WSL工具链之前&#xff0c;先讲一下集成的RTOS功能&a…

D-MAX纠偏软件Fife MAX Terminal软件MAX-Oi软件

D-MAX纠偏软件Fife MAX Terminal软件MAX-Oi软件

SpringBoot的自动配置核心原理及拓展点

Spring Boot 的核心原理几个关键点 约定优于配置&#xff1a; Spring Boot 遵循约定优于配置的理念&#xff0c;通过预定义的约定&#xff0c;大大简化了 Spring 应用程序的配置和部署。例如&#xff0c;它自动配置了许多常见的开发任务&#xff08;如数据库连接、Web 服务器配…

通过Python脚本实现字符画

效果 讲解&#xff1a; 用于将3D视图的帧缓冲区转换为字符画&#xff0c;并将字符画输出到文本编辑器中。 首先&#xff0c;获取当前绑定的帧缓冲区、视口信息和视图像素。 然后&#xff0c;将像素矩阵转化为字符串&#xff0c;并将字符串写入到文本编辑器中。 设置文本编辑…

bmob Harmony快速开发手机号一键登录功能

最近用Bmob的鸿蒙SDK尝试了Harmony开发&#xff0c;做了一个几乎每个应用都会有的功能&#xff1a;手机号码短信验证码一键注册登录的功能&#xff0c;感觉简直爽的不要不要的&#xff0c;ArkUI可见即可得的UI交互设计体验&#xff0c;配合Bmob后端云一如既往简单易用的风格&am…

使用Perplexity打造产品的27种方式

ChatGPT和Perplexity等聊天机器人正迅速成为产品经理的首选助手。以下是一份全面的指南&#xff0c;介绍PM如何在日常工作中使用Perplexity&#xff0c;该指南基于300多份回复和30次电话后的总结。 理解并制定增长战略&#xff1a;例如&#xff0c;解释增长会计的基本原理&…

【Vue】——组件之间数据的传递

&#x1f4bb;博主现有专栏&#xff1a; C51单片机&#xff08;STC89C516&#xff09;&#xff0c;c语言&#xff0c;c&#xff0c;离散数学&#xff0c;算法设计与分析&#xff0c;数据结构&#xff0c;Python&#xff0c;Java基础&#xff0c;MySQL&#xff0c;linux&#xf…

【proteus经典实战】16X192点阵程序

一、简介 6X192点阵程序通常用于表示高分辨率图像或文字&#xff0c;其中16X表示像素阵列的宽度&#xff0c;192表示每个像素阵列中的点阵数&#xff0c;16X192点阵程序需要一定的编程知识和技能才能编写和调试&#xff0c;同时还需要考虑硬件设备的兼容性和性能等因素。 初始…