flink left join消费kafka数据

left join会产生回车流数据

在控制台数据


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author: YSKSolution* @Date: 2022/11/8/19:16* @Package_name: PACKAGE_NAME*/
public class LeftJoin extends BaseSQLAPP {public static void main(String[] args) {new LeftJoin().init(2003,2,"BaseSQLAPP");}@Overrideprotected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {//join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));tEnv.executeSql("create table t1 (" +" id int, "+" name string "+")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv"));tEnv.executeSql("create table t2 (" +" id int, "+" age int "+")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv"));Table table = tEnv.sqlQuery(" select " +"t1.id," +"t1.name," +"t2.age" +" from t1 " +" left join t2 " +" on t1.id = t2.id ");//        tEnv.createTemporaryView("result",table);table.execute().print();}
}

先输入t1数据
在这里插入图片描述
控制台数据 ,左表数据输出,右表数据为null
在这里插入图片描述
再输入右表数据
在这里插入图片描述
控制台产生两条数据,一条是回撤流,一条是join得到的数据
在这里插入图片描述
2.写入upsertkakfa消费


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author: YSKSolution* @Date: 2022/11/8/19:16* @Package_name: PACKAGE_NAME*/
public class LeftJoin extends BaseSQLAPP {public static void main(String[] args) {new LeftJoin().init(2003,2,"BaseSQLAPP");}@Overrideprotected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {//join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));tEnv.executeSql("create table t1 (" +" id int, "+" name string "+")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv"));tEnv.executeSql("create table t2 (" +" id int, "+" age int "+")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv"));Table table = tEnv.sqlQuery(" select " +"t1.id," +"t1.name," +"t2.age" +" from t1 " +" left join t2 " +" on t1.id = t2.id ");//        tEnv.createTemporaryView("result",table);tEnv.executeSql("create table t3(" +"id int," +"name string," +"age int," +"primary key (id) not enforced"+")"+SQLUtil.getUpsertKafkaDDL("t3","json"));table.executeInsert("t3");}
}

先写左表,消费到的数据如下,右表数据为null
在这里插入图片描述
再写右表,产生两条数据,第一条是null,表示删除上面那条数据,第二条是left join得到的结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/20259.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电脑设置密码怎么设置?让你的电脑更安全!

在如今信息化的社会中,保护个人电脑的安全至关重要。设置密码是最基本的电脑安全措施之一,它可以有效防止未经授权的访问和保护个人隐私,可是电脑设置密码怎么设置?本文将介绍三种设置电脑密码的方法,帮助您加强电脑的…

法线方程实现最小二乘拟合(Matlab)

一、问题描述 利用法线方程实现最小二乘拟合。 二、实验目的 掌握法线方程方法的原理,能够利用法线方程完成去一组离散数据点的拟合。 三、实验内容及要求 对于下面的不一致系统,构造法线方程,计算最小二乘以及2-范数误差。 [ 3 − 1 2 …

Nginx实战:LUA脚本_环境配置安装

目录 一、什么是LUA脚本 二、Nginx中的LUA脚本 1、主要特点 2、用途 三、如何在nginx中使用LUA脚本 1、原生nginx 2、OpenResty 3、nginx lua配置验证 一、什么是LUA脚本 Nginx Lua 脚本是 Nginx 与 Lua 语言集成的结果,它允许你使用 Lua 语言编写Nginx 模块…

(深度学习记录)第TR3周:Transformer 算法详解

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 | 接辅导、项目定制 文本的输入处理中,transformer会将输入文本序列的每个词转化为一个词向量,我们通常会选择一个合适的长度作为输入…

若依分页问题排查

无限分页数据返回 一、问题排查1.1 代码排查1.2 sql排查1.3 原因分析 二、问题修复 项目使用了 若依的框架,前端反馈了一个问题,总记录条数只有 48条的情况下,传入的 页数时从6~~无穷大,每页大小为10, 此时还能返回数据&#xff0…

JSON源码类学习

json源码学习 parse把json转换成Object parseObject转换为jsonObject parseArray从字符串数组解析成真正的数组 tojsonString把真正的json解析json 数组的方法 作用:类型转换 为什么要做类型转换 开发一个方法验证 这个方法先封装方法,是否为json …

seaborn和matplotlib显示两条曲线图例

总结,添加label和plt.legend,以下由chatgpt生成 在使用 Seaborn 的 kdeplot(核密度估计图)时,显示图例也是一个常见需求,尤其是当你想比较多个不同分布的数据时。下面我将提供一个示例,说明如何…

Spring-Cloud-CircuitBreaker-Resilience4j (3.1.1)

介绍 Resilience4j 是一个专为函数式编程而设计的轻量级容错库。Resilience4j 提供高阶函数(装饰器),以增强任何功能接口、lambda 表达式或方法引用,包括断路器、速率限制器、重试或隔板。您可以在任何函数接口、lambda 表达式或…

【Python系列】Python 元组(Tuple)详解

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【STL源码剖析-空间配置器】stack、queue简单实现

举头天外望 无我这般人 目录 stack 的概述 stack 的实现 queue 的概述 queue 的实现 契子✨ 我们之前学过了 vector、list 这些 STL 的(容器) 而我们今天将要学习空间配置器 -- stack、queue,那什么是空间配置器呢? 简单来讲就是…

AI自动化办公:批量将Excel表格英文内容翻译为中文

有一个50列的表格,里面都是英文,要翻译成中文: 在ChatGPT中输入提示词: 你是一个开发AI大模型应用的Python编程专家,要完成以下任务的Python脚本: 打开Excel文件:"F:\AI自媒体内容\AI行业…

HTML静态网页成品作业(HTML+CSS)——我的班级介绍网页(2个页面)

🎉不定期分享源码,关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 🏷️本套采用HTMLCSS,未使用Javacsript代码,共有1个页面。 二、作品演示 三、代…

金属切削机床5G智能工厂工业物联数字孪生,推进制造业数字化转型

金属切削机床5G智能工厂工业物联数字孪生,推进制造业数字化转型。随着工业4.0时代的到来,制造业正面临着前所未有的变革与挑战。在这场变革中,金属切削机床智能工厂工业物联数字孪生平台正成为推动制造业数字化转型的重要力量。 数字孪生是指…

香港云服务器好还是国内的好?

香港云服务器与国内云服务器各有其优点和缺点,选择哪种类型的云服务器主要取决于业务需求、用户群体、网络需求以及成本考虑。以下是对两者进行详细比较的内容。 首先,从网络速度和稳定性来看,香港云服务器具有独特的优势。由于香港是全球数据…

企业文件加密:保障知识产权与客户隐私

在数字化时代,企业文件的安全成为了保护知识产权和客户隐私的关键。随着网络攻击和数据泄露事件的日益增多,企业必须采取强有力的措施来确保其敏感信息的安全。文件加密技术作为一项重要的数据保护手段,对于维护企业的竞争力和客户信任至关重…

图解支付系统的渠道路由设计

大家好,我是隐墨星辰,今天和大家聊聊渠道路由设计。 这篇文章主要讲清楚:渠道路由是什么,为什么需要渠道路由,渠道路由的几种形态,一个简洁而实用的基于规则的渠道路由设计。 注:有些公司称渠…

企业微信H5授权登录

在企业中如果需要在打开的网页里面携带用户的身份信息,第一步需要获取code参数 如何实现企业微信H5获取当前用户信息即accessToken? 1.在应用管理--》创建应用 2.创建好应用,点击应用主页-》设置-》网页-》将授权链接填上去 官方文档可以看…

wampserver的使用

wampserver的使用 文章目录 wampserver的使用1.启动2.目录3.基本操作 1.启动 WampServler有三种状态 服务器关闭状态,颜色为红色服务器开启,但是为离线状态,颜色为橙色,只有本机可以访问服务器开启,在线状态&#xf…

Educational Codeforces Round 166 (Rated for Div. 2)题解(A,B,D)

今天真的巨抽象,第三题没做出来,但是第四题过了,也是准备上小分了,因为nnd不按那个分数,而是按照做题数,直接废了 A. Verify Password 题解:小丑水题一个人,按照ASCII码比较一遍直接…

SDK开发

为什么需要Starter? 理想情况:开发者只需关心调用哪些接口,传递哪些参数就跟调用自己写的代码一样简单。 开发starter的好处:开发者引入之后,可以直接在application.yml中写配置,自动创建客户端。 starter开发流程 …