java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景

之前我们在一片文章里简单介绍过Flink的多流合并算子

java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join

今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。

Interval Join不是两个窗口做关联,更适用于处理乱序数据流之间的关联。它的作用更类似于从左流中a元素本身出发,对右流中一段时间内的数据进行关联(Inner Join:只关联相同Key的数据)。

如图所示:

 下边这条流中的2关联到上范围内的0/1

源码解析

Flink版本1.14.4

按住Ctrl+鼠标左键,点击process进入源码

 这里process方法是在KeydStream.java下IntervalJoined类下的方法

 包装返回类型的TypeInfomation(TypeInfo的介绍可以看上一篇)

 返回的outputType

SingleOutputStreamOperator使用给定的用户函数完成联接操作,该函数针对每个联接的元素对执行。这种方法允许传递输出类型的显式类型信息。

 IntervalJoinOperator初始化

左边界<=右边界检查

获取左流还有右流数据对应的序列化(从TypeInfo获取的)

继续看IntervalJoinOperator中的其余关键实现

open方法用来注册定时器

 初始化两个流的map状态

处理左侧流中的数据。每当数据到达左流时,它就会被添加到左缓冲区。将从右侧缓冲区中查找该元素可能的候选联接,如果该对位于用户定义的边界内,则将其传递给 ProcessJoinFunction

​ 同理处理右流

进入数据处理函数

获取数据,取出事件时间

 超过当前watermark的数据进行过滤

 

数据没问题的话,将数据添加到状态

 ​​

遍历另一条流的状态,遍历其中的数据,把满足时间要求的数据进行collect

​注册一个当前事件时间戳+右边界的定时器

定时器触发后,清空map状态中时间戳-左边界的那条数据

简单实例 

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkCode</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><jdk.version>1.8</jdk.version><jar.name>ubs-data-converter</jar.name><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.2</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${jdk.version}</source><target>${jdk.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${jar.name}</finalName><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.glassfish.jersey.core:jersey-common</exclude></excludes></artifactSet><relocations><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.shade.apache.kafka</shadedPattern></relocation></relocations><filters><filter><artifact>*</artifact><includes><include>org/apache/htrace/**</include><include>org/apache/avro/**</include><include>org/apache/flink/streaming/**</include><include>org/apache/flink/connector/**</include><include>org/apache/kafka/**</include><include>org/apache/hive/**</include><include>org/apache/hadoop/hive/**</include><include>org/apache/curator/**</include><include>org/apache/zookeeper/**</include><include>org/apache/jute/**</include><include>org/apache/thrift/**</include><include>org/apache/http/**</include><include>org/I0Itec/**</include><include>jline/**</include><include>com/yammer/**</include><include>kafka/**</include><include>org/apache/hadoop/hbase/**</include><include>com/alibaba/fastjson/**</include><include>org/elasticsearch/action/**</include><include>io/confluent/**</include><include>com/fasterxml/**</include><include>org/elasticsearch/**</include><include>hbase-default.xml</include><include>hbase-site.xml</include></includes></filter><filter><artifact>org.apache.hadoop.hive.*:*</artifact><excludes><exclude></exclude><exclude></exclude><exclude></exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

user bean

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class User{Integer id;Long t;}

 order bean 

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Order {Integer id;Long price;Long time;}

main

package ubs.app.intervaljoin;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import ubs.app.intervaljoin.bean.Order;
import ubs.app.intervaljoin.bean.User;
import ubs.app.intervaljoin.source.OrderSource;
import ubs.app.intervaljoin.source.UserSource;import java.time.Duration;public class IntervalJoinApp  {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置watermarkWatermarkStrategy<User> userWatermarkStrategy = WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<User>() {@Overridepublic long extractTimestamp(User element, long recordTimestamp) {return element.getT();}});DataStream<User> userDataStreamSource = env.addSource(new UserSource()).assignTimestampsAndWatermarks(userWatermarkStrategy);//设置watermarkWatermarkStrategy<Order> orderWatermarkStrategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Order>() {@Overridepublic long extractTimestamp(Order element, long recordTimestamp) {return element.getTime();}});DataStream<Order> orderDataStreamSource = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(orderWatermarkStrategy);env.setParallelism(1);SingleOutputStreamOperator<String> process = userDataStreamSource.keyBy(o -> o.getId()).intervalJoin(orderDataStreamSource.keyBy(o -> o.getId())).between(Time.seconds(-5), Time.seconds(0)).process(new ProcessJoinFunction<User, Order, String>() {@Overridepublic void processElement(User left, Order right, ProcessJoinFunction<User, Order, String>.Context ctx, Collector<String> out) throws Exception {Integer lid = left.getId();Long lt = left.getT();Integer rid = right.getId();long rt = right.getTime();out.collect(String.format("左%s 左时间%s 右%s 右时间%s 关联到了 %s", lid, lt/1000, rid, rt/1000, rt/1000-lt/1000));}});process.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/763130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云通过脚本直接修改SSH配置来允许root登录并启用密码认证

看起来你想要通过脚本直接修改SSH配置来允许root登录并启用密码认证。然而&#xff0c;PermitRootLogin和PasswordAuthentication这两个指令是/etc/ssh/sshd_config文件中的配置选项&#xff0c;不能直接通过set命令在bash脚本中设置。set命令在shell脚本中用于设置或取消设置s…

2.Redis有五种主要的数据类型

Redis有五种主要的数据类型 String&#xff08;字符串&#xff09;&#xff1a;String类型是最简单的数据类型&#xff0c;可以存储任意类型的数据&#xff0c;例如整数、浮点数、字符串等。String类型支持一些基本的操作&#xff0c;如设置值、获取值、增减值等。 Hash&#…

论文笔记:Llama 2: Open Foundation and Fine-Tuned Chat Models

导语 Llama 2 是之前广受欢迎的开源大型语言模型 LLaMA 的新版本&#xff0c;该模型已公开发布&#xff0c;可用于研究和商业用途。本文记录了阅读该论文的一些关键笔记。 链接&#xff1a;https://arxiv.org/abs/2307.09288 1 引言 大型语言模型&#xff08;LLMs&#xff…

cesium Clock JulianDate 日照分析

cesium在初始化的时候会自动把Clock对象挂载到容器上Clock内部以JulianDate维护时间&#xff0c;比北京时间慢8个小时&#xff0c;想显示北京时间需要计算时差JulianDate的日期部分和秒数部分是分开的 julianDayNumber&#xff1a;指整数天&#xff0c;记录从公元前4713年正午以…

GO 语言基础学习记录

一&#xff1a;声明变量 在golang语言中声明变量的方式 package main import "fmt" func main() { var a int 3 //关键字 var 变量名 变量指定类型 变量值 var b int //关键字 var 变量名 变量指定类型(注意:当变量没赋值时是按照变量…

【蓝桥杯】第15届蓝桥杯青少组stema选拔赛C++中高级真题答案(20240310)

一、选择题 第 1 题 第 2 题 表达式1000/3的结果是( A )。 A.333 B.333.3 C.334 D.333.0 第 3 题 下列选项中&#xff0c;判断a等于1并且b等于1正确的表达式是( B )。 A.!((a!1)&&(b!1)) B.!((a!1)||(b!1)) C.!(a1)&&(b1) D.(a1)&&(b1) 【解析】 A…

面试(一)

一. 说一下进程和线程的区别&#xff1f; (1)进程是资源分配的最小单位&#xff0c;线程是CPU调度的最小单位。 (2)线程是进程的一部分&#xff0c;一个线程只能属于一个进程&#xff0c;一个进程可以有多个线程&#xff0c;但至少有一个线程。 (3)进程有自己独立地址空间&a…

我的春招求职面经

智能指针在面试时经常被问到&#xff0c;最近自己也在写&#xff0c;有一点思考&#xff0c;于是找到了这样一个题目&#xff0c;可以看看&#xff0c;上面这个代码有什么问题&#xff1f;留言区说出你的答案吧&#xff01; 最后分享一下之前的实习->春招->秋招等文章汇总…

可以完成80%的数据分析工作的20个Pandas函数

Pandas 是数据科学社区中使用最广泛的库之一&#xff0c;它是一个强大的工具&#xff0c;可以进行数据操作、清理和分析。本文将提供最常用的 Pandas 函数以及如何实际使用它们的样例。我们将涵盖从基本数据操作到高级数据分析技术的所有内容&#xff0c;到本文结束时&#xff…

huggingface的transformers训练bert

目录 理论 实践 理论 https://arxiv.org/abs/1810.04805 BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;是一种自然语言处理&#xff08;NLP&#xff09;模型&#xff0c;由Google在2018年提出。它是基于Transformer模型的预训练方法…

使el-table通过操控鼠标滚轮横向滚动

1.创建directive文件夹&#xff0c;里面创建directive.js文件 import Vue from vue;Vue.directive(scroll-x,{inserted:function(el){let domClass el.getAttribute(class)if(domClass.indexOf(el-table)<0){return false}const scrollDiv el;if(scrollDivnull){return fa…

OpenCV基于边缘的分割详解

OpenCV 中基于边缘的分割是一种常见的图像分割技术&#xff0c;它利用图像中的边缘信息来进行分割。边缘通常是图像中灰度值变化较大的区域&#xff0c;因此可以作为物体之间的分界线。以下是基于边缘的分割在 OpenCV 中的详细介绍&#xff1a; Canny 边缘检测&#xff08;Cann…

YOLOv9有效改进|CVPR2023即插即用的到残差注意力机制(轻量化注意力机制)Inverted Residual Mobile Block

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;助力高效涨点&#xff01;&#xff01;&#xff01; 一、改进点介绍 在YOLOv9中加入CVPR2023即插即用的到残差注意力机制。 二、模块详解 2.1 模块简介 Inverted Residual Mobile Block结合了倒置残差块…

JavaEE企业开发新技术3

目录 2.11 Method的基本操作-1 文字性概念描述 代码&#xff1a; 2.12 Method的基本操作-2 2.13 Method的基本操作-3 2.14 数组的反射操作-1 文字性概念&#xff1a; 代码&#xff1a; 2.15 数组的反射操作-2 学习内容 2.11 Method的基本操作-1 文字性概念描述 Me…

SSM整合Springboot

1.0 概述 1.1 持久层&#xff1a; DAO层&#xff08;mapper&#xff09; DAO层&#xff1a;DAO层主要是做数据持久层的工作&#xff0c;负责与数据库进行联络的一些任务都封装在此 DAO层的设计首先是设计DAO的接口&#xff0c; 然后在spring-mapper.xml的配置文件中定义此接…

“低代码+平台”:驱动企业数字化转型与创新的新引擎

“低代码平台”作为一种新兴的软件开发范式&#xff0c;正逐渐成为企业快速响应市场变化、优化业务流程、提升数字化水平的重要手段。它的价值在于&#xff0c;将传统软件开发的复杂性大大降低&#xff0c;赋予了非技术人员或轻量级开发者快速构建应用的能力&#xff0c;并能灵…

【vue-小知识】var、let 和 const之间的区别

文章目录 结论1、重复定义变量名var&#xff1a;允许重复定义变量名let和const&#xff1a;不可以重复定义变量名 2、修改值var&#xff1a;允许修改值let&#xff1a;允许修改值const&#xff1a;不允许修改值&#xff0c;会报错 3、变量提升var : 支持变量提升let和const&…

【黑马程序员】Python多任务

文章目录 多进程多进程使用流程导入包Process进程类说明 获取进程编号目的常用操作 获取进程名进程注意点进程之间不共享全局变量主进程会等待子进程结束之后再结束设置守护主进程 多线程threading模块线程注意点线程之间执行是无序的主线程会等待所有的子线程执行结束在结束线…

Docker compose()

1.概述 是 Docker 官方提供的一款开源工具&#xff0c;主要用于简化在单个主机上定义和运行多容器 Docker 应用的过程。它的核心作用是容器编排&#xff0c;使得开发者能够在一个统一的环境中以声明式的方式管理多容器应用的服务及其依赖关系。 也就是说Docker Compose是一个用…

吃瓜Grok大模型

段子区 今年当地时间2月29日晚&#xff0c;马斯克闹出来一件大事——正式起诉OpenAI和Sam Altman&#xff0c;并要求OpenAI 恢复开源GPT-4等模型。国际流量大师我只付服马斯克和川宝!&#xff01; 当大家觉得这扯皮的故事就此结束后&#xff0c;马斯克“不负众望”的整了一个大…