0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

大纲

  • map
  • reduce
  • 完整代码
  • 参考资料

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
在这里插入图片描述
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

reduce

    # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

在这里插入图片描述

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/127219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能基础_机器学习007_高斯分布_概率计算_最小二乘法推导_得出损失函数---人工智能工作笔记0047

这个不分也是挺难的,但是之前有详细的,解释了,之前的文章中有, 那么这里会简单提一下,然后,继续向下学习 首先我们要知道高斯分布,也就是,正太分布, 这个可以预测x在多少的时候,概率最大 要知道在概率分布这个,高斯分布公式中,u代表平均值,然后西格玛代表标准差,知道了 这两个…

C++:map和set的基本使用

文章目录 搜索模型关联式容器setset的基本使用set的其他使用 multisetmapmap的基本使用map中的[] multimap 搜索模型 在实际搜索中有两个搜索模型:Key的搜索模型和Key/Value的搜索模型 Key的搜索模型: 简单来说就是在一个搜索树,搜索树中的…

汽车EDI:福特Ford EDI项目案例

项目背景 福特(Ford)是世界著名的汽车品牌,为美国福特汽车公司(Ford Motor Company)旗下的众多品牌之一。此前的文章福特FORD EDI需求分析中,我们已经了解了福特Ford EDI 的大致需求,本文将会介…

【Linux】jdk、tomcat、MySQL环境搭建的配置安装,Linux更改后端端口

一、作用 工具的组合为开发者和系统管理员提供了构建和运行Java应用程序以及存储和管理数据的完整环境。 JDK(Java Development Kit):JDK是Java开发工具包,它提供了开发和运行Java应用程序所需的工具和库。通过安装JDK&#xff0c…

SolidWorks2019安装教程(正版)

网盘资源附文末 一.简介 SolidWorks软件是世界上第一个基于Windows开发的三维CAD系统,由于技术创新符合CAD技术的发展潮流和趋势,SolidWorks公司于两年间成为CAD/CAM产业中获利最高的公司。良好的财务状况和用户支持使得SolidWorks每年都有数十乃至数百…

Mac-Java开发环境安装(JDK和Maven)

JDK安装 1、访问oracle官网,下载jdk 点击下载链接:https://www.oracle.com/java/technologies/downloads/#java11-mac 选择Mac版本,下载dmg 打勾点击下载,跳转登陆,没有就注册,输入账号密码即可下载成功…

Ubuntu20.04安装CUDA、cuDNN、tensorflow2可行流程(症状:tensorflow2在RTX3090上运行卡住)

最近发现我之前在2080ti上运行好好的代码,结果在3090上运行会卡住很久,而且模型预测结果完全乱掉,于是被迫研究了一天怎么在Ubuntu20.04安装CUDA、cuDNN、tensorflow2。 1.安装CUDA(包括CUDA驱动和CUDA toolkit,注意此…

ajax-axios发送 get请求 或者 发送post请求带有请求体参数

/* axios v0.21.1 | (c) 2020 by Matt Zabriskie */ !function(e,t){"object"typeof exports&&"object"typeof module?module.exportst():"function"typeof define&&define.amd?define([],t):"object"typeof export…

【WinForm详细教程四】WinForm中的ProgressBar 、ImageList和ListView控件

文章目录 1.ProgressBar2. ImageList3.ListView控件 1.ProgressBar 用于显示某个操作的进度。 属性: Value: 表示当前进度条的值,其范围由Min和Max决定。Step: 设置每次调用PerformStep()方法时增加的步长。MarqueeAnimationSpeed: 在Style设置为Marq…

RabbitMQ 运维 扩展

1、集群管理与配置 1.1、集群搭建 关于Rabbitmq 集群的搭建,详见以下文章。简单说来就是将多个单机rabbitmq服务,通过给到一致的密钥(.erlang.cookie)并且开放rabbitmq服务的 25672 端口,允许多节点间进行互相通讯&am…

iptables 与 firewalld

iptables 一、主机型(包过滤防火墙) 1、简介: 包过滤型防火墙是一种网络安全设备或软件,它工作在 2、3、4 层,通过检查网络数据包的源地址、目标地址、协议、端口等信息,根据预定义的规则来决定是否允许…

ip划分与私公网ip、ip的传递

报文问路:1、不知道跳转默认路由器,2、知道路径,向对应路径发出报文,3、路口路由器,下一步就是目标主机在哪。 想要通信必须同在一个局域网,其实将公网就可以看作一个大型的局域网。 在同一个局域网内发送…

正点原子嵌入式linux驱动开发——Linux USB驱动

USB是很常用的接口,目前大多数的设备都是USB接口的,比如鼠标、键盘、USB摄像 头等,在实际开发中也常常遇到USB接口的设备,本章就来学习一下如何使能Linux内核自带的USB驱动。这里不会具体学习USB的驱动开发。 USB接口简介 什么是…

413 Request Entity Too Large(nginx/1.24.0)

报错内容 <html><head><title>413 Request Entity Too Large</title></head><body><center><h1>413 Request Entity Too Large</h1></center><hr><center>nginx/1.24.0</center></body>&…

Explaining and harnessing adversarial examples

Explaining and harnessing adversarial examples----《解释和利用对抗样本》 背景&#xff1a; 早期的研究工作认为神经网络容易受到对抗样本误导是由于其非线性特征和过拟合。 创新点&#xff1a; 该论文作者认为神经网络易受对抗性扰动影响的主要原因是它的线性本质&#xf…

精通Nginx(03)-配置简述

本文主要讲述Nginx配置文件结构及调试技巧 使用nginx版本为1.24.0。 目录 Nginx目录 nginx.conf内容结构 配置片段化 配置调试技巧 Nginx目录 Nginx编译安装目录如下&#xff1a; 安装指定目录为"/usr/local"。配置目录为/usr/local/nginx/conf。 目录说明&am…

用LibreOffice在excel中画折线图

数据表格如下。假设想以x列为横坐标&#xff0c;y1和y2列分别为纵坐标画折线图。 选择插入-》图表&#xff1a; 选择折线图-》点和线&#xff0c;然后点击“下一步”&#xff1a; 选择&#xff1a;列中包含数据序列&#xff0c;然后点击完成&#xff08;因为图挡住了数据…

浏览器哪家强——PC端篇

今天的分享将围绕一个大家再熟悉不过的名称展开——浏览器。 根据百科给出的解释&#xff1a;浏览器是用来检索、展示以及传递Web信息资源的应用程序。通俗的说&#xff0c;浏览器就是一种阅读工具&#xff0c;类似记事本、word、wps&#xff0c;只不过后者阅读的是文本文档&am…

记一次 logback 没有生成独立日志文件问题

背景 在新项目发布后发现日志文件并没有按照期望的方式独立开来&#xff0c;而是都写在了 application.log 文件中。 问题展示 日志文件&#xff1a; 项目引入展示&#xff1a; <include resource"paas/sendinfo/switch/client/sendinfo-paas-switch-client-log.…

初识JavaScript(一)

文章目录 一、JavaScript介绍二、JavaScript简介1.ECMAScript和JavaScript的关系2.ECMAScript的历史3.什么是Javascript&#xff1f;4.JavaScript的作用?5.JavaScript的特点 三、JavaScript基础1.注释语法2.JavaScript的使用 四、JavaScript变量与常量变量关键字var和let的区别…