大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink DataSet
  • Flink DataSet 转换操作
  • Flink DataSet 输出
  • 容错机制、对比、发展方向

在这里插入图片描述

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。

通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。

Flink Window 总览

基本概念

  • Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
  • Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而Window窗口是从Streaming到Batch的一个桥梁。
  • Flink提供了非常完善的窗口机制
  • 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
  • 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
  • 窗口可以基于时间驱动、也可以基于事件驱动
  • 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
  • Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

  • 获取流数据源
  • 获取窗口
  • 操作窗口数据
  • 输出窗口数据

滚动时间窗口

在这里插入图片描述

类型特点

将数据依据固定的窗口长度对数据进行切分:

  • 时间对齐
  • 窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。

关键点

  • 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
  • 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
  • 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));timeWindow.apply(new MyTimeWindowFunction()).print();env.execute("TumblingWindow");}}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {/*** 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)* @author wzk* @date 16:58 2024/7/26**/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}out.collect("key: " + tuple.getField(0) + ", value: " + sum  +", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));}
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream.countWindow(3);globalWindow.apply(new MyCountWindowFuntion());env.execute("TumblingWindow");}}

编写一个事件驱动的类:MyCountWindowFuntion

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {/*** 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。* @author wzk* @date 17:11 2024/7/26**/@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}// 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间long maxTimestamp = window.maxTimestamp();out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"+ maxTimestamp + "," + format.format(maxTimestamp));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/53471.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

损坏SD数据恢复的8种有效方法

SD卡被用于许多不同的产品来存储重要数据&#xff0c;如图片和重要的商业文件。如果您的SD卡坏了&#xff0c;您需要SD数据恢复来获取您的信息。通过从损坏的SD卡中取回数据&#xff0c;您可以确保重要文件不会永远丢失&#xff0c;这对于工作或个人原因是非常重要的。 有许多…

【Qt笔记】QTableWidget控件详解

目录 引言 一、QTableWidget的特点 二、QTableWidget基础 2.1 引入QTableWidget 2.2 基本属性 三、代码示例&#xff1a;初始化QTableWidget 四、编辑功能 4.1 设置单元格为只读 4.2 响应内容更改 五、选择模式 六、样式定制 七、与其他控件的交互 7.1 在单元格…

[SUCTF 2018]annonymous1

知识点&#xff1a; 匿名函数创建其实有自己的名字&#xff08;%00lambda_%d&#xff09; 进入页面开始代码审计. <?php // 使用 create_function 创建一个匿名函数&#xff0c;该函数调用 die() 函数并执行 cat flag.php 命令&#xff08;在服务器上执行&#xff0c;如果…

如何在 DigitalOcean Droplet 云服务器上部署 Next.js 应用

Next.js 是一个流行的 React 框架&#xff0c;可轻松构建服务器渲染的 React 应用程序。在本教程中&#xff0c;我们将介绍如何使用 Nginx 作为反向代理&#xff0c;在 DigitalOcean 的 droplet 云主机上部署 Next.js 应用程序。以下是逐步指南&#xff0c;假设你已经准备好部署…

基于SpringBoot+Vue+MySQL的牙科医就诊管理系统

系统展示 用户前台界面 管理员后台界面 系统背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管理。这样的…

HTML/CSS/JS学习笔记 Day2(HTML--网页标签 上)

跟着该视频学习&#xff0c;记录笔记&#xff1a;【黑马程序员pink老师前端入门教程&#xff0c;零基础必看的h5(html5)css3移动端前端视频教程】https://www.bilibili.com/video/BV14J4114768?p12&vd_source04ee94ad3f2168d7d5252c857a2bf358 Day2 内容梳理&#xff1a;…

儿童孤独症康复学校:打破孤岛,关爱与成长

在世界的某个角落&#xff0c;有一群孩子&#xff0c;他们如同夜空中最亮的星&#xff0c;却往往因孤独症的屏障&#xff0c;而难以与周围的世界建立连接。这些孩子&#xff0c;如同被无形的岛屿环绕&#xff0c;渴望着被理解、被接纳。而正是在这样的背景下&#xff0c;星贝育…

[C++11#48][智能指针] RAII原则 | 智能指针的类型 | 模拟实现 | shared_ptr | 解决循环引用

目录 一.引入 1. 为什么需要智能指针&#xff1f; 2. 什么是内存泄漏&#xff1f; 内存泄漏分类 3.回忆 this 二. 原理 1. RAII 资源获取即初始化 2.像指针一样 三. 使用 1. 问题&#xff1a; string 的浅拷贝 2.解决 auto_ptr 自定义 auto_ptr unique_ptr - 独占…

原生 iOS 引入 Flutter 报错 kernel_blob.bin 找不到

情况 在一次原生 iOS 项目中引入 Flutter 的过程中&#xff0c;在模拟器中运行出现报错&#xff1a; 未能打开文件“kernel_blob.bin”&#xff0c;因为它不存在。 如下图&#xff1a; 模拟器中一片黑 原因&解决方案 这个是因为 Flutter 的打包 iOS framework 命令中…

ES之三:springboot集成ES

一.选择版本很重要&#xff0c;不然会找不到好多方法 明明有Timeout方法&#xff0c;不报红&#xff0c;运行时&#xff0c;报错&#xff0c;找不到该类 ClassNotFoundException 为了避免使用的Elasticsearch版本和SpringBoot采用的版本不一致导致的问题&#xff0c;尽量使用…

高校大模型实验室大模型应用平台

大模型应用平台是一款专为高校大模型应用场景教学和科研打造的知识库问答系统。该平台易于使用&#xff0c;知识库支持常见的txt、doc、pdf、md等数据文件上传&#xff0c;同时提供了简洁易懂的操作配置界面&#xff0c;使用户可以轻松地搭建和训练AI应用&#xff0c;并快速进行…

arm64高速缓存基础知识

高速缓存的替换策略 随机法&#xff1a;随机地确定替换的高速缓存行&#xff0c;由一个随机数产生器产生随机数来确认替换行 FIFO法&#xff1a;选择最先调入的高速缓存行进行替换 LRU法&#xff1a;最少使用的行优先替换。 高速缓存的共享属性 内部共享的高速缓存通常指的…

Flutter基本组件Text使用

Text是一个文本显示控件&#xff0c;用于在应用程序界面中显示单行或多行文本内容。 Text简单Demo import package:flutter/material.dart;class MyTextDemo extends StatelessWidget {const MyTextDemo({super.key});overrideWidget build(BuildContext context) {return Sca…

飞速了解Conda的作用和安装使用教程

当我们想要在github上克隆不同的项目下来运行时&#xff0c;会发现项目的语言环境或包的版本不同&#xff0c;出现版本冲突问题会导致程序无法运行、兼容性问题频出。我们常常需要管理多个项目&#xff0c;每个项目可能依赖于不同的包版本或编程语言环境。如果不加以管理&#…

agentuniverse快速开始和踩坑

https://github.com/alipay/agentUniverse/tree/mastergithub地址:https://github.com/alipay/agentUniverse/tree/master 老大看了演示demo也想跟着做个agent工具,但踩坑太多,含泪写下博客 前置环节 git clone https://github.com/alipay/agentUniverse.git conda create -n…

AndroidStudio清除重置Http Proxy代理的方式

问题背景 在国内做代码开发的都知道&#xff0c;在国际互联网我们存在看不见的墙&#xff0c;导致无法访问一些代码库和资源&#xff0c;所以在使用开发工具拉取第三方库的时候总会遇到无法连接或者连接超时的情况&#xff0c;所以就会使用一些安全的网络代理工具&#xff0c;辅…

【JavaSE】Java基本数据类型缓存池

new Integer(18) 、 Integer.valueOf(18) 、Integer.valueOf(300) 的区别 new Integer(18) &#xff1a;每次都会创建一个新对象Integer.valueOf(x)&#xff1a; x in [-128, 127]&#xff1a;使用缓存池中的对象x not in [-128, 127]&#xff1a;创建新对象 Integer缓存池大…

【Qt】事件分发器

事件分发器 概述 在 Qt 中&#xff0c;事件分发器(Event Dispatcher) 是⼀个核⼼概念&#xff0c;⽤于处理 GUI 应⽤程序中的事件。事件分发器负责将事件从⼀个对象传递到另⼀个对象&#xff0c;直到事件被处理或被取消。每个继承⾃ QObject类 或 QObject类 本⾝都可以在本类中…

《纳瓦尔宝典》的核心思想在于阐述如何通过智慧和策略实现财富自由和生活幸福

《纳瓦尔宝典》概况 图书概况 《纳瓦尔宝典》是2022年5月10日由中信出版社出版的一本书籍&#xff0c;作者是美国作家埃里克乔根森。该书通过收集和整理硅谷知名天使投资人纳瓦尔拉维坎特在推特、博客和播客等平台上的智慧箴言&#xff0c;形成了一本关于财富积累和幸福人生的…

如何在红米手机中恢复已删除的照片?(6 种方式可供选择)

凭借出色的相机和实惠的价格&#xff0c;小米红米系列已成为全球知名品牌。但是&#xff0c;最近有些人抱怨他们在 红米设备上丢失了许多珍贵的图片或视频&#xff0c;并希望弄清楚如何从小米手机恢复已删除的照片。好吧&#xff0c;在小米设备上恢复已删除的视频/照片并不难。…