[尚硅谷 flink] 基于时间的合流——双流联结(Join)

文章目录

      • 8.1 窗口联结(Window Join)
      • 8.2 **间隔联结(Interval Join)**

8.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

package org.example.watermark;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key,来进行匹配关联// 3. 只能拿到匹配上的数据,类似有固定时间范围的inner joinDataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0)  // ds1的keyby.equalTo(r2 -> r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用join方法* @param first  ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

8.2 间隔联结(Interval Join)

  • 间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
image.png
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
  • 处理迟到数据
 //2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag)  // 将 ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left  ks1的数据* @param right ks2的数据* @param ctx   上下文* @param out   采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/798103.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实现通讯录(顺序表版本)

一、功能要求 &#xff08;1&#xff09;⾄少能够存储100个⼈的通讯信息 &#xff08;2&#xff09;能够保存⽤⼾信息&#xff1a;名字、性别、年龄、电话、地址等 &#xff08;3&#xff09;增加联系⼈信息 &#xff08;4&#xff09;删除指定联系⼈ &#xff08;5&#…

X年后,ChatGPT会替代底层程序员吗?

能不能替代&#xff0c;真的很难说&#xff0c;因为机器换掉人&#xff0c;这其实是一个伦理问题。 其实说白了&#xff0c;任何行业在未来都会被AI或多或少的冲击到&#xff0c;因为ChatGPT做为一个可以持续提升智能的AI&#xff0c;在某些方面的智能程度超过人类并不是什么难…

数据结构-基本概念

1.什么是数据结构&#xff1f; 数据 数据&#xff0c;是对客观事物的符号表示&#xff0c;在计算机科学中是指所有能输入到计算机中并被计算机程序处理的符号的总称。 结构 &#xff08;1&#xff09;线性结构&#xff08;比如图书目录文件&#xff0c;一对一的关系&#x…

Unity Meta Quest MR 开发(五):空间锚点

文章目录 &#x1f4d5;教程说明 此教程相关的详细教案&#xff0c;文档&#xff0c;思维导图和工程文件会放入 Spatial XR 社区。这是一个高质量 XR 开发者社区&#xff0c;博主目前在内担任 XR 开发的讲师。该社区提供专人答疑、完整进阶教程、从零到一项目孵化保姆服务&…

文件及相关文件函数的使用

目录 前言 文件的概述 文件类型 如何用C语言来操作文件呢&#xff1f; 文件缓冲区 文件的打开与关闭 文件的打开——fopen&#xff08;&#xff09;函数 文件的关闭——fclose&#xff08;&#xff09;函数 文件结束检测——feof()函数 文件的顺序读写 文件字符输出函…

kubernetes集群添加到jumpserver堡垒机里管理

第一步、在kubernetes集群中获取一个永久的token。 jumpserver堡垒机用api的来管理kubernetes&#xff0c;所以需要kubernetes的token&#xff0c;这个token还需要是一个永久的token&#xff0c;版本变更&#xff1a;Kubernetes 1.24基于安全方面的考虑&#xff08;特性门控Le…

JVM字节码与类的加载——class文件结构

文章目录 1、概述1.1、class文件的跨平台性1.2、编译器分类1.3、透过字节码指令看代码细节 2、虚拟机的基石&#xff1a;class文件2.1、字节码指令2.2、解读字节码方式 3、class文件结构3.1、魔数&#xff1a;class文件的标识3.2、class文件版本号3.3、常量池&#xff1a;存放所…

在 K8s 上跑腾讯云 Serverless 函数,打破传统方式造就新变革

目录 目录 前言 Serverless 和 K8s 的优势 1、关于Serverless 函数的特点 2、K8s 的特点 腾讯云 Serverless 函数在 K8s 上的应用对企业服务的影响 1、弹性扩展和高可用性 2、成本优化和资源利用 3、简化部署和管理 拓展&#xff1a;腾讯云云函数 SCF on K8s 番外篇…

HarmonyOS实战开发-如何实现蓝牙设备发现、配对、取消配对功能。

介绍 蓝牙技术是一种无线数据和语音通信开放的全球规范&#xff0c;它是基于低成本的近距离无线连接&#xff0c;为固定和移动设备建立通信环境的一种特殊的近距离无线技术连接。本示例通过ohos.bluetooth 接口实现蓝牙设备发现、配对、取消配对功能。实现效果如下&#xff1a…

计算机网络 实验指导 实验9

实验9 三层交换机综合实验 1.实验拓扑图 名称相连的接口IP地址网关PC1F0/3172.1.1.2/28172.1.1.1/28PC2F0/4172.1.1.18/28172.1.1.17/28PC3F0/5172.1.1.34/28172.1.1.33/28PC4F0/3172.1.1.3/28172.1.1.1/28PC5F0/4172.1.1.19/28172.1.1.17/28PC6F0/5172.1.1.35/28172.1.1.33/2…

ROS 2边学边练(16)-- 自定义msg和srv文件

前言 在前面的文章我们在学习主题&#xff08;topic&#xff09;和服务&#xff08;service&#xff09;通信方法时&#xff0c;使用的一直是ROS 2提供好的消息结构文件&#xff08;xxx.msg&#xff09;和服务结构文件&#xff08;xxx.srv&#xff09;&#xff0c;稀里糊涂的就…

编程新手必看,学习python中列表数据类型内容(9)

Python中的列表是一种可变的、有序的数据结构&#xff0c;它允许存储不同类型的数据项&#xff0c;并支持多种操作。具体如下&#xff1a; 创建列表&#xff1a;可以通过一对方括号[]来创建一个空列表&#xff0c;或者在方括号内放入初始元素来创建一个非空列表。例如&#xff…

链表实验.

#include<stdio.h> #include<stdlib.h>// 定义单链表节点结构体 struct Node {int data;struct Node* next; };struct Node* initList() {struct Node* list (struct Node*)malloc(sizeof(struct Node));list->data 0;list->next NULL;return list; }void…

特征提取算法

特征提取算法 0. 写在前边1. Harris算法1.1 写在前面1.2 Harris算法的本质1.3 Harris算法的简化 2. Harris3D2.1 Harris3D算法问题定义2.2 Harris3D with intensity2.3 Harris3D without intensity 3. ISS特征点的应用 0. 写在前边 本篇将介绍几种特征提取算法&#xff0c;特征…

笔记 | 软件工程:需求分析

1 需求分析 #需求分析 1.1 需求分析概述 初步软件需求存在的问题&#xff1a;不具体&#xff0c;不清晰&#xff0c;关系不明朗&#xff0c;存在潜在缺陷&#xff0c;没有区分不同软件需求&#xff08;有必要鉴别不同软件需求项的重要性差别&#xff0c;区分不同软件需求的开…

【C语言】——指针八:指针运算笔试题解析

【C语言】——指针八&#xff1a;指针运算笔试题解析 一、题一二、题二三、题三四、题四五、题五六、题六七、题七 一、题一 //程序输出结果是什么 int main() {int a[5] { 1,2,3,4,5 };int* ptr (int*)(&a 1);printf("%d, %d", *(a 1), *(ptr - 1));return…

揭开“栈和队列”的神秘面纱

前言 在线性表中不止有顺序表和链表&#xff0c;今天的主角就如标题所说--->认识栈和队列。把他们俩放一起总结是有原因的&#xff0c;还请看官听我娓娓道来~ 什么是栈&#xff1f; 栈&#xff08;stack&#xff09;是限定仅在表尾进行插入和删除操作的线性表 咱可以把栈理…

Pro版 v3.0首页DIY热区神器,让图片更全能!

Pro版v3.0新增了很多新功能&#xff0c;为了方便大家能快速了解使用&#xff0c;今天&#xff0c;我们就开始来逐步了解这些新功能。 在此次更新中&#xff0c;Pro版的首页DIY功能实现了全新重构升级&#xff0c;新增了DIY热区、视频、排行榜、积分商城、预售、签到组件&#…

【Hadoop技术框架-MapReduce和Yarn的详细描述和部署】

前言&#xff1a; &#x1f49e;&#x1f49e;大家好&#xff0c;我是书生♡&#xff0c;今天的内容主要是Hadoop的后两个组件&#xff1a;MapReduce和yarn的相关内容。同时还有Hadoop的完整流程。希望对大家有所帮助。感谢大家关注点赞。 &#x1f49e;&#x1f49e;前路漫漫&…

深度学习实战73-基于多模态CLIP模型的实战项目,CLIP模型的架构介绍与代码实现

大家好,我是微学AI,今天给大家介绍一下深度学习实战73-基于多模态CLIP模型的实战项目,CLIP模型的架构介绍与代码实现。多模态CLIP(Contrastive Language-Image Pre-training)模型是一种深度学习模型,其核心设计理念是通过大规模的对比学习训练,实现图像与文本之间的跨模…