Flink多流转换(2)—— 双流连结

双流连结(Join):根据某个字段的值将数据联结起来,“配对”去做处理

窗口联结(Window Join)

可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理

代码逻辑

首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where().equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算

stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)

对于JoinFunction

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {OUT join(IN1 first, IN2 second) throws Exception;
}

使用时需要实现内部的join方法,有两个参数,分别表示两条流中成对匹配的数据

JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑

实现流程

两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果

实例分析

// 基于窗口的join
public class WindowJoinTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Tuple2<String, Long>> stream1 = env.fromElements(Tuple2.of("a", 1000L),Tuple2.of("b", 1000L),Tuple2.of("a", 2000L),Tuple2.of("b", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStream<Tuple2<String, Long>> stream2 = env.fromElements(Tuple2.of("a", 3000L),Tuple2.of("b", 3000L),Tuple2.of("a", 4000L),Tuple2.of("b", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.join(stream2).where(r -> r.f0).equalTo(r -> r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {@Overridepublic String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {return left + "=>" + right;}}).print();env.execute();}
}

运行结果如下:

间隔联结(Interval Join)

统计一段时间内的数据匹配情况,不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧

因此需要采用”间隔联结“的操作,针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);

于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围

所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果

匹配条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

举例分析如下:

下方的流 A 去间隔联结上方的流 B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒,上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为 3 的元素,可匹配区间为[1, 4],B 中只有时间戳为 1 的一个数据可以匹配,于是得到匹配数据对(3, 1)

代码逻辑

基于KeyedStream进行联结操作:


①DataStream 通过 keyBy 得到KeyedStream

②调用.intervalJoin()来合并两条流,得到的是一个 IntervalJoin 类型

③通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作

④调用.process()需要传入一个处理函数:ProcessJoinFunction

stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});

实例分析

// 基于间隔的join
public class IntervalJoinTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(Tuple3.of("Mary", "order-1", 5000L),Tuple3.of("Alice", "order-2", 5000L),Tuple3.of("Bob", "order-3", 20000L),Tuple3.of("Alice", "order-4", 20000L),Tuple3.of("Cary", "order-5", 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {return element.f2;}}));SingleOutputStreamOperator<Event> clickStream = env.fromElements(new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Alice", "./prod?id=200", 3500L),new Event("Bob", "./prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 36000L),new Event("Bob", "./home", 30000L),new Event("Bob", "./prod?id=1", 23000L),new Event("Bob", "./prod?id=3", 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));orderStream.keyBy(data -> data.f0).intervalJoin(clickStream.keyBy(data -> data.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {@Overridepublic void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {out.collect(right + " => " + left);}}).print();env.execute();}}

运行结果如下:

窗口同组联结(Window CoGroup)

用法跟 window join非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了

代码逻辑

stream1.coGroup(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(<CoGroupFunction>)

与 window join 的区别在于,调用.apply()方法定义具体操作时,传入的是一个CoGroupFunction

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

coGroup方法与FlatJoinFunction.join()方法类似,其中的三个参数依旧是两条流中的数据以及用于输出的收集器;但前两个参数不再是单独的配对数据,而是可遍历的数据集合;

因此该方法中直接把收集到的所有数据一次性传入,然后自定义配对方式,不需要再计算窗口中两条流数据集的笛卡尔积;

实例分析

// 基于窗口的join
public class CoGroupTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Tuple2<String, Long>> stream1 = env.fromElements(Tuple2.of("a", 1000L),Tuple2.of("b", 1000L),Tuple2.of("a", 2000L),Tuple2.of("b", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStream<Tuple2<String, Long>> stream2 = env.fromElements(Tuple2.of("a", 3000L),Tuple2.of("b", 3000L),Tuple2.of("a", 4000L),Tuple2.of("b", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.coGroup(stream2).where(r -> r.f0).equalTo(r -> r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {@Overridepublic void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {collector.collect(iter1 + "=>" + iter2);}}).print();env.execute();}
}

运行结果如下:

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/646841.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

五、垃圾回收

1. 垃圾回收基础 1.1 什么是垃圾 简单说就是&#xff1a;内存中已经不再被使用到的内存空间就是垃圾。 1.2 如何判定是垃圾 1.2.1 引用计数法 引用计数法&#xff1a;给对象添加一个引用计数器&#xff0c;有访问就 1&#xff0c;引用失效就 -1 引用计数法的优缺点&#…

抖音信息流广告引流,这种方法你要知道-数灵通

随着抖音的普及&#xff0c;我们经常会在刷视频的过程中遇到各种广告。这些广告不仅种类繁多&#xff0c;而且形式各异。除了常见的开屏广告和达人合作广告&#xff0c;信息流广告也是抖音广告的一种重要形式。那么&#xff0c;什么是信息流广告呢&#xff1f; 信息流广告是一种…

03. 静态路由

文章目录 一. 静态路由概述1.1. 概述1.2. 路由信息获取方式1.3. 路由表的参数1.4. 路由协议的优先级1.5. 最优路由条目优先1.6. 最长前缀匹配原则 二. 实验实操2.1. 实验1&#xff1a;静态路由2.1.1. 实验目的2.1.2. 实验拓扑图2.1.3. 实验步骤&#xff08;1&#xff09;配置网…

数据结构——双链表

双链表中节点类型的描述&#xff1a; 双链表的初始化&#xff08;带头结点&#xff09; 、 双链表的插入操作 后插操作 InsertNextDNode(p, s): 在p结点后插入s结点 按位序插入操作&#xff1a; 思路&#xff1a;从头结点开始&#xff0c;找到某个位序的前驱结点&#xff…

周鸿祎回应坚定支持华为:因为 360 也被制裁了

在昨天的华为鸿蒙生态千帆启航仪式上&#xff0c;360集团创始人兼CEO周鸿祎发表演讲表示&#xff0c;360坚定地支持华为的决定源于双方都曾遭到制裁。周鸿祎在演讲中提到&#xff1a;“在华为最早被制裁的时候&#xff0c;我们是少数几个公开站出来坚定支持华为的公司。其实也很…

如何进行H.265视频播放器EasyPlayer.js的中性化设置?

H5无插件流媒体播放器EasyPlayer属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;可支持H.264与H.265编码格式&#xff0c;性能稳定、播放流畅&#xff0c;能支持WebSocket-FLV、HTTP-FLV&#xff0c;HLS&#xff08;m3u8&#…

【域名解析】如何将域名指向对应服务器IP

目录 &#x1f337;一、域名解析基本概念 &#x1f33c;1. 定义 &#x1f33c;2. 域名解析类型 &#x1f337;二、域名解析服务器IP地址 &#x1f33c;1. 操作步骤 &#x1f33c;2. 验证 &#x1f337;一、域名解析基础知识 &#x1f33c;1. 基本概念 定义&#xff1a; …

luceda ipkiss教程 58:输出器件的版图和三维模型

在ipkiss中&#xff0c;通过visualize_3d_povray可以输出包含器件的三维模型参数的.pov文件&#xff0c;再通过POV-Ray&#xff08;免费软件&#xff0c;下载地址&#xff1a;https://www.povray.org/download/&#xff09;就可以查看器件的三维模型。 如&#xff1a; 代码如…

如何安装MeterSphere并实现无公网ip远程访问服务管理界面

文章目录 前言1. 安装MeterSphere2. 本地访问MeterSphere3. 安装 cpolar内网穿透软件4. 配置MeterSphere公网访问地址5. 公网远程访问MeterSphere6. 固定MeterSphere公网地址 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通…

vivado 接口、端口映射

接口 重要&#xff01;接口只能在“fpga”类型的&#xff1c;component&#xff1e;中定义。接口部分提供了<component>上所有可用物理接口的列表。<interfaces>部分包含嵌套在其中的一个或多个<interface>标记。一个接口是通过使用<port_map>标记由多…

Spring 声明式事务讲解,和 @Transactional注解的用法

目录 一、Spring框架介绍 二、什么是声明式事务 三、如何解决并发性事务问题 四、Transactional注解的用法 一、Spring框架介绍 Spring框架是一个开源的Java应用程序开发框架&#xff0c;旨在简化企业级Java应用程序的开发。它提供了一种轻量级的、全面的编程和配置模型&a…

进阶C语言-自定义类型

为了便于描述复杂的对象,C语言就支持了自定义类型&#xff0c;其中包括了结构体、枚举和联合体&#xff0c;下面将为大家一一介绍。 自定义类型 &#x1f388;1.结构体&#x1f50e;1.1结构的基础知识&#x1f50e;1.2结构的声明&#x1f50e;1.3特殊的声明&#x1f50e;1.4结构…

基于springboot在线学习平台源码和论文

在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括学习平台的网络应用&#xff0c;在外国学习平台已经是很普遍的方式&#xff0c;不过国内的管理平台可能还处于起步阶段。学习平台具有学习信息管理功能的选择。学习平台采用ja…

每天掌握一个软测高级技巧:接口自动化神器apin进阶操作

之前写了一篇关于接口自动化框架 apin 入门使用是文章&#xff0c;主要介绍了 apin 的安装以及用例编写的方法。 今天这篇文章来给大家聊聊&#xff0c;apin 中的一些高级使用技巧。比如依赖接口的变量提取和引用&#xff0c;用例断言&#xff0c;以及函数工具的使用。 01 变…

web安全学习笔记【09】——算法2

基础[1] 入门-算法逆向&散列对称非对称&JS源码逆向&AES&DES&RSA&SHA #知识点&#xff1a; 1、Web常规-系统&中间件&数据库&源码等 2、Web其他-前后端&软件&Docker&分配站等 3、Web拓展-CDN&WAF&OSS&反向&负载…

Web09--jQuery基础

1、jQuery概述 1.1 什么是jQuery jQuery是一款优秀的JavaScript的轻量级框架之一&#xff0c;封装了DOM操作、事件绑定、ajax等功能。特别值得一提的是基于jQuery平台的插件非常丰富&#xff0c;大多数前端业务场景都有其封装好的工具可直接使用。 jQuery下载和版本介绍 官…

Make.com的发送邮件功能已经登峰造极

make.com的发送邮件功能已经做到了登峰造极。 我给你个任务&#xff0c;让你发送个新邮件给谁谁&#xff0c;你一定想到SMTP服务器不就行了。 我给你第二个任务&#xff0c;我让你自动回复一个邮件&#xff0c;注意是回复。 做不到了吧&#xff5e;&#xff5e;&#xff01;…

TS基础知识点快速回顾(上)

基础介绍 什么是 TypeScript&#xff1f; TypeScript&#xff0c;简称 ts&#xff0c;是微软开发的一种静态的编程语言&#xff0c;它是 JavaScript 的超集。 那么它有什么特别之处呢? js 有的 ts 都有&#xff0c;所有js 代码都可以在 ts 里面运行。ts 支持类型支持&#…

一篇部署frp

利用宝塔第三方插件安装Frp穿透 参考网址&#xff1a;https://blog.csdn.net/qq_17754023/article/details/127438606 宝塔官方第三方插件下载 https://www.bt.cn/bbs/forum.php?modattachment&aidMzQ5MDF8MTBmM2E3YTh8MTYxNDk1MTY4MXwwfDM1OTY3 网盘下载&#xff1a; …

HTTP 基本概念

1. HTTP &#xff08;Hypertext Transfer Protocol&#xff09;超文本传输协议&#xff0c;是互联网上应用最为广泛的协议之一。 小林coding的解析特别通俗易懂 https://xiaolincoding.com/network/2_http/http_interview.html#http-%E6%98%AF%E4%BB%80%E4%B9%88 协议&#…