大数据Flink(一百一十八):Flink SQL水印操作(Watermark)

文章目录

Flink SQL水印操作(Watermark)

一、为什么要有WaterMark

二、​​​​​​​​​​​​​​Watermark解决的问题

三、​​​​​​​​​​​​​​代码演示


Flink SQL水印操作(Watermark)

一、​​​​​​​为什么要有WaterMark

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据

二、​​​​​​​​​​​​​​Watermark解决的问题

上面的问题在于如何将迟来的EventTime 为11的元素正确处理?

当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下: 

通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。

watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。

三、​​​​​​​​​​​​​​代码演示

  • 使用Socket模拟接收数据
  • 设置WaterMark
    • 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
  • 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

若输入第一条数据:hello,2022-03-25 16:39:45

那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示

1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线

2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000

3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算

4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算

根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50

 Flink输出结果:

Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。

数据乱序的场景

上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:

其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink结果:

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃

乱序时间的设置:

为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据

修改最大乱序时间(新建的表仅水印与之前不同)

CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

在监听终端中,输入数据

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink输出结果:  

可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/54354.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

43.常用C++编译器推荐——《跟老吕学C++》

43.常用C编译器推荐——《跟老吕学C》 常用C编译器推荐一、C编译器介绍1. GCC (GNU Compiler Collection)2. Clang2.1 Clang的特点2.2 Clang的应用场景2.3 Clang与GCC的比较 3. Microsoft Visual C (MSVC)MSVC的特点MSVC的使用场景MSVC与其他编译器的比较 4. Intel C Compiler4…

【Midjourney中文版】

Midjourney中文版打破了传统创作工具的界限,无需具备专业的艺术技能或复杂的软件操作能力,即可轻松创作出高质量的图片。它支持多种创作模式,包括文生图、图生图、图片混图融合等,满足多样化的创作需求。 打开Midjourney中文版后…

istio中如何使用serviceentry引入外部服务

假设需要引入一个外部服务,外部服务ip为10.10.102.90,端口为32033. 引入到istio中后,我想通过域名gindemo.test.ch:9090来访问这个服务。 serviceentry yaml内容如下: apiVersion: networking.istio.io/v1beta1 kind: ServiceEn…

【Pycharm】Pycharm创建Django提示pip版本需要升级

目录 1、现象 2、分析 3、本质 前言:经常使用pycharm创建django、flask等项目时候提示pip版本需要升级,解决方案 1、现象 使用Pycharm创建Django项目提示安装Django超时,报错建议pip升级22升级到24 2、分析 之前使用命令升级了pip到了24…

VS code EXPLORER 中不显示指定文件及文件夹设置(如.pyc, __pycache__, .vscode 文件)

VS code EXPLORER 中不显示指定文件及文件夹设置 引言正文方法1打开方式1打开方式2 方法2 引言 VS code 号称地表最强轻量级编译器,其最大的优势在于用户可以根据自己的需求下载适合自己的 extension。从而定制个性化的编译器。然而,本人今天遇到了一个…

出厂非澎湃OS手机解BL锁

脚本作者:酷安mlgmxyysd 脚本项目链接:https://github.com/MlgmXyysd/Xiaomi-HyperOS-BootLoader-Bypass/ 参考 B站作者:蓝空穹 https://www.bilibili.com/read/cv33210124/ 其他参考:云墨清风、水墨青竹、Magisk中文网 决定解BL…

设计模式 组合模式(Composite Pattern)

组合模式简绍 组合模式(Composite Pattern)是一种结构型设计模式,它允许你将对象组合成树形结构来表示“部分-整体”的层次结构。组合模式使得客户端可以用一致的方式处理单个对象和组合对象。这样,可以在不知道对象具体类型的条…

通信工程学习:什么是ONT光网络终端

ONT:光网络终端 ONT(Optical Network Terminal,光网络终端)是光纤接入网络(FTTH)中的关键设备,用于将光纤信号转换为电信号或将电信号转换为光信号,以实现用户设备与光纤网络的连接。…

Koa (下一代web框架) 【Node.js进阶】

koa (中文网) 是基于 Node.js 平台的下一代 web 开发框架,致力于成为应用和 API 开发领域中的一个更小、更富有表现力、更健壮的基石; 利用 async 函数 丢弃回调函数,并增强错误处理,koa 没有任何预置的中间件,可快速…

计算机组成原理(笔记3)

IEEE754浮点数标准 这里只讲32位单精度 S——尾数符号,0正1负; M——尾数, 纯小数表示, 小数点放在尾数域的最前面。 一般采用原码或补码表示。 E——阶码,采用“移码”表示; 阶符采用隐含方式,即采用“移码”方法来表示正负指数…

Python 之数据库操作(Python Database Operations)

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…

基于SSM的在线家用电器销售系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于JavaSSMVueMySQL的在线家…

统信服务器操作系统【1050e版】安装手册

统信服务器操作系统1050e版本的安装 文章目录 功能概述一、准备环境二、安装方式介绍安装步骤步骤一:制作启动盘步骤二:系统的安装步骤三:安装引导界面步骤四:图形化界面安装步骤五:选择安装引导程序语言步骤六:进入安装界面步骤七:设置键盘步骤八:设置系统语言步骤九:…

链接升级:Element UI <el-link> 的应用

链接升级&#xff1a;Element UI 的应用 一 . 创建文字链接1.1 注册路由1.2 创建文字链接 二 . 文字链接的属性2.1 文字链接的颜色2.2 是否显示下划线2.3 是否禁用状态2.4 填写跳转地址2.5 加入图标 在本篇文章中&#xff0c;我们将深入探索Element UI中的<el-link>组件—…

Elasticsearch基础(七):Logstash如何开启死信队列

文章目录 Logstash如何开启死信队列 一、确保 Elasticsearch 输出插件启用 DLQ 支持 二、配置 Logstash DLQ 设置 三、查看死信队列 四、排查 CSV 到 Elasticsearch 数据量不一致的问题 Logstash如何开启死信队列 在 Logstash 中&#xff0c;死信队列&#xff08;Dead Le…

C++ nullptr 和NULL的区别

个人主页&#xff1a;Jason_from_China-CSDN博客 所属栏目&#xff1a;C系统性学习_Jason_from_China的博客-CSDN博客 所属栏目&#xff1a;C知识点的补充_Jason_from_China的博客-CSDN博客 概念概述&#xff1a; 在C中&#xff0c;nullptr 和 NULL 都是用来表示空指针&#xf…

微波无源器件 功分器3 一种用于多端口辐射单元的紧凑四路双极化正交模功分器的设计

摘要&#xff1a; 一种有着双极化能力并且能作为一个Fabry-Perot谐振腔天线的馈源包含四个输入端口的新型紧凑功分器的概念和设计被提出了。在四个圆波导中的双同相极化通过使用四个5端口十字转门结合两个8by1&#xff08;八合一&#xff09; 功分网络。功分器末端接了两个端口…

【RabbitMQ】工作模式

工作模式概述 简单模式 简单模式中只存在一个生产者&#xff0c;只存在一个消费者。生产者生产消息&#xff0c;消费者消费消息。消息只能被消费一次&#xff0c;也称为点对点模式。 简单模式适合在消息只能被单个消费者处理的场景下存在。 工作队列模式&#xff08;Work Qu…

项目管理 | 一文读懂什么是敏捷开发管理

在快速变化的商业环境中&#xff0c;项目管理方式也在不断演进&#xff0c;其中敏捷开发管理因其高效、灵活和适应性强的特点&#xff0c;逐渐成为众多企业和团队的首选。本文将详细解析敏捷开发管理的定义、具体内容及其核心角色&#xff0c;帮助读者全面理解这一先进的项目管…

心觉:不能成事的根本原因

很多人一直都很努力&#xff0c;每天都很忙 每天都学习很多东西&#xff0c;学习各种道&#xff0c;各种方法论 但是许多年过去了依然一事无成 自己的目标没有达成&#xff0c;梦想没有实现 为什么呢 关键是没有开悟 那么什么是开悟呢 现在很多人都在讲开悟 貌似开悟很…