Flink窗口分类简介及示例代码

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. 流式计算
      • 2. 窗口
      • 3. 窗口的分类
        • ◆ 基于时间的窗口(时间驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)
          • 3) 会话窗口(Session Windows)
        • ◆ 基于元素个数的(数据驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)

1. 流式计算

  Flink作为一个流式处理引擎,被设计用来处理无限数据集,理论上来说,无限数据集是一种不断产生,源源不断的数据集,说白了就是你不知道这个数据流它啥时候结束,这就是无限数据集。

  流式计算的思想是每来一个数据我就直接处理,而不用等,因此他非常适合在实时性要求比较高的场景下使用。

2. 窗口

  在流处理的场景下,如果我们想要统计过去某个时间段或过去多少条数据的指标时,就需要用到窗口,在Flink中,窗口(window)可以将流划分为有限块进行处理,Flink将这些有限的块抽象为“存储桶(bucket)”,我们可以在这些所谓的桶上做计算,也就实现了无限数据的有限计算。

  窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

  窗口的声明周期是:一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness (可容忍的迟到时间)”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口(Global Windows)。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。关于窗口的详细介绍查看->官方对于窗口的介绍

3. 窗口的分类

◆ 基于时间的窗口(时间驱动)

1) 滚动窗口(Tumbling Windows)

window(TumblingProcessingTimeWindows.of(Time.seconds(10))),参数是时间滚动窗口大小。10秒滚动一个窗口

  滚动窗口将元素分发到指定大小的窗口。滚动窗口的大小是固定的,且个窗口之间没有空隙,不会重叠。比如说,如果你指定了滚动窗口的大小为5分钟,那么每5分钟就会有一个窗口被计算,且一个新的窗口被创建。如下图所示:
在这里插入图片描述
示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

自定义的工具类:

public class AnqclnUtil {// 要先声明泛型public static <T>List<T> toList(Iterable<T> elements) {List<T> list = new ArrayList<>();for (T t : elements) {list.add(t);}return list;}// 将long类型的时间转换为时间字符串public static String toDateTime(long ts) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);}
}

运行结果:

在这里插入图片描述

2) 滑动窗口(Sliding Windows)

window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))),参数是时间滑动窗口大小和滑动距离。5秒滑动一个窗口,每个窗口最多放10个元素

  与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

  比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

3) 会话窗口(Session Windows)

window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))),参数是会话间隔,也就是多久没有活跃就关闭当前会话。4秒不活跃就关闭窗口。

  会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))// 定义一个session窗口,时间间隔为4s  对于session窗口来说,不同的key出发时间不同,每个key都维护自己的session.window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

◆ 基于元素个数的(数据驱动)

1) 滚动窗口(Tumbling Windows)

countWindow(3),参数是个数滚动窗口大小。3个元素滚动一个窗口

  每来多少个元素就滚动一次

示例代码:

public class Flink02_Window_Count {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发.countWindow(3).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s,Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = AnqclnUtil.toList(elements);out.collect(" key: "+s+" "+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

2) 滑动窗口(Sliding Windows)

countWindow(3,2),参数是个数滑动窗口大小和滑动步长。每两个元素产生一个新的窗口,每个窗口最多放3个元素。

  就比滚动的多了个参数,滑动步长。步长是生成新窗口的条件,而窗口大小是指这个窗口最多能放多少个元素

示例代码:

public class Flink02_Window_Count {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
//                .countWindow(3)// 定义一个长度为3(窗口内元素的最大个数)  每来两个2个元素滑动一次,.countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s,Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = AnqclnUtil.toList(elements);out.collect(" key: "+s+" "+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/34242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分享一个计算器

先看效果&#xff1a; 再看代码&#xff08;查看更多&#xff09;&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>计算器</title><style>* {box-sizing: border-box;}body…

高性能跨平台网络通信框架 HP-Socket v5.9.3

项目主页 : http://www.oschina.net/p/hp-socket开发文档 : https://www.docin.com/p-4478351216.html下载地址 : https://github.com/ldcsaa/HP-SocketQQ Group: 44636872, 663903943 v5.9.3 更新 一、主要更新 问题修复&#xff1a;通过 POST/PUT 等带有请求内容的 HTTP 方…

模拟信号和数字信号的转换

此文章介绍的模拟信号与数字信号转换相关的知识有如下&#xff1a; 通信原理的PCM脉冲编码调制 数字电子技术的A/D与D/A 以及stm32的ADC与DAC 模拟信号是指-----时间和数值均连续变化的电信号&#xff0c;如正弦波、三角波等。 数字信号是指-----在时间上和数值上均是离散的…

数据结构【图的类型定义和存储结构】

数据结构之图 图的定义和概念图的定义图的术语 图的类型定义图的存储结构数组&#xff08;邻接矩阵&#xff09;表示法无向图的邻接矩阵表示法有向图的邻接矩阵表示法网&#xff08;即有权图&#xff09;的邻接矩阵表示法 邻接矩阵的ADT定义邻接表&#xff08;链式&#xff09;…

聊一下互联网开源变现

(点击即可收听) 互联网开源变现其实是指通过开源软件或者开放源代码的方式&#xff0c;实现收益或盈利。这种方式越来越被广泛应用于互联网行业 在互联网开源变现的模式中&#xff0c;最常见的方式是通过捐款、广告、付费支持或者授权等方式获利。 例如&#xff0c;有些开源软件…

内网穿透实战应用-——【如何在树莓派上安装cpolar内网穿透】

如何在树莓派上安装cpolar内网穿透 文章目录 如何在树莓派上安装cpolar内网穿透前言1.在树莓派上安装cpolar2.查询cpolar版本号3.激活本地cpolar客户端4.cpolar记入配置文件 前言 树莓派作为一个超小型的电脑系统&#xff0c;虽然因其自身性能所限&#xff0c;无法如台式机或笔…

【金融量化】Python实现根据收益率计算累计收益率并可视化

1 理论 理财产品&#xff08;本金100元&#xff09; 第1天&#xff1a;3% &#xff1a;&#xff08;13%&#xff09; ✖ 100 103 第2天&#xff1a;2% &#xff1a;&#xff08;12%&#xff09;✖ 以上 103 2.06 第3天&#xff1a;5% : &#xff08;15%&#xff09;✖ 以上…

游戏行业实战案例 5 :玩家在线分布

【面试题】某游戏数据后台设有“登录日志”和“登出日志”两张表。 「登录日志」记录各玩家的登录时间和登录时的角色等级。 「登出日志」记录各玩家的登出时间和登出时的角色等级。 其中&#xff0c;「角色 id 」字段唯一识别玩家。 游戏开服前两天&#xff08; 2022-08-13 至…

Python-组合数据类型

今天要介绍的是Python的组合数据类型 整理不易&#xff0c;希望得到大家的支持&#xff0c;欢迎各位读者评论点赞收藏 感谢&#xff01; 目录 知识点知识导图1、组合数据类型的基本概念1.1 组合数据类型1.2 集合类型概述1.3 序列类型概述1.4 映射类型概述 2、列表类型2.1 列表的…

java下载JDK

1.去官网下载 https://www.oracle.com/java/technologies/javase-downloads.html 2.点击 傻瓜式安装 注意选择版本跟电脑系统就行 下载后文件的作用

32个关于FPGA的学习网站

语言类学习网站 1、HDLbits 网站地址&#xff1a;https://hdlbits.01xz.net/wiki/Main_Page 在线作答、编译的学习Verilog的网站&#xff0c;题目很多&#xff0c;内容丰富。非常适合Verilog初学者&#xff01;&#xff01;&#xff01; 2、牛客网 网站地址&#xff1a;https:…

Flink CDC系列之:TiDB CDC 导入 Elasticsearch

Flink CDC系列之&#xff1a;TiDB CDC 导入 Elasticsearch 一、通过docker 来启动 TiDB 集群二、下载 Flink 和所需要的依赖包三、在TiDB数据库中创建表和准备数据四、启动Flink 集群&#xff0c;再启动 SQL CLI五、在 Flink SQL CLI 中使用 Flink DDL 创建表六、Kibana查看Ela…

不知道打仗之害,就不知道打仗之利

不知道打仗之害&#xff0c;就不知道打仗之利 【安志强趣讲《孙子兵法》第7讲】 【原文】 夫钝兵挫锐&#xff0c;屈力殚货&#xff0c;则诸侯乘其弊而起&#xff0c;虽有智者&#xff0c;不能善其后矣。 【注释】 屈力殚货&#xff1a;屈力&#xff0c;指力量消耗&#xff0c;…

维深(Wellsenn):2023中国消费端VR内容开发商调研报告(附下载

关于报告的所有内容&#xff0c;公众【营销人星球】获取下载查看 核心观点 国内互联网大厂商入局VR&#xff0c;字节跳动、网易表态明确。字节跳动2021年收购国内头部VR硬件厂商PICO后&#xff0c;加速构建VR内容生态&#xff0c;2021年 成立海南创见未来当前已推出VR视频应用…

大语言模型之三 InstructGPT训练过程

大语言模型 GPT历史文章中简介的大语言模型的的发展史&#xff0c;并且简要介绍了大语言模型的训练过程&#xff0c;本篇文章详细阐述训练的细节和相关的算法。 2020年后全球互联网大厂、AI创业公司研发了不少AI超大模型&#xff08;百亿甚至千亿参数&#xff09;&#xff0c;…

【学习日记】【FreeRTOS】手动任务切换详解

前言 本文是关于 FreeRTOS 中实现两个任务轮流切换并执行的代码详解。目前不支持优先级&#xff0c;仅实现两个任务轮流切换。 一、任务的自传 任务从生到死的过程究竟是怎么样的呢&#xff1f;&#xff08;其实也没死&#xff09;&#xff0c;这个问题一直困扰着我&#xf…

LeetCode[164]最大间距

难度&#xff1a;Hard 题目&#xff1a; 给定一个无序的数组 nums&#xff0c;返回 数组在排序之后&#xff0c;相邻元素之间最大的差值 。如果数组元素个数小于 2&#xff0c;则返回 0 。 您必须编写一个在「线性时间」内运行并使用「线性额外空间」的算法。 示例 1: 输入: …

CSDN 直播:腾讯云大数据 ES 结合 AI 大模型与向量检索的新一代云端检索分析引擎 8月-8号 19:00-20:30

本次沙龙围绕腾讯云大数据ES产品展开&#xff0c;重点介绍了腾讯云ES自研的存算分离技术&#xff0c;以及能与AI大模型和文本搜索深度结合的高性能向量检索能力。同时&#xff0c;本次沙龙还将为我们全方位介绍腾讯云ES重磅推出的Elasticsearch Serverless服务&#xff0c;期待…

基于亚奈奎斯特采样和SOMP算法的平板脉冲响应空间插值matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ...................................................................... %fine regular gr…

本地构建包含java和maven的镜像

目录 1.前提条件 2.下载 2.1.创建Dockerfile 3.构建镜像 参考文章 1.前提条件 本地环境需要的系统和软件 win10 Docker Desktop Powershell 图1 Win10安装Docker后&#xff0c;直接在Powershell使用Docker命令 有些Developer不习惯win10系统&#xff0c;却想要使用Lin…