【入门Flink】- 10基于时间的双流联合(join)

统计固定时间内两条流数据的匹配情况,需要自定义来实现——可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

窗口联结(Window Join)

一段时间的双流合并

定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

stream1.join(stream2).where(<KeySelector>) // stream1 的 keyBy.equalTo(<KeySelector>) // stream2 的 keyBy.window(<WindowAssigner>).apply(<JoinFunction>)
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String,Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1 的keyby.equalTo(r2 -> r2.f0) // ds2 的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用 join 方法* @param first ds1 的数据* @param second ds2 的数据*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

输出:

image-20231112153403293

window join:

  1. 两条流落在同一个时间窗口范围内才能匹配
  2. 根据 keyBy 的 key,来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join

间隔联结(Interval Join)

存在如下场景:两条流匹配的两个数据有可能刚好“卡在”窗口边缘两侧,窗口内就都没有匹配了,可以使用“间隔联结”(interval join)来解决。

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)“下界”(lowerBound);可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp +upperBound], 即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:这段时间作为可以匹配另一条流数据的“窗口”范围。

匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

image-20231112154002415

stream1
.keyBy(<KeySelector>)// KeyedStream 调用   
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right,Context ctx, Collector<String> out){out.collect(left + "," + right);}
});

处理迟到数据,可以使用左右侧输出流

完整代码:

public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.socketTextStream("hadoop102", 7777).map((MapFunction<String, Tuple2<String, Integer>>) value -> {String[] datas = value.split(",");return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.socketTextStream("hadoop102", 8888).map((MapFunction<String, Tuple3<String, Integer, Integer>>) value -> {String[] datas = value.split(",");return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));/*** 【Interval join】* 1、只支持事件时间* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后* 3、process 中,只能处理 join 上的数据* 4、两条流关联后的 watermark,以两条流中最小的为准* 5、如果 当前数据的事件时间 < 当前的 watermark,就是迟到数据,主流的 process 不处理* => between 后,可以指定将 左流 或 右流的迟到数据放入侧输出流* *///1. 分别做 keyby,key 其实就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//2. 调用 interval join// 左右测输出流迟到标签OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)) // 指定上下界.sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1 的数据* @param right ks2 的数据* @param ctx 上下文* @param out 采集器*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/139871.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面向对象基础(以python语言为例)

1、定义一个类&#xff1b;实例化类的对象&#xff1b;调用类中的方法 #定义一个类 class Student:#类方法&#xff08;即函数&#xff09;def study(self,course_name):print(f学生正在学习{course_name})def play(self):print("xx学生正在玩游戏")#实例化&#xf…

Linux c/c++服务器开发实践

在Linux C开发环境中&#xff0c;通常有两种方式来开发多线程程序&#xff0c;一种是利用POSIX多线程 API函数来开发多线程程序&#xff0c;另外一种是利用C自带线程类来开发程序。 常见的与线程相关的基本API函数&#xff1a; API函数含义pthread_create创建线程pthread_exi…

从0到1实现一个前端监控系统(附源码)

目录 一、从0开始 二、上报数据方法 三、上报时机 四、性能数据收集上报 收集上报FP 收集上报FCP 收集上报LCP 收集上报DOMContentLoaded 收集上报onload数据 收集上报资源加载时间 收集上报接口请求时间 五、错误数据收集上报 收集上报资源加载错误 收集上报js错…

详解JS中的对象

1、语法 对象的声明有两种形式&#xff0c;声明&#xff08;文字&#xff09;形式和构造形式 // 文字形式 var mtObj {key:value... } // 构造形式 var myObj new Object() myObj.key value构造形式与文字形式生成的对象是一样的&#xff0c;唯一的区别在于文字形式可以添…

Linux下C++调用python脚本实现LDAP协议通过TNLM认证连接到AD服务器

1.前言 首先要实现这个功能&#xff0c;必须先搞懂如何通过C调用python脚本文件最为关键&#xff0c;因为两者的环境不同。本质上是在 c 中启动了一个 python 解释器&#xff0c;由解释器对 python 相关的代码进行执行&#xff0c;执行完毕后释放资源。 2 模块功能 2.1python…

Windows server 2008 R2 IIS搭建ASP网站教程

一、安装应用程序服务器 提示安装成功 二、添加角色服务asp 三、asp网站配置 放入源码 设置网站首页为index.asp: 设置应用程序池 四、设置网站目录属性 五、access数据库连接配置 Cd c:\Windows\System32\inetsrv appcmd list apppool /xml | appcmd set apppool /…

K9203 996920302 面向DNP3的网络安全解决方案

K9203 996920302 面向DNP3的网络安全解决方案 2014年ISA卓越技术创新奖获得者&#xff0c;超电子&#xff0c;3eTI的CyberFence工业防火墙解决方案提供强大加密和应用程序级深度数据包检测(DPI)功能。最近&#xff0c;3eTI为其CyberFence产品线增加了DNP3(分布式网络协议)支持…

替代huggingface下载大模型的站点

0. 引入 huggingface是个好网站&#xff0c;大模型的下载都得靠他&#xff0c;但是网络问题一直是一言难尽&#xff0c;所以只好找找其他替代品。目前发现的代替网站如下&#xff0c;后续如有新发现&#xff0c;还会再来更新到这里。 1. aliendao https://aliendao.cn/ 是一…

SpringBoot Web开发

SpringBoot3-Web开发 SpringBoot的Web开发能力&#xff0c;由SpringMVC提供。 Web开发的三种方式 方式处理过程注意事项实现效果全自动直接编写控制逻辑全部使用自动给配置默认效果手自一体Configuration、 配置WebMvcConfigurer、 配置WebMvcRegistrations不要标注 EnableWeb…

【Linux】:静动态库

静动态库 一.静态库1.设计静态库2.生成静态库3.发布静态库4.使用静态库 二.动态库1.设计动态库2.生成和发布动态库3.使用 一.静态库 程序在编译链接的时候把库的代码链接到可执行文件中。程序运行的时候将不再需要静态库。 静态库链接格式&#xff1a;libxxx.a(前缀是lib,后缀是…

2023-11-12

今日比较摆烂, 但是把自写管道的原理搞懂了, 主要是把 exp 完完全全看懂了, 还不错. 然后就没干啥了. 明日计划: 学校的作业. AFL 源码. 我真是服了我自己了, AFL 源码搁多久了, 操操操 然后把 seccomp 重新学习下

基于安卓android微信小程序的四六级助手系统

项目介绍 随着我国教育需求不断增加&#xff0c;高校教育资源有限&#xff0c;教育经费相对不足的情况下&#xff0c;利用现代信息技术发展高等教育&#xff0c;不仅充分利用了优秀的教育资源&#xff0c;而且为更多的人提供接受高等教育的机会&#xff0c;同时这也是极大促进…

【Unity每日一记】“调皮的协程”,协程和多线程的区别在哪里

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

网络原理-UDP/TCP详解

一. UDP协议 UDP协议端格式 由上图可以看出&#xff0c;一个UDP报文最大长度就是65535. • 16位长度&#xff0c;表示整个数据报&#xff08;UDP首部UDP数据&#xff09;的最大长度&#xff08;注意&#xff0c;这里的16位UDP长度只是一个标识这个数据报长度的字段&#xff0…

机器视觉目标检测 - opencv 深度学习 计算机竞赛

文章目录 0 前言2 目标检测概念3 目标分类、定位、检测示例4 传统目标检测5 两类目标检测算法5.1 相关研究5.1.1 选择性搜索5.1.2 OverFeat 5.2 基于区域提名的方法5.2.1 R-CNN5.2.2 SPP-net5.2.3 Fast R-CNN 5.3 端到端的方法YOLOSSD 6 人体检测结果7 最后 0 前言 &#x1f5…

单链表按位序与指定结点 删除

按位序删除(带头结点) #define NULL 0 #include<stdlib.h>typedef struct LNode {int data;struct LNode* next; }LNode, * LinkList;//按位序删除&#xff08;带头结点&#xff09; bool ListInsert(LinkList& L, int i, int& e) {if (i < 1)return false;L…

iPhone或在2024开放第三方应用商店。

iPhone或开放第三方应用商店&#xff0c;可以说这是一个老生常谈的话题。对于像是iOS这样封闭的系统来说&#xff0c;此前传出苹果可能开放侧载消息的时候&#xff0c;又有谁能信&#xff0c;谁会信&#xff1f; 如果是按照苹果自身的意愿&#xff0c;这种事情自然是不可能发生…

RestTemplate.postForEntity 方法进行 HTTP POST 请求

RestTemplate 是 Spring Framework 提供的一个用于处理 HTTP 请求的客户端工具。其中&#xff0c;postForEntity 是 RestTemplate 提供的用于发送 HTTP POST 请求并返回 ResponseEntity 对象的方法。 public <T> ResponseEntity<T> postForEntity(String url, Obj…

Antd React Form.Item内部是自定义组件怎么自定义返回值

在线演示https://stackblitz.com/edit/stackblitz-starters-xwtwyz?filesrc%2FSelfTreeSelect.tsx 需求 当我们点击提交,需要返回用户名和选中树的id信息,但是,我不关要返回树的id信息,还需要返回选中树的名称 //默认返回的 {userName:梦洁,treeInfo:leaf1-value } //但是需…

SpringBoot项目调用openCV报错:nested exception is java.lang.UnsatisfiedLinkError

今天在通过web项目调用openCV的时候提示如下错误&#xff1a; nested exception is java.lang.UnsatisfiedLinkError:org.opencv.imgcodecs.Imgcodecs.imread_0(Ljava/la如下图所示&#xff1a; 但是通过直接启动java main函数确正常&#xff0c;初步诊断和SpringBoot热加载…