Flink之窗口触发机制及自定义Trigger的使用

1 窗口触发机制

窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下:
  • Trigger
  • ProcessingTimeoutTrigger
  • EventTimeTrigger
  • CountTrigger
  • DeltaTrigger
  • NeverTrigger in GlobalWindows
  • ContinuousEventTimeTrigger
  • PurgingTrigger
  • ContinuousProcessingTimeTrigger
  • ProcessingTimeTrigger
通常情况下是不需要自己重写Trigger的,使用Flink内置的就可以,除非特殊业务特殊需求.
1.1 源码解析

EventTimeTrigger源码说明如何触发窗口计算,在EventTimeTrigger源码中只需要关注onElementonEventTime两个方法即可,源码内容如下:

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}// 基于数据驱动的方法@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {// 判断当前watermark是否大于等于窗口的最大时间if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 如果大于等于窗口的最大时间触发计算return TriggerResult.FIRE;} else {// 小于窗口的最大时间首先注册定时器ctx.registerEventTimeTimer(window.maxTimestamp());// 然后等待数据继续输入,不触发计算return TriggerResult.CONTINUE;}}// 基于事件时间定时器驱动的方法@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {// 根据不断发送来的watermark判断是否触发计算return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}// ...
}

源码中将不需要的关注的代码都已省略

  • onElement

    注释中写明这个方法是基于数据进行驱动的,也就是说只有数据到达时才会执行这个方法,每一个窗口都有自己的startTimeendTime,也就是窗口的范围,判断条件中window.maxTimestamp()获取的就是当前窗口的endTime,如果当前watermark超出当前窗口的endTime就会触发这个窗口计算,TriggerResult.FIRE表示的就是窗口开始计算,如果当前watermark小于endTime就不会触发窗口计算这个窗口会继续等待数据输入,也就是TriggerResult.CONTINUE方法.

  • onEventTime

    onElement是由数据驱动的,但是Flink的实际数据处理过程是存在没有数据发送到当前窗口,但是会有watermark源源不断的发送到当前窗口的情况,在多并行度的执行条件下就会发生这种情况.在onEventTime方法中如果上游发送过来的watermark等于当前窗口的endTime就会执行TriggerResult.FIRE否则还是执行TriggerResult.CONTINUE.

Trigger的触发机制就是这样,其他的CountTrigger等大致逻辑基本是一样的,了解清楚源码中这两个方法的作用很容易理解.

1.2 代码实现

通常Flink内置的Trigger都可以满足数据处理需求,往往在实际开发中可能会存在特殊的业务需求,这时用户可以自定义Trigger,以达到控制窗口触发计算的规则. 可以仿照EventTimeTrigger来构建一个自定义Trigger,只需要将其中的部分代码简单进行修改,并在onElement方法中添加自定的触发逻辑即可.
  • 自定义Trigger

    /*** 这里首先需要继承Trigger类,并将<Object, TimeWindow>中的Object修改成自己需要的数据类型,这段代码中需要根据UserEvent2中的数据* 来控制触发窗口计算的条件,所以将Object修改成UserEvent2**/ 
    public class CustomTrigger extends Trigger<UserEvent2, TimeWindow> {public CustomTrigger() {}// 通过修改onElement方法中窗口计算的触发逻辑实现自定义方式@Overridepublic TriggerResult onElement(// 这里也要将原有的Object类型修改成上面的UserEvent2UserEvent2 element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {// 原有的判断逻辑不动,这个是为了便捷,判断逻辑可以根据实际需求进行修改,或者如同下面中添加一个新的触发逻辑if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;// 这里增加一个判断逻辑,当用户行为时间为2700的时候也触发计算} else if (element.getTime().equals("2700")) {return TriggerResult.FIRE;// 原有的判断逻辑不动} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {ctx.registerEventTimeTimer(windowMaxTimestamp);}}// 将toString中俄返回值根据用户的需要进行修改@Overridepublic String toString() {return "CustomTrigger()";}// 将返回值更改成创建的自定义Trigger类public static CustomTrigger create() {return new CustomTrigger();}
    }
    
  • 业务代码

    // ...
    SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设置滚动窗口大小为10s.trigger(new CustomTrigger()) // 传入自定义的Trigger类.allowedLateness(Time.seconds(2)) // 允许迟到数据迟到时间2s,同watermark中的forBoundedOutOfOrderness功能类似.sideOutputLateData(lateData) // 将迟到数据进行测流输出.max("time");// 获取用户行为发生事件最大的这条数据
    // ...
    

    上面这段业务代码中设置的滚动窗口的大小为10s,正常来说只有满足end - start = 10000的时候才会触发窗口计算,但是在自定义Trigger中指定了当数据中时间为2700的时候也触发窗口计算,在时间为2700的数据没到达时候还会按照原有的逻辑触发窗口计算,但是只要2700的数据到达,不管时候达到TumblingEventTimeWindows.of(Time.seconds(10))这个条件,都会触发窗口计算.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/112437.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LuatOS-SOC接口文档(air780E)-- ir - 红外遥控

ir.sendNEC(pin, addr, cmd, repeat, disablePWM)# 发送NEC数据 参数 传入值类型 解释 int 使用的GPIO引脚编号 int 用户码&#xff08;大于0xff则采用Extended NEC模式&#xff09; int 数据码 int 可选&#xff0c;引导码发送次数&#xff08;110ms一次&#xff0…

Vue3.0里为什么要用 Proxy API 替代 defineProperty API ?

一、Object.defineProperty 定义&#xff1a;Object.defineProperty() 方法会直接在一个对象上定义一个新属性&#xff0c;或者修改一个对象的现有属性&#xff0c;并返回此对象 为什么能实现响应式 通过defineProperty 两个属性&#xff0c;get及set get 属性的 getter 函…

Swift使用Embassy库进行数据采集:热点新闻自动生成器

概述 爬虫程序是一种可以自动从网页上抓取数据的软件。爬虫程序可以用于各种目的&#xff0c;例如搜索引擎、数据分析、内容聚合等。本文将介绍如何使用Swift语言和Embassy库编写一个简单的爬虫程序&#xff0c;该程序可以从新闻网站上采集热点信息&#xff0c;并生成一个简单…

GCC优化相关

文章目录 优化选项博文链接 单独设置某段代码优化等级博文链接 优化选项 -O/-O0:无优化(默认)-O1:使用能减少目标文件大小以及执行时间并且不会使编译时间明显增加的优化。该模式在编译大型程序的时候会花费更多的时间和内存。在-O1 下&#xff0c;编译会尝试减少代码体积和代码…

Sarscape5.6版本中导入外部控制点、写入精密轨道文件与GACOS用于大气相位

SARscape中导入外部GCP点用于轨道精炼 https://www.cnblogs.com/enviidl/p/16524645.html在SAR处理时&#xff0c;有时会加入GCP点文件&#xff0c;SAR处理中用到的控制点分为两类&#xff1a;用于校正地理位置的几何控制点&#xff08;Geometry GCP&#xff09;和用于轨道精炼…

多测师肖sir_高级金牌讲师___ui自动化之selenium001

一、认识selenium &#xff08;1&#xff09;selenium是什么&#xff1f; a、selenium是python中的一个第三方库 b、Selenium是一个应用于web应用程序的测试工具&#xff0c;支持多平台&#xff0c;多浏览器&#xff0c;多语言去实现ui自动化测试&#xff0c;我们现在讲的Sel…

Atlassian Confluence OGNL表达式注入RCE CVE-2021-26084

影响版本 All 4.x.x versions All 5.x.x versions All 6.0.x versions All 6.1.x versions All 6.2.x versions All 6.3.x versions All 6.4.x versions All 6.5.x versions All 6.6.x versions All 6.7.x versions All 6.8.x versions All 6.9.x versions All 6.1…

Android之播放本地视频和Url视频方法

一、播放本地视频文件 根据文件路径在浏览器中播放&#xff0c;可用于视频预览等场景 效果&#xff1a; 用浏览器播放本地视频 文件路径例子&#xff1a; /storage/emulated/0/Android/data/com.custom.jfrb/files/Movies/1697687179497.mp4 File file new File("文件…

RK3568笔记四:基于TensorFlow花卉图像分类部署

若该文为原创文章&#xff0c;转载请注明原文出处。 基于正点原子的ATK-DLRK3568部署测试。 花卉图像分类任务&#xff0c;使用使用 tf.keras.Sequential 模型&#xff0c;简单构建模型&#xff0c;然后转换成 RKNN 模型部署到ATK-DLRK3568板子上。 在 PC 使用 Windows 系统…

使用telegram机器人发送通知

文章目录 背景1 创建机器人2 与机器人的会话3 调用API让机器人发送消息 背景 在训练深度学习模型时&#xff0c;除了粗略估计外&#xff0c;很难预测训练何时结束。此外&#xff0c;我们可能还想随时随地查看训练情况&#xff0c;如果每次都需要登录回服务器的话并不方便。因此…

Kubernetes与Docker和Containerd是个什么关系

文章目录 小结描述实例参考 小结 Kubernetes 在不停地迭代演进&#xff0c;Kubernetes停止使用Docker做为Container Runtime&#xff0c;改为Containerd或者CRI-O等与与Container Runtime Interface (CRI)更兼容的Container Runtime&#xff0c;进行了小结。 容器组&#xff…

wordpress网站部署了ssl证书之后就排版混乱了

刚给自己的小网站部署了SSL证书&#xff0c;之后就发现https访问主页竟然乱套了。在手机上访问却是正常的。 直接上解决方案&#xff1a; 编辑网站根目录下的wp-config.php文件 在自定义文本处添加以下代码&#xff1a; if ($_SERVER[HTTP_X_FORWARDED_PROTO] https) $_SE…

PHP-FFMpeg 操作音视频

✨ 目录 &#x1f388; 安装PHP-FFMpeg&#x1f388; 视频中提取一张图片&#x1f388; 视频中提取多张图片&#x1f388; 调整视频大小&#x1f388; 视频添加水印&#x1f388; 生成音频波形&#x1f388; 音频转换&#x1f388; 给音频添加元数据&#x1f388; 拼接多个音视…

利用ArcGIS获取每一个冰川的中心位置经纬度坐标:要素转点和要素折点转点的区别

问题概述&#xff1a;下图是天山地区的冰川的分布&#xff0c;我们可以看到每一条冰川是一个面要素&#xff0c;要求得到每一个冰川&#xff08;面要素&#xff09;的中心经纬度坐标。 1.采用要素转点功能 选择工具箱的【数据管理工具】-【要素】-【要素转点】。完成之后再采用…

计算机基础知识36

数据库数据的演变史 ATM&#xff1a;1. 把数据都存在了文件中&#xff0c;文件名不规范 kevin|123 kevin123 kevin*123 2. 存储数据的文件越来越多&#xff0c;放在db文件夹&#xff0c;占用空间&#xff0c;查询存储不方便&#xff0c;速度慢 # 数据库软件能解…

lnmp架构部署Discuz论坛并配置重定向转发

lnmp架构部署Discuz论坛并配置重定向转发 文章目录 lnmp架构部署Discuz论坛并配置重定向转发环境说明部署Discuz论坛系统下载Discuz论坛系统代码包&#xff0c;官网地址如下&#xff1a;部署Discuz论坛系统步骤&#xff1a;解压安装Discuz源码包配置虚拟主机进入Discuz安装界面…

Janus: 逆向思维,以数据为中心的MoE训练范式

文章链接&#xff1a;Janus: A Unified Distributed Training Framework for Sparse Mixture-of-Experts Models 发表会议: ACM SIGCOMM 2023 (计算机网络顶会) 目录 1.背景介绍all-to-allData-centric Paradigm 2.内容摘要关键技术Janus细粒度任务调度拓扑感知优先级策略预取…

Android推送问题排查

针对MobPush智能推送服务在使用过程中可能出现的问题&#xff0c;本文为各位开发者们带来了针对MobPush安卓端推送问题的解决办法。 TCP在线推送排查 排查TCP在线收不到推送时&#xff0c;我们先通过客户端的RegistrationId接口获取设备的唯一标识 示例&#xff1a; MobPush…

修改 Stable Diffusion 使 api 接口增加模型参数

参考&#xff1a;https://zhuanlan.zhihu.com/p/644545784 1、修改 modules/api/models.py 中的 StableDiffusionTxt2ImgProcessingAPI 增加模型名称 StableDiffusionTxt2ImgProcessingAPI PydanticModelGenerator("StableDiffusionProcessingTxt2Img",StableDiff…

代码随想录算法训练营第五十六天 | 1143.最长公共子序列、1035.不相交的线 、53. 最大子序和 动态规划

1143.最长公共子序列 视频讲解&#xff1a;动态规划子序列问题经典题目 | LeetCode&#xff1a;1143.最长公共子序列_哔哩哔哩_bilibili 代码随想录 &#xff08;1&#xff09;代码 1035.不相交的线 视频讲解&#xff1a;动态规划之子序列问题&#xff0c;换汤不换药 | Leet…