0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/127941.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】set和multiset

文章目录 关联式容器键值对一、set介绍二、set的使用multiset 关联式容器 STL中的部分容器,比如:vector、list、deque、forward_list(C11)等,这些容器统称为序列式容器,因为其底层为线性序列的数据结构,里面存储的是元…

【JAVA学习笔记】58 - 泛型

项目代码 https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter15/src/com/yinhai/generic_ https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter15/src/com/yinhai/customgeneric_ 一、泛型的入门和好处 1)请编写程序,…

人工智能基础_机器学习016_BGD批量梯度下降求解多元一次方程_使用SGD随机梯度下降计算一元一次方程---人工智能工作笔记0056

然后上面我们用BGD计算了一元一次方程,那么现在我们使用BGD来进行计算多元一次方程 对多元一次方程进行批量梯度下降. import numpy as np X = np.random.rand(100,8) 首先因为是8元一次方程,我们要生成100行8列的X的数据对应x1到x8 w = np.random.randint(1,10,size = (8…

B端企业形象设计的正确姿势,你学会了吗?

如今,企业形象设计在B端市场中变得越来越重要。它是企业与客户之间建立联系的桥梁,也是吸引目标客户的重要方式。为了帮助您打造一个独特而专业的企业形象设计,我将为您提供十个步骤。 步骤1:了解企业定位和目标 在设计B端企业形…

完美解决:Nginx安装后,/etc/nginx/conf.d下面没有default.conf文件

目录 1 问题: 2 解决方法 方法一: 方法二: 3 查看 1 问题: /etc/nginx/conf.d下面没有default.conf文件。 2 解决方法 方法一: 自己创建default.conf文件。 vi /etc/nginx/conf.d/default.conf 添加如下内容&…

恒驰服务 | 华为云数据使能专家服务offering之大数据建设

恒驰大数据服务主要针对客户在进行智能数据迁移的过程中,存在业务停机、数据丢失、迁移周期紧张、运维成本高等问题,通过为客户提供迁移调研、方案设计、迁移实施、迁移验收等服务内容,支撑客户实现快速稳定上云,有效降低时间成本…

js字符串支持多个分隔符分割

js字符串支持多个分隔符分割 场景代码 场景 用户输入内容后,支持多个分隔符(比如:中英文逗号,分号以及换号)对字符串进行分割,之后提交给后台同学解析。 代码 function splitString(inputString, separat…

「专题速递」数据驱动赋能、赛事直播优化、RTC技术、低延时传输引擎、多媒体处理框架、GPU加速...

点击文末阅读原文, 免费报名【抖音背后的体验增长实战揭秘】专场 随着全行业视频化的演进,营销、知识、商业和空间的交互体验正在被重塑。这种变化不仅仅是一种抽象的趋势,更是关系到用户留存和业务增长的关键因素。面对这样的挑战&#xff0…

【音视频 | wav】wav音频文件格式详解——包含RIFF规范、完整的各个块解析、PCM转wav代码

😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 🤣本文内容🤣&a…

Websocket传输协议是什么

WebSocket 是一种网络通信协议,属于 HTML5 规范的一部分。它提供了在单个长期连接上进行全双工通信的能力,使得数据可以从客户端发送到服务器,也可以从服务器发送到客户端,这与传统的 HTTP 请求和响应模型不同。 WebSocket 协议定…

Latex安装使用教程

在论文投稿时有些期刊要求使用Latex格式,比如博主现在就遇到了这个问题,木有办法,老老实实的学呗。大家可以去官网下载,但官网的界面设计属实有些一言难尽,因此我们可以使用国内的镜像。 LaTeX 基于 TeX,主…

输入几个数,分别输出其中的奇数和偶数

这个问题我们只需要设计几个循环嵌套在一起就可以解决&#xff0c;话不多说&#xff0c;我们直接上代码 目录 1.运行代码 2.运行结果 1.运行代码 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> #include<string.h>int main() {int arr[10] {1,2,3,4,5,6,…

测试用例设计方法 —— 场景法详解

场景法是通过运用场景来对系统的功能点或业务流程的描述&#xff0c;从而提高测试效果的一种方法。 场景法一般包含基本流和备用流&#xff0c;从一个流程开始&#xff0c;通过描述经过的路径来确定的过程&#xff0c;经过遍历所有的基本流和备用流来完成整个场景。 场景主要…

AQS面试题总结

一&#xff1a;线程等待唤醒的实现方法 方式一&#xff1a;使用Object中的wait()方法让线程等待&#xff0c;使用Object中的notify()方法唤醒线程 必须都在synchronized同步代码块内使用&#xff0c;调用wait&#xff0c;notify是锁定的对象&#xff1b; notify必须在wait后执…

【Python语言速回顾】——数据可视化基础

目录 引入 一、Matplotlib模块&#xff08;常用&#xff09; 1、绘图流程&常用图 ​编辑 2、绘制子图&添加标注 ​编辑 3、面向对象画图 4、Pylab模块应用 二、Seaborn模块&#xff08;常用&#xff09; 1、常用图 2、代码示例 ​编辑 ​编辑 ​编辑 ​…

一个基于Excel模板快速生成Excel文档的小工具

介绍 DocumentGenerator是一个Excel快速生成工具&#xff0c;目标以后还能实现Word、pdf等的文件的生成。该程序独立运行&#xff0c;可通过HTTP接口调用其生成接口。 典型使用场景为如下&#xff1a; 使用者编写模板文件使用者准备模板文件的填充JSON数据内容使用者通过网络…

网络套接字编程(二)

网络套接字编程(二) 文章目录 网络套接字编程(二)简易TCP网络程序服务端创建套接字服务端绑定IP地址和端口号服务端监听服务端运行服务端网络服务服务端启动客户端创建套接字客户端的绑定和监听问题客户端建立连接并通信客户端启动程序测试单执行流服务器的弊端 多进程版TCP网络…

CCF_A 计算机视觉顶会CVPR2024投稿指南以及论文模板

目录 CVPR2024官网&#xff1a; CVPR2024投稿链接&#xff1a; CVPR2024 重要时间节点&#xff1a; CVPR2024投稿模板: WORD: LATEX : CVPR2024_AuthorGuidelines CVPR2024投稿Topics&#xff1a; CVPR2024官网&#xff1a; https://cvpr.thecvf.com/Conferences/2024CV…

【Linux】常见指令以及具体其使用场景

君兮_的个人主页 即使走的再远&#xff0c;也勿忘启程时的初心 C/C 游戏开发 Hello,米娜桑们&#xff0c;这里是君兮_&#xff0c;随着博主的学习&#xff0c;博主掌握的技能也越来越多&#xff0c;今天又根据最近的学习开设一个新的专栏——Linux&#xff0c;相信Linux操作系…

【嵌入式开发学习02】esp32cam烧录human_face_detect实现人脸识别

Ubuntu20.04系统为esp32cam烧录human_face_detect 1. 下载esp-dl2. 安装esp-idf3. 烧录human_face_detect 如果使用ubuntu 16.04在后续的步骤中会报错如下&#xff0c;因为ubuntu 16.04不支持glibc2.23以上的版本&#xff08;可使用strings /lib/x86_64-linux-gnu/libc.so.6 | …