0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

大纲

  • map
  • reduce
  • 完整代码
  • 参考资料

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
在这里插入图片描述
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

reduce

    # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

在这里插入图片描述

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/127219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能基础_机器学习007_高斯分布_概率计算_最小二乘法推导_得出损失函数---人工智能工作笔记0047

这个不分也是挺难的,但是之前有详细的,解释了,之前的文章中有, 那么这里会简单提一下,然后,继续向下学习 首先我们要知道高斯分布,也就是,正太分布, 这个可以预测x在多少的时候,概率最大 要知道在概率分布这个,高斯分布公式中,u代表平均值,然后西格玛代表标准差,知道了 这两个…

C++:map和set的基本使用

文章目录 搜索模型关联式容器setset的基本使用set的其他使用 multisetmapmap的基本使用map中的[] multimap 搜索模型 在实际搜索中有两个搜索模型:Key的搜索模型和Key/Value的搜索模型 Key的搜索模型: 简单来说就是在一个搜索树,搜索树中的…

汽车EDI:福特Ford EDI项目案例

项目背景 福特(Ford)是世界著名的汽车品牌,为美国福特汽车公司(Ford Motor Company)旗下的众多品牌之一。此前的文章福特FORD EDI需求分析中,我们已经了解了福特Ford EDI 的大致需求,本文将会介…

【ChatGPT】教我 Flyweight(享元) 模式

文章目录 设计模式中 Flyweight 模式,实际应用场景有哪些?我需要画很多的树,以这个需求为例来教我 Flyweight 模式。好的,我大概明白了,我还有点疑惑,请问外在状态和内在状态是哪些?你可以讲一下…

【Linux】jdk、tomcat、MySQL环境搭建的配置安装,Linux更改后端端口

一、作用 工具的组合为开发者和系统管理员提供了构建和运行Java应用程序以及存储和管理数据的完整环境。 JDK(Java Development Kit):JDK是Java开发工具包,它提供了开发和运行Java应用程序所需的工具和库。通过安装JDK&#xff0c…

Spring的核心概念汇总

Spring的核心概念 (一)什么是BeanDefinition BeanDefinition是Spring框架中的一个概念,用于描述和定义在Spring容器中创建的bean。在Spring中,我们可以通过配置文件或者注解的方式定义BeanDefinition,它包含了一些关键…

SolidWorks2019安装教程(正版)

网盘资源附文末 一.简介 SolidWorks软件是世界上第一个基于Windows开发的三维CAD系统,由于技术创新符合CAD技术的发展潮流和趋势,SolidWorks公司于两年间成为CAD/CAM产业中获利最高的公司。良好的财务状况和用户支持使得SolidWorks每年都有数十乃至数百…

2023-11-01 node.js-electron-环境配置-记录

摘要: 2023-11-01 node.js-electron-环境配置-记录 相关文档: Node.js Build cross-platform desktop apps with JavaScript, HTML, and CSS | Electron node.js的国内源 - Python技术站 node.js 下载地址: https://nodejs.org/dist/v20.9.0/ 说明: 最好使用最新版本当前我使…

使用JPA时设置Null值:set null

方式一:使用JPA 方式二:使用EntityManager 方式三:原生SQL private Object mergeExt(Object o) {String tableName o.getClass().getSimpleName();StringBuffer sb new StringBuffer();String sqlHeader "update " tableName …

LuatOS-SOC接口文档(air780E)--miniz - 简易zlib压缩

示例 -- 准备好数据 local bigdata "123jfoiq4hlkfjbnasdilfhuqwo;hfashfp9qw38hrfaios;hfiuoaghfluaeisw" -- 压缩之, 压缩得到的数据是zlib兼容的,其他语言可通过zlib相关的库进行解压 local cdata miniz.compress(bigdata) -- lua 的 字符串相当于有长度的cha…

Mac-Java开发环境安装(JDK和Maven)

JDK安装 1、访问oracle官网,下载jdk 点击下载链接:https://www.oracle.com/java/technologies/downloads/#java11-mac 选择Mac版本,下载dmg 打勾点击下载,跳转登陆,没有就注册,输入账号密码即可下载成功…

Ubuntu20.04安装CUDA、cuDNN、tensorflow2可行流程(症状:tensorflow2在RTX3090上运行卡住)

最近发现我之前在2080ti上运行好好的代码,结果在3090上运行会卡住很久,而且模型预测结果完全乱掉,于是被迫研究了一天怎么在Ubuntu20.04安装CUDA、cuDNN、tensorflow2。 1.安装CUDA(包括CUDA驱动和CUDA toolkit,注意此…

gradle与maven

Gradle 和 Maven 都是流行的构建工具,通常用于构建和管理 Java 和 Android 项目。它们都可以自动下载依赖库、编译代码、运行测试、打包和发布等。 以下是对 Gradle 和 Maven 的介绍: Gradle: Gradle 是一个基于 Groovy 和 Kotlin 的构建自…

(笔记)Kotlin——Android封装ViewBinding之二 优化

0. 在app模块的build.gradle文件中添加如下配置开启ViewBinding android {.......viewBinding {enabled true}} 1. 新建一个Ext.kt文件 添加两个扩展函数&#xff0c;分别对应Activity和Fragment inline fun <T : ViewBinding> AppCompatActivity.viewBinding(cross…

ajax-axios发送 get请求 或者 发送post请求带有请求体参数

/* axios v0.21.1 | (c) 2020 by Matt Zabriskie */ !function(e,t){"object"typeof exports&&"object"typeof module?module.exportst():"function"typeof define&&define.amd?define([],t):"object"typeof export…

【WinForm详细教程四】WinForm中的ProgressBar 、ImageList和ListView控件

文章目录 1.ProgressBar2. ImageList3.ListView控件 1.ProgressBar 用于显示某个操作的进度。 属性&#xff1a; Value: 表示当前进度条的值&#xff0c;其范围由Min和Max决定。Step: 设置每次调用PerformStep()方法时增加的步长。MarqueeAnimationSpeed: 在Style设置为Marq…

RabbitMQ 运维 扩展

1、集群管理与配置 1.1、集群搭建 关于Rabbitmq 集群的搭建&#xff0c;详见以下文章。简单说来就是将多个单机rabbitmq服务&#xff0c;通过给到一致的密钥&#xff08;.erlang.cookie&#xff09;并且开放rabbitmq服务的 25672 端口&#xff0c;允许多节点间进行互相通讯&am…

iptables 与 firewalld

iptables 一、主机型&#xff08;包过滤防火墙&#xff09; 1、简介&#xff1a; 包过滤型防火墙是一种网络安全设备或软件&#xff0c;它工作在 2、3、4 层&#xff0c;通过检查网络数据包的源地址、目标地址、协议、端口等信息&#xff0c;根据预定义的规则来决定是否允许…

ip划分与私公网ip、ip的传递

报文问路&#xff1a;1、不知道跳转默认路由器&#xff0c;2、知道路径&#xff0c;向对应路径发出报文&#xff0c;3、路口路由器&#xff0c;下一步就是目标主机在哪。 想要通信必须同在一个局域网&#xff0c;其实将公网就可以看作一个大型的局域网。 在同一个局域网内发送…

正点原子嵌入式linux驱动开发——Linux USB驱动

USB是很常用的接口&#xff0c;目前大多数的设备都是USB接口的&#xff0c;比如鼠标、键盘、USB摄像 头等&#xff0c;在实际开发中也常常遇到USB接口的设备&#xff0c;本章就来学习一下如何使能Linux内核自带的USB驱动。这里不会具体学习USB的驱动开发。 USB接口简介 什么是…