java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景

之前我们在一片文章里简单介绍过Flink的多流合并算子

java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join

今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。

Interval Join不是两个窗口做关联,更适用于处理乱序数据流之间的关联。它的作用更类似于从左流中a元素本身出发,对右流中一段时间内的数据进行关联(Inner Join:只关联相同Key的数据)。

如图所示:

 下边这条流中的2关联到上范围内的0/1

源码解析

Flink版本1.14.4

按住Ctrl+鼠标左键,点击process进入源码

 这里process方法是在KeydStream.java下IntervalJoined类下的方法

 包装返回类型的TypeInfomation(TypeInfo的介绍可以看上一篇)

 返回的outputType

SingleOutputStreamOperator使用给定的用户函数完成联接操作,该函数针对每个联接的元素对执行。这种方法允许传递输出类型的显式类型信息。

 IntervalJoinOperator初始化

左边界<=右边界检查

获取左流还有右流数据对应的序列化(从TypeInfo获取的)

继续看IntervalJoinOperator中的其余关键实现

open方法用来注册定时器

 初始化两个流的map状态

处理左侧流中的数据。每当数据到达左流时,它就会被添加到左缓冲区。将从右侧缓冲区中查找该元素可能的候选联接,如果该对位于用户定义的边界内,则将其传递给 ProcessJoinFunction

​ 同理处理右流

进入数据处理函数

获取数据,取出事件时间

 超过当前watermark的数据进行过滤

 

数据没问题的话,将数据添加到状态

 ​​

遍历另一条流的状态,遍历其中的数据,把满足时间要求的数据进行collect

​注册一个当前事件时间戳+右边界的定时器

定时器触发后,清空map状态中时间戳-左边界的那条数据

简单实例 

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkCode</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><jdk.version>1.8</jdk.version><jar.name>ubs-data-converter</jar.name><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.2</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${jdk.version}</source><target>${jdk.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${jar.name}</finalName><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.glassfish.jersey.core:jersey-common</exclude></excludes></artifactSet><relocations><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.shade.apache.kafka</shadedPattern></relocation></relocations><filters><filter><artifact>*</artifact><includes><include>org/apache/htrace/**</include><include>org/apache/avro/**</include><include>org/apache/flink/streaming/**</include><include>org/apache/flink/connector/**</include><include>org/apache/kafka/**</include><include>org/apache/hive/**</include><include>org/apache/hadoop/hive/**</include><include>org/apache/curator/**</include><include>org/apache/zookeeper/**</include><include>org/apache/jute/**</include><include>org/apache/thrift/**</include><include>org/apache/http/**</include><include>org/I0Itec/**</include><include>jline/**</include><include>com/yammer/**</include><include>kafka/**</include><include>org/apache/hadoop/hbase/**</include><include>com/alibaba/fastjson/**</include><include>org/elasticsearch/action/**</include><include>io/confluent/**</include><include>com/fasterxml/**</include><include>org/elasticsearch/**</include><include>hbase-default.xml</include><include>hbase-site.xml</include></includes></filter><filter><artifact>org.apache.hadoop.hive.*:*</artifact><excludes><exclude></exclude><exclude></exclude><exclude></exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

user bean

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class User{Integer id;Long t;}

 order bean 

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Order {Integer id;Long price;Long time;}

main

package ubs.app.intervaljoin;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import ubs.app.intervaljoin.bean.Order;
import ubs.app.intervaljoin.bean.User;
import ubs.app.intervaljoin.source.OrderSource;
import ubs.app.intervaljoin.source.UserSource;import java.time.Duration;public class IntervalJoinApp  {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置watermarkWatermarkStrategy<User> userWatermarkStrategy = WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<User>() {@Overridepublic long extractTimestamp(User element, long recordTimestamp) {return element.getT();}});DataStream<User> userDataStreamSource = env.addSource(new UserSource()).assignTimestampsAndWatermarks(userWatermarkStrategy);//设置watermarkWatermarkStrategy<Order> orderWatermarkStrategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Order>() {@Overridepublic long extractTimestamp(Order element, long recordTimestamp) {return element.getTime();}});DataStream<Order> orderDataStreamSource = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(orderWatermarkStrategy);env.setParallelism(1);SingleOutputStreamOperator<String> process = userDataStreamSource.keyBy(o -> o.getId()).intervalJoin(orderDataStreamSource.keyBy(o -> o.getId())).between(Time.seconds(-5), Time.seconds(0)).process(new ProcessJoinFunction<User, Order, String>() {@Overridepublic void processElement(User left, Order right, ProcessJoinFunction<User, Order, String>.Context ctx, Collector<String> out) throws Exception {Integer lid = left.getId();Long lt = left.getT();Integer rid = right.getId();long rt = right.getTime();out.collect(String.format("左%s 左时间%s 右%s 右时间%s 关联到了 %s", lid, lt/1000, rid, rt/1000, rt/1000-lt/1000));}});process.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/763130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2.Redis有五种主要的数据类型

Redis有五种主要的数据类型 String&#xff08;字符串&#xff09;&#xff1a;String类型是最简单的数据类型&#xff0c;可以存储任意类型的数据&#xff0c;例如整数、浮点数、字符串等。String类型支持一些基本的操作&#xff0c;如设置值、获取值、增减值等。 Hash&#…

论文笔记:Llama 2: Open Foundation and Fine-Tuned Chat Models

导语 Llama 2 是之前广受欢迎的开源大型语言模型 LLaMA 的新版本&#xff0c;该模型已公开发布&#xff0c;可用于研究和商业用途。本文记录了阅读该论文的一些关键笔记。 链接&#xff1a;https://arxiv.org/abs/2307.09288 1 引言 大型语言模型&#xff08;LLMs&#xff…

cesium Clock JulianDate 日照分析

cesium在初始化的时候会自动把Clock对象挂载到容器上Clock内部以JulianDate维护时间&#xff0c;比北京时间慢8个小时&#xff0c;想显示北京时间需要计算时差JulianDate的日期部分和秒数部分是分开的 julianDayNumber&#xff1a;指整数天&#xff0c;记录从公元前4713年正午以…

【蓝桥杯】第15届蓝桥杯青少组stema选拔赛C++中高级真题答案(20240310)

一、选择题 第 1 题 第 2 题 表达式1000/3的结果是( A )。 A.333 B.333.3 C.334 D.333.0 第 3 题 下列选项中&#xff0c;判断a等于1并且b等于1正确的表达式是( B )。 A.!((a!1)&&(b!1)) B.!((a!1)||(b!1)) C.!(a1)&&(b1) D.(a1)&&(b1) 【解析】 A…

我的春招求职面经

智能指针在面试时经常被问到&#xff0c;最近自己也在写&#xff0c;有一点思考&#xff0c;于是找到了这样一个题目&#xff0c;可以看看&#xff0c;上面这个代码有什么问题&#xff1f;留言区说出你的答案吧&#xff01; 最后分享一下之前的实习->春招->秋招等文章汇总…

huggingface的transformers训练bert

目录 理论 实践 理论 https://arxiv.org/abs/1810.04805 BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;是一种自然语言处理&#xff08;NLP&#xff09;模型&#xff0c;由Google在2018年提出。它是基于Transformer模型的预训练方法…

YOLOv9有效改进|CVPR2023即插即用的到残差注意力机制(轻量化注意力机制)Inverted Residual Mobile Block

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;助力高效涨点&#xff01;&#xff01;&#xff01; 一、改进点介绍 在YOLOv9中加入CVPR2023即插即用的到残差注意力机制。 二、模块详解 2.1 模块简介 Inverted Residual Mobile Block结合了倒置残差块…

JavaEE企业开发新技术3

目录 2.11 Method的基本操作-1 文字性概念描述 代码&#xff1a; 2.12 Method的基本操作-2 2.13 Method的基本操作-3 2.14 数组的反射操作-1 文字性概念&#xff1a; 代码&#xff1a; 2.15 数组的反射操作-2 学习内容 2.11 Method的基本操作-1 文字性概念描述 Me…

SSM整合Springboot

1.0 概述 1.1 持久层&#xff1a; DAO层&#xff08;mapper&#xff09; DAO层&#xff1a;DAO层主要是做数据持久层的工作&#xff0c;负责与数据库进行联络的一些任务都封装在此 DAO层的设计首先是设计DAO的接口&#xff0c; 然后在spring-mapper.xml的配置文件中定义此接…

“低代码+平台”:驱动企业数字化转型与创新的新引擎

“低代码平台”作为一种新兴的软件开发范式&#xff0c;正逐渐成为企业快速响应市场变化、优化业务流程、提升数字化水平的重要手段。它的价值在于&#xff0c;将传统软件开发的复杂性大大降低&#xff0c;赋予了非技术人员或轻量级开发者快速构建应用的能力&#xff0c;并能灵…

【vue-小知识】var、let 和 const之间的区别

文章目录 结论1、重复定义变量名var&#xff1a;允许重复定义变量名let和const&#xff1a;不可以重复定义变量名 2、修改值var&#xff1a;允许修改值let&#xff1a;允许修改值const&#xff1a;不允许修改值&#xff0c;会报错 3、变量提升var : 支持变量提升let和const&…

吃瓜Grok大模型

段子区 今年当地时间2月29日晚&#xff0c;马斯克闹出来一件大事——正式起诉OpenAI和Sam Altman&#xff0c;并要求OpenAI 恢复开源GPT-4等模型。国际流量大师我只付服马斯克和川宝!&#xff01; 当大家觉得这扯皮的故事就此结束后&#xff0c;马斯克“不负众望”的整了一个大…

【网络取证箱】网络取证在线分析工具箱

【网络取证箱】网络取证在线分析工具箱 在线网站查询工具箱&#xff0c;没什么介绍的&#xff0c;所见即所得&#xff0c;在本文档里补充了其它一些网络安全资源&#xff0c;请忽用于非法活动&#xff0c;仅供学习研究—【蘇小沐】 &#xff08;一&#xff09;Whois查询 主要…

docker 进入容器内部命令

docker容器运行了&#xff0c;怎么进入容器内部查看内部的文件情况呢&#xff1f; 答&#xff1a;可以通过docker exec 的命令查看。 docker exec --help 可以查看命令介绍 &#xff1a; docker exec -it XXX /bin/bash XX为容器ID 进入容器内部 /bin/bash是需要添加的 不…

Java NIO和IO之间的区别

前言 NIO&#xff08;New IO&#xff09;&#xff0c;这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的&#xff0c;但实现方式不同&#xff0c;NIO主要用到的是块&#xff0c;所以NIO的效率要比IO高很多。在Java API中提供了两套NIO&#xff0c;一套是针对标准输入输出…

Vue3中基本数据类型为什么需要.value,,,引用类型不需要.value

1、在v3中使用基本数据类型&#xff08;如数字、字符串、布尔值&#xff09;时&#xff0c;如果你希望响应式地更新数据并触发视图更新,需要使用ref包裹基本数据类型,然后将基本数据类型转化为响应式对象;- - - 因此当你使用ref包裹基本数据类型时,实际上得到的是一个包含.valu…

B002-springcloud alibaba 微服务环境搭建

目录 创建父工程创建基础模块创建用户微服务创建商品微服务创建订单微服务微服务调用 创建父工程 新建项目springcloud-alibaba&#xff0c;本工程不需要写代码&#xff0c;删除src 导包 <parent><groupId>org.springframework.boot</groupId><artifact…

Linux上Mysql安装和部署(图文结合超详细)

1、首先将虚拟机装成功&#xff08;这里不做演示&#xff09; 2、df-h 查看光盘是否挂载&#xff0c;已挂载进行下一步&#xff0c;未挂载手动挂载 2.1、手动挂载 mount -o ro /dev/sr0 /media3、进入etc/yum.repos.d目录查看仓是否配置&#xff0c;若配置进行下一一步&#…

360企业安全浏览器兼容模式显示异常某个内容不显示 偶发现象 本地无法复现情况js

360企业安全浏览器兼容模式显示异常 &#xff0c;现象测试环境频发 &#xff0c;本地连测试无法复现&#xff0c;线上反馈问题。 出现问题的电脑为windows且使用360企业安全浏览器打开兼容模式可复现 复现过程&#xff1a; 不直接点击超链接跳转页面 &#xff0c;登录后直接通…

C++ 侯捷 程序设计(Ⅱ)兼谈对象模型 笔记

Conversion function 转换函数 侯捷老师使用分数 Fraction举例&#xff0c;分数理应可以被看作是小数 提供了Fraction类对象一个转换为double的方法&#xff0c;当碰到需要转换为double的情况下&#xff0c;会调用该方法。 黄色的就是转换函数&#xff0c;没有return type&am…