0基础学习PyFlink——事件时间和运行时间的窗口

大纲

  • 定制策略
  • 运行策略
  • Reduce
  • 完整代码
  • 滑动窗口案例
  • 参考资料

在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTimeWindows)作为窗口的参考时间:

    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

而得到的结果也是不稳定的。
在这里插入图片描述
这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。
为了让结果稳定,我们可以不依赖运行时间(ProcessingTime),而使用不依赖于运行环境,只依赖于数据的事件时间(EventTime)。
一般,我们需要大数据处理的数据,往往存在一个字段用于标志该条数据的“顺序”。这个信息可以是单调递增的ID,也可以是不唯一的时间戳。我们可以将这类信息看做事件发生的时间。
那如何让输入的数据中的“事件时间”参与到窗口时长的计算中呢?这儿就要引入Watermark(水印)的概念。
假如我们把数据看成一张纸上的内容,水印则是这张纸的背景。它并不影响纸上内容的表达,只是系统要用它来做更多的事情。
将数据中表达“顺序”的数据转换成“时间”,我们可以使用水印单调递增时间戳分配器

定制策略

class ElementTimestampAssigner(TimestampAssigner):def extract_timestamp(self, value, record_timestamp)-> int:return int(value[1])……       # define the watermark strategywatermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \.with_timestamp_assigner(ElementTimestampAssigner())

for_monotonous_timestamps会分配一个水印单调递增时间戳分配器,然后使用with_timestamp_assigner告知输入数据中“顺序”字段的值。这样系统就会根据这个字段的值生成一个单调递增的时间戳。这个时间戳相对顺序就和输入数据一样,是稳定的。
比如上图中,会分别用2,1,4,3……来计算时间戳。

运行策略

然后对原始数据使用该策略,这样source_with_wartermarks中的数据就包含了时间戳。

source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)

Reduce

这次我们使用TumblingEventTimeWindows,即事件时间(EventTime)窗口,而不是运行时间(ProcessingTime)窗口。

     # keyingkeyed=source_with_wartermarks.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(‘E’, 1) TimeWindow(start=0, end=2)
(‘E’, 3) (‘E’, 2) TimeWindow(start=2, end=4)
(‘E’, 4) (‘E’, 5) TimeWindow(start=4, end=6)
(‘E’, 6) (‘E’, 7) TimeWindow(start=6, end=8)
(‘E’, 8) (‘E’, 9) TimeWindow(start=8, end=10)
(‘E’, 10) TimeWindow(start=10, end=12)
(E,1)
(E,2)
(E,2)
(E,2)
(E,2)
(E,1)

多运行几次,结果是稳定输出的。
我们再多关注下TimeWindow中的start和end,它们是不重叠的、步长为2、左闭右开的区间。这个符合滚动窗口特性。

完整代码

from typing import Iterablefrom pyflink.common import Types, Time, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows, SlidingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssignerclass ElementTimestampAssigner(TimestampAssigner):def extract_timestamp(self, value, record_timestamp)-> int:return int(value[1])class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5),("E",7),("E",8),("E",9),("E",10)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# define the watermark strategywatermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \.with_timestamp_assigner(ElementTimestampAssigner())source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)# keyingkeyed=source_with_wartermarks.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

滑动窗口案例

from typing import Iterablefrom pyflink.common import Types, Time, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
from pyflink.common.watermark_strategy import TimestampAssignerclass ElementTimestampAssigner(TimestampAssigner):def extract_timestamp(self, value, record_timestamp)-> int:return int(value[1])class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5),("E",7),("E",8),("E",9),("E",10)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# define the watermark strategywatermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \.with_timestamp_assigner(ElementTimestampAssigner())source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)# keyingkeyed=source_with_wartermarks.key_by(lambda i: i[0]) # reducingreduced=keyed.window(SlidingEventTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

(‘E’, 1) TimeWindow(start=0, end=2)
(‘E’, 1) (‘E’, 2) TimeWindow(start=1, end=3)
(‘E’, 3) (‘E’, 2) TimeWindow(start=2, end=4)
(‘E’, 3) (‘E’, 4) TimeWindow(start=3, end=5)
(‘E’, 4) (‘E’, 5) TimeWindow(start=4, end=6)
(‘E’, 6) (‘E’, 5) TimeWindow(start=5, end=7)
(‘E’, 6) (‘E’, 7) TimeWindow(start=6, end=8)
(‘E’, 7) (‘E’, 8) TimeWindow(start=7, end=9)
(‘E’, 8) (‘E’, 9) TimeWindow(start=8, end=10)
(‘E’, 9) (‘E’, 10) TimeWindow(start=9, end=11)
(‘E’, 10) TimeWindow(start=10, end=12)
(E,1)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,1)

通过TimeWindow的信息,我们看到这是一个步长为1、长度为2左闭右开的窗口。这个符合滑动窗口特点。

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/event-time/built_in/
  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/126609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第4章_运算符

文章目录 1. 算术运算符1.1 加法与减法运算符1.2 乘法与除法运算符1.3 求模运算符 2. 比较运算符2.1 等号运算符2.2 安全等于运算符2.3 不等于运算符2.4 空运算符2.5 非空运算符2.6 最小值运算符2.7 最大值运算符2.8 BETWEEN AND运算符2.9 IN运算符2.10 NOT IN运算符2.11 LIKE运…

代码随想录算法训练营第三十九天丨 动态规划part02

62.不同路径 思路 动态规划 机器人从(0 , 0) 位置出发,到(m - 1, n - 1)终点。 按照动规五部曲来分析: 确定dp数组(dp table)以及下标的含义 dp[i][j] :表示从(0 ,0)出发&#…

利用win32的GetLastInputInfo函数实现锁屏(C#)

前两天看到群里面讨论这个问题,刚好我们上一家公司的系统也有这个功能,就研究了一下,我们这边实现这个功能的目的如下:当用户长时间不操作系统时,自动退出系统并退回到登录界面,想要使用系统,就…

【数据结构】数组和字符串(十三):链式字符串的基本操作(串长统计、查找、复制、插入、删除、串拼接)

文章目录 4.3 字符串4.3.1 字符串的定义与存储4.3.2 字符串的基本操作(链式存储)1. 结构体2. 初始化3. 判空4. 串尾添加5. 打印6. 串长统计7. 查找8. 复制9. 插入10. 删除11. 串拼接12. 销毁13. 主函数14. 代码整合 4.3 字符串 字符串(String)是由零个或…

Latex排版SIGGRAPH总结(持续总结中...)

本文学习总结自:How to use the ACM SIGGRAPH / TOG LaTeX template 相关文件:百度网盘 首先解压 “my paper” 中的文件,并用Latex打开mypaper.tex. 多行连等公式 \begin{equation}表示编号公式,\[ \]表示无编号公式 无编号\b…

双轮差速模型机器人通过线速度、角速度计算机器人位姿

已知上一时刻机器人位置P_OLD (x,y,),机器人当前时刻的线速度和角速度(v,),短时间内t内,机器人在线性部分和非线性部分的增量为 线性部分: 非线性部分: 由于可能非常小,导致非线性部分数值不稳定&#xf…

【R统计】各式各样的插补法解决数据缺失的问题!

💂 个人信息:酷在前行👍 版权: 博文由【酷在前行】原创、需要转载请联系博主👀 如果博文对您有帮助,欢迎点赞、关注、收藏 订阅专栏🔖 本文收录于【R统计】,该专栏主要介绍R语言实现统计分析的…

【计算机视觉】对极几何

文章目录 一、极线约束(Epipolar Constraint)二、相机标定过的情况三、相机没有标定过的情况四、八点算法(eight-point algorithm) 我的《计算机视觉》系列参考UC Berkeley的CS180课程,PPT可以在课程主页看到。 在上一…

关于preempt count的疑问

Linux中的preempt_count - 知乎 https://www.cnblogs.com/hellokitty2/p/15652312.html LWN:关于preempt_count()的四个小讨论!-CSDN博客 主要是参考这些文章 之前一直认为只要是in_interrupt()返回非0值,那么就可以认为当前在中断上下文。即…

阿昌教你如何优雅的数据脱敏

阿昌教你如何优雅的数据脱敏 Hi,我是阿昌,最近有一个数据脱敏的需求,要求用户可自定义配置数据权限,并对某种类型数据进行脱敏返回给前端 一、涉及知识点 SpringMVCJava反射Java自定义注解Java枚举 二、方案选择 1、需求要求…

Webpack打包图片-js-vue

文章目录 一、Webpack打包图片1.加载图片资源的准备2.认识asset module type3.asset module type的使用4.url-loader的limit效果 二、babel1.为什么需要babel2.babel命令行的使用3.babel插件的使用4.babel的预设preset5.babel-loader6.babel-preset 三、加载Vue文件1.编写App.v…

使用Ansible中的playbook

目录 1.Playbook的功能 2.YAML 3.YAML列表 4.YAML的字典 5.playbook执行命令 6.playbook的核心组件 7.vim 设定技巧 示例 1.Playbook的功能 playbook 是由一个或多个play组成的列表 Playboot 文件使用YAML来写的 2.YAML #简介# 是一种表达资料序列的格式,类似XML #特…

开关电源测试过压保护的测试标准及其方法

过压保护的原理 过压保护是电压超过预定值时降低电压的一种方式,原理是通过电路中的电压检测电路来检测电路中的电压是否超过了设定的阈值,如果超过了阈值,就会触发过压保护器件,使电源断开或使受控设备电压降低,保护电…

网络协议--TCP的交互数据流

19.1 引言 前一章我们介绍了TCP连接的建立与释放,现在来介绍使用TCP进行数据传输的有关问题。 一些有关TCP通信量的研究如[Caceres et al. 1991]发现,如果按照分组数量计算,约有一半的TCP报文段包含成块数据(如FTP、电子邮件和U…

使用Fiddler进行Mock测试

1、接口抓包 找到要mock的接口,打开fiddler抓包 以某某接口为例,找到下面的接口 http://XXX/SYSTEMS 2、复制该接口数据到本地 在接口上进行右键点击,选择save -> …and Open as Local File -> 默认会保存至桌面,示例中的数…

uniapp的启动页、开屏广告

uniapp的启动页、开屏广告 启动页配置广告开屏 启动页配置 在manifest.json文件中找到APP启动界面配置,可以看到有Android和iOS的启动页面的配置 ,选择自定义启动图即可配置 广告开屏 在pages中新建一个广告开屏文件并在pases.json的最顶部配置这个页…

开发商城系统的一些小建议

电子商务的迅猛发展,商城系统已经成为了企业推广产品和服务、吸引更多消费者的重要工具。然而,要想在竞争激烈的市场中脱颖而出,提升用户体验成为了至关重要的一环。下面就商城系统的开发作一些简单分享,以帮助企业更好地满足用户…

跨国文件传输为什么要用专业的大文件传输软件?

跨国文件传输是许多跨国企业需要的基础工作,对于传输的质量和速度要求也是很严格的,随着数据量的不断增加,寻常传统的传输方式肯定是不行,需要新的技术和方式来进行传输,大文件传输软件应运而出,那它有什么…

联想百应:构建“生态资源池”,打造中小企业转型第一服务平台

与3800多家服务商和100多家SaaS生态伙伴携手,累计支持超过20万中小企业智能化转型……在近日由工业和信息化部和安徽省举办的2023全国中小企业数字化转型大会上,联想集团首次公布供应链、平台、技术、生态与绿色赋能五大赋能能力和助力中小企业“链式”成…

sqlite3 关系型数据库语言 SQL 语言

SQL(Structured Query Language)语言是一种结构化查询语言,是一个通用的,功能强大的关系型数据库操作语言. 包含 6 个部分: 1.数据查询语言(DQL:Data Query Language) 从数据库的二维表格中查询数据,保留字 SELECT 是 DQL 中用的最多的语句 2.数据操作语言(DML) 最主要的关…