大数据-玩转数据-双流JOIN

一、双流JOIN

在Flink中, 支持两种方式的流的Join: Window Join和Interval Join

二、Window Join

窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。
Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join

滚动窗口Join代码段示例
在这里插入图片描述

package com.lyh.flink12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/24 22:09*/
public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

三、Interval Join

间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

在这里插入图片描述
Interval Join只支持event-time
必须是keyBy之后的流才可以interval join

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

import java.time.Duration;public class  Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/95033.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

棉花叶病害数据集

Bacterial Blight&#xff08;细菌性枯萎病&#xff09;&#xff1a;细菌性枯萎病是由细菌引起的棉花疾病&#xff0c;主要受害部位是棉花的叶子和茎。这种病害可以导致叶片枯萎、变色和腐烂&#xff0c;对棉花产量产生不利影响。 Curl Virus&#xff08;卷叶病毒&#xff09;…

仿真调试说明——摘抄龙芯杯官方文件

1.仿真调试说明 你需要具备以下知识&#xff1a; 仿真工具的使用&#xff0c;比如Vivado的XsimVerilog的基本语法 通过本文的学习&#xff0c;你将获得&#xff1a;各类仿真错误排查的方法CPU逻辑出错的调试指导Verilog 运算符的优先级 1.1 调试指导思想概述 全局上的调试原…

多卡片效果悬停效果

效果展示 页面结构 从页面的结构上看&#xff0c;在默认状态下毛玻璃卡片是有层次感的效果叠加在一起&#xff0c;并且鼠标悬停在卡片区域后&#xff0c;卡片整齐排列。 CSS3 知识点 transform 属性的 rotate 值运用content 属性的 attr 值运用 实现页面整体布局 <div …

案例题--Web应用考点

案例题--Web应用考点 负载均衡技术微服务XML和JSON无状态和有状态真题 在选择题中没有考察过web的相关知识&#xff0c;主要就是在案例分析题中考察 负载均衡技术 应用层负载均衡技术 传输层负载均衡技术 就近的找到距离最近的服务器&#xff0c;并进行分发 使用户就近获取…

S32K144 GPIO编程

前面的文章介绍了如何在MDK-Keil下面进行S32K144的开发&#xff0c;下面就使用该工程模板进行GPIO LED的编程试验。 1. 开发环境 S32K144EVB-Q100开发板MDK-Keil Jlink 2. 硬件连接 S32K144EVB-Q100开发板关于LED的原理图如下&#xff1a; 也就是具体连接关系如下&#xf…

【C++】vector相关OJ

文章目录 1. 只出现一次的数字2. 杨辉三角3. 电话号码字母组合 ヾ(๑╹◡╹)&#xff89;" 人总要为过去的懒惰而付出代价ヾ(๑╹◡╹)&#xff89;" 1. 只出现一次的数字 力扣链接 代码展示&#xff1a; class Solution { public:int singleNumber(vector<i…

6-1 选择排序

#include <stdio.h>#define N 1000 int arr[N];/* 对长度为n的数组arr执行选择排序 */ void selectionSort(int arr[], int n);/* 打印长度为n的数组arr */ void printArray(int arr[], int n);void swap(int *xp, int *yp) {int temp *xp;*xp *yp;*yp temp; }int mai…

uniapp iOS离线打包——如何创建App并提交版本审核?

uniapp 如何创建App&#xff0c;并提交版本审核&#xff1f; 文章目录 uniapp 如何创建App&#xff0c;并提交版本审核&#xff1f;登录 appstoreconnect创建AppiOS 预览和截屏应用功能描述技术支持App 审核信息 App 信息内容版权年龄分级 价格与销售范围App 隐私提交审核 登录…

华为云云耀云服务器L实例评测|安装搭建学生成绩管理系统

1.前言概述 华为云耀云服务器L实例是新一代开箱即用、面向中小企业和开发者打造的全新轻量应用云服务器。多种产品规格&#xff0c;满足您对成本、性能及技术创新的诉求。云耀云服务器L实例提供丰富严选的应用镜像&#xff0c;实现应用一键部署&#xff0c;助力客户便捷高效的在…

mybatis-plus控制台打印sql(mybatis-Log)

配置了mybatis-plus.configuration.log-implorg.apache.ibatis.logging.stdout.StdOutImpl&#xff1b;但是mybatis执行的sql没有输出 需要检查点&#xff1a; 1、日志级别设置&#xff1a;请确保你的日志级别配置正确。如果日志级别设置得太低&#xff0c;可能导致SQL语句不…

计算机竞赛 题目:基于LSTM的预测算法 - 股票预测 天气预测 房价预测

文章目录 0 简介1 基于 Keras 用 LSTM 网络做时间序列预测2 长短记忆网络3 LSTM 网络结构和原理3.1 LSTM核心思想3.2 遗忘门3.3 输入门3.4 输出门 4 基于LSTM的天气预测4.1 数据集4.2 预测示例 5 基于LSTM的股票价格预测5.1 数据集5.2 实现代码 6 lstm 预测航空旅客数目数据集预…

【kubernetes】CRI OCI

1 OCI OCI(Open Container Initiative)&#xff1a;由Linux基金会主导&#xff0c;主要包含容器镜像规范和容器运行时规范&#xff1a; Image Specification(image-spec)Runtime Specification(runtime-spec)runC image-spec定义了镜像的格式&#xff0c;镜像的格式有以下几…

【多级缓存】

文章目录 1. JVM进程缓存2. Lua语法3. 实现多级缓存3.1 反向代理流程3.2 OpenResty快速入门 4. 查询Tomcat4.1 发送http请求的API4.2 封装http工具4.3 基于ID负载均衡4.4 流程小结 5. Redis缓存查询5.1 实现Redis查询 6. Nginx本地缓存6.1 本地缓存API6.2 实现本地缓存查询 7. …

Django的模版使用(Django-03)

一 模版的使用 模板引擎是一种可以让开发者把服务端数据填充到html网页中完成渲染效果的技术。它实现了 把前端代码和服务端代码分离 的作用&#xff0c;让项目中的业务逻辑代码和数据表现代码分离&#xff0c;让前端开发者和服务端开发者可以更好的完成协同开发。 静态网页&…

深度学习笔记_4、CNN卷积神经网络+全连接神经网络解决MNIST数据

1、首先&#xff0c;导入所需的库和模块&#xff0c;包括NumPy、PyTorch、MNIST数据集、数据处理工具、模型层、优化器、损失函数、混淆矩阵、绘图工具以及数据处理工具。 import numpy as np import torch from torchvision.datasets import mnist import torchvision.transf…

用向量数据库Milvus Cloud 搭建AI聊天机器人

加入大语言模型(LLM) 接着,需要在聊天机器人中加入 LLM。这样,用户就可以和聊天机器人开展对话了。本示例中,我们将使用 OpenAI ChatGPT 背后的模型服务:GPT-3.5。 聊天记录 为了使 LLM 回答更准确,我们需要存储用户和机器人的聊天记录,并在查询时调用这些记录,可以用…

redis的持久化消息队列

Redis Stream Redis Stream 是 Redis 5.0 版本新增加的数据结构。 Redis Stream 主要用于消息队列&#xff08;MQ&#xff0c;Message Queue&#xff09;&#xff0c;Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能&#xff0c;但它有个缺点就是消息无法…

一键AI高清换脸——基于InsightFace、CodeFormer实现高清换脸与验证换脸后效果能否通过人脸比对、人脸识别算法

前言 1、项目简介 AI换脸是指利用基于深度学习和计算机视觉来替换或合成图像或视频中的人脸。可以将一个人的脸替换为另一个人的脸,或者将一个人的表情合成到另一个人的照片或视频中。算法常常被用在娱乐目上,例如在社交媒体上创建有趣的照片或视频,也有用于电影制作、特效…

全屋灯具选购指南,如何选择合适的灯具。福州中宅装饰,福州装修

灯具装修指南 灯具就像我们家里的星星&#xff0c;在黑暗中带给我们明亮&#xff0c;可是灯具如果选择的不好&#xff0c;这个效果不仅体现不出来&#xff0c;还会让人觉得烦躁。 灯具到底该怎么选呢&#xff1f;装修灯具有哪些注意事项呢&#xff1f;给大家做了一个总结&#…

C++设计模式-抽象工厂(Abstract Factory)

目录 C设计模式-抽象工厂&#xff08;Abstract Factory&#xff09; 一、意图 二、适用性 三、结构 四、参与者 五、代码 C设计模式-抽象工厂&#xff08;Abstract Factory&#xff09; 一、意图 提供一个创建一系列相关或相互依赖对象的接口&#xff0c;而无需指定它们…