43、Flink 的 Window Join 详解

1.Window Join
a)概述

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上,窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。

两个流中的元素在组合之后,会被传递给用户定义的 JoinFunctionFlatJoinFunction,可以用它们输出符合 join 要求的结果。

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)

注意

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。
b)滚动 Window Join

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunctionFlatJoinFunction

行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

在这里插入图片描述

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口,将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction

注意:滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
c)滑动 Window Join

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunctionFlatJoinFunction,当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!

注意:在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

在这里插入图片描述

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素;图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
d)会话 Window Join

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunctionFlatJoinFunction,这个操作同样是 inner join,如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

在这里插入图片描述

定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction,而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/17778.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

外汇天眼:野村证券和Laser Digital与GMO互联网集团合作发行日元和美元稳定币

野村控股和Laser Digital将与GMO互联网集团合作&#xff0c;在日本探索发行日元和美元稳定币。GMO互联网集团的美国子公司GMO-Z.com Trust Company, Inc. 在纽约州金融服务部的监管框架下&#xff0c;在以太坊、恒星币和Solana等主要区块链上发行稳定币。GMO-Z.com Trust Compa…

MySQL增删查改进阶

数据库约束表的关系增删查改 目录 一.数据库约束类型 NOT NULL约束类型 UNIQUE 唯一约束 DEFAULT 默认值约束 PRIMARY KEY&#xff1a;主键约束 FOREIGN KEY :W外键约束 二&#xff0c;查询 count&#xff08;&#xff09;两种用法 sum&#xff0c;avg&#xff0c;max…

Vue3_创建项目

目录 一、创建vue项目 1.下载vue 2.进入刚才创建的项目 3.安装依赖 4.运行项目 ​5.打包项目放入生产环境 二、vue项目组成 1.项目文件结构 2.项目重要文件 Vue (发音为 /vjuː/&#xff0c;类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、C…

Go语言中实现RSA加解密、签名验证算法

随着互联网的高速发展&#xff0c;人们对安全的要求也越来越高。密码学中两大经典算法&#xff0c;一个是对称加解密&#xff0c;另一个是非对称加解密&#xff0c;这里就来分享一下非对称加密算法的代表&#xff1a;RSA加解密。 在Go语言中实现RSA加解密还是比较简单的&#…

【安全产品】基于HFish的MySQL蜜罐溯源实验记录

MySQL蜜罐对攻击者机器任意文件读取 用HFish在3306端口部署MySQL蜜罐 配置读取文件路径 攻击者的mysql客户端版本为5.7(要求低于8.0) 之后用命令行直连 mysql -h 124.222.136.33 -P 3306 -u root -p 可以看到成功连上蜜罐的3306服务&#xff0c;但进行查询后会直接lost con…

ai机器人电销资源有哪些?真的能帮到我们提高效率吗ai智能语音机器人部署

随着互联网科技的发展&#xff0c;各种各样的科技产物都应用于电销企业&#xff0c;ai机器人电销就是其中一个。那么ai机器人电销可靠吗&#xff1f;ai机器人电销资源有哪些&#xff1f;我们一起来看看。 AI机器人在电销资源方面有以下一些用途和功能&#xff1a; 自动识别潜在…

for循环绑定id,更新html页面的文字内容

需求&#xff1a;将方法中内容对齐 实现方式 给for循环中每个方法添加一个动态的id在DOM结果渲染完后&#xff0c;更新页面数据&#xff0c;否则会报错&#xff0c;找不到对应节点或对应节点为空 <view v-for"(item, index) in itemList" :key"index"…

OWASP十大API漏洞解析:如何抵御Bot攻击?

新型数字经济中&#xff0c;API是物联网设备、Web和移动应用以及业务合作伙伴流程的入口点。然而&#xff0c;API也是犯罪分子的前门&#xff0c;许多人依靠Bot来发动攻击。对于安全团队来说&#xff0c;保护API并缓解Bot攻击至关重要。那么Bot在API攻击中处于怎样的地位&#…

香橙派Kunpeng Pro测评

目录 香橙派Kunpeng Pro开发板试用体验 观察理解与适用场景 体验步骤 试用感受

【ARM+Codesys案例】T3/RK3568/树莓派+Codesys枕式包装机运动控制器

枕式包装机是一种包装能力非常强&#xff0c;且能适合多种规格用于食品和非食品包装的连续式包装机。它不但能用于无商标包装材料的包装&#xff0c;而且能够使用预先印有商标图案的卷筒材料进行高速包装。同时&#xff0c;具有稳定性高、生产效率高&#xff0c;适合连续包装、…

C语言 数组—— 一维数组下标越界问题分析

目录 数组元素的访问 一维数组元素的越界访问 二维数组元素的越界访问 小结 数组元素的访问 访问数组元素时&#xff0c; 下标越界 是大忌&#xff01;  编译器通常不检查下标越界&#xff0c;导致程序运行时错误  下标越界&#xff0c;将访问数组以外的空间  …

pyqt窗体水印

pyqt窗体水印 介绍效果代码 介绍 给窗体加上水印 效果 代码 import sys from PyQt5.QtWidgets import QApplication, QMainWindow from PyQt5.QtGui import QPainter, QColor, QFont,QPen from PyQt5.QtCore import Qtclass WatermarkedWindow(QMainWindow):def __init__(se…

鸿蒙4.2小版本推出,鸿蒙5.0已经不远了

上个月&#xff0c;市场上迎来了华为鸿蒙系统4字开头的小升级&#xff0c;版本来到了4.2版本。 我们先来看看4.2版本都给用户带来哪些特色&#xff1a; 界面切换更流畅&#xff1a;无论是响应速度还是操作手感&#xff0c;用户都将感受到更加迅速和顺滑的体验 搜星速度的显著…

工具:Visual Studio Code

一、VSCode生成exe 二、在vs中断点调试 如果没效果需要安装如下与unity相连接的插件 三、注释 1、代码注释 注释和取消都是都是同一个命令&#xff1a;选中代码&#xff0c;然后按住CtrlShift/ 2、方法或类注释 /// 四、导航 五、将变量注释展示到解释面板 1、直接显示 [Too…

Git提交时出现Merge branch ‘master‘ of ...之解决方法

最近遇到了一个问题 我是用git提交代码的时候 分支上 显示的是merge 意思是 合并代码了 每次都会 创建一个分支 因为我和另一个小伙伴共同 开发一个项目 所以 小伙伴告诉我 总是创建新的分支 我细细看了一下 测试了一下 我们两个人 修改不同的文件 同时修改 他提交了 我再提…

pip安装软件包提示“没有那个文件或目录”问题的处理

文章目录 一、Python.h&#xff1a;没有那个文件或目录二、lber.h&#xff1a;没有那个文件或目录 一、Python.h&#xff1a;没有那个文件或目录 pip install -I python-ldap3.0.0b1 #异常提示In file included from Modules/LDAPObject.c:3:0:Modules/common.h:9:20: 致命错…

React hooks - useLayoutEffect

useLayoutEffect 用法区别 用法 useLayoutEffect 和 useEffect 的使用方式很相似&#xff1a; useLayoutEffect 接收一个函数和一个依赖项数组作为参数只有在数组中的依赖项发生改变时才会再次执行副作用函数useLayoutEffect 也可以返回一个清理函数 useEffect(()>{retur…

【NVM】持久内存的架构

1 内存数据持久化 1.1 数据持久化 持久内存系统包含如下关键组件&#xff1a;微处理器、连接微处理器内存总线上的持久内存模组&#xff08;Persistent MemoryModule&#xff0c;PMM&#xff09;及持久内存上的非易失性存储介质。 使用持久内存来实现数据的持久化&#xff0c…

SpringCloud系列(22)--Ribbon默认负载轮询算法原理及源码解析

前言&#xff1a;在上一篇文章中我们介绍了如何去切换Ribbon的负载均衡模式&#xff0c;而本章节内容则是介绍Ribbon默认负载轮询算法的原理。 1、负载轮询算法公式 rest接口第N次请求数 % 服务器集群总数 实际调用服务器下标&#xff08;每次服务器重启后rest接口计数从1开始…

爬虫在金融领域的应用:股票数据收集

介绍 在金融领域&#xff0c;准确及时的数据收集对于市场分析和投资决策至关重要。股票价格作为金融市场的重要指标之一&#xff0c;通过网络爬虫技术可以高效地从多个网站获取实时股票价格信息。本文将介绍网络爬虫在金融领域中的应用&#xff0c;重点讨论如何利用Scrapy框架…