Flink 1.14.* Flink窗口创建和窗口计算源码

解析Flink如何创建的窗口,和以聚合函数为例,窗口如何计算聚合函数

  • 一、构建不同窗口的build类
    • 1、全局窗口
    • 2、创建按键分流后的窗口
  • 二、在使用窗口处理数据流时,不同窗口创建的都是窗口算子WindowOperator
    • 1、聚合函数实现
    • 2、创建全局窗口(入参传的是NullByteKeySelector)
    • 3、创建按键分流后的窗口(入参传的是KeyedStream的KeySelector)
    • 3、WindowOperator

一、构建不同窗口的build类

这个是示例,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> input = env.fromElements(Tuple2.of("key1", 1),Tuple2.of("key1", 3),Tuple2.of("key2", 2),Tuple2.of("key2", 4));

1、全局窗口

下面是创建全局窗口的代码

AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowed = input.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
@PublicEvolving
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {return new AllWindowedStream(this, assigner);}

@Public
public class AllWindowedStream<T, W extends Window> {private final KeyedStream<T, Byte> input;private final WindowAssigner<? super T, W> windowAssigner;private Trigger<? super T, ? super W> trigger;private Evictor<? super T, ? super W> evictor;private long allowedLateness = 0L;private OutputTag<T> lateDataOutputTag;@PublicEvolvingpublic AllWindowedStream(DataStream<T> input, WindowAssigner<? super T, W> windowAssigner) {//这里设置input的KeySelector为null的对象this.input = input.keyBy(new NullByteKeySelector());this.windowAssigner = windowAssigner;this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());}
}

AllWindowedStream 是对整个数据流应用窗口操作的抽象,而不进行键分组。换句话说,AllWindowedStream 是对全局数据流进行窗口操作。

使用场景:

  1. 当你不需要对数据流进行键分组,而是希望对整个数据流应用窗口操作时,使用 AllWindowedStream
  2. 适用于全局统计、全局聚合等场景。

2、创建按键分流后的窗口

下面是根据第一位字段当键分流,针对键分的流数据,分别创建窗口

KeyedStream<Tuple2<String, Integer>, String> keyed = input.keyBy(value -> value.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
 @PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream(this, assigner);}
@Public
public class WindowedStream<T, K, W extends Window> {private final KeyedStream<T, K> input;//WindowOperatorBuilder 是 Flink 内部用于构建窗口操作符的工具类。它主要用于在内部构建和配置窗口操作符(WindowOperator),并不直接用于用户代码中。WindowOperatorBuilder 提供了一种灵活的方式来配置窗口操作符的各种细节,包括窗口分配器、窗口触发器、窗口合并器等。private final WindowOperatorBuilder<T, K, W> builder;@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {//这里只需要设置input,input的keyBy已经在前面设置了this.input = input;//通过input.getKeySelector()获取KeyedStream设置的函数this.builder = new WindowOperatorBuilder(windowAssigner, windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), input.getExecutionConfig(), input.getType(), input.getKeySelector(), input.getKeyType());}//调用WindowedStream的trigger实际上调用的是WindowOperatorBuilder的trigger方法@PublicEvolvingpublic WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {this.builder.trigger(trigger);return this;}}   
public class WindowOperatorBuilder<T, K, W extends Window> {private static final String WINDOW_STATE_NAME = "window-contents";private final ExecutionConfig config;private final WindowAssigner<? super T, W> windowAssigner;private final TypeInformation<T> inputType;private final KeySelector<T, K> keySelector;private final TypeInformation<K> keyType;private Trigger<? super T, ? super W> trigger;@Nullableprivate Evictor<? super T, ? super W> evictor;private long allowedLateness = 0L;@Nullableprivate OutputTag<T> lateDataOutputTag;public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig config, TypeInformation<T> inputType, KeySelector<T, K> keySelector, TypeInformation<K> keyType) {this.windowAssigner = windowAssigner;this.config = config;this.inputType = inputType;//把KeyedStream中的keySelector赋值到WindowOperatorBuilder的keySelectorthis.keySelector = keySelector;this.keyType = keyType;this.trigger = trigger;}
}

WindowedStream 是在对数据流进行键分组后,对每个键的子流应用窗口操作的抽象。也就是说,WindowedStream 是对每个键进行独立的窗口操作。

使用场景:

  1. 当你需要对数据流按键分组,并对每个键的子流应用窗口操作时,使用 WindowedStream
  2. 适用于需要对不同键进行独立统计和聚合的场景。

二、在使用窗口处理数据流时,不同窗口创建的都是窗口算子WindowOperator

这里以聚合函数为例,看不同的窗口类型创建的算子是什么。

1、聚合函数实现

 // 定义聚合函数AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> aggregateFunction =new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> createAccumulator() {return new Tuple2<>("", 0);}@Overridepublic Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {return new Tuple2<>(value.f0, value.f1 + accumulator.f1);}@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {return new Tuple2<>(a.f0, a.f1 + b.f1);}};//聚合函数接口 public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

2、创建全局窗口(入参传的是NullByteKeySelector)

根据上面知道,此时

@Public
public class AllWindowedStream<T, W extends Window> {@PublicEvolvingpublic AllWindowedStream(DataStream<T> input, WindowAssigner<? super T, W> windowAssigner) {//这里设置input的KeySelector为null的对象this.input = input.keyBy(new NullByteKeySelector());this.windowAssigner = windowAssigner;this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());}@PublicEvolvingpublic <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, AllWindowFunction<V, R, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<R> resultType) {//根据AllWindowedStream的构造函数,知道此时this.input.getKeySelector()=new NullByteKeySelectorKeySelector<T, Byte> keySel = this.input.getKeySelector();//省略干扰代码AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor("window-contents", aggregateFunction, accumulatorType.createSerializer(this.getExecutionEnvironment().getConfig()));operator = new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueAllWindowFunction(windowFunction), this.trigger, this.allowedLateness, this.lateDataOutputTag);//省略干扰代码   return this.input.transform(opName, resultType, (OneInputStreamOperator)operator).forceNonParallel();}}
@PublicEvolving
public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {private final AggregateFunction<IN, ACC, OUT> aggFunction;public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeSerializer<ACC> typeSerializer) {super(name, typeSerializer, (Object)null);this.aggFunction = (AggregateFunction)Preconditions.checkNotNull(aggFunction);}
}

3、创建按键分流后的窗口(入参传的是KeyedStream的KeySelector)

public class WindowedStream<T, K, W extends Window> {@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder = new WindowOperatorBuilder(windowAssigner, windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), input.getExecutionConfig(), input.getType(), input.getKeySelector(), input.getKeyType());}public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<R> resultType) {//删除干扰代码aggregateFunction = (AggregateFunction)this.input.getExecutionEnvironment().clean(aggregateFunction);String opName = this.builder.generateOperatorName(aggregateFunction, windowFunction);OneInputStreamOperator<T, R> operator = this.builder.aggregate(aggregateFunction, windowFunction, accumulatorType);return this.input.transform(opName, resultType, operator);}}

通过上面我们知道builder指的是WindowOperatorBuilder,并且构造函数入参中的keySelector实际上是keyedStreamkeySelector

public class WindowOperatorBuilder<T, K, W extends Window> {public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig config, TypeInformation<T> inputType, KeySelector<T, K> keySelector, TypeInformation<K> keyType) {this.windowAssigner = windowAssigner;this.config = config;this.inputType = inputType;//这个keySelector = keyedStream的keySelectorthis.keySelector = keySelector;this.keyType = keyType;this.trigger = trigger;}public <ACC, V, R> WindowOperator<K, T, ?, R, W> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {//删除干扰代码AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor("window-contents", aggregateFunction, accumulatorType.createSerializer(this.config));return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction(windowFunction));}private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc, InternalWindowFunction<ACC, R, K, W> function) {return new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config), stateDesc, function, this.trigger, this.allowedLateness, this.lateDataOutputTag);}}  
}    

两种窗口最后都是构建WindowOperator,只是传的参数不一样,其中全局窗口的keySelectornull对象,按键建窗口的keySelector是取的KeyedStream

3、WindowOperator

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {private final KeySelector<IN, K> keySelector;private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness, OutputTag<IN> lateDataOutputTag) {//删除干扰代码    this.windowStateDescriptor = windowStateDescriptor;this.keySelector = (KeySelector)Preconditions.checkNotNull(keySelector);}public void open() throws Exception {if (this.windowStateDescriptor != null) {this.windowState = (InternalAppendingState)this.getOrCreateKeyedState(this.windowSerializer, this.windowStateDescriptor);}}//数据到的执行方法public void processElement(StreamRecord<IN> element) throws Exception {//它遍历名为elementWindows的迭代器Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);//这里有判断窗口是否是像会话窗口那种需要动态合并窗口的逻辑,为了不干扰理解,这里删除了那一块代码逻辑,有兴趣的可以专门去看一下//删除干扰代码Iterator var12 = elementWindows.iterator();label59:while(true) {Window window;TriggerResult triggerResult;while(true) {//在每次迭代中,它会检查窗口是否已经过期(isWindowLate方法)do {if (!var12.hasNext()) {break label59;}window = (Window)var12.next();} while(this.isWindowLate(window));//更新窗口的状态,将元素值添加到窗口状态中,并在触发器上下文中设置键和窗口isSkippedElement = false;this.windowState.setCurrentNamespace(window);//add方法this.windowState.add(element.getValue());this.triggerContext.key = key;this.triggerContext.window = window;//调用onElement方法对元素进行处理并检查触发器结果triggerResult = this.triggerContext.onElement(element);if (!triggerResult.isFire()) {//如果触发结果不需要触发(isFire() 返回 false),则跳出内部循环。break;}//如果窗口内容不为空,它将发出窗口内容并终止内部循环ACC contents = this.windowState.get();if (contents != null) {this.emitWindowContents(window, contents);break;}}//如果触发器结果要求清除窗口(isPurge()返回true),则会清除窗口状态if (triggerResult.isPurge()) {this.windowState.clear();}this.registerCleanupTimer(window);}}//水位线判断逻辑protected boolean isWindowLate(W window) {return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.internalTimerService.currentWatermark();}}    

这里又发现了熟悉的接口,OneInputStreamOperator<IN, OUT>processElement方法实际上是父类接口Input<IN>的processElement方法

下面是WindowOperator的类关系图,和Flink 1.14.*中flatMap,filter等基本转换函数源码中RichFlatMapFunctionRichFilterFunction一样的父类AbstractUdfStreamOperator ,接口新增了特性
在这里插入图片描述
通过这些,大家心里应该有数了,不管是FlatMap还是Filter还是窗口,都是基于这个类关系图扩展下来的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/53193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringFrameWork学习笔记

本笔记基于【尚硅谷新版SSM框架全套视频教程&#xff0c;Spring6SpringBoot3最新SSM企业级开发】https://www.bilibili.com/video/BV1AP411s7D7?vd_sourcea91dafe0f846ad7bd19625e392cf76d8 总结 资料获取网址&#xff1a;https://www.wolai.com/v5Kuct5ZtPeVBk4NBUGBWF 技术…

Java项目: 基于SpringBoot+mysql房产销售系统 (含源码+数据库+开题报告+答辩PPT+毕业论文)

一、项目简介 本项目是一套基于SpringBootmysql房产销售系统 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功能齐…

Halcon基于灰度值的模板匹配

Halcon基于灰度值的模板匹配 基于灰度值的模板匹配是最经典的模板匹配算法&#xff0c;也是最早提出来的模板匹配算法。这种算法的根本思想是&#xff0c;计算模板图像与检测图像之间的像素灰度差值的绝对值总和&#xff08;SAD方法&#xff09;或者平方差总和&#xff08;SSD…

ico格式怎么转换?5个软件让你轻松转换文件格式

ico格式怎么转换&#xff1f;5个软件让你轻松转换文件格式 ICO格式是常用于网站图标和应用程序图标的文件格式&#xff0c;虽然它很常见&#xff0c;但并非所有图像编辑软件都支持直接保存为ICO格式。如果你需要将其他格式的图片&#xff08;如PNG、JPG等&#xff09;转换为IC…

读书学习笔记入门 # Datawhale X 李宏毅苹果书 AI夏令营

文章目录 学习目标&#xff1a;学习内容&#xff1a;Task 1 通过案例了解机器学习机器学习&#xff08;Machine Learning&#xff0c;ML&#xff09;和深度学习&#xff08;Deep Learning&#xff0c;DL&#xff09;的基本概念什么是回归&#xff08;regression&#xff09;什么…

深入解析Linux轻量级进程:线程的概念、原理、优缺点及其与进程的关系与区别

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;Linux从入门到进阶 欢迎大家点赞收藏评论&#x1f60a; 目录 &#x1f4da;Linux线程&#x1f4d5;什么是线程*可以使用多进程去并发的执行一个进程的代码&#xff0c;那为什么要由线程呢&#x…

基于CloudflareSpeedTest项目实现git clone加速

1.网络测速 「自选优选 IP」测试 Cloudflare CDN 延迟和速度&#xff0c;获取最快 IP 更多内容参考项目&#xff1a;https://github.com/XIU2/CloudflareSpeedTest 国外很多网站都在使用 Cloudflare CDN&#xff0c;但分配给中国内地访客的 IP 并不友好&#xff08;延迟高、丢…

Pixelmator Pro for Mac 专业图像处理软件【媲美PS的修图软件】

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;表示安装成功 三、运行测试安装完成&#xff01;&#xff01;&#xff01; 效果 一、下载软件 下载软件…

【STM32+HAL库】---- 通用定时器输入捕获PWM信号

硬件开发板&#xff1a;STM32G0B1RET6 软件平台&#xff1a;cubemaxkeilVScode1 新建cubemax工程 1.1 配置系统时钟RCC 1.2 配置定时器 1.2.1 配置输入捕获 选择通用定时器TIM2-Channel 1为输入捕获引脚&#xff0c;对应IO口是PA0,时钟源选择内部时钟源Internal clock,工作模…

Unity实战案例 2D小游戏HappyGlass(模拟水珠)

本案例素材和教程都来自Siki学院&#xff0c;十分感谢教程中的老师 本文仅作学习笔记分享交流&#xff0c;不作任何商业用途 预制体 在这个小案例中&#xff0c;水可以做成圆形但是带碰撞体&#xff0c;碰撞体比图形小一圈&#xff0c;顺便加上Trail renderer组件 材质 将碰撞…

Win11 / Win10 系统极化工具,降低游戏延迟效果明显

Win11 / Win10 系统优化工具,降低游戏延迟效果明显 Windows 系统优化就是精简系统一些功能组件、对一些系统功能进行设置等&#xff0c;这样可以减少不必要的硬件资源占用。 全面的系统优化功能外&#xff0c;据不少网友表示通过优化后 CS GO 游戏降低输入延迟效果明显。 免费…

沃飞长空联合极氪亮相2024世界动力电池大会

9月1日至2日&#xff0c;2024世界动力电池大会在四川宜宾举办&#xff0c;沃飞长空与同属吉利控股集团旗下的新时代豪华科技品牌极氪汽车一同亮相。 现场&#xff0c;双方携手展出了AE200电动垂直起降航空器、极氪009光辉版、极氪001&#xff0c;以及极氪能源、金砖电池、威睿…

开源 AI 智能名片 O2O 商城小程序在营销中的应用

摘要&#xff1a;本文探讨了开源 AI 智能名片 O2O 商城小程序在营销中的应用&#xff0c;重点分析了喜好原则、互惠互利和高度认可三个方面对小程序推广和用户忠诚度提升的重要性。通过融入这些原则&#xff0c;开源 AI 智能名片 O2O 商城小程序能够更好地满足用户需求&#xf…

HTML静态网页成品作业(HTML+CSS)——动漫大耳朵图图网页(4个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有4个页面。 二、作品演示 三、代…

VUE2.0 elementUI el-input-number 数据更新,视图不更新——基础积累

今天遇到一个问题&#xff0c;是关于el-input-number组件的&#xff0c;发现数据明明已经更改了&#xff0c;但是页面上组件输入框中还是之前的值。 比如上方输入框中&#xff0c;我输入120.5&#xff0c;就会出现下面的诡异现象 回显此值是120.779&#xff0c;但是页面上输入…

协同开发工具Git

网上对于Git的使用方法介绍的很多&#xff0c;在日常工作中&#xff0c;Git是团队开发必不可少的工具之一&#xff0c;我想为一些刚使用Git的小伙伴们介绍一下常遇到的小问题。 1&#xff1a;拼写错误。这应该是每个初学者都会犯得错误&#xff0c;当出现这种错误还是比较好排…

供应链管理平台开发指南:从食堂采购系统源码开始

本篇文章&#xff0c;小编将围绕如何从食堂采购系统源码出发&#xff0c;构建一个完整的供应链管理平台进行详细解读&#xff0c;帮助开发人员掌握实现技术要点&#xff0c;并为企业打造高效的供应链系统提供技术参考。 一、供应链管理平台的核心功能概述 供应链管理平台的核心…

RK3568 Android 11 蓝牙BluetoothA2dpSink 获取用于生成频谱的PCM

Android 中的 A2DP Sink A2DP Sink 在 Android 系统中主要用于 接收 其他蓝牙设备&#xff08;如手机、平板、电脑等&#xff09;发送过来的 高质量的立体声音频。简单来说&#xff0c;它让你的 Android 设备可以充当一个 蓝牙音箱 或 耳机 的角色。 核心功能&#xff1a; 接…

中国农村政策与改革统计年报(2015-2022年)

中国农村经营管理统计年报、政策与改革统计年报&#xff08;2015-2022年&#xff09; 数据年限&#xff1a;2015-2022年&#xff0c;目前最新 数据格式&#xff1a;pdf 数据范围&#xff1a;全国各省市自治区&#xff08;不含港澳台&#xff09; 数据内容&#xff1a;《中国农村…

win系统安装mysql,使用mysqldump,pycharm使用mysqldump,避坑

文章目录 下载mysql的win客户端设置系统环境变量验证是否可用pycharm使用mysqldump异常问题排查 下载mysql的win客户端 官网下载地址如果下载旧版本&#xff0c;需自行到Archives里面找 本人使用的是mysql5.7&#xff0c;找到相应版本后&#xff0c;点击Download下载 设置系统…