0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

大纲

  • 滑动(Sliding)和滚动(Tumbling)的区别
  • 样例
    • 窗口为2,滑动距离为1
    • 窗口为3,滑动距离为1
    • 窗口为3,滑动距离为2
    • 窗口为3,滑动距离为3
  • 完整代码
  • 参考资料

在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。

滑动(Sliding)和滚动(Tumbling)的区别

正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。
在这里插入图片描述
而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。
在这里插入图片描述
它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。

样例

我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。

word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

窗口为2,滑动距离为1

count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。

    def count_window(self, size: int, slide: int = 0):"""Windows this KeyedStream into tumbling or sliding count windows.:param size: The size of the windows in number of elements.:param slide: The slide interval in number of elements... versionadded:: 1.16.0"""if slide == 0:return WindowedStream(self, CountTumblingWindowAssigner(size))else:return WindowedStream(self, CountSlidingWindowAssigner(size, slide))

我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。

    # reducingwindows_size = 2sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(E,2)
(E,2)
(E,2)
(E,2)
(E,2)

在这里插入图片描述

窗口为3,滑动距离为1

    # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)
(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为2

    # reducingwindows_size = 3sliding_size = 2reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为3

这个就等效于滚动窗口了,因为“滑”过了窗口大小。

    # reducingwindows_size = 3sliding_size = 3reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/129461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用vscode实现远程开发,并通过内网穿透在公网环境下远程连接

文章目录 前言1、安装OpenSSH2、vscode配置ssh3. 局域网测试连接远程服务器4. 公网远程连接4.1 ubuntu安装cpolar内网穿透4.2 创建隧道映射4.3 测试公网远程连接 5. 配置固定TCP端口地址5.1 保留一个固定TCP端口地址5.2 配置固定TCP端口地址5.3 测试固定公网地址远程 前言 远程…

学习笔记三十四:Ingress和 Ingress Controller概述

Ingress和 Ingress Controller概述 回顾service四层负载在k8s中为什么要做负载均衡Service不足之处四层负载和七层负载的区别OSI七层模型: Ingress介绍Ingress Controller介绍Ingress-controller 作用Ingress和Ingress Controller总结使用Ingress Controller代理k8s…

设备接入服务组件->微服务and容器化改造说明文档

SVN路径 https://192.0.0.241/USTA-dac/branches/dev/V1.10.500/dac 目录结构 das为设备接入服务,负责驱动管理,资源同步,订阅下发。下面有两个文件夹分别对应了openssl1.0的版本和后面更换接口后openssl1.1的版本。das_proxy为设备信令下发…

家用NAS上的Linux虚拟机上安装Domino

大家好,才是真的好。 此篇不是广告,毕竟没有任何人给广告费。 就是我个人入手了一台NAS设备,一开始用途比较淳朴,仅仅存储和家庭有关的各种照片和视频,但用着用着,就发现了NAS设备的拓展性之强&#xff0…

kubernetes集群编排——k8s存储

configmap 字面值创建 kubectl create configmap my-config --from-literalkey1config1 --from-literalkey2config2kubectl get cmkubectl describe cm my-config 通过文件创建 kubectl create configmap my-config-2 --from-file/etc/resolv.confkubectl describe cm my-confi…

提示3D标题编辑器仍在运行怎么解决,以及3D标题编辑器怎么使用

在进行视频剪辑时,尤其是剪辑一些带有文字的开场视频,一般都会使用具有立体效果的3D标题,这样制作出来的视频效果不仅好看,还非常的炫酷,但是对于一些刚刚开始接触视频剪辑的小伙伴来说,可能对3D标题还不是…

【李群李代数】【manif 】基于固定信标的2D机器人定位 (Error State Kalman Filter)...

demo演示 运行结果 我们考虑一个机器人在平面上被少量的准时地标或_信标 包围。 机器人以轴向速度和角速度的形式接收控制动作,并且能够测量信标相对于其自身参考系的位置。 机器人位姿 X 在 SE(2) 中,信标位置 b_k 在 R^2 中, | cos th -si…

Android系统Launcher启动流程学习(二)launcher启动

Zygote(孵化器)进程启动 在init进程中有解析.rc文件,在这个rc文件中配置了一个重要的服务service–zygote,这是app程序的鼻祖 zygote进程主要负责创建Java虚拟机,加载系统资源,启动SystemServer进程&#…

Postgresql批量按照顺序更新某一个字段

如批量更新采购订单行sequence字段,按照订单行id的顺序赋值1,2,3,4...: UPDATE purchase_order_line_copy1 SET sequence subquery.new_sequence FROM (SELECT id, ROW_NUMBER() OVER (ORDER BY id) AS new_sequence…

Pytest-Allure及Allure命令使用

一、Allure介绍 Allure是Pytest用于生成测试报告的框架,提供丰富的测试报告功能; 二、Allure安装 Allure安装分为2块,分别是pytest-Allure库安装,本地生成报告并导出的命令行allure安装; 1、pytest-Allure库安装 …

时序预测 | Python实现ARIMA-CNN-LSTM差分自回归移动平均模型结合卷积长短期记忆神经网络时间序列预测

时序预测 | Python实现ARIMA-CNN-LSTM差分自回归移动平均模型结合卷积长短期记忆神经网络时间序列预测 目录 时序预测 | Python实现ARIMA-CNN-LSTM差分自回归移动平均模型结合卷积长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 时序预测 …

【vtk学习笔记4】基本数据类型

一、可视化数据的基本特点 可视化数据有以下特点: 离散型 计算机处理的数据是对无限、连续的空间进行采样,生成的有限采样点数据。在某些离散点上有精确的值,但点与点之间值不可知,只有通过插值方式获取数据具有规则或不规则的结…

机器学习 - 加油站数据分析

一、实验数据 数据集:“加油站数据.xls” 数据集介绍:该表记录了用户在11月和12月一天24小时内的加油信息,包括:持卡人标识(cardholder)、卡号(cardno)、加油站网点号(n…

Simulink的To Workspace

To Workspace模块将Simulink产生的数据存储到matlab的工作区。 用To Workspace模块中的数据进行绘图。 参见Matlab/simulink/simscape multibody-to wotkspace模块使用_to workspace模块_五VV的博客-CSDN博客To workspace模块入门详解_哔哩哔哩_bilibili(很好&#…

c++之类和对象

首先我们要理解cin,cout只能自动识别内置类型,原因就是因为cin,cout里面的函数重载。 那么如果我想输入非内置类型,就要进行运算符重载。 但是会发生如下的情况。 友元函数可以访问对象的私有。 运算符重载的总结 成员初始化既可以用函数体内初始化也可…

月报总结|Moonbeam 10月份大事一览

万圣节快乐!时间一晃眼,10月已经迈入尾声,也即将迎来寒冷的冬天。但与季节相反,加密产业近期的发展可以说是高潮起伏,热度不断攀升。Moonbeam在10月中也发布了许多重大的更新,如Uniswap V3前段上线、众贷DO…

antd的Table组件使用rowSelection属性实现多选时遇到的bug

前言 前端样式框架采用AntDesign时,经常会使用到Table组件,如果要有实现多选或选择的需求时往往就会用到rowSelection属性,效果如下 rowSelection属性属性值如下 问题 文档中并没有说明选择时以数据中的哪个属性为准,看官方案例…

python第一课 变量

1.离线的情况下首选txt文档 2.有道云笔记 3.思维导图 xmind mindmaster 4.博客 5.wps流程图 # 变量的命名规则 1.变量名只能由数字字母下划线组成 2.变量名不能以数字开头 3.变量名不能与关键字重名 快捷键 撤销:Ctrl/Command Z 新建:Ctrl/Com…

VScode + opencv + c++ + win配置教程

准备: 1、下载opencv 2、下载MinGw 3、 3、下载CMake 下载完解压放到一个文件夹里面,便于环境管理,文件夹我重命名了,解压出来文件名不一样正常 环境变量配置 C:\Users\wuxulong\cpp_env\MinGw\mingw64\bin C:\Users\wuxulon…