Flink的时间语义和Watermark

1 时间语义

   数据迟到的概念是:数据先产生,但是处理的时候滞后了

   在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
在这里插入图片描述

   Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

   Ingestion Time:是数据进入Flink的时间。

   Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

   在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。引入EventTime的时间属性如下:

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props))stream.keyBy( _.getUser ).timeWindow(Time.hours(1)).reduce( (a, b) => a.add(b) ).addSink(...)

   设置了EventTime后后面处理底层会判断

在这里插入图片描述

   注意:设置了事件时间,但是并不知道事件时间,Event Time 的使用一定要指定数据源中的时间戳,通过assignTimestampsAndWatermarks指定,时间戳要是ms单位。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter())val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>)withTimestampsAndWatermarks.keyBy( _.getGroup ).timeWindow(Time.seconds(10)).reduce( (a, b) => a.add(b) ).addSink(...)

   对于排序好的数据,不需要延迟触发,可以只指定时间戳就行了

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(_.timestamp)

   对于乱序数据调用 assignTimestampAndWatermarks 方法,传入一个 BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WC](Time.milliseconds(1000)){override def extractTimestamp(element: WC): Long = {element.timestamp * 1000}}

2 WaterMark

2.1 什么是WaterMark

   我们的数据从采集经过kafka,etl等操作要耗时的,再到流经source,到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生。

   迟到数据是因为有延迟,简单的想法就多等一下。不要5秒的事件到了就关闭窗口,多等一会。我们要考虑的是当前事件的时间进展到底要按照什么时间算,也就是说假设现在5秒的窗口要关闭,设置延迟为2秒,那么5秒的数据来了就多等2秒,5秒的事件来了就相当于还没有进展到5秒,是进展到了5-2=3秒,也就是时间才进展到3秒。按照这种多等2秒的方式的话要等到时间戳是7的数据来了之后7-2=5才关闭5秒的窗口。这就提出了Watermark

   乱序,其实就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

   Watermark可以从以下几个方面理解:①Watermark是一种衡量Event Time进展的机制。②Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。③数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。④Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行

   Watermark延迟时间的设置一般根据数据的乱序情况定义,通常设置成最大乱序程度

2.2 Watermark传递

   真正的Watermark其实就是一条特殊的记录,可以认为是插入数据流里面的一个特殊数据,Watermark可以理解为是一个有时间戳的特殊数据结构,就和数据一样一条一条来,后面处理数据如果是正常数据就正常处理,如果是Watermark就按照对于时间的操作该关闭窗口就关闭窗口。

   Watermark必须单调递增,既然表示当前事件时间的进展,时间只能朝前不停的推进,另外总和当前数据的时间戳相关,数据的时间戳就应该是当前的事件时间。

   当Flink接收到数据时,会按照一定的规则去生成Watermark。Watermark要求单调递增的话就选取所有当前已经来的数据里面最大的时间戳作为当前的事件时间,要多等一会的话在当前最大的时间戳基础上再减去一个延迟时间就可以了,即maxEventTime - 延迟时长。所以Watermark是基于数据携带的时间戳生成的,如果Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

   有序流的Watermarker(最大延迟时间为0)如下图所示:

在这里插入图片描述

   乱序流的Watermarker(最大延迟时间为4)如下图所示:
在这里插入图片描述

   上图中,采用周期性插入Watermark的生成策略,默认每200ms系统插入Watermark。我们设置的允许最大延迟到达时间为4s,当系统要插入第一个Watermark时查看此时数据中的最大事件时间为15,所以插入的Watermark是11s。过了200ms后到了第二次插入watermark的时候,此时数据中的最大事件时间为22,所以插入Watermark是18s。果我们的窗口1是1s-10s,窗口2是10s-20s,那么Watermarker为11到达之后需要触发窗口1。一旦触发以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。

2.3 Watermark的传递

在这里插入图片描述

   Watermark的传递如上图所示。

   Flink 的传递策略基本上遵循三点:①watermark 会以广播的形式在算子之间进行传播。并行任务没有数据交互不考虑,只要考虑上游有多少个任务给他发数据,下游要发送多少个数据到别的任务。②如果在程序里面收到了一个 Long.MAX_VALUE这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的标志。③单流输入取其大,多流输入取小。不同的上游任务发来的Watermark不一样,不能按照上游所有的Watermark中最大的Watermark来判定当前的事件时间,而是应该按照最小的那个来判定,因为Watermark代表的数据是他之前的数据都到期了,如果只接收到一个分区的Watermark是29表示这个分区29之前数据已经到齐了,但是不能保证当前任务不在接收29之前的数据,因为之前别的Watermark可能还没进展到29,所以应该按照最小的。

   底层实现:上游有2个分区就会对每一个分区都去创建一个分区的Watermark(PARTITION Watermark),分别是29,14所以当前任务的事件时间是14,那么下游的子任务广播出去也是14,14之前的数据都到齐了。接下来一个分区来了一个新的Watermark是17,相当于这个分区的时间进展为17之前的都到齐,那么首先更新当前的Watermark,然后观察现在所有分区的Watermark最小值是否改变,如果改变那么事件时间就朝前进展,事件时间更新就往下游广播。

2.4 WaterMark使用

   watermark对于有序数据,最常见的引用方式如下:

dataStream.assignTimestampsAndWatermarks(_.timestamp)

   升序数据不用管Watermark,本身数据来就带有时间戳

   watermark对于乱序数据,最常见的引用方式如下:

dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {override def extractTimestamp(element: Element): Long = element.ds})

   Watermark就是在assignTimestampsAndWatermarks里面定义出来的,BoundedOutOfOrdernessTimestampExtractor 是Flink内置提供的允许乱序最大延时的watermark生成方式,只需要重写其extractTimestamp方法。现在kafka源也支持直接生成Watermark,所以etl的时候可以把Watermark也产生。不过我们一般是在Flink把数据读进来做了转换之后马上分配一个Watermark。Watermark要保证正确性,延迟时间一般定义成最大的乱序程度(从数据里面提炼出来的参数)。同个分区数据可能会乱序,Watermark不会乱序(单调递增,取最大的时间戳减去延迟时间)

2.5 自定义WaterMark

   watermark的生成策略有两种:一种是AssignerWithPeriodicWatermarks周期性生成(隔一段时间系统自动插入),另外一种是AssignerWithPunctuatedWatermarks根据特定标记生成。这两个接口都是Flink暴露了TimestampAssigner接口的子类型。实际生成中大量密集数据比较多,稀疏较少,所以一般使用周期性AssignerWithPeriodicWatermarks方式。

   周期性的生成watermark系统会周期性的将watermark插入到流中。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval(watermarkInterval)方法进行设置。每隔watermarkInterval,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark(watermarkInterval)方法。如果方法返回的watermark大于之前的watermark,新的watermark会被插入到流中。这个检查保证了watermark是单调递增的。如果方法返回的时间戳小于等于之前watermark,则不会产生新的watermark。

   自定义一个周期性的时间戳抽取:

class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[Element] {val bound: Long = 60 * 1000 // 延时为1分钟var maxTs: Long = Long.MinValue // 观察到的最大时间戳override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}override def extractTimestamp(r: Element, previousTS: Long) = {maxTs = maxTs.max(r.timestamp)r.timestamp}
}

   间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理,自定义一个间断式地生成watermar:

class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Element] {val bound: Long = 60 * 1000override def checkAndGetNextWatermark(r: Element, extractedTS: Long): Watermark = {if (r.status == "sucess") {new Watermark(extractedTS - bound)} else {null}}override def extractTimestamp(r: Element, previousTS: Long): Long = {r.timestamp}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473562.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析案例:亚洲国家人口数据计算

数据截图: 数据下载地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1dGHwAC5 密码&#xff1a;nfd2 该数据包含了2006年-2015年10年间亚洲地区人口数量数据&#xff0c;共10行50列数据。我们需要使用Numpy完成如下数据任务: 计算2015年各个国家人口数据计算朝鲜历…

LeetCode 1646. 获取生成数组中的最大值

文章目录1. 题目2. 解题1. 题目 给你一个整数 n 。按下述规则生成一个长度为 n 1 的数组 nums &#xff1a; nums[0] 0nums[1] 1当 2 < 2 * i < n 时&#xff0c;nums[2 * i] nums[i]当 2 < 2 * i 1 < n 时&#xff0c;nums[2 * i 1] nums[i] nums[i 1]…

TotoiseSVN的基本使用方法

一、签入源代码到SVN服务器 假如我们使用Visual Studio在文件夹StartKit中创建了一个项目&#xff0c;我们要把这个项目的源代码签入到SVN Server上的代码库中里&#xff0c;首先右键点击StartKit文件夹&#xff0c;这时候的右键菜单如下图所示&#xff1a; 图2-2-1 点击Import…

LeetCode 1647. 字符频次唯一的最小删除次数(贪心)

文章目录1. 题目2. 解题1. 题目 如果字符串 s 中 不存在 两个不同字符 频次 相同的情况&#xff0c;就称 s 是 优质字符串 。 给你一个字符串 s&#xff0c;返回使 s 成为 优质字符串 需要删除的 最小 字符数。 字符串中字符的 频次 是该字符在字符串中的出现次数。 例如&am…

Flink中的状态管理

1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器)&#xff0c;一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护&#xff0c;并且用来计算某个结果的所有数据&#xff0c;都属于这个任务的状态。可以简单的任务状态就是…

Python之日志处理(logging模块)

主要内容 日志相关概念logging模块简介使用logging提供的模块级别的函数记录日志logging模块日志流处理流程使用logging四大组件记录日志配置logging的几种方式向日志输出中添加上下文信息参考文档 一、日志相关概念 日志是一种可以追踪某些软件运行时所发生事件的方法。软件开…

LeetCode 514. 自由之路(记忆化递归 / DP)

文章目录1. 题目2. 解题1. 题目 电子游戏“辐射4”中&#xff0c;任务“通向自由”要求玩家到达名为“Freedom Trail Ring”的金属表盘&#xff0c;并使用表盘拼写特定关键词才能开门。 给定一个字符串 ring&#xff0c;表示刻在外环上的编码&#xff1b;给定另一个字符串 ke…

Flink中的容错机制

1 checkpoint Flink 故障恢复机制的核心&#xff0c;就是应用状态的一致性检查点checkpoint。 在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint&#xff0c;处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下…

LeetCode 698. 划分为k个相等的子集(回溯)

文章目录1. 题目2. 解题1. 题目 给定一个整数数组 nums 和一个正整数 k&#xff0c;找出是否有可能把这个数组分成 k 个非空子集&#xff0c;其总和都相等。 示例 1&#xff1a; 输入&#xff1a; nums [4, 3, 2, 3, 5, 2, 1], k 4 输出&#xff1a; True 说明&#xff1a;…

MySQL中的表中增加删除字段

1增加两个字段&#xff1a; mysql> create table id_name(id int,name varchar(20)); Query OK, 0 rows affected (0.13 sec)mysql> alter table id_name add age int,add address varchar(11); Query OK, 0 rows affected (0.13 sec) Records: 0 Duplicates: 0 Warnin…

Ubuntu下svn 版本管理客户端工具及常用方法

Ubuntu16.04系统下安装RapidSVN版本控制器及配置diff,editor,merge和exploer工具&#xff0c;在Window下我们使用TortoiseSVN(小乌龟)&#xff0c;可以很方便地进行查看、比较、更新、提交、回滚等SVN版本控制操作。 在Linux下我们可以使用RapidSVN。RapidSVN是一款轻量级的免费…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL&#xff0c;本质上还是基于关系型表的操作方式&#xff1b;而关系型表、SQL本身&#xff0c;一般是有界的&#xff0c;更适合批处理的场景。所以在流处理的过程中&#xff0c;有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作&#xff0c;用新的字母组替换原有的字母组&#xff08;不一定大小相同&#xff09;。 每个替换操作具有 3 个参数&#xff1a;起始索引 i&#xff0c;源字 x 和目标字 y。 规则是&#xff1a;如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的&#xff0c;而在很多情况下我们无法…

天池 在线编程 最大得分(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744275 2. 解题 class Solution { public:/*** param matrix: the matrix* return: the maximum score you can get*/int maximumScore(vector<vector<i…

天池 在线编程 LR String

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744276 2. 解题 class Solution { public:/*** param s: a string* param t: a string* param n: max times to swap a l and a r.* return: return if s can …

天池 在线编程 音乐组合

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744274 2. 解题 对60求余后&#xff0c;0, 30的为 Cn2C_n^2Cn2​&#xff0c;其余的相加等于60的&#xff0c;种类相乘 class Solution { public:/*** param …

java之NIO(Channel,Buffer,Selector)

java之NIO 1 什么是NIO Java NIO (New IO&#xff0c;Non-Blocking IO)是从Java 1.4版本开始引入的一套新的IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO的三大核心部分&#xff1a;通道(Channel)&#xff0c;缓冲区(Buffer), 选择器(Selector)&#xff0c;数据总是从…

LeetCode 1652. 拆炸弹(前缀和)

文章目录1. 题目2. 解题1. 题目 你有一个炸弹需要拆除&#xff0c;时间紧迫&#xff01;你的情报员会给你一个长度为 n 的 循环 数组 code 以及一个密钥 k 。 为了获得正确的密码&#xff0c;你需要替换掉每一个数字。所有数字会 同时 被替换。 如果 k > 0 &#xff0c;将…

MYSQL从入门到精通

SQL是数据库的查询语言&#xff0c;语法结构简单&#xff0c;相信本文会让你从入门到熟练。 掌握SQL后&#xff0c;不论你是产品经理、运营人员或者数据分析师&#xff0c;都会让你分析的能力边界无限拓展。别犹豫了&#xff0c;赶快上车吧&#xff01; SQL最小化的查询结构如下…