【大数据】详解 Flink 中的 WaterMark

详解 Flink 中的 WaterMark

  • 1.基础概念
    • 1.1 流处理
    • 1.2 乱序
    • 1.3 窗口及其生命周期
    • 1.4 Keyed vs Non-Keyed
    • 1.5 Flink 中的时间
  • 2.Watermark
    • 2.1 案例一
    • 2.2 案例二
    • 2.3 如何设置最大乱序时间
    • 2.4 延迟数据重定向
  • 3.在 DDL 中的定义
    • 3.1 事件时间
    • 3.2 处理时间

1.基础概念

1.1 流处理

流处理,最本质的是在处理数据的时候,接受一条处理一条数据。

批处理,则是累积数据到一定程度在处理。这是他们本质的区别。

在设计上 Flink 认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。

  • 有界数据对应批处理,API 对应 DateSet
  • 无界数据对应流处理,API 对应 DataStream

1.2 乱序

什么是乱序呢?

可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的。虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order 或者说 late element)。

✅ 某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有 5 秒的延时,也就是在实际时间的第 1 秒产生的数据有可能在第 5 秒中产生的数据之后到来(比如到 Window 处理节点)。例如,有 1 ~ 10 个事件,乱序到达的序列是:2, 3, 4, 5, 1, 6, 3, 8, 9, 10, 7。

1.3 窗口及其生命周期

对于 Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于 Spark 和 Flink 都产生了窗口计算。

比如,因为我们想看到过去一分钟或过去半小时的访问数据,这时候我们就需要窗口。

  • Window:Window 是处理无界流的关键,Window 将流拆分为一个个有限大小的 buckets,可以在每一个 buckets 中进行计算。
  • 当 Window 是时间窗口的时候,每个 Window 都会有一个开始时间(start_time)和结束时间(end_time)(左闭右开),这个时间是系统时间。

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

窗口有如下组件:

  • Window Assigner:用来决定某个元素被分配到哪个或哪些窗口中去。
  • Trigger:触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于 “当窗口中的元素数量大于 4” 时,或 “当水位线通过窗口结束时”。
  • Evictor:驱逐器。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。

窗口还拥有函数,比如 ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。

1.4 Keyed vs Non-Keyed

在定义窗口之前,要指定的第一件事是流是否需要 Keyed,使用 keyBy(...) 将无界流分成逻辑的 keyed stream。如果未调用 keyBy(...),则表示流不是 keyed stream

  • 对于 Keyed 流,可以将传入事件的任何属性用作 key。拥有 keyed stream 将允许窗口计算由多个任务并行执行,因为每个逻辑 Keyed 流可以独立于其余任务进行处理。相同 Key 的所有元素将被发送到同一个任务。
  • 在 Non-Keyed 流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为 1。

1.5 Flink 中的时间

Flink 在流处理程序支持不同的时间概念。分别为是 事件时间Event Time)、处理时间Processing Time)、提取时间Ingestion Time)。

从时间序列角度来说,发生的先后顺序是:事件时间提取时间处理时间

  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入 Apache Flink 流处理系统的时间,也就是 Flink 读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是 Flink 程序处理该事件时当前系统时间。

2.Watermark

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。Watermark 是用于处理乱序事件或延迟数据的,这通常用 Watermark 机制结合 Window 来实现(Watermarks 用来触发 Window 窗口计算)。

2.1 案例一

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3000; // 3.0 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime();currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness bound// 生成 watermarkreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
}

在这里插入图片描述
上图中是一个 10s 大小的窗口,1000020000 为一个窗口。当 EventTime 为 23000 的数据到来,生成的 WaterMark 的时间戳为 20000,大于等于 window_end_time,会触发窗口计算。

2.2 案例二

public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 3000; // 3 seconds@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);}
}

在这里插入图片描述
只是简单的用当前系统时间减去最大延迟时间生成 Watermark ,当 WaterMark 为 20000 时,大于等于窗口的结束时间,会触发 1000020000 窗口计算。再当 EventTime 为 19500 的数据到来,它本应该是属于窗口 1000020000 窗口的,但这个窗口已经触发计算了,所以此数据会被丢弃。

2.3 如何设置最大乱序时间

虽说水位线表明着早于它的事件不应该再出现,接收到水位线以前的的消息是不可避免的,这就是所谓的 迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有 3 种:

  • 重新激活已经关闭的窗口并重新计算以修正结果。将迟到事件收集起来另外处理。将迟到事件视为错误消息并丢弃。Flink 默认的处理方式是直接丢弃,其他两种方式分别使用 Side OutputAllowed Lateness
  • Side Output 机制 可以将迟到事件单独放入一个数据流分支,这会作为 Window 计算结果的副产品,以便用户获取并对其进行特殊处理。
  • Allowed Lateness 机制 允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间迟到的事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

这里总结机制为:

  • 窗口 Window 的作用是为了周期性的获取数据。
  • WaterMark 的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess 是将窗口关闭时间再延迟一段时间。
  • sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
public class TumblingEventWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);
//        env.getConfig().setAutoWatermarkInterval(100);DataStream<String> socketStream = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Long>> resultStream = socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {@Overridepublic long extractTimestamp(String element) {long eventTime = Long.parseLong(element.split(" ")[0]);System.out.println(eventTime);return eventTime;}}).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return Tuple2.of(value.split(" ")[1], 1L);}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许延迟处理2秒.reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});resultStream.print();env.execute();}
}

在这里插入图片描述
watermark 为 21000 时,触发了 [10000, 20000) 窗口计算,由于设置了 allowedLateness(Time.seconds(2)),即允许两秒延迟处理,watermark < window_end_time + lateTime 公式得到满足,因此随后 10000 和 12000 进入窗口时,依然能触发窗口计算;随后 watermark 增加到 22000,watermark < window_end_time + lateTime 不再满足,因此 11000 再次进入窗口时,窗口不再进行计算。

2.4 延迟数据重定向

流的返回值必须是 SingleOutputStreamOperator,其是 DataStream 的子类。通过 getSideOutput 方法获取延迟数据。可以将延迟数据重定向到其他流或者进行输出。

public class TumblingEventWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);DataStream<String> socketStream = env.socketTextStream("localhost", 9999);//保存被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = socketStream// Time.seconds(3)有序的情况修改为0.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {@Overridepublic long extractTimestamp(String element) {long eventTime = Long.parseLong(element.split(" ")[0]);System.out.println(eventTime);return eventTime;}}).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return Tuple2.of(value.split(" ")[1], 1L);}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sideOutputLateData(outputTag) // 收集延迟大于2s的数据.allowedLateness(Time.seconds(2)) //允许2s延迟.reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});resultStream.print();//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中DataStream<Tuple2<String, Long>> sideOutput = resultStream.getSideOutput(outputTag);sideOutput.print();env.execute();}
}

3.在 DDL 中的定义

3.1 事件时间

事件时间属性是通过 CREATE TABLE DDL 语句中的 WATERMARK 语句定义的。水印语句在现有事件时间字段上定义 水印生成表达式,将事件时间字段标记为事件时间属性。

Flink SQL 支持在 TIMESTAMPTIMESTAMP_LTZ 列上定义事件时间属性。如果源中的时间戳数据以 年-月-日-时-分-秒 表示,通常是不含时区信息的字符串值,例如 2020-04-15 20:13:40.564,建议将事件-时间属性定义为 TIMESTAMP 列。

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- Declare the user_action_time column as an event-time attribute-- and use a 5-seconds-delayed watermark strategy.WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果数据源中的时间戳数据以纪元时间表示,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列。

CREATE TABLE user_actions (user_name STRING,data STRING,ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- Declare the time_ltz column as an event-time attribute-- and use a 5-seconds-delayed watermark strategy.WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

3.2 处理时间

处理时间能让表格程序根据本地机器的时间产生结果。这是最简单的时间概念,但会产生非确定性结果。处理时间不需要提取时间戳或生成水印。

CREATE TABLE DDL 语句中,使用系统 PROCTIME() 函数将处理时间属性定义为计算列。函数返回类型为 TIMESTAMP_LTZ

CREATE TABLE user_actions (user_name STRING,data STRING,-- Declare an additional field as a processing-time attribute.user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652784.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度推荐模型之DeepFM

一、FM 背景&#xff1a;主要解决大规模稀疏数据下的特征组合遇到的问题&#xff1a;1. 二阶特征参数数据呈指数增长 怎么做的&#xff1a;对每个特征引入大小为k的隐向量&#xff0c;两两特征的权重值通过计算对应特征的隐向量内积 而特征之间计算点积的复杂度原本为 实际应…

幻兽帕鲁的搭建和幻兽帕鲁需要什么配置的服务器

前言 大家好&#xff0c;今天教大家如何快速搭建幻兽帕鲁&#xff0c;并能满足8-32人游玩 第一步 购买服务器 1.CPU&#xff1a;4核&#xff08;最低需要4核起&#xff0c;当然可以选择更高的&#xff09;CPU的选择更看重单核性能&#xff0c;尽量选择主频2.5GHz以上的&#…

OpenTCS IDEA 全流程搭建和运行指南

OpenTCS IDEA 全流程搭建和运行指南 JDK安装下载JDK版本 openTCS源码下载IDEA 打开运行查看下载源码中gradle版本号下载gradle 二进制文件配置IDEA Gradle本地仓库IDEA打开openTCS项目运行顺序 JDK安装 下载JDK版本 JDK网址 注意&#xff1a; 请下载官方文档标准的java13JDK o…

4G模块Air724如何访问天气信息

1.这是获得json数据&#xff1a; 左边是标准官方api说明中的&#xff0c;右边是我用串口获取的&#xff1a; 2.首先找一个天气服务器&#xff0c;我的&#xff1a;YY天气&#xff0c;直接百度&#xff0c;注册&#xff0c;然后找api即可&#xff1a; 3.用接口工具测试接口是否…

AV Foundation 视频播放中的可视拖拽进度条

引言 在视频播放软件中&#xff0c;通过拖拽进度条来调整播放进度几乎已成为不可或缺的功能。这一功能使用户能够精确指定视频播放的时间点。近年来&#xff0c;视频播放器在原有的拖拽进度条基础上进行了更加人性化的性能提升&#xff0c;引入了可视化拖拽条。这一创新为用户…

Ps:根据 HSB 调色(以可选颜色命令为例)

在数字色彩中&#xff0c;RGB 和 HSV&#xff08;又称 HSB&#xff09;是两种常用的颜色表示方式&#xff08;颜色模型&#xff09;。 在 RGB 颜色模式下&#xff0c;Photoshop 的红&#xff08;Red&#xff09;、绿&#xff08;Green&#xff09;、蓝&#xff08;Blue&#xf…

基于SpringBoot微信小程序的宠物美容预约系统设计与实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行交流合作✌ 主要内容&#xff1a;SpringBoot、Vue、SSM、HLM…

rabbitmq基础-java-3、Fanout交换机

1、简介 Fanout&#xff0c;英文翻译是扇出。 2、 特点 1&#xff09; 可以有多个队列 2&#xff09; 每个队列都要绑定到Exchange&#xff08;交换机&#xff09; 3&#xff09; 生产者发送的消息&#xff0c;只能发送到交换机 4&#xff09; 交换机把消息发送给绑定过的…

应用机器学习的建议

一、决定下一步做什么 在你得到你的学习参数以后&#xff0c;如果你要将你的假设函数放到一组新的房屋样本上进行测试&#xff0c;假如说你在预测房价时产生了巨大的误差&#xff0c;你想改进这个算法&#xff0c;接下来应该怎么办&#xff1f;实际上你可以考虑先采用下面的几种…

防御保护--第一次实验

目录 一&#xff0c;vlan的划分及在防火墙上创建单臂路由 二&#xff0c;创建安全区域 三&#xff0c;配置安全策略 四&#xff0c;配置认证策略 五&#xff0c;配置NAT策略 1.将内网中各个接口能够ping通自己的网关 2..生产区在工作时间内可以访问服务器区&#xff0c;仅…

AI大模型开发架构设计(6)——AIGC时代,如何求职、转型与选择?

文章目录 AIGC时代&#xff0c;如何求职、转型与选择&#xff1f;1 新职场&#xff0c;普通人最值钱的能力是什么?2 新职场成长的3点建议第1点&#xff1a;目标感第2点&#xff1a;执行力第3点&#xff1a;高效生产力 3 新职场会产生哪些新岗位机会?如何借势?4 新职场普通人…

数据结构-线性表

文章目录 数据结构—线性表1.线性表的定义和基本操作线性表的定义线性表的特点线性表的基本操作 2.线性表的顺序存储和链式存储表示顺序存储链式存储单链表循环链表双向链表 数据结构—线性表 1.线性表的定义和基本操作 线性表的定义 定义&#xff1a;线性表是具有相同数据类…

数据结构篇-03:堆实现优先级队列

本文着重在于讲解用 “堆实现优先级队列” 以及优先级队列的应用&#xff0c;在本文所举的例子中&#xff0c;可能使用优先级队列来解并不是最优解法&#xff0c;但是正如我所说的&#xff1a;本文着重在于讲解“堆实现优先级队列” 堆实现优先级队列 堆的主要应用有两个&…

sqli-lbs靶场搭建

目录 环境小皮源码下载 1.源码解压&#xff1a; 2.搭建网站 2.1点击创建网站 2.2修改sql-connections\db-creds.inc 2.3重新启动 3.访问你设置的域名 3.1点击启动数据库配置 3.2返回第一个页面&#xff08;开启题目&#xff09; sqlilbs靶场搭建 环境小皮源码下载 下载地址&am…

【服务器】宝塔面板的使用手册

目录 &#x1f337;概述 &#x1f33c;1. 绑定域名 &#x1f33c;2. 添加端口 &#x1f33c;3. 安装docker配置docker​​​​​​​ &#x1f33c;4. 软件商店 &#x1f33c;5. 首页 &#x1f337;概述 宝塔面板的安装教程&#xff1a;【服务器】安装宝塔面板 &#x1f…

golang封装业务err(结合iris)

golang封装业务err 我们有时在web开发时&#xff0c;仅凭httpStatus以及msg是不方便维护和体现我们的业务逻辑的。所以就需要封装我们自己的业务错误。 自定义biz_err维护err map&#xff1a;errorResponseMap、errorHttpStatusMap 注意&#xff1a;本文主要以演示为主&#xf…

uniapp 用css animation做的鲤鱼跃龙门小游戏

第一次做这种小游戏&#xff0c;刚开始任务下来我心里是没底的&#xff0c;因为我就一个‘拍黄片’的&#xff0c;我那会玩前端的动画啊&#xff0c;后面尝试写了半天&#xff0c;当即我就给我领导说&#xff0c;你把我工资加上去&#xff0c;我一个星期给你做出来&#xff0c;…

Vulnhub靶场DC-9

攻击机192.168.223.128 靶机192.168.223.138 主机发现 nmap -sP 192.168.223.0/24 端口扫描 nmap -sV -p- -A 192.168.223.138 开启了22 80端口 访问一下web页面 有个查询界面 测试发现存在post型的sql注入 用sqlmap跑一下&#xff0c;因为是post型的&#xff0c;这里…

用于不对称卷积的验证参数的小程序

非对称卷积的特征图尺寸计算 此处只例举输入图像是正方形的情况。设输入图像尺寸为WxW&#xff0c;卷积核尺寸为ExF&#xff0c;步幅为S&#xff0c;Padding为P&#xff0c;卷积后的特征图尺寸为&#xff1a; 矩形卷积 如果输入图像是正方形&#xff0c;尺寸为WxW&#xff0c…

C++二叉搜索树详解

文章目录 1. 前言2. 二叉搜索树的概念3. 二叉搜索树的操作4. 二叉搜索树的实现5. 二叉搜索树的应用6. 二叉搜索树的性能分析 1. 前言 当涉及到组织和管理数据时&#xff0c;二叉搜索树是一种常用的数据结构。它不仅可以快速插入和删除元素&#xff0c;还可以高效地搜索和查找特…