flink watermark 实例分析

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');

watermark使用demo

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/238382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年12月GESP认证图形化编程四级真题试卷

2023年12月GESP认证Scratch图形化等级考试&#xff08;四级&#xff09;真题试卷 题目总数&#xff1a;27 总分数&#xff1a;100 选择题 第 1 题 单选题 现代计算机是指电子计算机&#xff0c;它所基于的是&#xff08; &#xff09;体系结构 A. 艾伦图灵 B. …

Valentina Studio Pro for Mac:高效数据库管理工具

作为一款强大而高效的数据库管理工具&#xff0c;Valentina Studio Pro for Mac在Mac平台上的表现无疑是令人印象深刻的。无论您是初学者还是专业数据库管理员&#xff0c;Valentina Studio Pro都能够满足您的需要&#xff0c;并提供一流的工具和功能来简化数据库管理的过程。 …

KBU808-ASEMI适配高端电源KBU808

编辑&#xff1a;ll KBU808-ASEMI适配高端电源KBU808 型号&#xff1a;KBU808 品牌&#xff1a;ASEMI 封装&#xff1a;KBU-4 最大平均正向电流&#xff1a;8A 最大重复峰值反向电压&#xff1a;800V 产品引线数量&#xff1a;4 产品内部芯片个数&#xff1a;4 产品内…

Docker 编译OpenHarmony 4.0 release

一、背景介绍 1.1、环境配置 编译环境&#xff1a;Ubuntu 20.04OpenHarmony版本&#xff1a;4.0 release平台设备&#xff1a;RK3568 OpenHarmony 3.2更新至OpenHarmony 4.0后&#xff0c;公司服务器无法编译通过&#xff0c;总是在最后几十个文件时报错,错误码4000&#xf…

C#电源串口调试

目的 记录串口调试的遇到的一些问题以及相应的解决方法 1.串口定义:串口是计算机与其他硬件传输数据的通道&#xff0c;在计算机与外设通信时起到重要作用 2.串口通信的基础知识 C#中的串口通信类 C#使用串口通信类是SerialPort(),该类使用方法是 new 一个 SerialPort对象 为S…

【C语言】自定义类型:结构体深入解析(二)结构体内存对齐宏offsetof计算偏移量结构体传参

文章目录 &#x1f4dd;前言&#x1f320; 结构体内存对齐&#x1f309;内存对齐包含结构体的计算&#x1f320;宏offsetof计算偏移量&#x1f309;为什么存在内存对⻬?&#x1f320; 结构体传参&#x1f6a9;总结 &#x1f4dd;前言 本小节&#xff0c;我们学习结构的内存对…

R语言【cli】——通过cli_abort用 cli 格式的内容显示错误、警告或信息,内部调用cli_bullets和inline-makeup

cli_abort(message,...,call .envir,.envir parent.frame(),.frame .envir ) 先从那些不需要下大力气理解的参数入手&#xff1a; 参数【.envir】&#xff1a;进行万能表达式编译的环境。 参数【.frame】&#xff1a;抛出上下文。默认用于参数【.trace_bottom】&#xff…

Redis实现日榜|直播间榜单|排行榜|Redis实现日榜01

前言 直播间贡献榜是一种常见的直播平台功能&#xff0c;用于展示观众在直播过程中的贡献情况。它可以根据观众的互动行为和贡献值进行排名&#xff0c;并实时更新&#xff0c;以鼓励观众积极参与直播活动。 在直播间贡献榜中&#xff0c;每个观众都有一个对应的贡献值&#…

力扣日记12.21【二叉树篇】98. 验证二叉搜索树

力扣日记&#xff1a;【二叉树篇】98. 验证二叉搜索树 日期&#xff1a;2023.12.21 参考&#xff1a;代码随想录、力扣 98. 验证二叉搜索树 题目描述 难度&#xff1a;中等 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义…

啥?你还不道数据库?赶紧进来看吧!

操作系统&#xff1a; windows&#xff1a;win10、win11、win7、windows Server2016 Linux/Unix &#xff1a;红帽&#xff08;RedHat&#xff09;、Bebian、SUSE MacOS Linux系统&#xff1a;CantOS&#xff08;yum、dnf&#xff09;、Ubuntu&#xff08;apt、apt—get&am…

Ubuntu 常用命令之 df 命令用法介绍

&#x1f4d1;Linux/Ubuntu 常用命令归类整理 在Ubuntu系统下&#xff0c;df命令是用来查看文件系统的磁盘空间占用情况的。df是disk free的缩写&#xff0c;这个命令可以获取硬盘被占用了多少空间&#xff0c;还有多少空间是可用的&#xff0c;硬盘的挂载点等信息。 df命令的…

【Python】matplotlib画图_饼状图

柱状图主要使用pie()函数&#xff0c;基本格式如下&#xff1a; plt.pie(x,explodeNone,labelsNone,colorsNone,autopctsNone,pctdistance0.6,shadowFalse,labeldistance1.1,staatangleNone,radiusNone,counterclockTrue,wedgepropsNone,textpropsNone,center(0,0),frameFalse…

PIC单片机项目(7)——基于PIC16F877A的智能灯光设计

1.功能设计 使用PIC16F877A单片机&#xff0c;检测环境关照&#xff0c;当光照比阈值低的时候&#xff0c;开灯。光照阈值可以通过按键进行设置&#xff0c;同时阈值可以保存在EEPROM中&#xff0c;断电不丢失。使用LCD1602进行显示&#xff0c;第一行显示测到的实时光照强度&a…

代码随想录算法训练营Day7 | 344.反转字符串、541.反转字符串||、替换数字、151.反转字符串中的单词、右旋字符串

LeetCode 344 反转字符串 本题思路&#xff1a;反转字符串比较简单&#xff0c;定义两个指针&#xff0c;一个 i 0, 一个 j s.length-1。然后定义一个临时变量 tmp&#xff0c;进行交换 s[i] 和 s[j]。 class Solution {public void reverseString(char[] s) {int i 0;int …

华为二层交换机与防火墙配置实例

二层交换机与防火墙对接上网配置示例 组网图形 图1 二层交换机与防火墙对接上网组网图 二层交换机简介配置注意事项组网需求配置思路操作步骤配置文件相关信息 二层交换机简介 二层交换机指的是仅能够进行二层转发&#xff0c;不能进行三层转发的交换机。也就是说仅支持二层…

OceanMind海睿思入选中国信通院首批“高质量智能审计工具目录”,获多项认证

近日&#xff0c;由中国信息通信研究院&#xff08;以下简称“中国信通院”&#xff09;、中国通信标准化协会支持的“2023 GOLF IT新治理领导力论坛”在北京顺利举行。 中新赛克海睿思作为国内领先的审计数字化代表企业受邀参会。 在内部审计数字化转型走深向实以及智能化演进…

【Spring Boot】面试题汇总,带答案的那种

继上次的文章【MySQL连环炮&#xff0c;你抗的住嘛&#xff1f;】爆火之后&#xff0c;越来越多的小伙伴后台留言&#xff0c;要求阿Q总结下其他的“连环炮”知识点&#xff0c;想在金九银十的面试黄金期轻松对线面试官。 同样为了节省大家的时间&#xff0c;阿Q最近对【Sprin…

性能优化之资源优化

性能优化之资源优化 资源优化性能关键检测流程。浅析一下基于Unity3D 美术规则约束一、模型层面二、贴图层面三、动画层面四、声音层面&#xff1a;&#xff08;音频通用设置&#xff09;五、UI层面&#xff1a; 题外点&#xff1a;诚然在优化中&#xff0c;美术占比是很重要的…

搭建接口自动化测试框架python+requests+pytest

安装python&#xff08;最好是比较新比较稳定的版本&#xff09;&#xff0c;然后是python的解释器或者叫编译器pycharm安装后新建一个项目&#xff0c;以此项目为基础&#xff0c;安装依赖搭建框架。打开pycharm&#xff0c;点击左上角的File->New project->弹出如下界面…

通过navcat的ssh连接 将一个服务器当作跳板连接远程mysql

文章目录 通过ssh连接一个服务器当作跳板连接远程mysql 通过ssh连接一个服务器当作跳板连接远程mysql 简单来说 一共三台机器 windows Linux&#xff08;入口&#xff09; Linux&#xff08;mysql&#xff09; windows 可以通过ssh 私钥连接Linux&#xff08;入口&#xff09;…