大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink DataSet
  • Flink DataSet 转换操作
  • Flink DataSet 输出
  • 容错机制、对比、发展方向

在这里插入图片描述

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。

通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。

Flink Window 总览

基本概念

  • Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
  • Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而Window窗口是从Streaming到Batch的一个桥梁。
  • Flink提供了非常完善的窗口机制
  • 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
  • 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
  • 窗口可以基于时间驱动、也可以基于事件驱动
  • 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
  • Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

  • 获取流数据源
  • 获取窗口
  • 操作窗口数据
  • 输出窗口数据

滚动时间窗口

在这里插入图片描述

类型特点

将数据依据固定的窗口长度对数据进行切分:

  • 时间对齐
  • 窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。

关键点

  • 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
  • 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
  • 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));timeWindow.apply(new MyTimeWindowFunction()).print();env.execute("TumblingWindow");}}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {/*** 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)* @author wzk* @date 16:58 2024/7/26**/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}out.collect("key: " + tuple.getField(0) + ", value: " + sum  +", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));}
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream.countWindow(3);globalWindow.apply(new MyCountWindowFuntion());env.execute("TumblingWindow");}}

编写一个事件驱动的类:MyCountWindowFuntion

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {/*** 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。* @author wzk* @date 17:11 2024/7/26**/@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}// 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间long maxTimestamp = window.maxTimestamp();out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"+ maxTimestamp + "," + format.format(maxTimestamp));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/53471.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二、主流的架构方法论

在企业软件开发和系统设计中&#xff0c;架构方法论提供了指导原则、最佳实践和框架来帮助架构师和开发团队设计和实施高质量的软件系统。以下是一些主流的架构方法论及其特点分析&#xff1a; 1. TOGAF&#xff08;The Open Group Architecture Framework&#xff09; 特点&…

损坏SD数据恢复的8种有效方法

SD卡被用于许多不同的产品来存储重要数据&#xff0c;如图片和重要的商业文件。如果您的SD卡坏了&#xff0c;您需要SD数据恢复来获取您的信息。通过从损坏的SD卡中取回数据&#xff0c;您可以确保重要文件不会永远丢失&#xff0c;这对于工作或个人原因是非常重要的。 有许多…

【Qt笔记】QTableWidget控件详解

目录 引言 一、QTableWidget的特点 二、QTableWidget基础 2.1 引入QTableWidget 2.2 基本属性 三、代码示例&#xff1a;初始化QTableWidget 四、编辑功能 4.1 设置单元格为只读 4.2 响应内容更改 五、选择模式 六、样式定制 七、与其他控件的交互 7.1 在单元格…

[SUCTF 2018]annonymous1

知识点&#xff1a; 匿名函数创建其实有自己的名字&#xff08;%00lambda_%d&#xff09; 进入页面开始代码审计. <?php // 使用 create_function 创建一个匿名函数&#xff0c;该函数调用 die() 函数并执行 cat flag.php 命令&#xff08;在服务器上执行&#xff0c;如果…

破解资源利用困境!麒麟信安支撑吉林市中医院完成云化转型

吉林市中医院在数字化转型的过程中&#xff0c;面临着一系列挑战。此前&#xff0c;该医院采用传统数据中心物理服务器部署模式&#xff0c;虽然在初期为医院的信息化建设提供了基础&#xff0c;但随着时间的推移&#xff0c;其局限性逐渐显现。每个应用系统都占用了大量的新旧…

如何在 DigitalOcean Droplet 云服务器上部署 Next.js 应用

Next.js 是一个流行的 React 框架&#xff0c;可轻松构建服务器渲染的 React 应用程序。在本教程中&#xff0c;我们将介绍如何使用 Nginx 作为反向代理&#xff0c;在 DigitalOcean 的 droplet 云主机上部署 Next.js 应用程序。以下是逐步指南&#xff0c;假设你已经准备好部署…

基于SpringBoot+Vue+MySQL的牙科医就诊管理系统

系统展示 用户前台界面 管理员后台界面 系统背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管理。这样的…

HTML/CSS/JS学习笔记 Day2(HTML--网页标签 上)

跟着该视频学习&#xff0c;记录笔记&#xff1a;【黑马程序员pink老师前端入门教程&#xff0c;零基础必看的h5(html5)css3移动端前端视频教程】https://www.bilibili.com/video/BV14J4114768?p12&vd_source04ee94ad3f2168d7d5252c857a2bf358 Day2 内容梳理&#xff1a;…

儿童孤独症康复学校:打破孤岛,关爱与成长

在世界的某个角落&#xff0c;有一群孩子&#xff0c;他们如同夜空中最亮的星&#xff0c;却往往因孤独症的屏障&#xff0c;而难以与周围的世界建立连接。这些孩子&#xff0c;如同被无形的岛屿环绕&#xff0c;渴望着被理解、被接纳。而正是在这样的背景下&#xff0c;星贝育…

[C++11#48][智能指针] RAII原则 | 智能指针的类型 | 模拟实现 | shared_ptr | 解决循环引用

目录 一.引入 1. 为什么需要智能指针&#xff1f; 2. 什么是内存泄漏&#xff1f; 内存泄漏分类 3.回忆 this 二. 原理 1. RAII 资源获取即初始化 2.像指针一样 三. 使用 1. 问题&#xff1a; string 的浅拷贝 2.解决 auto_ptr 自定义 auto_ptr unique_ptr - 独占…

原生 iOS 引入 Flutter 报错 kernel_blob.bin 找不到

情况 在一次原生 iOS 项目中引入 Flutter 的过程中&#xff0c;在模拟器中运行出现报错&#xff1a; 未能打开文件“kernel_blob.bin”&#xff0c;因为它不存在。 如下图&#xff1a; 模拟器中一片黑 原因&解决方案 这个是因为 Flutter 的打包 iOS framework 命令中…

ES之三:springboot集成ES

一.选择版本很重要&#xff0c;不然会找不到好多方法 明明有Timeout方法&#xff0c;不报红&#xff0c;运行时&#xff0c;报错&#xff0c;找不到该类 ClassNotFoundException 为了避免使用的Elasticsearch版本和SpringBoot采用的版本不一致导致的问题&#xff0c;尽量使用…

统计学习方法与实战——统计学习方法概论

统计学习方法概论 文章目录 统计学习方法概论前言章节目录导读 实现统计学习方法的步骤统计学习方法三要素模型模型是什么? 策略损失函数与风险函数常用损失函数ERM与SRM 算法 模型评估与模型选择过拟合与模型选择 正则化与交叉验证泛化能力生成模型与判别模型生成方法判别方法…

高校大模型实验室大模型应用平台

大模型应用平台是一款专为高校大模型应用场景教学和科研打造的知识库问答系统。该平台易于使用&#xff0c;知识库支持常见的txt、doc、pdf、md等数据文件上传&#xff0c;同时提供了简洁易懂的操作配置界面&#xff0c;使用户可以轻松地搭建和训练AI应用&#xff0c;并快速进行…

arm64高速缓存基础知识

高速缓存的替换策略 随机法&#xff1a;随机地确定替换的高速缓存行&#xff0c;由一个随机数产生器产生随机数来确认替换行 FIFO法&#xff1a;选择最先调入的高速缓存行进行替换 LRU法&#xff1a;最少使用的行优先替换。 高速缓存的共享属性 内部共享的高速缓存通常指的…

时间序列处理方法

对于时间序列数据进行多分类任务&#xff0c;RNN对于顺序建模不理想&#xff0c;可以考虑以下模型和改进方法&#xff1a; Transformers&#xff08;时序数据版&#xff09; 优势: 相较于RNN&#xff08;如GRU、LSTM&#xff09;&#xff0c;Transformers 擅长捕捉长距离依赖关…

Python 多线程访问数据库正确使用dbutils PooledDB数据库连接池

1.安装DBUtils pip install DBUtils 2.db_helper.py的代码如下 import pymysql from dbutils.pooled_db import PooledDB from config import configclass DBHelper:def __init__(self):""":param mincached:连接池中空闲连接的初始数量:param maxcached:连接…

Flutter基本组件Text使用

Text是一个文本显示控件&#xff0c;用于在应用程序界面中显示单行或多行文本内容。 Text简单Demo import package:flutter/material.dart;class MyTextDemo extends StatelessWidget {const MyTextDemo({super.key});overrideWidget build(BuildContext context) {return Sca…

飞速了解Conda的作用和安装使用教程

当我们想要在github上克隆不同的项目下来运行时&#xff0c;会发现项目的语言环境或包的版本不同&#xff0c;出现版本冲突问题会导致程序无法运行、兼容性问题频出。我们常常需要管理多个项目&#xff0c;每个项目可能依赖于不同的包版本或编程语言环境。如果不加以管理&#…

在HTML5中使用Noto Sans CJK字体的详细指南

在HTML5中使用Noto Sans CJK字体的详细指南 方法一&#xff1a;通过Google Fonts在线加载Noto Sans CJK步骤 1&#xff1a;访问Google Fonts并获取字体链接步骤 2&#xff1a;获取字体的<link>标签步骤 3&#xff1a;在HTML文件中引入字体 方法二&#xff1a;下载并本地托…