0基础学习PyFlink——水位线(watermark)触发计算

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。
在这里插入图片描述

为了解决长期不计算的问题,我们引入了在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》和《0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)》的方案。但是这个方案引入另外一个问题,就是每次处理数据可能不尽相同。这是因为它们使用了“处理时间”(Processing Time)来作为窗口划分的参考系,而每次程序处理时间会根据当前负载情况有很大的不同。这样我们对同一批数据做处理时,可能会得出不同的Window切分方案。
在这里插入图片描述
于是我们引入《0基础学习PyFlink——事件时间和运行时间的窗口》方案。它可以使用源自数据本身的“事件时间”(Event Time)作为Time Window的参考系,这样在不同负载、不同时间,相同数据的时间参考系是一样的,进而可以得出一致的结果。
在这里插入图片描述
但是现实中,我们没法保证上述数据是按照上面的顺序到达Flink的。
比如下面这个例子,红色部分都是乱序的,那么Flink如何处理这些数据呢?
在这里插入图片描述
只有两种可能性:

  1. 直接抛弃;
  2. 等待一段时间统一处理,超过等待的时间直接抛弃。因为不可能一直等下去,否则什么时候处理呢?

这些即有别于Count Window,也有别于Time Window。这个时候就要引入水位线(watermark)技术来解决这个问题。
在详细讲解之前,我们需要明确一些基本知识:

  1. EventTime就是Timestamp,即我们可以通过制定Timestamp函数设定元素的EventTime。
  2. EventTime从属于元素。
  3. Watermark源于EventTime和max_out_of_orderness(等待无序数据的时间),即Watermark=EventTime-max_out_of_orderness。
  4. Watermark从属于流。
  5. Window的Start源于EventTime;End源于Start和窗口时间,即End=Start+WindowTme;这是一个左闭右开的区间,即[Start, End)。
  6. Window从属于流,只有Watermark>=Window End时才会触发计算(且窗口中要有元素)。
  7. Window在单向递增前进,比如从[0,10)变成[10,20)、[20,30)……[90,100)。
  8. Wartermark单向递增前进,它不会因为新进入的元素EventTime较小,而导致Wartermark向变小的趋势发展。
    在这里插入图片描述
    上图中,第一个元素(A,1)的EventTime通过自定义公式可以得到101,于是初始的Window的Start值是该值向下取可以被Window Size整除的最大值,即100;这个进一步确认了第一个窗口是[100,105)。
    watermark是通过eventtime计算出来的,上例中我们希望如果事件在窗口时间之外到来则抛弃,即不等待任何时间,即Window End+0,即Eventtime-0。
    (A,0)数据来到的时候,watermark不会因为其Eventtime为100,比流中的watermark值(101)小而改变,依然维持watermark单调递增。这个在(A,2)和(A,5)到来时也是如此。
    (A,8)元素的到来,会让流的watermark变成108。这个值会越过当前窗口[100,105),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,1)、(A,0)、(A,3)和(A,4);
    (A,10)元素的到来,会让流的watermark变成110。这个值会越过当前窗口[100,110),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,8)、(A,6)、(A,7)和(A,9);而(A,2)因为不在这个区间内,就被抛弃了。我们也可以认为(A,2)迟到而被抛弃。
    为了更好讲述原理,上述例子存在一个假设:watertime更新是随着元素一个个进入而改变的。而实际元素进入个数不太确定,比如可能会两个两个进入,那么就会变成如下。主要区别就是(A,5)参与了第二次窗口计算,虽然它迟到了,而且watermark计算方法也不打算等待任何一个迟到的数据,但是它和(A,10)一起进入时间戳计算逻辑,导致触发的时机被滞后,从而“幸运”的赶上了最后一轮窗口计算。如果它稍微再晚一点到来,它也会被抛弃。
    在这里插入图片描述

测试代码

import time
from pyflink.common import Duration, WatermarkStrategy, Time, Types
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
from pyflink.datastream.functions import AllWindowFunction, ProcessFunction, ProcessAllWindowFunction, KeyedProcessFunction
from pyflink.table.expressions import lit, col
from pyflink.table.window import Tumble
from pyflink.common.time import Instant
from pyflink.table.udf import udf
from pyflink.common import Rowclass WindowFunc(AllWindowFunction[tuple, tuple, TimeWindow]):def apply(self, window, inputs):out = "**************************WindowFunc**************************" \"\nwindow: start:{} end:{} \ninputs: {}" \"\n**************************WindowFunc**************************" \.format(Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end), inputs)print(out)for value in inputs:yield (value, Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end))class TimestampAssignerAdapter(TimestampAssigner):def extract_timestamp(self, value, record_timestamp: int):return value[1] * 1000class TimestampAssignerProcessFunctionAdapter(ProcessFunction):def process_element(self, value, ctx: 'ProcessFunction.Context'):out_put = "-----------------------TimestampAssignerProcessFunctionAdapter {}-----------------------" \"\nvalue: {} \ttimestamp: {} \tcurrent_processing_time: {} \tcurrent_watermark: {}" \"\n-----------------------TimestampAssignerProcessFunctionAdapter-----------------------" \.format(int(time.time()), value, Instant.of_epoch_milli(ctx.timestamp()),Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),Instant.of_epoch_milli(ctx.timer_service().current_watermark()))print(out_put)yield (value, Instant.of_epoch_milli(ctx.timestamp()), Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),Instant.of_epoch_milli(ctx.timer_service().current_watermark()))def gen_random_int_and_timestamp():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()# stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_execute_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)stream_execute_env.set_parallelism(1)stream_execute_env.get_config().set_auto_watermark_interval(2)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)ordinal_num_start = 0ordinal_num_end = 10rows_per_second = 1schame = Schema.new_builder().column('in_ord', DataTypes.INT()) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.in_ord.kind', 'sequence') \.option('fields.in_ord.start', str(ordinal_num_start)) \.option('fields.in_ord.end', str(ordinal_num_end)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')@udf(result_type=DataTypes.ROW([DataTypes.FIELD("in_ord", DataTypes.INT()), DataTypes.FIELD("calc_order", DataTypes.INT())]), input_types=[DataTypes.INT()])def colFunc(oneCol):ordinal_num_data_map = {0: 1, 1: 0, 2: 3, 3: 4, 4: 8, 5: 6, 6: 7, 7: 2, 8: 9, 9: 10, 10: 5}# ordinal_num_data_map = {0: 16, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9,#                       10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 0, 17: 17, 18: 18, 19: 19,#                       20: 20, 21: 121, 22: 122, 23: 123, 24: 124, 25: 125, 26: 126, 27: 127, 28: 128, 29: 129,}data = ordinal_num_data_map[oneCol] + 100return Row(oneCol, data)input_table=table.map(colFunc(col('in_ord')))datastream = stream_table_env.to_data_stream(input_table)###############################################################################################    # datastream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(10))) \#                     .apply(WindowFunc())################################################################################################ watermark_strategy = WatermarkStrategy.no_watermarks().with_timestamp_assigner(TimestampAssignerAdapter())# datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)# datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())# datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.milliseconds(10))) \#                     .apply(WindowFunc())        ################################################################################################ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(TimestampAssignerAdapter())watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(0)).with_timestamp_assigner(TimestampAssignerAdapter())datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \.apply(WindowFunc())###############################################################################################stream_execute_env.execute()if __name__ == '__main__':gen_random_int_and_timestamp()

-----------------------TimestampAssignerProcessFunctionAdapter 1699856800-----------------------
value: Row(in_ord=0, calc_order=101) timestamp: Instant<101, 0> current_processing_time: Instant<1699856800, 705000000> current_watermark: Instant<-9223372036854776, 192000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=1, calc_order=100) timestamp: Instant<100, 0> current_processing_time: Instant<1699856802, 700000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=2, calc_order=103) timestamp: Instant<103, 0> current_processing_time: Instant<1699856802, 702000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=3, calc_order=104) timestamp: Instant<104, 0> current_processing_time: Instant<1699856804, 700000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=4, calc_order=108) timestamp: Instant<108, 0> current_processing_time: Instant<1699856804, 709000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<100, 0> end:Instant<105, 0>
inputs: [Row(in_ord=0, calc_order=101), Row(in_ord=1, calc_order=100), Row(in_ord=2, calc_order=103), Row(in_ord=3, calc_order=104)]
WindowFunc
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=5, calc_order=106) timestamp: Instant<106, 0> current_processing_time: Instant<1699856806, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=6, calc_order=107) timestamp: Instant<107, 0> current_processing_time: Instant<1699856806, 705000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=7, calc_order=102) timestamp: Instant<102, 0> current_processing_time: Instant<1699856808, 700000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=8, calc_order=109) timestamp: Instant<109, 0> current_processing_time: Instant<1699856808, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=9, calc_order=110) timestamp: Instant<110, 0> current_processing_time: Instant<1699856809, 440000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=10, calc_order=105) timestamp: Instant<105, 0> current_processing_time: Instant<1699856809, 441000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<105, 0> end:Instant<110, 0>
inputs: [Row(in_ord=4, calc_order=108), Row(in_ord=5, calc_order=106), Row(in_ord=6, calc_order=107), Row(in_ord=8, calc_order=109), Row(in_ord=10, calc_order=105)]
WindowFunc
WindowFunc
window: start:Instant<110, 0> end:Instant<115, 0>
inputs: [Row(in_ord=9, calc_order=110)]

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/142850.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Interactive Analysis of CNN Robustness

Interactive Analysis of CNN Robustness----《CNN鲁棒性的交互分析》 摘要 虽然卷积神经网络&#xff08;CNN&#xff09;作为图像相关任务的最先进模型被广泛采用&#xff0c;但它们的预测往往对小的输入扰动高度敏感&#xff0c;而人类视觉对此具有鲁棒性。本文介绍了 Pert…

微信小程序display常用属性和子元素排列方式介绍

wxss中display常用显示属性与css一致&#xff0c;介绍如下&#xff1a; 针对元素本身显示的属性&#xff1a; displayblock&#xff0c;元素显示换行displayinline&#xff0c;元素显示换行&#xff0c;但不可设置固定的宽度和高度&#xff0c;也不可设置上下方向的margin和p…

Python | 机器学习之聚类算法

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《人工智能奇遇记》&#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 目录结构 1. 机器学习之聚类算法概念 1.1 机器学习 1.2 聚类算法 2. 聚类算法 2.1 实验目的…

Riskified: 2023年电商政策滥用问题恶化,正严重挑战商家盈利底线

2023年11月14日&#xff0c;中国上海 —— 近日&#xff0c;由全球领先的电子商务欺诈和风险智能解决方案提供商 Riskified 发布的《政策滥用及其对商家的影响&#xff1a;2023年全球参考基准》报告显示&#xff0c;政策滥用问题正进一步恶化&#xff0c;超过九成电商商家正在承…

七个优秀微服务跟踪工具

随着微服务架构复杂性的增加&#xff0c;在问题出现时确定问题的根本原因变得更具挑战性。日志和指标为我们提供了有用的信息&#xff0c;但并不能提供系统的完整概况。这就是跟踪的用武之地。通过跟踪&#xff0c;开发人员可以监控微服务之间的请求进度&#xff0c;从而使他们…

WebSocket真实项目总结

websocket websocket是什么? websocket是一种网络通讯协议。 websocket 是HTML5开始提供的一种在单个TCP链接上进行全双工通讯的协议。 为什么需要websocket? 初次接触websocket&#xff0c;都会带着疑惑去学习&#xff0c;既然已经有了HTTP协议&#xff0c;为什么还需要另一…

【数据结构】——单链表(增删查改)

目录 前言&#xff1a; 一&#xff1a;单链表的特点 ​编辑 二&#xff1a;单链表实现 单链表定义 2.1申请节点&#xff08;初始化&#xff09; 2.2单链表尾插 ​编辑 2.3单链表打印 2.4单链表头插 2.5单链表尾删 2.6单链表头删 2.7单链表查找 2.8在目标位置后面插入…

通用结构化剪枝DepGraph

文章目录 0. 前言一. 第一部分: Torch-Pruning1.1 传统的剪枝流程 - ResNet-18结构化剪枝1.2 Torch-Pruning剪枝 - ResNet-18结构化剪枝1.3 Torch-Pruning剪枝 - 遍历所有分组1.4 Torch-Pruning剪枝 - 剪枝器 High-level Pruners1.5 Torch-Pruning剪枝 - 拓展到更复杂的神经网…

基于入侵杂草算法优化概率神经网络PNN的分类预测 - 附代码

基于入侵杂草算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于入侵杂草算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于入侵杂草优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

Python机器学习、深度学习提升气象、海洋、水文领域实践应用

Python是功能强大、免费、开源&#xff0c;实现面向对象的编程语言&#xff0c;能够在不同操作系统和平台使用&#xff0c;简洁的语法和解释性语言使其成为理想的脚本语言。除了标准库&#xff0c;还有丰富的第三方库&#xff0c;Python在数据处理、科学计算、数学建模、数据挖…

【Apifox】国产测试工具雄起

在开发过程中&#xff0c;我们总是避免不了进行接口的测试&#xff0c; 而相比手动敲测试代码&#xff0c;使用测试工具进行测试更为便捷&#xff0c;高效 今天发现了一个非常好用的接口测试工具Apifox 相比于Postman&#xff0c;他还拥有一个非常nb的功能&#xff0c; 在接…

vue-组件通信(动态组件)

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容:vue-组件通信|动态组件 目录 组件通信 1.父传子 2.子传父 3.ref 4.兄弟组件 5.跨层级 provid…

Xilinx Zynq 7000系列中端FPGA解码MIPI视频,基于MIPI CSI-2 RX Subsystem架构实现,提供5套工程源码和技术支持

目录 1、前言免责声明 2、我这里已有的 MIPI 编解码方案3、本 MIPI CSI2 模块性能及其优缺点4、详细设计方案设计原理框图OV5640及其配置权电阻硬件方案MIPI CSI-2 RX SubsystemSensor Demosaic图像格式转换Gammer LUT伽马校正VDMA图像缓存AXI4-Stream toVideo OutHDMI输出 5、…

【JavaEE】Servlet(创建Maven、引入依赖、创建目录、编写及打包、部署和验证、smart Tomcat)

一、什么是Servlet&#xff1f; Servlet 是一种实现动态页面的技术. 是一组 Tomcat 提供给程序猿的 API, 帮助程序猿简单高效的开发一个 web app 1.1 Servlet能干什么&#xff1f; &#x1f695;允许程序猿注册一个类, 在 Tomcat 收到某个特定的 HTTP 请求的时候, 执行这个类…

P3371 【模板】单源最短路径(弱化版)

【模板】单源最短路径&#xff08;弱化版&#xff09; 题目背景 本题测试数据为随机数据&#xff0c;在考试中可能会出现构造数据让SPFA不通过&#xff0c;如有需要请移步 P4779。 题目描述 如题&#xff0c;给出一个有向图&#xff0c;请输出从某一点出发到所有点的最短路…

Kotlin基础——接口和类

接口 使用 : 表示继承关系&#xff0c;只能继承一个类&#xff0c;但可以实现多个接口override修饰符表示重写可以有默认方法&#xff0c;若父类的默认方法冲突&#xff0c;则需要子类重写&#xff0c;使用super<XXX>.xxx()调用某一父类方法 interface Focusable {fun …

SQL学习之增删改查

文章目录 数据库数据类型建表create table插入数据insert into查询数据select from修改数据update set删除数据delete from备份ctas结果插入iis截断表 truncate table修改表结构alter table添加注释 注&#xff1a;本文的SQL语法是基于Oracle数据库操作的&#xff0c;但是基本的…

开源软件 FFmpeg 生成模型使用图片数据集

本篇文章聊聊&#xff0c;成就了无数视频软件公司、无数在线视频网站、无数 CDN 云服务厂商的开源软件 ffmpeg。 分享下如何使用它将各种视频或电影文件&#xff0c;转换成上万张图片数据集、壁纸集合&#xff0c;来让下一篇文章中的模型程序“有米下锅”&#xff0c;这个方法…

⑨【MySQL事务】事务开启、提交、回滚,事务特性ACID,脏读、幻读、不可重复读。

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ MySQL事务 ⑨【事务】1. 事务概述2. 操作事务3…

若依Linux与Docker集群部署

若依Linux集群部署 1. 若依2.MYSQL Linux环境安装2.1 MYSQL数据库部署和安装2.2 解压MYSQL安装包2.3 创建MYSQL⽤户和⽤户组2.4 修改MYSQL⽬录的归属⽤户2.5 准备MYSQL的配置⽂件2.6 正式开始安装MYSQL2.7 复制启动脚本到资源⽬录2.8 设置MYSQL系统服务并开启⾃启2.9 启动MYSQL…