【Flink系列四】Window及Watermark

3.1、window

在 Flink 中 Window 可以将无限流切分成有限流,是处理有限流的核心组件,现在 Flink 中 Window 可以是时间驱动的(Time Window),也可以是数据驱动的(Count Window)。

Flink中的窗口可以分成:滚动窗口(Tumbling Window,无重叠),滑动窗口(Sliding Window,可能有重叠),会话窗口(Session Window,活动间隙),全局窗口(Gobal Window)

3.1.1、Tumbling Windows 滚动窗口

滚动窗口的assigner分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

// 滚动event-time窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动processing-time窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.second(5))).<windowed transformation>(<window function>);// 长度为一天的滚动event-time窗口, 偏移量为-8小时
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset, 你会得到 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。

3.1.2、Sliding Windows滑动窗口

滑动窗口的assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(滑动步长window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

3.1.3、Session Windows 会话窗口

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

// 设置了固定间隔的event-time会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的event-time会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element)-> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔}))

3.1.4、Global Windows 全局窗口

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
3.1.5、Triggers窗口触发

Trigger决定了一个窗口(由window assigner定义)何时可以被window function处理。一般来说,watermark的时间戳>=window endTime并且在窗口内有数据,就会触发窗口的计算。每个WindowAssigner都有一个默认的Trigger。如果默认trigger无法满足需求,可以在trigger(...)调用中指定自定义的trigger。

  • onElement() 每次往 window 增加一个元素的时候都会触发
  • onEventTime() 当 event-time timer 被触发的时候会调用
  • onProcessingTime() 当 processing-time timer 被触发的时候会调用
  • onMerge() 对两个 trigger 的 state 进行 merge 操作
  • clear() window 销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

  • CONTINUE 不做任何事情
  • FIRE 触发 window
  • PURGE 清空整个 window 的元素并销毁窗口
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口

3.2、time和watermark

3.2.1、time

在 Flink 中 Time 可以分为三种Event-Time,Processing-Time 以及 Ingestion-Time,三者的关系我们可以从下图中得知:

3.2.2、watermark

Flink提出了watermark,专门处理EventTime窗口计算,其本质其实就是一个时间戳。因为对于迟到数据late element,不可能一直无限期等待,必须有一个机制来保证一个特定的时间后,必须取触发window去进行计算,这种机制就是watermark

watermark本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。 Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。 Watermark的产生和Apache Flink内部处理逻辑如下图所示: 

目前Apache Flink 有两种生产Watermark的方式,如下:

  • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
  • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

参阅:Apache Flink 漫谈系列(03) - Watermark-阿里云开发者社区

我们可以考虑一个这样的例子:某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。那我们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了所有的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示所有时间戳不大于 t 的数据都已经到来了,未来小于等于t的数据不会再来,因此可以放心地触发和销毁窗口了。下图中给了一个乱序数据流中的 watermark 例子

3.2.3、迟到的数据

上面的 watermark 让我们能够应对乱序的数据,但是真实世界中我们没法得到一个完美的 watermark 数值 — 要么没法获取到,要么耗费太大,因此实际工作中我们会使用近似 watermark — 生成 watermark(t) 之后,还有较小的概率接受到时间戳 t 之前的数据,在 Flink 中将这些数据定义为 “late elements”, 同样我们可以在 window 中指定是允许延迟的最大时间(默认为 0),可以使用下面的代码进行设置

设置allowedLateness之后,迟来的数据同样可以触发窗口,进行输出,利用 Flink 的 side output 机制,我们可以获取到这些迟到的数据,使用方式如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/209669.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c jpeg YUV图片帧分割成 8*8 块 ,与逆向把8*8还原为帧

1. 正向分割为若干8*8 块 下面的程序为通用程序&#xff0c;可以分割任意块 #include <stdlib.h> #include <string.h> #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h>…

如果微软20年前开发.net core,JAVA会不会和IE一样倒下了

可以跨平台&#xff0c;大量类库&#xff0c;微软亲自操刀&#xff0c;性能一流&#xff0c;因为没有做跨平台&#xff0c;.NET被 python,javascript等抢了一半以上市场。 如果微软早早的推出类似.net core这样的跨平台语言&#xff0c;.net程序猿还会出在这样的尴尬局面吗众所…

Java基础-开发流程以及HelloWorld程序

目录 1. Java的开发流程2. HelloWorld 1. Java的开发流程 开发Java程序&#xff0c;需要三个步骤&#xff1a;编写代码&#xff0c;编译代码&#xff0c;运行代码 2. HelloWorld 编写代码 public class HelloWorld {public static void main(String[] args) {System.out.pri…

Ribbon 饥饿加载

Ribbon默认是采用懒加载&#xff0c;即第一次访问时才会去创建LoadBalanceClient&#xff0c;请求时间会很长而饥饿加载则会在项目启动时创建&#xff0c;降低第一次访问的耗时&#xff0c;通过下面配置开启饥饿加载: 一、懒加载 Ribbon 默认为懒加载即在首次启动Application…

代码随想录二刷 |二叉树 | 二叉树的层序遍历

代码随想录二刷 &#xff5c;二叉树 &#xff5c; 二叉树的层序遍历 题目描述解题思路代码实现 题目描述 102.二叉树的层序遍历 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例…

Flask 最佳实践(一)

Flask是一个轻量级而强大的Python Web框架&#xff0c;它的简洁性和灵活性使其成为许多开发者的首选。然而&#xff0c;为了确保项目的可维护性和可扩展性&#xff0c;我们需要遵循一些最佳实践。本文将探讨Flask中一些关键的最佳实践。 1. 项目结构 构建一个清晰的项目结构是…

Java实现Socket聊天室

一、网络编程是什么&#xff1f; 在网络通信协议下&#xff0c;不同计算机上运行的程序&#xff0c;进行数据传输。 应用场景&#xff1a;即时通讯、网游对战、金融证券、国际贸易、邮件、等等。 不管是什么场景&#xff0c;都是计算机与计算机之间通过网络进行数据传输。 …

软件测试之接口测试自动化(详解版)

本着以和大家交流如何实现高效的接口测试为出发点&#xff0c;本文包含了我在接口测试领域的一些方法和心得&#xff0c;希望大家一起讨论和分享&#xff0c;内容包括但不仅限于&#xff1a; 服务端接口测试介绍接口测试自动化介绍接口测试自动化实践关于接口测试自动化的思考…

质量工程化,交付快速化

质量和速度之间权衡让人很难取舍&#xff0c;而通过推进质量工程&#xff0c;以系统化的方式识别和优化系统痛点&#xff0c;可以帮助团队构建既快又好的精益软件生产系统。原文: Quality Engineered, Speed Delivered 所有人都想要更快的速度。 但需要解决复杂问题: 权衡质量会…

Kotlin(十四) 扩展函数和运算符重载

目录 扩展函数 语法结构 代码示例 运算符重载 语法结构 一元操作符 二元操作符 数值类型操作符 等于和不等于操作符 比较操作符 调用操作符 扩展函数 语法结构 对于扩张函数的语法结构其实很简单&#xff0c;你想在那个类中添加扩张函数&#xff0c;那么你就用该类…

6. Zigzag Conversion

按照下标找规律注意leetcode的运行输出&#xff0c;如果其中一组用例出现死循环&#xff0c;输出结果会在一个文件&#xff0c;即部分测试用例正确&#xff0c;部分错误且出现死循环&#xff0c;则需辨别输出结果属于哪一份测试用例 class Solution { public:string convert(s…

(二)五种最新算法(SWO、COA、LSO、GRO、LO)求解无人机路径规划MATLAB

一、五种算法&#xff08;SWO、COA、LSO、GRO、LO&#xff09;简介 1、蜘蛛蜂优化算法SWO 蜘蛛蜂优化算法&#xff08;Spider wasp optimizer&#xff0c;SWO&#xff09;由Mohamed Abdel-Basset等人于2023年提出&#xff0c;该算法模型雌性蜘蛛蜂的狩猎、筑巢和交配行为&…

w3school学习笔记3(NumPy)

系列文章目录 文章目录 系列文章目录前言一、NumPy简介二、NumPy入门三、NumPy创建四、NumPy数组索引五、NumPy数组裁切六、NumPy数据类型七、NumPy副本/视图八、NumPy数据形状九、NumPy数组重塑十、NumPy数组迭代总结 前言 一、NumPy简介 1、什么是Numpy&#xff1f; NumPy是…

线上盲盒小程序,开启互联网盲盒时代

近年来&#xff0c;盲盒经济在国内非常火爆&#xff0c;各类盲盒品牌层出不穷&#xff0c;深受国内外年轻人、消费者的喜爱。 目前&#xff0c;根据数据显示&#xff0c;盲盒市场不仅在线下异常火热&#xff0c;线上盲盒也是成为了大众的新选择。各类电商平台中盲盒的成交额更…

Esxi7Esxi8设置VMFSL虚拟闪存的大小

Esxi7Esxi8设置VMFSL虚拟闪存的大小 ESXi7,8 默认安装会分配一个 VMFSL(VMFS-L)(Local VMFS)很大空间(120G), 感觉很浪费, 实际给 8G 就可以了, 最少 6G , 经实验,给2G没法安装 . Esxi7是虚拟闪存的 修改的方法是: 在安装时修改 设置 autoPartitionOSDataSize8192 在cdromBoo…

快捷切换raw页面到repo页面-Raw2Repo插件

Raw2Repo By Rick &#x1f4d6;快捷切换代码托管平台raw页面到repo页面 &#x1f517;github链接 https://github.com/rickhqh/Raw2Repo ✨Features 功能&#xff1a; ✅单击 Raw2Repo 插件按钮&#xff0c;即可跳转到相应的代码仓库页面。✅支持 GitHub、Gitee、GitCode …

spring boot整合mybatis进行部门管理管理的增删改查

部门列表查询&#xff1a; 功能实现&#xff1a; 需求&#xff1a;查询数据库表中的所有部门数据&#xff0c;展示在页面上。 准备工作&#xff1a; 准备数据库表dept&#xff08;部门表&#xff09;&#xff0c;实体类Dept。在项目中引入mybatis的起步依赖&#xff0c;mysql的…

【ET8】1.ET8入门-运行指南

主要学习网址 论坛地址为&#xff1a;https://et-framework.cn Git地址为&#xff1a;GitHub - egametang/ET: Unity3D Client And C# Server Framework 官方QQ群 : 474643097 项目检出 检出项目切换到release8.0分支 GitHub地址&#xff1a;GitHub - egametang/ET: Unity…

[足式机器人]Part2 Dr. CAN学习笔记-数学基础Ch0-5Laplace Transform of Convolution卷积的拉普拉斯变换

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记-数学基础Ch0-5Laplace Transform of Convolution卷积的拉普拉斯变换 Laplace Transform : X ( s ) L [ x ( t ) ] ∫ 0 ∞ x ( t ) e − s t d t X\left( s \right) \mathcal{L} \left[ x\lef…

基于Swin_Transformer的图像超分辨率系统

1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义 随着科技的不断发展&#xff0c;图像超分辨率技术在计算机视觉领域中变得越来越重要。图像超分辨率是指通过使用计算机算法将低分辨率图像转换为高分辨率图像的过…