Flink双流Join

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join

  • coGroup

  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

:::color3 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

代码演示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.join(orangeStream).where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

:::

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
​
橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python生日祝福烟花

1. 实现效果 2. 素材加载 2个图片和3个音频 shoot_image pygame.image.load(shoot(已去底).jpg) # 加载拼接的发射图像 flower_image pygame.image.load(flower.jpg) # 加载拼接的烟花图 烟花不好去底 # 调整图像的像素为原图的1/2 因为图像相对于界面来说有些大 shoo…

26页PDF | 数据中台能力框架及评估体系解读(限免下载)

一、前言 这份报告详细解读了数据中台的发展历程、核心概念、能力框架及成熟度评估体系。它从阿里巴巴的“大中台&#xff0c;小前台”战略出发&#xff0c;探讨了数据中台如何通过整合企业内部的数据资源和能力&#xff0c;加速业务迭代、降低成本&#xff0c;并推动业务增长…

如何分段存储Redis键值对

说明&#xff1a;本文介绍针对一个value过长的键值对&#xff0c;如何分段存储&#xff1b; 场景 当我们需要存入一个String类型的键值对到Redis中&#xff0c;如下&#xff1a; &#xff08;缓存接口&#xff09; public interface CacheService {/*** 添加一个字符串键值…

C-操作符

操作符种类 在C语言中&#xff0c;操作符有以下几种&#xff1a; 算术操作符 移位操作符 位操作符 逻辑操作符 条件操作符 逗号表达式 下标引用&#xff0c;函数调用 拓展&#xff1a;整型提升 我们介绍常用的几个 算术操作符 &#xff08;加&#xff09;&#xff…

RabbitMQ 客户端 连接、发送、接收处理消息

RabbitMQ 客户端 连接、发送、接收处理消息 一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样 RabbitMQ 服务&#xff0c;不是像其他服务器一样&#xff0c;负责逻辑处理&#xff0c;然后转发给客户端 而是所有客户端想要向 RabbitMQ服务发送消息&#xff0c; 第一步&a…

题海拾贝——生成元(Digit Generator,ACM/ICPC SEOUL 2005,UVa1583)

Hello大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路&#xff01; 我的博客&#xff1a;<但凡. 欢迎点赞关注&#xff01; 1、题目描述 如果x加上x的各个数字之和得到y&#xff0c;就说x是y的生成元。给出(1<n<10…

欧科云链研究院:比特币还能“燃”多久?

出品&#xff5c; OKG Research 作者&#xff5c;Hedy Bi 本周二&#xff0c;隔夜“特朗普交易” 的逆转趋势波及到比特币市场。比特币价格一度冲高至约99,000美元后迅速回落至93,000美元以下&#xff0c;最大跌幅超6%。这是由于有关以色列和黎巴嫩有望达成停火协议的传闻引发…

hint: Updates were rejected because the tip of your current branch is behind!

问题 本地仓库往远段仓库推代码时候提示&#xff1a; error: failed to push some refs to 192.168.2.1:java-base/java-cloud.git hint: Updates were rejected because the tip of your current branch is behind! refs/heads/master:refs/heads/master [rejected] (…

设计模式面试大全:说一下单例模式,及其应用场景?

定义 单例模式&#xff08;Singleton Pattern&#xff09;是 Java 中最简单的设计模式之一&#xff0c;此模式保证某个类在运行期间&#xff0c;只有一个实例对外提供服务&#xff0c;而这个类被称为单例类。 单例模式也比较好理解&#xff0c;比如一个人一生当中只能有一个真…

go-zero使用自定义模板实现统一格式的 body 响应

前提 go环境的配置、goctl的安装、go-zero的基本使用默认都会 需求 go-zero框架中&#xff0c;默认使用goctl命令生成的代码并没有统一响应格式&#xff0c;现在使用自定义模板实现统一响应格式&#xff1a; {"code": 0,"msg": "OK","d…

【Python网络爬虫笔记】5-(Request 带参数的get请求) 爬取豆瓣电影排行信息

目录 1.抓包工具查看网站信息2.代码实现3.运行结果 1.抓包工具查看网站信息 请求路径 url:https://movie.douban.com/typerank请求参数 页面往下拉&#xff0c;出现新的请求结果&#xff0c;参数start更新&#xff0c;每次刷新出20条新的电影数据 2.代码实现 # 使用网络爬…

玻璃效果和窗户室内效果模拟

一、玻璃效果 首先来讲如何模拟玻璃效果。玻璃的渲染包括三部分&#xff0c;普通场景物体的渲染、反射和折射模拟、毛玻璃模拟。作为场景物体&#xff0c;那么类似其它场景物体Shader一样&#xff0c;可以使用PBR、BlingPhong或者Matcap&#xff0c;甚至三阶色卡通渲染都可以。…

某东图标点选验证码

注意&#xff0c;本文只提供学习的思路&#xff0c;严禁违反法律以及破坏信息系统等行为&#xff0c;本文只提供思路 如有侵犯&#xff0c;请联系作者下架 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。…

九、Ubuntu Linux操作系统

一、Ubuntu简介 Ubuntu Linux是由南非人马克沙特尔沃思(Mark Shutteworth)创办的基于Debian Linux的操作系统&#xff0c;于2004年10月公布Ubuntu是一个以桌面应用为主的Linux发行版操作系统Ubuntu拥有庞大的社区力量&#xff0c;用户可以方便地从社区获得帮助其官方网站:http…

fastdds:编译、安装并运行helloworld

fastdds安装可以参考官方文档&#xff1a; 3. Linux installation from sources — Fast DDS 3.1.0 documentation 从INSTALLATION MANUAL这一节可以看出来&#xff0c;fastdds支持的操作系统包括linux、windows、qnx、MAC OS。本文记录通过源码和cmake的方式来安装fastdds的…

7. 现代卷积神经网络

文章目录 7.1. 深度卷积神经网络&#xff08;AlexNet&#xff09;7.2. 使用块的网络&#xff08;VGG&#xff09;7.3. 网络中的网络&#xff08;NiN&#xff09;7.4. 含并行连结的网络&#xff08;GoogLeNet&#xff09;7.5. 批量规范化7.5.1. 训练深层网络7.5.2. 批量规范化层…

监控视频汇聚平台:Liveweb视频监控管理平台方案详细介绍

Liveweb国标视频综合管理平台是一款以视频为核心的智慧物联应用平台。它基于分布式、负载均衡等流媒体技术进行开发&#xff0c;提供广泛兼容、安全可靠、开放共享的视频综合服务。该平台具备多种功能&#xff0c;包括视频直播、录像、回放、检索、云存储、告警上报、语音对讲、…

常用函数的使用错题汇总

目录 new/delete malloc/free1. 语言和类型2. 内存分配3. 内存释放4. 安全性和类型安全5. 其他特性总结 线程停止文件流 new/delete malloc/free malloc/free 和 new/delete 是 C/C 中用于动态内存管理的两种方式&#xff0c;它们有一些重要的区别。以下是这两种方式的比较&…

Istio笔记01--快速体验Istio

Istio笔记01--快速体验Istio 介绍部署与测试部署k8s安装istio测试istio 注意事项说明 介绍 Istio是当前最热门的服务网格产品&#xff0c;已经被广泛应用于各个云厂商和IT互联网公司。企业可以基于Istio轻松构建服务网格&#xff0c;在接入过程中应用代码无需更改&#xff0c;…

爬取boss直聘上海市人工智能招聘信息+LDA主题建模

爬取boss直聘上海市人工智能招聘信息 import time import tqdm import random import requests import json import pandas as pd import os from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriv…