聊聊Flink:Flink中的时间语义和Watermark详解

该篇主要讲Flink中的时间语义、Flink 水印机制以及Flink对乱序数据的三重保障。

一、Flink的三种时间语义

在这里插入图片描述
1.1 Event Time

Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。

使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果:无论我们是将同一个程序部署在不同的计算环境还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,以确认不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。在流处理领域,比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。

一个基于Event Time的Flink程序中必须定义Event Time,以及如何生成Watermark。我们可以使用元素中自带的时间,也可以在元素到达Flink后人为给Event Time赋值。

使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。

1.2 Processing Time

对于某个算子来说,Processing Time指算子使用当前机器的系统时钟来定义时间。在Processing Time的时间窗口场景下,无论事件什么时候发生,只要该事件在某个时间段达到了某个算子,就会被归结到该窗口下,不需要Watermark机制。对于一个程序在同一个计算环境来说,每个算子都有一定的耗时,同一个事件的Processing Time,第n个算子和第n+1个算子不同。如果一个程序在不同的集群和环境下执行时,限于软硬件因素,不同环境下前序算子处理速度不同,对于下游算子来说,事件的Processing Time也会不同,不同环境下时间窗口的计算结果会发生变化。因此,Processing Time在时间窗口下的计算会有不确定性。

Processing Time只依赖当前执行机器的系统时钟,不需要依赖Watermark,无需缓存。Processing Time是实现起来非常简单也是延迟最小的一种时间语义。

1.3 Ingestion Time

Ingestion Time是事件到达Flink Souce的时间。从Source到下游各个算子中间可能有很多计算环节,任何一个算子的处理速度快慢可能影响到下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间,因此不会被算子处理速度影响。

Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。比起Event Time,Ingestion Time可以不需要设置复杂的Watermark,因此也不需要太多缓存,延迟较低。比起Processing Time,Ingestion Time的时间是Souce赋值的,一个事件在整个处理过程从头至尾都使用这个时间,而且后续算子不受前序算子处理速度的影响,计算结果相对准确一些,但计算成本稍高。

注:Ingestion Time1.13 版本已经不再提了,这也是为啥官网的图没看到Ingestion Time的原因。目前推荐Event Time的时间语义。

1.4 Flink如何设置时间域

调用 setStreamTimeCharacteristic 设置时间域,枚举类 TimeCharacteristic 预设了三种时间域,不显式设置的情况下,默认使用 TimeCharacteristic.EventTime(1.12 版本以前默认是 TimeCharacteristic.ProcessingTime)。

env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //过期方法

在 1.12 以后版本默认是使用 EventTime,如果要显示使用 ProcessingTime,可以关闭 watermark(自动生成 watermark 的间隔设置为 0),设置

env.getConfig().setAutoWatermarkInterval(0);

二、Flink 水印机制

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,为了保证计算结果的正确性,需要让窗口等待延迟数据到达后再进行计算,但是不能无限期地等待下去,必须有一种机制来确定何时触发窗口计算,这种机制就是水印(Watermark)。

稍稍总结一下水位线的引入原因:

  • 分布式系统的网络传输的不确定性;
  • 数据是乱序的;
  • 支持事件时间的流处理器需要一种测量事件时间进度的方法,用以正确的处理窗口等操作;

水位线的物理意义有两点:

  • 水位线本质是一个基于数据生成的、单调递增的时间戳;
  • 水位线 W(t)表示当前数据流中的所有 t 时刻前的数据都已经到了。

水印是一种用于衡量事件时间进度的机制,其表示某个时刻(事件时间)以前的数据将不再产生,因此水印指的是一个时间点。水印作为数据流的一部分流动,并带有时间戳t。t表示该流中不应再有时间戳小于等于t的元素(即时间戳早于或等于水印的事件)。

下图显示了带有时间戳和嵌入式水印的事件流,事件是按顺序排列的(相对于其时间戳),这意味着水印只是流中的周期性标记。
在这里插入图片描述
水印对于乱序流至关重要,如下图所示,其中事件不是按其时间戳排序的。通常,水印是数据流中一个点的声明,表示水印之前的所有事件都应该到达。一旦水印到达算子,算子则认为某个时间周期,所有事件已经被收到,不会再有更多符合条件的事件。
在这里插入图片描述
水印是直接通过Source Function生成的或在后续的DataStream API中生成的。在实际的流计算中,一个作业往往会同时处理多个源的数据,多个源的数据按照key分组后进行Shuffle处理,数据会汇聚到同一个处理节点。而每个并行子任务通常独立生成水印,这样就容易导致汇聚到一起的水印不是单调递增的。对于这种情况,Flink会选择所有流入的水印中事件时间最小的一个发往下游,如下图所示。

在这里插入图片描述

多个流的水印流入算子后,由于当前算子也有自己的水印,因此算子会综合计算得出最终水印,计算规则为:取多个流中事件时间最小的水印与当前算子的水印进行对比,如果大于当前算子水印,则更新当前算子水印,并发往下游。例如抽象类AbstractStreamOperator中的源码如下:

在这里插入图片描述

三、分布式环境下Watermark的传播

在实际计算过程中,Flink的算子一般分布在多个并行的分区(或者称为实例)上,Flink需要将Watermark在并行环境下向前传播。如下图所示,Flink的每个并行算子子任务会维护针对该子任务的Event Time时钟,这个时钟记录了这个算子子任务Watermark处理进度,随着上游Watermark数据不断向下发送,算子子任务的Event Time时钟也要不断向前更新。由于上游各分区的处理速度不同,到达当前算子的Watermark也会有先后快慢之分,每个算子子任务会维护来自上游不同分区的Watermark信息,这是一个列表,列表内对应上游算子各分区的Watermark时间戳等信息。
在这里插入图片描述
当上游某分区有Watermark进入该算子子任务后,Flink先判断新流入的Watermark时间戳是否大于Partition Watermark列表内记录的该分区的历史Watermark时间戳,如果新流入的更大,则更新该分区的Watermark。例如,某个分区新流入的Watermark时间戳为4,算子子任务维护的该分区Watermark为1,那么Flink会更新Partition Watermark列表为最新的时间戳4。接着,Flink会遍历Partition Watermark列表中的所有时间戳,选择最小的一个作为该算子子任务的Event Time。同时,Flink会将更新的Event Time作为Watermark发送给下游所有算子子任务。算子子任务Event Time的更新意味着该子任务将时间推进到了这个时间,该时间之前的事件已经被处理并发送到下游。例如,图中第二步和第三步,Partition Watermark列表更新后,导致列表中最小时间戳发生了变化,算子子任务的Event Time时钟也相应进行了更新。整个过程完成了数据流中的Watermark推动算子子任务Watermark的时钟更新过程。Watermark像一个幕后推动者,不断将流处理系统的Event Time向前推进。我们可以将这种机制总结为:

  • Flink某算子子任务根据各上游流入的Watermark来更新Partition Watermark列表。
  • 选取Partition Watermark列表中最小的时间作为该算子的Event Time,并将这个时间发送给下游算子。

这样的设计机制满足了并行环境下Watermark在各算子中的传播问题,但是假如某个上游分区的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯独个别分区的时间停留在很早的某个时间,这会导致算子的Event Time时钟不更新,相应的时间窗口计算也不会被触发,大量的数据积压在算子内部得不到处理,整个流处理处于空转状态。这种问题可能出现在使用数据流自带的Watermark,自带的Watermark在某些分区下没有及时更新。针对这种问题,一种解决办法是根据机器当前的时钟周期性地生成Watermark。

此外,在union等多数据流处理时,Flink也使用上述Watermark更新机制,那就意味着,多个数据流的时间必须对齐,如果一方的Watermark时间较老,那整个应用的Event Time时钟也会使用这个较老的时间,其他数据流的数据会被积压。一旦发现某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle设置该数据流为空闲状态。

四、Flink对乱序数据的三重保障

我们思考一个问题:怎样避免乱序数据带来计算不正确性?

常用的解决办法是:当最大的事件时间maxEventTime达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。

但是,我们应该等待多久的时间呢?由于网络、分布式等原因造成的延时,一般大多数迟到的数据都会在最近一段时间到来,这个最近一段时间一般是毫秒级的,Watermark就是做到了这样的保障。还有很少的一部分数据会迟到很久,我们可以通过allowedLateness和sideOutputLateData来兜底。

处理乱序数据,三重保证机制:

3.1 Watermark
能够保证迟到很短的时间的数据到来后(一般是迟到毫秒级别内的数据,最大不超过1s),触发窗口关闭并输出。(即能够hold住短时间内迟到的数据)

3.2 allowedLateness
allowedLateness(lateness: Time):设置允许的延迟时间,默认为0,该方法仅对事件时间窗口有效。在水印通过窗口结尾后(即水印>=窗口结束时间),该方法指定的允许延迟时间才开始生效。该延迟时间与水印指定的允许延迟时间不冲突,相当于在水印延迟时间的基础上进行累加。落入该方法指定的允许延迟时间范围内的元素可能会导致窗口再次触发(例如EventTimeTrigger)。为了使这些元素正常被计算,Flink会保持窗口的状态,直到允许的延迟过期为止。一旦延迟过期,Flink将删除该窗口并删除其状态。

3.3 sideOutputLateData
sideOutputLateData(outputTag: OutputTag[T]):将延迟到达的数据保存到outputTag对象中,OutputTag是一种类型化的命名标签,用于标记算子的侧道输出,单独收集延迟数据。后面可通过DataStream的getSideOutput(outputTag)方法得到被丢弃数据组成的数据流。

当指定的允许延迟大于0时,在水印通过窗口结尾后,将保留窗口及其内容。在这种情况下,当一个迟到但未被丢弃的元素到达时,它可能会导致该窗口的另一次触发。这次触发称为延迟触发,因为是由延迟事件触发的,与主触发(即窗口的第一次触发)相反。对于会话窗口,后期触发会进一步导致窗口合并,因为可能缩小两个预先存在的未合并窗口之间的间隙。当使用全局窗口时,没有数据是延迟的,因为全局窗口的结束时间戳是Long.MAX_VALUE。

注意:

后期触发的元素应更新先前计算的结果,即数据流将包含同一计算的多个结果。根据你的应用程序,需要考虑这些重复的结果或对它们进行重复数据删除。

在水印的基础上设置允许延迟机制后,数据可以延迟的时间范围是多少?在只设置了水印的情况下,如果满足当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间,则触发窗口计算,发射计算结果并销毁窗口。在水印的基础上设置了允许延迟机制后,如果满足当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间(水印指定的),则触发窗口计算,发射计算结果,但不会销毁窗口,窗口会保留计算状态并继续等待延迟数据;每条延迟数据到达后,如果落入窗口内,都会再次触发窗口计算,更新计算状态,发射出最新计算结果,直到满足条件:当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间,则关闭并销毁窗口。此后到达的延迟数据,由于窗口已经关闭,数据将进入侧道输出流进行单独存放,后期根据业务单独处理即可。

指定允许延迟时间可以使用如下代码片段:

在这里插入图片描述
使用Flink的侧道输出机制可以获得一个后来被丢弃的数据组成的数据流。使用时首先需要使用sideOutputLateData(OutputTag)方法指定要在窗口化流上获取后期数据。然后可以使用getSideOutput(lateOutputTag)方法得到后期数据组成的数据流,代码如下:

在这里插入图片描述
为了更好地理解允许延迟和侧道输出机制,假设有乱序数据按照ABCDEFG的顺序依次到达Flink应用程序,并且设置了水印允许的最大延迟时间为3分钟,在水印的基础上又通过allowedLateness(Time.minutes(3))方法设置了允许的延迟时间为3分钟,使用sideOutputLateData(lateOutputTag)方法设置侧道输出,如下图所示。

在这里插入图片描述
当数据A到达时,由于窗口开始时间<=数据A的事件时间<窗口结束时间,因此数据A落入窗口内。

当数据B到达时,由于其事件时间>=窗口结束时间,因此数据B不属于该窗口。此时Watermark=进入Flink的当前最大事件时间‒允许的最大延迟时间=9:11‒3分钟=9:08。水印在窗口内,不会触发窗口计算。

当数据C到达时,由于窗口开始时间<=数据C的事件时间<窗口结束时间,因此数据C落入窗口内。

当数据D到达时,由于其事件时间>=窗口结束时间,因此数据D不属于该窗口。此时Watermark=进入Flink的当前最大事件时间‒允许的最大延迟时间=9:15‒3(分钟)=9:12>=窗口结束时间。水印在窗口外,触发窗口计算并发射计算结果。由于设置了允许延迟机制的延迟时间为3分钟,此时的窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间=9:10+3(分钟)+3(分钟)=9:16>9:15(进入Flink的当前最大事件时间),不满足窗口关闭的条件,因此窗口会继续等待延迟数据,并保留计算状态(此处的计算状态指的就是计算结果,例如窗口内数据的聚合结果)。

当数据E到达时,由于进入Flink的当前最大事件时间没有改变,窗口不会关闭,而是继续等待。窗口开始时间<=数据E的事件时间<窗口结束时间,因此数据E落入窗口内,并触发窗口计算,与上次计算的结果进行合并,发射出新的计算结果,如下图所示。

在这里插入图片描述
当数据F到达时,此时的窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间=9:10+3(分钟)+3(分钟)=9:16<=9:16(进入Flink的当前最大事件时间),满足窗口关闭的条件,因此窗口会关闭并销毁。

当数据G到达时,窗口开始时间<=数据G的事件时间<窗口结束时间,但是窗口已经关闭了,因此数据G将进入侧道输出流进行单独存放。通过侧道输出API可从侧道输出流中取出延迟严重的数据进行相应的业务处理。

这样分析下来应该豁然开朗了吧?如果还有啥疑问欢迎和我一起交流,我们下一篇再见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61182.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS基础选择器与div布局

基础选择器一 全局选择器 可以与任何元素匹配&#xff0c;优先级最低&#xff0c;不推荐使用 *{margin: 0;padding: 0;}元素选择器 HTML文档中的元素&#xff0c;p、b、div、a、img、body等。 标签选择器&#xff0c;选择的是页面上所有这种类型的标签&#xff0c;所以经常…

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…

ODC 如何精确呈现SQL耗时 | OceanBase 开发者工具解析

前言 在程序员或DBA的日常工作中&#xff0c;编写并执行SQL语句如同日常饮食中的一餐一饭&#xff0c;再寻常不过。然而&#xff0c;在使用命令行或黑屏客户端处理SQL时&#xff0c;常会遇到编写难、错误排查缓慢以及查询结果可读性不佳等难题&#xff0c;因此&#xff0c;图形…

华为USG5500防火墙配置NAT

实验要求&#xff1a; 1.按照拓扑图部署网络环境&#xff0c;使用USG5500防火墙&#xff0c;将防火墙接口加入相应的区域&#xff0c;添加区域访问规则使内网trust区域可以访问DMZ区域的web服务器和untrust区域的web服务器。 2.在防火墙上配置easy-ip&#xff0c;使trust区域…

三角波生成函数

% 设置时间范围和采样频率 t 0:0.01:2; % 时间从0到2秒&#xff0c;步长为0.01秒% 定义频率 f 和角频率 theta f 5; % 频率为5Hz theta 2 * pi * f * t;% 初始化输出向量 y zeros(size(t));% 根据给定的公式计算 y for k 1:fy y (-1)^(k-1)*(2 /(k * pi)) * sin(k * the…

Lc70--319.两个数组的交集(二分查找)---Java版

1.题目描述 2.思路 用集合求交集&#xff0c;因为集合里面的元素要满足不重复、无序、唯一。使得集合在去重、查找和集合操作&#xff08;如交集、并集、差集等&#xff09;中非常高效和方便。 3.代码实现 class Solution {public int[] intersection(int[] nums1, int[] nu…

操作系统实验 C++实现生产者-消费者问题

实验目的 1、进一步加深理解进程同步的概念 2、加深对进程通信的理解 3、了解Linux下共享内存的使用方法 实验内容 1、按照下面要求&#xff0c;写两个c程序&#xff0c;分别是生产者producer.c以及customer.c 2、一组生产者和一组消费者进程共享一块环形缓冲区 使用共…

无人机在森林中的应用!

一、森林资源调查 无人机可以利用遥感技术快速获取所需区域高精度的空间遥感信息&#xff0c;对森林图斑进行精确区划。相较于传统手段&#xff0c;无人机调查具有低成本、高效率、高时效的特点&#xff0c;尤其在地理环境条件不好的区域&#xff0c;调查人员无法或难以到达的…

Android学生信息管理APP的设计与开发

1. 项目布局设计 页面1&#xff1a;学生信息添加页面 采用线性布局&#xff0c;页面中控件包含TextView、editView、Button等。 布局核心代码如下&#xff1a; <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http…

AI(12)-飘带

1.【钢笔工具】画第一条曲线 2.【钢笔工具】画第二条曲线 3-全选两条曲线-【对象】-【混合】-【混合选项】-【指定的步数】-【15】 3-1-【对象】-【混合】-【建立】 4-双击打开【渐变工具】 4-1-【类型&#xff1a;线性】 4-2-点击切换【描边】在上方 4-3-关闭【填色】 4-4-点…

智能指针原理、使用和实现——C++11新特性(三)

目录 一、智能指针的理解 二、智能指针的类型 三、shared_ptr的原理 1.引用计数 2.循环引用问题 3.weak_ptr处理逻辑 四、shared_ptr的实现 五、定制删除器 六、源码 一、智能指针的理解 问题&#xff1a;什么是智能指针&#xff1f;为什么要有智能指针&#xff1f;智…

NIST 发布后量子密码学转型战略草案

美国国家标准与技术研究所 (NIST) 发布了其初步战略草案&#xff0c;即内部报告 (IR) 8547&#xff0c;标题为“向后量子密码标准过渡”。 该草案概述了 NIST 从当前易受量子计算攻击的加密算法迁移到抗量子替代算法的战略。该草案于 2024 年 11 月 12 日发布&#xff0c;开放…

使用uniapp开发微信小程序使用uni_modules导致主包文件过大,无法发布的解决方法

在使用uniapp开发微信小程序时候&#xff0c;过多的引入uni_modules的组件库&#xff0c;会导致主包文件过大&#xff0c;导致无法上传微信小程序&#xff0c;主包要求大小不超过1.5MB.分包大小每个不能超过2M。 解决方法&#xff1a;分包。 1.对每个除了主页面navbar的页面进…

WPF窗体基本知识-笔记-命名空间

窗体程序关闭方式 命名空间:可以理解命名空间的作用为引用下面的控件对象 给控件命名:一般都用x:Name,也可以用Name但是有的控件不支持 布局控件(容器)的类型 布局控件继承于Panel的控件,其中下面的border不是布局控件,panel是抽象类 在重叠的情况下,Zindex值越大的就在上面 Z…

【android USB 串口通信助手】stm32 源码demo 单片机与手机通信 Android studio 20241118

android 【OTG线】 接 下位机STM32【USB】 通过百度网盘分享的文件&#xff1a;USBToSerialPort.apk 链接&#xff1a;https://pan.baidu.com/s/122McdmBDUxEtYiEKFunFUg?pwd8888 提取码&#xff1a;8888 android 【OTG线】 接 【USB转TTL】 接 【串口(下位机 SMT32等)】 需…

SpringBoot源码解析(四):解析应用参数args

SpringBoot源码系列文章 SpringBoot源码解析(一)&#xff1a;SpringApplication构造方法 SpringBoot源码解析(二)&#xff1a;引导上下文DefaultBootstrapContext SpringBoot源码解析(三)&#xff1a;启动开始阶段 SpringBoot源码解析(四)&#xff1a;解析应用参数args 目录…

使用IDEA+Maven实现MapReduced的WordCount

使用IDEAMaven实现MapReduce 准备工作 在桌面创建文件wordfile1.txt I love Spark I love Hadoop在桌面创建文件wordfile2.txt Hadoop is good Spark is fast上传文件到Hadoop # 启动Hadoop cd /usr/local/hadoop ./sbin/start-dfs.sh # 删除HDFS的hadoop对应的input和out…

Spring Cloud Ribbon 实现“负载均衡”的详细配置说明

1. Ribbon 介绍 Ribbon 是什么 &#xff1f; 1.Spring Cloud Ribbon 是基于Netflix Ribbon 实现的一套客户端&#xff0c;负载均衡的工具 2.Ribbon 主要功能是提供客户端负载均衡算法和服务调用 3.Ribbon 客户端组件提供一系列完善的配置项如“连接超时&#xff0c;重试” 4…

TSMC12nm工艺数字IC后端实现难点都有哪些?

大家知道咱们社区近期TSMC 12nm ARM Cortexa-A72(1P9M 6Track Metal Stack)即将开班。这里小编要强调一点:不要认为跑了先进工艺的项目就会很有竞争力&#xff01;如果你仅仅是跑个先进工艺的flow&#xff0c;不懂先进工艺在数字IC后端实现上的不同点&#xff0c;为何有这样的不…

使用微信小程序调用飞桨PaddleX平台自行训练的模型——微信小程序用训练的牡丹花模型Demo测试

&#x1f3bc;个人主页&#xff1a;【Y小夜】 &#x1f60e;作者简介&#xff1a;一位双非学校的大二学生&#xff0c;编程爱好者&#xff0c; 专注于基础和实战分享&#xff0c;欢迎私信咨询&#xff01; &#x1f386;入门专栏&#xff1a;&#x1f387;【MySQL&#xff0…