【FLink】水位线(Watermark)

目录

1、关于时间语义

1.1事件时间

1.2处理时间​编辑

2、什么是水位线

2.1 顺序流和乱序流

2.2乱序数据的处理

2.3 水位线的特性

3 、水位线的生成

3.1 生成水位线的总体原则

3.2 水位线生成策略

3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

3.4.2 断点式水位线生成器(Punctuated Generator)

3.4.3 在数据源中发送水位线

4、水位线的传递

5、迟到数据的处理


1、关于时间语义

1.1事件时间

        一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

1.2处理时间

2、什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。说白了就是事件时间戳。

2.1 顺序流和乱序流

有序流就是指数据按照生成的先后顺序,每条数据产生一个有先后顺序的水位线

这是一种理想的状态(数据量较小),而在实际中,我们产生的数据量往往非常庞大,而数据之间的时间间隔非常之小,所以为了提高效率,一般会每隔一段时间生成一个水位线

在实际生产中,由于多服务之间网络传输等的因素,往往我们的数据流,并不是我们所想的顺序结果,而是数据先后错乱,这就是乱序流

2.2乱序数据的处理

由于数据是乱序的,我们无法正确处理“迟到”的数据,为了让窗口能够正确的收集到迟到的数据,我们也可以让窗口等上一段时间,比如2秒。也就是说,我们可以在数据的时间戳基础上加上一些延迟来尽量保证不丢数据。

2.3 水位线的特性

3

3 、水位线的生成

3.1 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

3.2 水位线生成策略

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

DataStream<Event> stream = env.addSource(new ClickSource());DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}

3.3.2 乱序流中内置水位线设置

调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。

这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值

public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}

3.4 自定义水位线生成器

3.4.1 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。

import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}

如果想修改默认周期时间,可以通过下面方法修改。

//修改默认周期为400ms
env.getConfig().setAutoWatermarkInterval(400L);

3.4.2 断点式水位线生成器(Punctuated Generator

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。

3.4.3 在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

4、水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟

也就是说:水位线的传递是以最小事件时间为准则。

5、迟到数据的处理

5.1 推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

5.2 设置窗口延迟关闭

当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

5.3 使用侧流接收迟到的数据

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)

完整示例:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/156483.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【shell】条件语句

一、测试 1.1文件测试test test命令是内部命令 test的语法 test 条件表达式 [ 条件表达式 ] test 选项 文件 -d &#xff1a;判断是否是目录 -f &#xff1a;判断是否是普通文件 -b &#xff1a;判断是否是块设备 -c &#xff1a;判断是否是字符设备 -e &#xff1a;判断是否…

【Python】给出一个包含n个整数的数列,问整数a在数列中的第一次出现是第几个。

问题描述 给出一个包含n个整数的数列&#xff0c;问整数a在数列中的第一次出现是第几个。 输入格式 第一行包含一个整数n。 第二行包含n个非负整数&#xff0c;为给定的数列&#xff0c;数列中的每个数都不大于10000。 第三行包含一个整数a&#xff0c;为待查找的数。 输出格式…

rv1126-rv1109-openssh

这是一个工具&#xff0c;可以通过ssh远程登录来操作&#xff0c;非常逆天&#xff01; 于是rv1109代码自身自带有openssh 所以只需要打开config即可 diff --git a/buildroot/configs/rockchip_rv1126_rv1109_spi_nand_defconfig b/buildroot/configs/rockchip_rv1126_rv1109…

Android codec2 视频框架之输出端的内存管理

文章目录 前言setSurfacestart从哪个pool中申请buffer解码后框架的处理流程renderOutbuffer 输出显示 前言 输出buffer整体的管理流程主要可以分为三个部分&#xff1a; MediaCodc 和 应用之间的交互 包括设置Surface、解码输出回调到MediaCodec。将输出buffer render或者rele…

手机照片一键去水印轻松摆脱不需要的旁观者

是什么让照片中的意外客人成为挑战&#xff1f;我们都经历过这种情况——在热门地标或繁忙的城市街道拍照&#xff0c;不可避免地会在画面中捕捉到陌生人。有时他们会无意中抢尽风头&#xff0c;转移观众的注意力。 这些水印不仅影响了照片的美观度&#xff0c;还给我们的观赏体…

京东数据分析软件(京东平台数据分析):2023年Q3扫地机器人行业消费报告

随着90后、00后逐渐成为消费主力军&#xff0c;他们对生活品质更加关注、健康意识进一步增强&#xff0c;再加上“懒人经济”的盛行&#xff0c;人们对扫地机器人的使用率和关注热情也不断增长。 根据鲸参谋电商数据分析平台的相关数据显示&#xff0c;今年7月份-9月份&#xf…

AI大爆发的时代,未来的年轻人怎样获得机会和竞争力?

文章目录 引言AI与教育工作者教育资源不平衡 这次&#xff0c;狼真的来了。 引言 AI正迅猛地改变着我们的生活。 根据高盛发布的一份报告&#xff0c;AI有可能取代3亿个全职工作岗位&#xff0c;影响全球18%的工作岗位。在欧美&#xff0c;或许四分之一的工作可以用AI完成。另…

第四代智能井盖传感器,更迭智能井盖监测办法

人工检查井盖是一项耗时且效率低下的工作&#xff0c;需要工作人员逐个进行检查。由于这种方式无法实时监测井盖的状态&#xff0c;当井盖出现故障时无法及时将信息反馈给相关人员&#xff0c;从而影响了井盖的维修效率。此外人工检查还受到天气、光线等环境因素的影响较大&…

卷积神经网络(ResNet-50)鸟类识别

文章目录 卷积神经网络&#xff08;CNN&#xff09;mnist手写数字分类识别的实现卷积神经网络&#xff08;CNN&#xff09;多种图片分类的实现卷积神经网络&#xff08;CNN&#xff09;衣服图像分类的实现卷积神经网络&#xff08;CNN&#xff09;鲜花的识别卷积神经网络&#…

基于社交网络算法优化概率神经网络PNN的分类预测 - 附代码

基于社交网络算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于社交网络算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于社交网络优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

一、MySQL-Replication(主从复制)

1.1、MySQL Replication 主从复制&#xff08;也称 AB 复制&#xff09;允许将来自一个MySQL数据库服务器&#xff08;主服务器&#xff09;的数据复制到一个或多个MySQL数据库服务器&#xff08;从服务器&#xff09;。 根据配置&#xff0c;您可以复制数据库中的所有数据库&a…

Flowable工作流基础篇

文章目录 一、Flowable介绍二、Flowable基础1.创建ProcessEngine2.部署流程定义3.启动流程实例4.查看任务5.完成任务6.流程的删除7.查看历史信息 三、Flowable流程设计器1.Eclipse Designer1.1 下载安装Eclipse1.2 安装Flowable插件1.3 创建项目1.4 创建流程图1.5 部署流程 2.F…

Maven工程继承关系,多个模块要使用同一个框架,它们应该是同一个版本,项目中使用的框架版本需要统一管理。

1、父工程pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/PO…

KMP——字符串匹配

朴素匹配的逻辑&#xff1a; 将原串的指针移动至本次发起点的下一个位置&#xff08;b字符处&#xff09;&#xff1b;匹配串的指针移动至起始位置。尝试匹配&#xff0c;发现对不上&#xff0c;原串的指针会一直往后移动&#xff0c;直到能够与匹配串对上位置。 如图&#x…

(02)vite环境变量配置

文章目录 将开发环境和生产环境区分开环境变量vite处理环境变量loadEnv 业务代码需要使用环境变量.env.env.development.env.test修改VITE_前缀 将开发环境和生产环境区分开 分别创建三个vite 的配置文件&#xff0c;并将它们引入vite.config.js vite.base.config.js import…

深入探讨软件测试技术:方法、工具与最佳实践

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 引言 软件测试是软件开发生命周期中至关重要的…

SO3 与so3 SE3与se3 SIM3

文章目录 1 旋转*叉乘1.1 旋转矩阵的导数1.2 物理意义1.3 实例1.4 角轴与反对称矩阵 2 SO3 与so32.1 so3 2 SO32.2 SO3 2 so3 3 SE3 与se33.1 se3 2 SE3:3.2 SE3 2 se3 4 SIM3 与sim35 Adjoint Map 1 旋转*叉乘 1.1 旋转矩阵的导数 根据旋转矩阵的性质&#xff1a; R R T I …

2023年以就业为目的学习Java还有必要吗?

文章目录 1活力四射的 Java2从零开始学会 Java3talk is cheap, show me the code4结语写作末尾 现在学 Java 找工作还有优势吗&#xff1f; 在某乎上可以看到大家对此问题的热议&#xff1a;“2023年以就业为目的学习Java还有必要吗&#xff1f;” 。有人说市场饱和&#xff0c…

基于白冠鸡算法优化概率神经网络PNN的分类预测 - 附代码

基于白冠鸡算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于白冠鸡算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于白冠鸡优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神经网络…

django ModelSerializer自定义显示字段

文章目录 前言一、问题二、解决 前言 最近在复习django的时候&#xff0c;发现了一个有趣的问题&#xff0c;解决了之后特意记录下来&#xff0c;以供以后参考。 一、问题 相信大家使用django的时候&#xff0c;被其DRF的强大功能所折服&#xff0c;因为它能通过简单的代码就…