【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/144407.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

你不懂API接口是什么?怎么和程序员做朋友

说到开发平台就一定离不开接口&#xff0c;作为PM&#xff0c;我们不需要对接口了解的特别细。只需要知道接口是什么&#xff0c;有什么用&#xff0c;有哪些要素就行。 1. 接口是什么 (1) 硬件接口 生活中我们经常会接触接口&#xff0c;最常见的就是HDMI接口和USB接口&…

计算机毕业设计选题推荐-公共浴池微信小程序/安卓APP-项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

我这些年对于自动化测试的理解

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

环境配置 | Git的安装及配置[图文详情]

Git是一个开源的分布式版本控制系统&#xff0c;可以有效、高速地处理从小到大的项目版本管理。下面介绍了基础概念及详细的用图文形式介绍一下git安装过程. 目录 1.Git基础概念 2.Git的下载及安装 3.常见的git命令 Git高级技巧 Git与团队协作 1.Git基础概念 仓库&#…

沉浸式航天vr科普馆VR太空主题馆展示

科普教育从小做起&#xff0c;现在我们的很多地方小孩子游乐体验不单单只有草坪玩耍体验&#xff0c;还有很多科普知识的体验馆和游玩馆。虽然现在我们还不能真实的上太空或者潜入海底&#xff0c;但是这些现在已经可以逼真的展示在我们面前。通过一种虚拟现实技术手段。人们带…

在3+1的方向上展开结构加法4a3+4a14

4a3 4a14 - - 1 - - - - - - - - - - - - - 1 1 1 - 1 1 - 1 - - 1 - - - 要求得到的图片只能有4个点&#xff0c;并且需要最大限度的保留4a3和4a14两张图片的内在结构特征。 4个点的结构总可以认为是3个点的结构1合成的 - - 1 - - …

只使用JS怎么给静态页面网站添加站内全局搜索功能?

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 背景 静态页面通常由HTML、CSS 和 JavaScript…

酷开系统 酷开科技,将家庭娱乐推向新高潮

在当今数字化时代&#xff0c;家庭娱乐已经成为人们日常生活中不可或缺的一部分。如果你厌倦了传统的家庭娱乐方式&#xff0c;想要一种全新的、充满惊喜的娱乐体验&#xff0c;那么&#xff0c;不妨进入到酷开科技的世界&#xff0c;作为智能电视行业领军企业&#xff0c;酷开…

销售管道管理软件推荐:提升销售业绩与效率

在企业中销售部门扮演着锐意进取的尖刀部队的角色&#xff0c;肩负着拓展公司发展领土的重要责任。销售管理是一个漫长而复杂的过程&#xff0c;需要经历潜在的商机、联系跟进、签订合同以及赢得订单等关键里程碑&#xff0c;无论是面向C端用户的销售还是面向企业复杂产品的销售…

使用jmeter+ant进行接口自动化测试(数据驱动)

本次接着介绍如何利用apache-ant执行测试用例并生成HTML格式测试报告 ①下载安装 apache-ant-1.9.9&#xff0c;配置环境变量 如下方式检验安装成功 ②安装好ant后&#xff0c;把jmeter中extras目录下的ant-jmeter-1.1.1.jar 文件copy到ant安装目录下的lib文件夹中 ③配置ant…

【cfeng-work】架构演进和漫谈

架构漫谈和入门 内容管理 intro分层架构MVC模式分层架构大数据时代的复杂架构 前端架构后端架构运维端架构持续演进变化 本文主要是自己接触架构的一些输出漫谈 cfeng 在work中某次负责了后端一个服务的上线&#xff0c;多个模块一起上&#xff0c;结果上线失败&#xff0c;幸运…

【MySQL】数据库——库操作

文章目录 1. 创建数据库[IF NOT EXISTS] 的使用 2. 删除库3. 数据库的编码问题查看系统默认支持的字符集查看系统默认支持的校验集只查看 database的校验集指定编码创建数据库修改字符集修改校验集验证规则对数据库的影响utf8_general_ci ——不区分大小写utf8_bin ——区分大小…

什么是会话固定以及如何在 Node.js 中防止它

什么是会话固定以及如何在 Node.js 中防止它 在深入讨论之前&#xff0c;我们需要了解会话是什么以及会话身份验证如何工作。 什么是会话&#xff1f; 正如我们所知&#xff0c;HTTP 请求是无状态的&#xff0c;这意味着当我们发送登录请求时&#xff0c;并且我们有有效的用…

智能售货柜:小本投资的不二之选

智能售货柜&#xff1a;小本投资的不二之选 智能售货柜的运营优势在于&#xff1a;一是降低运营成本&#xff0c;不需要大量员工&#xff1b;二是具备自动识别和智能结算功能&#xff0c;提高运营效率&#xff1b;三是提供数据分析&#xff0c;优化产品和服务。相比传统零售店&…

教育案例分享 | 安全狗云安全体系为高校提升立体化纵深防御能力

一、客户情况 某高校有服务器500台&#xff0c;对外站点200个&#xff0c;核心交换流量20G。 二、客户痛点 校园网系统分类较多&#xff0c;并且每类网站中安全级重要程度又各不相同&#xff0c;同时有多个网络出口(如&#xff1a;教育网、电信网、移动网等)&#xff0c;二级学…

论文阅读——RetNet

transformer的问题&#xff1a;计算量大&#xff0c;占用内存大&#xff0c;不好部署。 所以大家在找能解决办法&#xff0c;既能和transformer表现一样好&#xff0c;又能在推理阶段计算复杂度很低。 这些方法大概分类三类&#xff1a;一是代替transformer非线性注意力机制的…

【Linux】文件系统中inode与软硬链接以及读写权限问题

文章目录 前言一、 简单理解文件系统二、文件操作具体步骤1.新建文件2.删除文件3.查找文件 三、目录的重新理解1.目录下没有w权限&#xff0c;无法对其下的文件进行创建与删除2.目录下没有r权限&#xff0c;无法对其下的文件进行查看3.目录下没有x权限&#xff0c;无法进入这个…

内网Jenkins 部署.net(dotnet)项目

一、前置条件 内网部署Jenkins&#xff0c;并安装好所需插件 此篇内容需承接内网搭建Jenkins自动化远程部署项目到Windows服务器_jenkins内网安装-CSDN博客 &#xff0c;才更好操作与理解 二、在Jenkins中创建项目 三、配置项目 General Source Code Management Build Envi…

Docker - 容器数据卷

Docker - 容器数据卷 什么是容器数据卷 等同于挂载&#xff0c;将容器内的目录地址指向于宿主机文件系统中 直接使用命令来挂载 -v docker run -it -v 主机目录:容器内目录# 测试 docker run -it -v /root:/home centos /bin/bash [rootiZ2zeg7mctvft5renx1qvbZ ~]# docker …

什么是CMDB?为什么企业需要CMDB?

CMDB即Configuration Management Database&#xff0c;配置管理数据库&#xff0c;它是组织IT基础结构中配置项CI(Configuration Item)及其关系的数据库。 而CI是指任何需要进行管理以确保成功提供服务的条目&#xff0c;CI可以是一个具体的实体&#xff0c;如服务器、交换机&…