7.2、如何理解Flink中的水位线(Watermark)

目录

0、版本说明

1、什么是水位线?

2、水位线使用场景?

3、设计水位线主要为了解决什么问题?

4、怎样在flink中生成水位线?

4.1、自定义标记 Watermark 生成器

4.2、自定义周期性 Watermark 生成器

4.3、内置Watermark生成器 - 有序流水位线生成器

4.4、内置Watermark生成器 - 乱序流水位线生成器

4.5、在 读取数据源时 添加水位线

5、水位线和窗口的关系?

6、水位线在各个算子间的传递

6.1、测试用例 - 不设置 withIdleness 超时时间

6.2、测试用例 - 设置 withIdleness 超时时间


0、版本说明

        开发语言:java1.8

        Flink版本:1.17

        官网链接:官网链接

1、什么是水位线?

        Flink中水位线是一条特殊的数据(long timestamp)

        它会以时间戳的形式作为一条标识数据插入到数据流中


2、水位线使用场景?

        使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

        通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


3、设计水位线主要为了解决什么问题?

        设计水位线主要是为了解决实时流中数据乱序和迟到的问题

        思考:什么原因造成了数据流的乱序呢?

                如今数据采集、数据传输大多都在分布式系统中完成

                各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


4、怎样在flink中生成水位线?

        Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

4.1、自定义标记 Watermark 生成器

标记 Watermark 生成器特点:

        每条数据到来后,都会为其生成一条 Watermark

适用场景:

        数据量小且数据有序

代码示例:        

Step1:自定义 标记水位线生成器 实现类

// 自定义 标记水位线生成器 实现类
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {// 每进入一条数据,都会调用一次 onEvent 方法@Override/** 参数说明:*   @event : 进入到该方法的事件数据*   @eventTimestamp : 时间戳提取器提取的时间戳* */public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {//发射水位线output.emitWatermark(new Watermark(eventTimestamp));}// 不需要实现@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}
}

Step2:自定义 标记性水位线生成策略 实现类

// TODO 自定义 标记性水位线生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {// TODO 实例化一个 事件时间提取器@Overridepublic TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}};return timestampAssigner;}// TODO 实例化一个 watermark 生成器@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new PeriodWatermarkGenerator<>();}
}

Step3:使用 标记性水位线生成策略

// TODO 使用 自定义标记 Watermark 生成器
public class UserPeriodWatermarkStrategy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 5.触发程序执行env.execute();}
}

查看运行结果:


4.2、自定义周期性 Watermark 生成器

标记 Watermark 生成器特点:

        基于处理时间,周期性生成 Watermark

适用场景:

        数据量大且可能存在一定程度数据延迟(乱序)

代码示例:        

Step1:自定义 周期性水位线生成器 实现类

// 自定义 周期性水位线生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {// 设置变量,用来保存 当前最大的事件时间private long currentMaxTimestamp;// 设置变量,指定最大的乱序时间(等待时间)private final long maxOutOfOrderness = 0000; // 3 秒@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 只更新当前最大时间戳,不再发生水位线if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;}// 周期性 生成水位线// 每个 setAutoWatermarkInterval 时间,调用一次该方法@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark = 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));}
}

Setp2:自定义 周期性水位线生成策略 实现类

// 自定义 周期性水位线生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {// TODO 实例化一个 事件时间提取器@Overridepublic TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}};return timestampAssigner;}// TODO 实例化一个 watermark 生成器@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new PunctuatedWatermarkGenerator<>();}}

Step3:周期性水位线生成策略

// TODO 使用 自定义周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();// TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>()).withTimestampAssigner((event, timestamp) -> event.f1);// 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.3、内置Watermark生成器 - 有序流水位线生成器

有序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,最大乱序时间为0

适用场景:

        大数量有序流

代码示例:

// TODO 内置Watermark生成器 - 有序流水位线生成器
public class UserForMonotonousTimestamps {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 创建 内置水位线生成策略WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner((element,recordTimestamp) -> element.f1);// 3.使用 内置水位线生成策略SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.4、内置Watermark生成器 - 乱序流水位线生成器

乱序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

适用场景:

        大数量乱序流

代码示例:

// TODO 内置Watermark生成器 - 乱序流水位线生成器
public class UserForBoundedOutOfOrderness {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 获取 WatermarkStrategy实例WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s.withTimestampAssigner((element,recordTimestamp) -> element.f1);// 3.使用 内置水位线生成策略SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.5、在 读取数据源时 添加水位线

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.创建 Source 对象
Source source = DataGeneratorSource、KafkaSource...// 3.读取 source时添加水位线
env.fromSource(source, WatermarkStrategy实例, "source name")   .print()
;// 4.触发程序执行
env.execute();

5、水位线和窗口的关系?

窗口什么时候创建?

        当窗口内的第一条数据到达时

窗口什么时候触发计算?

        当阈值水位线到达窗口时


6、水位线在各个算子间的传递

        下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

测试代码:

// TODO 测试水位线的传递
public class TransmitWaterMark {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 2.将socket作为数据源(开启socket端口: nc -lk 9999)DataStreamSource<String> source = env.socketTextStream("localhost", 9999);source.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {if (key.equals("a")) {return 0;} else if (key.equals("b")) {return 1;} else {return 2;}}}, value -> value.split(",")[0]).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy//.<Tuple2<String, Long>>forMonotonousTimestamps().<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy()).withTimestampAssigner((element,recordTimestamp) -> element.f1).withIdleness(Duration.ofSeconds(5))  //空闲等待5s).process(new ShowProcessFunction()).setParallelism(1).print();env.execute();}
}

6.1、测试用例 - 不设置 withIdleness 超时时间

现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化


6.2、测试用例 - 设置 withIdleness 超时时间

现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/85117.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软考网络工程师华为配置考点总结

华为交换机配置基础 1.vlan的配置 华为设备中划分VLAN的方式有&#xff1a; 静态的划分&#xff1a;基于接口动态划分&#xff1a;基于MAC地址、基于IP子网、基于协议、基于策略&#xff08;MAC地址、Ip地址&#xff09;。 其中基于接口划分VLAN&#xff0c;是最简单&#x…

AI视频剪辑:批量智剪技巧大揭秘

对于许多内容创作者来说&#xff0c;视频剪辑是一项必不可少的技能。然而&#xff0c;传统的视频剪辑方法需要耗费大量的时间和精力。如今&#xff0c;有一种全新的剪辑方式正在改变这一现状&#xff0c;那就是批量AI智剪。这种智能化的剪辑方式能够让你在短时间内轻松剪辑大量…

Fourier变换的积分性质及其证明过程

Fourier变换的积分性质及其证明过程 一、积分性质 如果当 t → ∞ t \to \infty t→∞时&#xff0c; g ( t ) ∫ − ∞ t f ( t ) d t → 0 g(t) \int_{ - \infty }^t {f(t){\rm{d}}t \to 0} g(t)∫−∞t​f(t)dt→0&#xff0c;则&#xff1a; F [ ∫ − ∞ t f ( t ) …

2591. 将钱分给最多的儿童(Java)

给你一个整数 money &#xff0c;表示你总共有的钱数&#xff08;单位为美元&#xff09;和另一个整数 children &#xff0c;表示你要将钱分配给多少个儿童。 你需要按照如下规则分配&#xff1a; 所有的钱都必须被分配。 每个儿童至少获得 1 美元。 没有人获得 4 美元。 请你…

代码随想录算法训练营第23期day3| 203.移除链表元素 ,707.设计链表,206.反转链表

目录 一、链表 基础操作 二、&#xff08;leetcode 203&#xff09;移除链表元素 1.使用原来的链表 2.设置虚拟头结点 三、&#xff08;leetcode 707&#xff09;设计链表 四、&#xff08;leetcode 206&#xff09;反转链表 1.双指针法 2.递归法 一、链表 单链表定义…

大型集团借力泛微搭建语言汇率时区统一、业务协同的国际化OA系统

国际化、全球化集团&#xff0c;业务遍布全世界&#xff0c;下属公司众多&#xff0c;集团对管理方式和企业文化塑造有着很高的要求。不少大型集团以数字化方式助力全球统一办公&#xff0c;深化企业统一管理。 面对大型集团全球化的管理诉求&#xff0c;数字化办公系统作为集…

观测云产品更新 | 优化日志数据转发、索引绑定、基础设施自定义等

观测云更新 日志 数据转发&#xff1a;新增外部存储转发规则数据查询&#xff1b;支持启用/禁用转发规则&#xff1b;绑定索引&#xff1a;日志易新增标签绑定&#xff0c;从而实现更细颗粒度的数据范围查询授权能力。 基础设施 > 自定义 【默认属性】这一概念更改为【必…

指夹式脉搏血氧仪方案

随着科技的进步&#xff0c;家庭医疗器械的需求已经从简单测量到智能健康管理转变&#xff0c;比如能够对不同家庭成员的健康分别记录管理&#xff0c;将监测数据同步给家庭成员&#xff0c;专属家庭医生提供线上医疗服务等等。当前&#xff0c;在全球范围内&#xff0c;对更好…

9.19 校招 实习 内推 面经

绿泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 9.19 校招 实习 内推 面经 1、校招丨中联重科2024届校园招聘&#xff08;内推&#xff09; 校招丨中联重科2024届校园招聘&#xff08;内推&#xff09; 2、2023校招总结--SLAM/C开发 - 8 2…

接口自动化测试框架postman tests常用方法

【软件测试面试突击班】如何逼自己一周刷完软件测试八股文教程&#xff0c;刷完面试就稳了&#xff0c;你也可以当高薪软件测试工程师&#xff08;自动化测试&#xff09; postman常用方法集合&#xff1a; 1.​​​​​​设置环境变量 postman.setEnvironmentVariable("…

IntelliJ IDEA使用——Debug操作

文章目录 版本说明图标和快捷键查看变量计算表达式条件断点多线程调试 版本说明 当前的IntelliJ IDEA 的版本是2021.2.2&#xff08;下载IntelliJ IDEA&#xff09; ps&#xff1a;不同版本一些图标和设置位置可能会存在差异&#xff0c;但应该大部分都差不多。 图标和快捷键…

anaconda:Env creation from python 3.7 not working M1 Apple Silicon Mac

在anaconda上执行如下命令&#xff0c;想创建python3.7的环境 conda create -n myenv python3.7 发现无法创建 Collecting package metadata (current_repodata.json): done Solving environment: failed with repodata from current_repodata.json, will retry with next re…

DHCP与静态IP:哪种适合你的网络需求?

​如今&#xff0c;大多数网络设备&#xff08;如路由器或网络交换机&#xff09;都使用IP协议作为通过网络进行通信的标准。在IP协议中&#xff0c;网络上的每个设备都有一个唯一的标识符&#xff0c;称为IP地址。实现这一点的最简单方法是配置固定IP地址或静态IP地址。由于静…

elementUI elfrom表单验证无效、不起作用常见原因

今天遇到一个变态的问题&#xff0c;因页面比较复杂&#xff0c;出现几组条件判断&#xff0c;每个template内部又包含很多表单&#xff01;&#xff01; <template v-if"transformTypeValue 1"></template><template v-else-if"transformTypeV…

C语言-扫雷游戏的实现

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

如何实现线程池之间的数据透传 ?

如何实现线程池之间的数据透传 &#xff1f; 引言transmittable-thread-local概览capture如何 capture如何保存捕获的数据 save 和 replayrestore 小结 引言 当我们涉及到数据的全链路透传场景时&#xff0c;通常会将数据存储在线程的本地缓存中&#xff0c;如: 用户认证信息透…

​校园学习《乡村振兴战略下传统村落文化旅游设计》许少辉八一新著

​校园学习《乡村振兴战略下传统村落文化旅游设计》许少辉八一新著

【Stm32】【Lin通信协议】Lin通信点亮灯实验

Lin通信点亮灯实验 通过STM32的串口发送数据&#xff0c;然后通过串口转换模块将数据转换成LIN&#xff08;Local Interconnect Network&#xff09;协议&#xff0c;最终控制点亮灯。需要工程和入门资料的可以私信我&#xff0c;看到了马上回。 入门书本推荐&#xff1a; 一…

【C语言】数组和指针刷题练习

指针和数组我们已经学习的差不多了&#xff0c;今天就为大家分享一些指针和数组的常见练习题&#xff0c;还包含许多经典面试题哦&#xff01; 一、求数组长度和大小 普通一维数组 int main() {//一维数组int a[] { 1,2,3,4 };printf("%d\n", sizeof(a));//整个数组…

lettuce利用stream实现消息推送

1、消息推送 /*** Auther: pshdhx* Date: 2023/02/22/10:38* Description: 往同一个stream队列里边塞值&#xff0c;同一队列的所有消费者组&#xff0c;都会收到消息* 模拟 消息推送到服务器*/ public class TestPubStream {public static void main(String[] args) {// 创建…