深入理解Flink IntervalJoin源码

IntervalJoin基于connect实现,期间会生成对应的IntervalJoinOperator。

@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,TypeInformation<OUT> outputType) {Preconditions.checkNotNull(processJoinFunction);Preconditions.checkNotNull(outputType);// 检查用户自定义Functionfinal ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);// 构建IntervalJoin对应的IntervalJoinOperatorfinal IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =new IntervalJoinOperator<>(lowerBound,upperBound,lowerBoundInclusive,upperBoundInclusive,left.getType().createSerializer(left.getExecutionConfig()),right.getType().createSerializer(right.getExecutionConfig()),cleanedUdf);// (基于connect实现)使用给定的自定义Function,对每个元素进行连接操作return left.connect(right)// 根据k1、k2,为s1、s2分配k,实际就是构建ConnectedStreams,以便后续构建IntervalJoinOperator对应的Transformation.keyBy(keySelector1, keySelector2)// 构建IntervalJoinOperator对应的TwoInputTransformation.transform("Interval Join", outputType, operator);
}

并且会根据给定的自定义Function构建出对应的TwoInputTransformation,以便能够参与Transformation树的构建。

/*** 创建StreamOperator对应的Transformation,以便能参与Transformation树的构建*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String functionName,TypeInformation<R> outTypeInfo,TwoInputStreamOperator<IN1, IN2, R> operator) {inputStream1.getType();inputStream2.getType();// 创建IntervalJoinOperator对应的TwoInputTransformationTwoInputTransformation<IN1, IN2, R> transform = new TwoInputTransformation<>(inputStream1.getTransformation(),inputStream2.getTransformation(),functionName,operator,outTypeInfo,environment.getParallelism());if (inputStream1 instanceof KeyedStream && inputStream2 instanceof KeyedStream) {KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2;TypeInformation<?> keyType1 = keyedInput1.getKeyType();TypeInformation<?> keyType2 = keyedInput2.getKeyType();if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) {throw new UnsupportedOperationException("Key types if input KeyedStreams " +"don't match: " + keyType1 + " and " + keyType2 + ".");}transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());transform.setStateKeyType(keyType1);}@SuppressWarnings({ "unchecked", "rawtypes" })SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, transform);// 将IntervalJoinOperator对应的TwoInputTransformation,添加到Transformation树上getExecutionEnvironment().addOperator(transform);return returnStream;
}

作为ConnectedStreams,一旦left or right流中的StreamRecord抵达,就会被及时处理:

@Override
public void processElement1(StreamRecord<T1> record) throws Exception {/**处理left*/processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}@Override
public void processElement2(StreamRecord<T2> record) throws Exception {/**处理right*/processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}

两者的处理逻辑是相同的:

/*** 处理Left和Right中的数据*/
@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,// 当前Join上的数据是否为leftfinal boolean isLeft) throws Exception {// 当前left or right的StreamRecordfinal THIS ourValue = record.getValue();// 当前left or right的StreamRecord中的时间戳final long ourTimestamp = record.getTimestamp();if (ourTimestamp == Long.MIN_VALUE) {throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +"interval stream joins need to have timestamps meaningful timestamps.");}// 是否迟到:当前StreamRecord中的时间戳是否小于当前Watermarkif (isLate(ourTimestamp)) {return;}// 将当前StreamRecord写入到它所对应的“己方MapState”中(left归left,right归right)addToBuffer(ourBuffer, ourValue, ourTimestamp);/*** 遍历当前StreamRecord的“对方MapState”,判断哪个StreamRecord被Join上了*/for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {// “对方MapState”中的Key,即时间戳final long timestamp  = bucket.getKey();// 如果遍历到的MapState的这个元素的时间戳不在(以当前StreamRecord的时间戳为基准的)Join的范围内,// 说明没Join上,那就跳过本次循环。这是判断哪个StreamRecord是否Join上的核心!if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}// 反之,说明已经Join上了,那就取出这个元素的Value,即时间戳所对应的List<BufferEntry<T1>>for (BufferEntry<OTHER> entry: bucket.getValue()) {// 将Join上的left和right分发下游(回调用户自定义函数中的processElement()方法)if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}// 经历双层for循环并分发下游后,计算清理时间(当前StreamRecord的时间戳+上界值)long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;// 注册Timer来清理保存在MapState中的过期数据if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}
}

先取出当前StreamRecord中的Timestamp检查它是否已经迟到了,判断依据为:当前StreamRecord中的Timestamp是否小于当前Watermark。

/*** 判断当前StreamRecord是否迟到:当前StreamRecord中的时间戳是否小于当前Watermark*/
private boolean isLate(long timestamp) {// 获取当前的Watermarklong currentWatermark = internalTimerService.currentWatermark();// 迟到判定条件return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}

接着将当前StreamRecord写入到对应的MapState中。需要注意的是,left和right都有各自的MapState,这个MapState将Timestamp作为Key,将List集合作为Value(考虑到同一时刻可能会有多条数据)

/*** 将当前StreamRecord写入到它所对应的MapState中(left归left,right归right)*/
private static <T> void addToBuffer(final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,final T value,final long timestamp) throws Exception {// 先拿着时间戳作为key去MapState中取List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);if (elemsInBucket == null) {elemsInBucket = new ArrayList<>();}// 将StreamRecord包装成BufferEntry(默认未被Join上),add到List集合中elemsInBucket.add(new BufferEntry<>(value, false));// 将List集合put到MapState中(时间戳作为Key)buffer.put(timestamp, elemsInBucket);
}

接着会经历嵌套for循环,判断哪些StreamRecord是满足Join条件的:以当前StreamRecord的Timestamp和指定的上、下界组成时间过滤条件,对当前StreamRecord的“对方MapState”内的每个Timestamp(作为Key)进行比对。

/*** 遍历当前StreamRecord的“对方MapState”,判断哪个StreamRecord被Join上了*/
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {// “对方MapState”中的Key,即时间戳final long timestamp  = bucket.getKey();// 如果遍历到的MapState的这个元素的时间戳不在(以当前StreamRecord的时间戳为基准的)Join的范围内,// 说明没Join上,那就跳过本次循环。这是判断哪个StreamRecord是否Join上的核心!if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}// 反之,说明已经Join上了,那就取出这个元素的Value,即时间戳所对应的List<BufferEntry<T1>>for (BufferEntry<OTHER> entry: bucket.getValue()) {// 将Join上的left和right分发下游(回调用户自定义函数中的processElement()方法)if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}
}

一旦某个Key符合时间过滤条件,那就将它所对应的List集合(作为Value)取出来,逐条将其发送给下游,本质就是将其交给自定义Function处理

/*** 将满足IntervalJoin条件的StreamRecord发送给下游,本质就是将其交给自定义Function处理*/
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);collector.setAbsoluteTimestamp(resultTimestamp);context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);// 将Join上的StreamRecord交给自定义Function,执行开发者的处理逻辑userFunction.processElement(left, right, context, collector);
}

整个过滤筛选过程,也是IntervalJoin的核心所在!

最后,会计算保存在MapState中的StreamRecord的过期清理时间,因为StreamRecord不能一直被保存。本质就是基于InternalTimerService注册Timer,触发时间为:当前StreamRecord的Timestamp + 给定的上界值。

// 经历双层for循环并分发下游后,计算清理时间(当前StreamRecord的时间戳+上界值)
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
// 注册Timer来清理保存在MapState中的过期数据
if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}

由于IntervalJoinOperator实现了Triggerable接口,因此一旦注册的Timer被触发,就会将对应MapState中对应的Timestamp进行remove

/*** 基于InternalTimerService注册的Timer,会定时对MapState执行clean操作*/
@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);// clean leftleftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);// clean rightrightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/33112.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

​可视化绘图技巧100篇进阶篇(四)-三维簇状柱形图(3D Clustered Bar Chart)

目录 前言 适用场景 图例 柱形图 一、柱形图的特点 二、柱形图的类型

2021年12月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题:输出整数部分 输入一个双精度浮点数f, 输出其整数部分。 时间限制:1000 内存限制:65536 输入 一个双精度浮点数f(0 < f < 100000000)。 输出 一个整数,表示浮点数的整数部分。 样例输入 3.8889 样例输出 3 下面是一个使用C语言编写的输出双精度浮点数整数部分…

HTML详解连载(1)

HTML详解连载&#xff08;1&#xff09; HTML定义HTML 超文本标记语言标签语法注意拓展 HTML基本骨架解释VS Code 快速生成骨架&#xff1a;标签的关系父子关系&#xff08;嵌套关系&#xff09;兄弟关系&#xff08;并列关系&#xff09; 代码格式注释 标题标签标签名:h1-h6(双…

csrf跨站请求的相关装饰器、Auth模块(模块的使用、相关方法、退出系统、修改密码功能、注册功能)、扩展默认的auth_user表

一、csrf跨站请求的相关装饰器 django.middleware.csrf.CsrfViewMiddlewareDjango中有一个中间件对csrf跨站做了验证&#xff0c;我只要把csrf的这个中间件打开&#xff0c; 那就意味着所有的方法都要被验证 在所有的视图函数中&#xff1a;只有几个视图函数做验证只有几个函数…

制造业企业数字化转型之设备数据采集

导 读 ( 文/ 1894 ) 随着信息技术的快速发展和制造业的转型升级&#xff0c;企业数字化转型已成为保持竞争力和实现可持续发展的关键。在数字化转型过程中&#xff0c;设备数据采集作为重要的一环&#xff0c;发挥着关键的作用。设备数据采集通过收集、分析和利用设备所产生的数…

【资讯速递】AI与人类思维的融合;OpenAI在中国申请注册“GPT-5”商标;移动大模型主要面向to B 智能算力是未来方向

2023年8月11日 星期五 癸卯年六月廿五 第000001号 欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于IT资讯速递专栏,本专栏主要用于发布各种IT资讯&#xff0c;为大家可以省时省力的就能阅读和了解到行业的一些新资讯 资…

Effective Java笔记(28)列表优于数组

数组与泛型相比&#xff0c;有两个重要的不同点 。 首先&#xff0c;数组是协变的&#xff08; covariant &#xff09; 。 这个词听起来有点吓人&#xff0c;其实只是表示如果 Sub 为 Super 的子类型&#xff0c;那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反&#xff0c;泛…

无涯教程-Perl - link函数

描述 此函数创建一个新文件名NEWFILE,链接到文件OLDFILE。该函数创建一个硬链接&#xff1b;如果需要符号链接,请使用符号链接功能。 语法 以下是此函数的简单语法- link OLDFILE,NEWFILE返回值 如果失败,此函数返回0,如果成功,则返回1。 例 以下是显示其基本用法的示例…

开发一个RISC-V上的操作系统(六)—— 中断(interrupt)和异常(exception)

目录 往期文章传送门 一、控制流 &#xff08;Control Flow&#xff09;和 Trap 二、Exceptions, Traps, and Interrupts Contained Trap Requested Trap Invisible Trap Fatal Trap 异常和中断的异同 三、RISC-V的异常处理 mtvec&#xff08;Machine Trap-Vector Ba…

从零学算法154

154.已知一个长度为 n 的数组&#xff0c;预先按照升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,4,4,5,6,7] 在变化后可能得到&#xff1a; 若旋转 4 次&#xff0c;则可以得到 [4,5,6,7,0,1,4] 若旋转 7 次&#…

前端必学的CSS3波浪效果演示

以下是这三种CSS3波浪效果的使用说明 使用translateX和translateZ属性创建波浪效果&#xff1a; 使用场景&#xff1a; 适用于需要在X轴上平移和在Z轴上应用3D变换的波浪效果。可以用于创建具有起伏效果的海浪、水面波纹等效果。 优点&#xff1a; 通过3D变换&#xff0c;…

内生安全构建数据存储

一、数据安全成为防护核心&#xff0c;存储安全防护不容有失 1、数据作为企业的核心资产亟需重点保护&#xff0c;数据安全已成网络空间防护核心 2、国家高度重视关键信息基础设施的数据安全&#xff0c;存储安全已成为审核重点 二、存储安全是数据安全的关键一环&#xff0c;应…

AIGC技术揭秘:探索火热背后的原因与案例

文章目录 什么是AIGC技术&#xff1f;为何AIGC技术如此火热&#xff1f;1. 提高效率与创造力的完美结合2. 拓展应用领域&#xff0c;创造商业价值3. 推动技术创新和发展 AIGC技术案例解析1. 艺术创作&#xff1a;生成独特的艺术作品2. 内容创作&#xff1a;实时生成各类内容3. …

SolidWorks不能使用选择如允许此选择将生成有冲突的前后关系

SolidWorks不能使用选择如允许此选择将生成有冲突的前后关系 1 SolidWorks不能使用选择如允许此选择将生成有冲突的前后关系 1 SolidWorks不能使用选择如允许此选择将生成有冲突的前后关系 https://www.swrjzxw.com/1556.html SolidWorks装配体时 显示 不能使用选择如允许此选…

哪些CRM的报价公开且透明?

企业在选型时&#xff0c;会发现很多品牌的CRM系统价格并不透明&#xff0c;往往都是需要跟产品顾问沟通后才能了解。下面推荐一款价格实在的CRM系统&#xff0c;所有报价公开透明&#xff0c;那就是Zoho CRM。 Zoho CRM是什么&#xff1f; Zoho CRM是一款在线CRM软件&#x…

NAS相关

Debian11 更换软件源 备份 #备份软件源列表 cp /etc/apt/sources.list /etc/apt/sources.list.bak编辑sources.list nano /etc/apt/sources.list替换文件内容 deb http://mirrors.163.com/debian/ bullseye main non-free contrib deb http://mirrors.163.com/debian/ bull…

SAP BAPI 创建/修改MD61/MD62计划独立需求预测

MD61 创建&#xff1a; BAPI: BAPI_REQUIREMENTS_CREATE CLEAR: lv_error,ls_requirements_item,lt_requirements_schedule_in,ls_requirements_schedule_in,lt_return_n,ls_return_n,lv_reqmtsplannumber."工厂ls_requirements_item-plant lv_werks."MRP AR…

pytorch模型加载caffe模型的权重

一、将caffe模型的权重转成dict格式 caffe库的编译可以参考我之前写的一篇博客&#xff1a;ImportError: dynamic module does not define module export function (PyInit__caffe)问题解决记录_chen_zn95的博客-CSDN博客 安装好后使用以下脚本便可将caffe模型的参数名和参数…

分布式测试插件 pytest-xdist 使用详解

目录 使用背景&#xff1a; 使用前提&#xff1a; 使用快速入门&#xff1a; 使用小结&#xff1a; 使用背景&#xff1a; 大型测试套件&#xff1a;当你的测试套件非常庞大&#xff0c;包含了大量的测试用例时&#xff0c;pytest-xdist可以通过并行执行来加速整体的测试过…

js中的break和continue中的区别

js中break和continue有着一些差别。 首先&#xff0c;虽然break和continue都有跳出循环的作用&#xff0c;但break是完全跳出循环&#xff0c;而continue则是跳出一次循环&#xff0c;然后开启下一次的循环。 下面我就来举几个例子吧。 var num 0;for(var i 1;i < 10;i){i…