二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

目录

1.背景

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

2.2coGroup方法入口

2.3 CoGroupedStreams对象分析

2.4WithWindow内部类分析

2.5CoGroupWindowFunction函数分析

3.修改源码支持获取迟到数据测输出流

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

3.3新增WithWindow构造方法

3.4修改apply方法

3.5开放UnionTypeInfo类的public权限

​编辑

3.7项目中查看maven是否已经刷新为最新代码

4.测试


1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

    input1.coGroup(input2).where(keySelector1).equalTo(keySelector2).window(windowAssigner).trigger(trigger).evictor(evictor).allowedLateness(allowedLateness).apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

    /*** Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and* window can be specified.*/public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {return new CoGroupedStreams<>(this, otherStream);}

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注

/*** A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as* well as a {@link WindowAssigner}.** @param <T1> Type of the elements from the first input* @param <T2> Type of the elements from the second input* @param <KEY> Type of the key. This must be the same for both inputs* @param <W> Type of {@link Window} on which the co-group operation works.*/@Publicpublic static class WithWindow<T1, T2, KEY, W extends Window> {//第一条流private final DataStream<T1> input1;//第二条流private final DataStream<T2> input2;//第一个key提取器private final KeySelector<T1, KEY> keySelector1;//第二个Key提取器private final KeySelector<T2, KEY> keySelector2;//Key的类型private final TypeInformation<KEY> keyType;//窗口分配器private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;//窗口出发计算器private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;private final Time allowedLateness;private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;//构造函数给上面对象赋值protected WithWindow(DataStream<T1> input1,DataStream<T2> input2,KeySelector<T1, KEY> keySelector1,KeySelector<T2, KEY> keySelector2,TypeInformation<KEY> keyType,WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,Time allowedLateness) {this.input1 = input1;this.input2 = input2;this.keySelector1 = keySelector1;this.keySelector2 = keySelector2;this.keyType = keyType;this.windowAssigner = windowAssigner;this.trigger = trigger;this.evictor = evictor;this.allowedLateness = allowedLateness;}/*** Completes the co-group operation with the user function that is executed for windowed* groups.** <p>Note: This method's return type does not support setting an operator-specific* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific* parallelism.*/public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {// clean the closurefunction = input1.getExecutionEnvironment().clean(function);//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型UnionTypeInfo<T1, T2> unionType =new UnionTypeInfo<>(input1.getType(), input2.getType());//转换成union的KeySelectorUnionKeySelector<T1, T2, KEY> unionKeySelector =new UnionKeySelector<>(keySelector1, keySelector2);//将taggedInput1的数据类容map成UnionTypeInfo<T1, T2>类型SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =input1.map(new Input1Tagger<T1, T2>());taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);taggedInput1.returns(unionType);//将taggedInput2的数据类容map成UnionTypeInfo<T1, T2>类型SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =input2.map(new Input2Tagger<T1, T2>());taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);taggedInput2.returns(unionType);//将两个流进行unionDataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);//keyBy并且开窗windowedStream =new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType).window(windowAssigner);//配置窗口触发器if (trigger != null) {windowedStream.trigger(trigger);}//配置移除器if (evictor != null) {windowedStream.evictor(evictor);}//配置allowedLatenessif (allowedLateness != null) {windowedStream.allowedLateness(allowedLateness);}//创建CoGroupWindowFunction ,并把用户函数传入进去return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);}/*** Completes the co-group operation with the user function that is executed for windowed* groups.** <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,* TypeInformation)} method has the wrong return type and hence does not allow one to set an* operator-specific parallelism** @deprecated This method will be removed once the {@link #apply(CoGroupFunction,*     TypeInformation)} method is fixed in the next major version of Flink (2.0).*/@PublicEvolving@Deprecatedpublic <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {return (SingleOutputStreamOperator<T>) apply(function, resultType);}@VisibleForTestingTime getAllowedLateness() {return allowedLateness;}//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流@VisibleForTestingWindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {return windowedStream;}}

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

   private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>extends WrappingFunction<CoGroupFunction<T1, T2, T>>implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {private static final long serialVersionUID = 1L;public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {super(userFunction);}@Overridepublic void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)throws Exception {//缓存当前窗口里1号流的数据List<T1> oneValues = new ArrayList<>();//缓存当前窗口里2号流的数据List<T2> twoValues = new ArrayList<>();for (TaggedUnion<T1, T2> val : values) {if (val.isOne()) {oneValues.add(val.getOne());} else {twoValues.add(val.getTwo());}}//传入到用户函数中wrappedFunction.coGroup(oneValues, twoValues, out);}}

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

    @PublicEvolvingpublic WithWindow<T1, T2, KEY, W> sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) {return new WithWindow<>(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,trigger,evictor,allowedLateness,outputTag);}

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

   protected WithWindow(DataStream<T1> input1,DataStream<T2> input2,KeySelector<T1, KEY> keySelector1,KeySelector<T2, KEY> keySelector2,TypeInformation<KEY> keyType,WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,Time allowedLateness,OutputTag<TaggedUnion<T1, T2>> laterOutputTag) {this(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,trigger,evictor,allowedLateness);this.lateDataOutputTag = laterOutputTag;}

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

 /*** Completes the co-group operation with the user function that is executed for windowed* groups.** <p>Note: This method's return type does not support setting an operator-specific* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific* parallelism.*/public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {// clean the closurefunction = input1.getExecutionEnvironment().clean(function);UnionTypeInfo<T1, T2> unionType =new UnionTypeInfo<>(input1.getType(), input2.getType());UnionKeySelector<T1, T2, KEY> unionKeySelector =new UnionKeySelector<>(keySelector1, keySelector2);SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =input1.map(new Input1Tagger<T1, T2>());taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);taggedInput1.returns(unionType);SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =input2.map(new Input2Tagger<T1, T2>());taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);taggedInput2.returns(unionType);DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);// we explicitly create the keyed stream to manually pass the key type information inwindowedStream =new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType).window(windowAssigner);if (trigger != null) {windowedStream.trigger(trigger);}if (evictor != null) {windowedStream.evictor(evictor);}if (allowedLateness != null) {windowedStream.allowedLateness(allowedLateness);}//判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream//的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中if (lateDataOutputTag != null) {windowedStream.sideOutputLateData(lateDataOutputTag);}return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);}

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2<Integer, WaterSensor> 类型进行打印

    OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(3)).sideOutputLateData(outputTag).with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {out.collect(first.toString() + "======" + second.toString());}});with.print();with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {@Overridepublic Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());}}).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言中的联合体和枚举

联合体 联合体的创建 联合体的关键字是union union S {char a;int i; };除了关键字和结构体不一样之外&#xff0c;联合体的创建语法形式和结构体的很相似&#xff0c;如果不熟悉结构体的创建&#xff0c;可以看一下我上一篇的博客关于结构体知识的详解。 联合体的特点 联合…

移植 Zephyr 到 Art-Pi

背景 ​ 最近工作中接触到了 Zephyr&#xff0c;不由觉得 Zephyr 是个很强大、全面、优秀的实时操作系统&#xff0c;但同时是有一定的上手难度的&#xff0c;其复杂的构建系统让小编倒吸一口凉气。为了深入研究并完全掌控 Zephyr&#xff0c;小编决定把它移植到手头的开发板上…

Springboot实现合并单元格的excel文件导入到数据库(多模块)

最近做项目的时候一直在遇到excel导入导出的问题&#xff0c;本篇博文也是为了记录我这几天的血泪史&#xff0c;并做以记录&#xff0c;希望各位看完之后能有所收获。 以下是我excel文档里面的具体内容&#xff1a; excel文件中的编码信息属于另外一张表&#xff0c;所以以下…

android emulator windows bat启动

android emulator windows bat启动 先上结果 // 模拟器路径 -netspeed full -avd 模拟器名称 C:\Users\name\AppData\Local\Android\Sdk\emulator\emulator.exe -netdelay none -netspeed full -avd Pixel_3a_API_34_extension_level_7_x86_64一般来说 windows 如果不做…

2023年全国职业院校技能大赛(网络系统管理赛项)样题一

2023****年全国职业院校技能大赛 GZ073****网络系统管理赛项 赛题第1套 模块A&#xff1a;网络构建 目 录 任务清单… 1 &#xff08;一&#xff09;基础配置… 1 &#xff08;二&#xff09;有线网络配置… 1 &#xff08;三&#xff09;无线网络配置… 3 &#xff0…

初探Flink集群【持续更新】

周末下雨&#xff0c;倒杯茶&#xff0c;在家练习Flink相关。 开发工具&#xff1a;IntelliJ Idea 第一步、创建项目 打开Idea&#xff0c;新建Maven项目&#xff0c;包和项目命名 在pom.xml 文件中添加依赖 <properties><flink.version>1.13.0</flink.vers…

使用Python进行股票分析(2)

简介 我们在之前的文章《使用Python进行股票分析&#xff08;1&#xff09;》中&#xff0c;通过自动获取股票的历史数据&#xff0c;然后选择在一定时间内处于上涨的股票作为我们投资的标的。在本文中&#xff0c;我们进一步通过分析股票的短期趋势&#xff0c;选择处于短期上…

Ubuntu Desktop 安装谷歌拼音输入法

Ubuntu Desktop 安装谷歌拼音输入法 1. Installation1.1. 汉语语言包​1.2. 谷歌拼音输入法1.3. 安装语言包1.4. 键盘输入方式系统1.5. 重启电脑1.6. 输入法配置 2. configuration2.1. Text Entry Settings… 3. ExecutionReferences 1. Installation 1.1. 汉语语言包 strong…

springcloud第4季 负载均衡的介绍3

一 loadbalance 1.1 负载均衡的介绍 使用注解loadbalance&#xff0c;是一个客户端的负载均衡器&#xff1b;通过之前已经从注册中心拉取缓存到本地的服务列表中&#xff0c;获取服务进行轮询负载请求服务列表中的数据。 轮询原理 1.2 loadbalance工作流程 loadBalance工作…

再仔细品品Elasticsearch的向量检索

我在es一开始有向量检索&#xff0c;就开始关注这方面内容了。特别是在8.X之后的版本&#xff0c;更是如此。我也已经把它应用在亿级的生产环境中&#xff0c;用于多模态检索和语义检索&#xff0c;以及RAG相关。 也做过很多的优化&#xff1a;ES 8.x 向量检索性能测试 & 把…

Swift 从获取所有 NSObject 对象聊起:ObjC、汇编语言以及底层方法调用链(三)

概览 承接上一篇博文: Swift 从获取所有 NSObject 对象聊起:ObjC、汇编语言以及底层方法调用链(二)我们在其中讨论了如何使用第三方强大通用的钩子库 SwiftHook 来协助我们完成 NSObject 构造器 init 的 SWIZZ 操作。我们还讨论了为什么用 print 打印对象信息时会发生崩溃…

Unity 布局元素Layout Element

Layout Element是一种用于控制UI元素在布局组件&#xff08;如Horizontal Layout Group、Vertical Layout Group、Grid Layout Group、Content Size Fitter和Aspect Ratio Fitter&#xff09;中的大小和位置的组件。Layout Element组件可以附加到UI元素上&#xff0c;以便在布局…

opencv各个模块介绍(2)

Features2D 模块&#xff1a;特征检测和描述子计算模块&#xff0c;包括SIFT、SURF等算法。 Features2D 模块提供了许多用于特征检测和描述子匹配的函数和类&#xff0c;这些函数和类可用于图像特征的提取、匹配和跟踪。 FeatureDetector&#xff1a;特征检测器的基类&#xf…

arm 外部中断

main.c: #include"key_inc.h" //封装延时函数 void delay(int ms) {int i,j;for(i0;i<ms;i){for(j0;j<2000;j){}} } int main() {//按键中断的初始化key1_it_config();key2_it_config();key3_it_config();while(1){printf("in main pro\n");delay(1…

查看Linux系统重启的四种基本命令

目录 前言1. last2. uptime3. journalctl4. dmesg 前言 对于排查其原因推荐阅读&#xff1a;详细分析服务器自动重启原因&#xff08;涉及Linux、Window&#xff09; 在Linux中&#xff0c;有多种命令可以查看系统重启的信息 以下是其中一些常用的命令及其解释&#xff1a; …

EasyPOI操作Excel从零入门

教程介绍 我们不造轮子&#xff0c;只是轮子的搬运工。&#xff08;其实最好是造轮子&#xff0c;造比别人好的轮子&#xff09;开发中经常会遇到excel的处理&#xff0c;导入导出解析等等&#xff0c;java中比较流行的用poi&#xff0c;但是每次都要写大段工具类来搞定这事儿…

【新版】系统架构设计师 - 新版架构备考索引<附2023年11月原题回忆>

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 新版架构备考索引机考详情备考索引与方向&#xff08;个人观点&#xff0c;仅供参考&#xff09;总结附&#xff1a;2023年11月改版机试原题简单回忆 架构 - 新版架构备考索引 首先&#xff0c;此…

Spring 简介

1. Spring简介 1.1 Spring 核心设计思想 1.1.1 Spring 是什么&#xff1f; Spring 是包含了众多⼯具⽅法的 IoC 容器。Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff0c;它是⼀个开源框架&#xff0c;Spring ⽀持⼴泛的应⽤场景&#xff0c;它…

月之暗面Kimi代码分析能力评测

最近打算重构一下PawSQL优化引擎中的OR条件的SELECT重写优化策略的代码&#xff0c;时间有点久&#xff0c;代码有点复杂&#xff0c;看到网上对新出了KIMI评价很高。于是尝试用它来理解一下代码。上传了此优化重写的代码&#xff0c;提问&#xff1a; 第一问&#xff0c;设计…

java多线程编程面试题总结

一些最基本的基础知识就不总结了&#xff0c;参考之前写的如下几篇博客&#xff0c;阅读顺序从上到下&#xff0c;依次递进。 java 多线程 多线程概述及其三种创建方式 线程的常用方法 java 线程安全问题 三种线程同步方案 线程通信&#xff08;了解&#xff09; java 线程池…