Flink Watermark和时间语义

Flink 中的时间语义

[点击并拖拽以移动] ​

时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间Event Time。数据生成的时候就会自动注入时间戳,Event Time可以从日志数据的时间戳timestamp)中提取。

设置 Event Time

我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳timestamp

val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

乱序数据的影响

[点击并拖拽以移动] ​

FlinkEvent Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。

水位线 Watermark

怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark是一种衡量Event Time进展的机制,可以设置延迟触发。Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经达到了,因此,window的执行也是由Watermark触发的。Watermark用来让程序自己延迟和结果正确性。

Watermark 的特点: Watermark是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark与数据的时间戳有关。
[点击并拖拽以移动] ​

watermark 的传递、引入和设定

watermark的传递: 一个Task输入可以并行多个,如下有4个并行度,输出也可能存在多个并行,如下有3个。每个任务Task内部都有一个事件时钟,且每个分区也维护了对应的WM,如下的Partition WM。当事件流流进Partition时会判断新事件流的WM是否大于当前的Partition WM,当大于时就更新Partition的时间戳WM为新流入的WM(取最大值),如下1->2象限Partition WM的变化。同时,如下Task也维护了一个全局的WM表示事件时钟,该值取分区中最小的WM作为输出的时间戳,如下第二象限的输出选择最小的WM=3向下传递。当第二个(横线)分区Partition WM流进来WM=7的事件流时,就会出现第三象限的情景,但是最小的WM还是=3,因此不更新Task全局的WM。当第三个分区Partition WM流进来WM=6的事件流时,就会出现第四象限的情景,此时分区Partition WM的最小值=4,因此Task全局WM=4
[点击并拖拽以移动] ​

watermark的引入: Event Time的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。

import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据       Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//提取事件戳 = timestamp * 1000是因为出入的毫秒override def extractTimestamp(t: SensorReading): Long = {t.timestamp * 1000}
})

【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了

dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)

【2】Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermarkMyAssigner可以有两种类型,都继承自TimestampAssigner

dataStream.assignTimestampsAndWatermarks(new MyAssigner())

TimestampAssigner:定义了抽取时间戳,以及生成watermark的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks 系统会周期性的将Watermark插入到流中。默认周期是200毫秒(如果是processingTimeWatermark = 0 ),可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。举例:如下产生watermark的逻辑:每隔5秒,Flink调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的water会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的watermark

//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{val bound: Long = 60 * 1000 //延时为 1 分钟var maxTs: Long = Long.MinValue //观察到的最大时间戳//生成水位线override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {maxTs = maxTs.max(t.timestamp)t.timestamp}
}

【2】AssignerWithPunctuatedWatermarks 没有时间周期规律,可打断的生成watermark

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{val bound: Long = 60 * 1000//获取水位线,根据数据触发override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {if(t.id == "sensor_1"){new Watermark(l - bound)}else{null}}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {t.timestamp}
}

watermark 的设定:
【1】在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/594836.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析基础之《numpy(6)—IO操作与数据处理》

了解即可,用panads 一、numpy读取 1、问题 大多数数据并不是我们自己构造的,而是存在文件当中,需要我们用工具获取 但是numpy其实并不适合用来读取和处理数据,因此我们这里了解相关API,以及numpy不方便的地方即可 2…

【JavaFX】JavaFX11开发踩坑记录

文章目录 技术栈踩坑记录 技术栈 JavaFX 11MavenJDK 11 踩坑记录 这些坑对于初学者很容易踩,JavaFX经常会报错空指针异常遇到其中一个问题可能就会消耗好几天的时间。 JavaFX 采用的是MVC架构设计,页面设计使用 fxml文件;业务逻辑采用Con…

【实用工具】FFmpeg常用的命令

前言 FFmpeg是一个强大的多媒体处理工具,可以用于处理音频、视频和图像。 命令格式 ffmpeg {1} {2} -i {3} {4} {5} 上面命令中,五个部分的参数依次如下。 1.全局参数 2.输入文件参数 3.输入文件 4.输出文件参数 5.输出文件 常见命令行参数 -c&…

漏洞复现-天融信TOPSEC static_convert 远程命令执行漏洞(附漏洞检测脚本)

免责声明 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直接或者间接的…

2016年AMC8数学竞赛中英文真题典型考题、考点分析和答案解析

今天我们来看2016年的AMC8竞赛真题的典型考题和解析,最后利用碎片化时间冲刺,查漏补缺,提高成绩。温馨提示:2024年AMC8比赛现在还可以报名,自由报名截止到1月7日,我这里有官方自由报名通道。后续官方模拟题…

美国地质调查局历史地形图

简介 美国地质调查局地形图的历史可以追溯到 19 世纪末,当时美国地质调查局开始着手绘制整个美国的详细地图。1:24,000 比例尺,也称为 7.5 分四边形地图,成为最广泛使用的比例尺之一。每张地图覆盖 7.5 分经纬度的区域,从而详细呈…

计算机基础面试题 |07.精选计算机基础面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

HackTheBox - Medium - Linux - BroScience

BroScience BroScience 是一款中等难度的 Linux 机器,其特点是 Web 应用程序容易受到“LFI”的攻击。通过读取目标上的任意文件的能力,攻击者可以深入了解帐户激活码的生成方式,从而能够创建一组可能有效的令牌来激活新创建的帐户。登录后&a…

canvas绘制椭圆形示例

查看专栏目录 canvas示例教程100专栏,提供canvas的基础知识,高级动画,相关应用扩展等信息。canvas作为html的一部分,是图像图标地图可视化的一个重要的基础,学好了canvas,在其他的一些应用上将会起到非常重…

Liunx(CentOS)安装Nacos(单机启动,绑定Mysql)

Liunx安装Nacos(单机启动,绑定Mysql) 一,准备安装包 github下载点 二,在/usr/local/目录下创建一个文件夹用于上传和解压Nacos cd /usr/local/ #这里创建文件夹名字可随意,解压后会生成一个名为nacos的文件夹,后续…

❀记忆冒泡、选择和插入排序算法思想在bash里运用❀

目录 冒泡排序算法:) 选择排序算法:) 插入排序算法:) 冒泡排序算法:) 思想:依次比较相邻两个元素,重复的进行直到没有相邻元素需要交换,排序完成。 #!/bin/bash arr(12 324 543 213 65 64 1 3 45) #定义一个数组 n${#arr[*]} #获取数组…

海外静态IP和动态IP有什么区别?推荐哪种?

什么是静态ip、动态ip,二者有什么区别?哪种好?关于这个问题,不难发现,在知道、知乎上面的解释有很多,但据小编的发现,这些回答都是关于静态ip和动态ip的专业术语解释,普通非专业人事…

一、初识Redis与分布式系统

目录 一、Redis应用 二、实现方式 三、Redis应用 四、分布式系统 五、分布式系统实现 1、应用服务和数据库服务分离 2、引入负载均衡,应用服务器集群(解决高并发) 3、引入读写分离,数据库主从结构(解决高并发&a…

Spark---RDD算子(单值类型Value)

文章目录 1.RDD算子介绍2.转换算子2.1 Value类型2.1.1 map2.1.2 mapPartitions2.1.3 mapPartitionsWithIndex2.1.4 flatMap2.1.5 glom2.1.6 groupBy2.1.7 filter2.1.8 sample2.1.9 distinct2.1.10 coalesce2.1.11 repartition2.1.12 sortBy 1.RDD算子介绍 RDD算子是用于对RDD进…

【UEFI基础】EDK网络框架(基础说明)

基础说明 UEFI中的网络框架大致如下: 红框部分是实现UEFI的EDK2开源项目中网络框架自带的实现,红框之外的部分需要网卡设备商提供驱动。UEFI下通常推荐使用最右边的形式,即网卡设备商提供实现了UNDI的网卡驱动。因此UEFI网络框架的另一个形式…

线性代数_对称矩阵

对称矩阵是线性代数中一种非常重要的矩阵结构,它具有许多独特的性质和应用。下面是对称矩阵的详细描述: ### 定义 对称矩阵,即对称方阵,是指一个n阶方阵A,其转置矩阵等于其本身,即A^T A。这意味着方阵A中的…

python 知识点

ping ping 不能带协议,如:ping baidu.com 引入包顺序 分三级,第一级是 Python 的内置库,第二级是第三方库,第三级是自己的代码。每一级用一个空行间隔 运算符 keyError:key不存在 列表推导式 创建字典 字…

git 如何撤销历史某次merge

git,如何 撤销某一次历史提交或merge,并保留该版本的后续提交? 场景1: 你有两个功能迭代版本的分支,一个是 15 号上线,一个是25号上线。5号的时候产品突然说,这两个版本一起上,然后…

【REST2SQL】03 GO读取JSON文件

REST2SQL需要一些配置信息,用JSON文件保存,比如config.json 1 创建config.json配置文件 {"hostPort":"localhost:5217","connString":"oracle://blma:5217127.0.0.1:1521/CQYH","_oracle":"ora…

ubuntu 执行apt-get update报错

系统是Ubuntu22.04 执行apt-get update 遇到如下情况 E: 无法下载 https://mirrors.tuna.tsinghua.edu.cn/ubuntu/dists/jammy/main/binary-arm64/Packages 404 Not Found [IP: 101.6.15.130 443] E: 无法下载 https://mirrors.tuna.tsinghua.edu.cn/ubuntu/dists/jammy-upda…