FlinkSql维表join之Temporal table join

什么是维表join? 

 对于每条流式数据,可以关联一个外部维表数据源,为FlinkSql实时计算提供数据关联查询。

说明: 维表是一张不断变化的表,在维表JOIN时,需指明该条记录关联维表快照的时刻。维表JOIN仅支持对当前时刻维表快照的关联,未来会支持关联左表proctime或rowtime所对应的维表快照。

维表join语法:

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

测试语句:

kafka事实表:

        // 2. 定义输入表,从Kafka消费数据tableEnv.executeSql("CREATE TABLE sourceTable (\n" +"  `user_id` STRING,\n" +"  `item_id` INTEGER,\n" +"  `behavior` STRING,\n" +"  `ts`    STRING,\n" +"  `body` ROW<id STRING,name STRING,code STRING> ,\n" +"`proctime` as PROCTIME()"+") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'test-data',\n" +"  'properties.bootstrap.servers' = '127.0.0.1:9092',\n" +"  'properties.group.id' = 'test-data',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json', \n" +//字段丢失任务不失败"   'json.fail-on-missing-field' = 'true',\n"+//-- 解析失败跳过"   'json.ignore-parse-errors' = 'false' \n" + ")");

mysql维表:

        tableEnv.executeSql("CREATE TABLE dim_province (\n" +"                        province_id BIGINT,\n" +"                        province_name  VARCHAR,\n" +"                        region_name VARCHAR \n" +"                ) WITH (\n" +"                        'connector.type' = 'jdbc',\n" +"                        'connector.url' = 'jdbc:mysql://localhost:3306/sms?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',\n" +"                        'connector.table' = 'dim_province',\n" +"                        'connector.username' = 'root',\n" +"                        'connector.password' = 'root'\n" +
//                        "                        'connector.lookup.cache.max-rows' = '1',\n" +
//                        "                        'connector.lookup.cache.ttl' = '5s'\n" +"                )");

执行查询:

        tableEnv.executeSql("select sourceTable.item_id,sourceTable.ts,dim_province.province_name,sourceTable.proctime" +"" +" from sourceTable  join dim_province " +"   FOR SYSTEM_TIME AS OF sourceTable.proctime   \n"+"ON sourceTable.item_id = dim_province.province_id").print();

结果如示:

+----+-------------+--------------------------------+--------------------------------+-------------------------+
| op |     item_id |                             ts |                  province_name |                proctime |
+----+-------------+--------------------------------+--------------------------------+-------------------------+
| +I |           2 |                  1690786451861 |                            222 | 2023-07-31 15:07:49.673 |
| +I |           2 |                  1690786451861 |                            222 | 2023-07-31 15:08:33.763 |
| +I |           3 |                  1690786451861 |                            333 | 2023-07-31 15:09:04.121 |
| +I |           2 |                  1690786451861 |                         222222 | 2023-07-31 15:09:45.225 |

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/16262.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

native webrtc支持切换音频采集设备和获取裸流

https://www.yuque.com/caokunchao/rtendq/oq8w3qgs3g59whru 前言 版本webrtc m96 1、修改webrtc m96代码&#xff0c;向外提供一个adm指针的接口出来 2、外部来获取指针进行设备的选择 3、外部获取音频裸流&#xff0c;麦克风或者扬声器的数据 修改webrtc代码 1、修改H:\w…

form-data 提交文件请求远程调用

文件请求方法 /*** 上传图文消息内的图片 获取url* 富文本内的图片** param file*/public static String uploadMediaGetUrl(File file) throws IOException {if (!file.exists()) {return null;}String responseData null;try {String url "http://localhost:8503/fil…

Linux NUMA架构(非统一内存访问)

NUMA架构 NUMA Architecture| Non Uniform Memory Access Policy/Model | Numa Node Configuration (CPU Affinity) NUMA架构产生的原因 cpu的高速处理功能和内存存储直接的速度会严重影响cpu的性能。传统的计算机单核架构,cpu通过内存总线(内存访问控制器)直接连接到一…

# Unity 如何获取Texture 的内存大小

Unity 如何获取Texture 的内存大小 在Unity中&#xff0c;要获取Texture的内存文件大小&#xff0c;可以使用UnityEditor.TextureUtil类中的一些函数。这些函数提供了获取存储内存大小和运行时内存大小的方法。由于UnityEditor.TextureUtil是一个内部类&#xff0c;我们需要使…

一些面试(笔试)题

1、请解释如下 cron6&#xff1a; * * * * * test -f /etc/dAppCluster/gethits.py && /etc/dAppCluster/gethitspy > /dev/null 2>&1 20 4 * * * test -d /data1/www/logs && /usr/bin/find /data1/www/logs -name *-error_log"-atime 7 -type…

分段@Transactional 坑及失效问题

Transactional 背景&#xff1a;在某些情况下&#xff0c;我们需要分段transaction&#xff0c;在最外面没有transaction&#xff0c;里面分成几个transaction&#xff0c;保证分段是成功的。 问题代码&#xff1a; Service public Order getOrder1(String id) {Optional<Or…

【笔记】流沙河讲庄子:心斋与坐忘

《庄子》这部书有三十三篇&#xff0c;这三十三篇&#xff0c;从魏晋南北朝起&#xff0c;经过唐代以后&#xff0c;历来研究者都把它分为三个部分&#xff1a;《内篇》《外篇》《杂篇》。所谓《内篇》&#xff0c;是指体现庄周的哲学思想和文化思想的核心部分&#xff1b;所谓…

MySQL~事务

二、事务 1、基本介绍 概念&#xff1a;如果一个包含多个步骤的业务操作&#xff0c;被事务管理&#xff0c;那么这些操作要么同时成功&#xff0c;要么同时失败。 2、操作&#xff1a; 开启事务&#xff1a; start transaction; 回滚&#xff1a;rollback; 提交&#xff…

全网最全讲的最详细的多线程原理

在我们开始讲多线程之前&#xff0c;我们先来了解一下什么是进程&#xff0c;什么是线程。进程和线程是操作系统中两个容易混淆的概念。 进程 在Windows操作系统中打开任务管理器&#xff0c;可以查看进程和线程的详细信息。也可以使用专业的进程查看小软件——Process Explo…

WebSocket

WebSocket详解 WebSocket是一种在单个 TCP 连接上进行全双工通信的协议&#xff0c;它允许客户端和服务器之间进行实时数据交换。与传统的HTTP请求相比&#xff0c;WebSocket具有更低的延迟和更高的并发性&#xff0c;适用于实时通信场景&#xff0c;如即时聊天、实时游戏、实…

javascript实现几何粒子星空连线背景效果

javascript实现几何粒子星空连线背景效果 <html><head><meta charset"UTF-8"><title>几何星空连线背景</title><script src"./ParticleBackground.js"></script> </head><body><canvas id"…

vue2入门学习路线指引

1.插值表达式 2.指令v-bind 3.指令v-for 4.指令v-text和v-html 5.指令v-if和v-show 6.指令v-if, v-else-if和v-else 7.指令v-on和methods 8.指令v-on事件对象,事件修饰符和按键修饰符 9.指令v-model双向绑定和v-model修饰符 10.动态修改标签的class样式 11.动态修改标签的style…

MySql 知识大汇总

数据库索引 数据库索引是一种数据结构&#xff0c;用于提高数据库查询的速度和效率。索引可以看作是表中一列或多列的值的快速查找方式&#xff0c;类似于书籍的目录。通过创建索引&#xff0c;可以减少数据库的扫描量&#xff0c;加快数据的检索速度。 常见的索引类型 常见…

Linux进程调度

初探Linux进程调度 已知&#xff1a;父进程创建子进程后&#xff0c;父子进程同时运行。 问题&#xff1a;如果计算机只有一个处理器&#xff0c;父子进程以什么方式同时执行&#xff1f; 基本概念 运行&#xff1a;一个可执行程序从文件&#xff0c;变成进程的过程。 执行…

MySQL碎片清理

为什么产生&#xff1f; 经过大量增删改的表&#xff0c;都可能存在碎片 MySQL数据结构是B树&#xff0c; 删除某一记录&#xff0c;只会标记为删除&#xff0c;后续插入一条该区间的记录&#xff0c;就会复用这个位置。 删除整个数据页的记录&#xff0c;则整个页标记为“可…

微软对Visual Studio 17.7 Preview 4进行版本更新,新插件管理器亮相

近期微软发布了Visual Studio 17.7 Preview 4版本&#xff0c;而在这个版本当中&#xff0c;全新设计的扩展插件管理器将亮相&#xff0c;并且可以让用户可更简单地安装和管理扩展插件。 据了解&#xff0c;目前用户可以从 Visual Studio Marketplace 下载各式各样的 VS 扩展插…

常用的CSS渐变样式

边框渐变 方案1&#xff1a; 边框渐变( 支持圆角) width: 726px;height: 144px;border-radius: 24px;border: 5px solid transparent;background-clip: padding-box, border-box; background-origin: padding-box, border-box; background-image: linear-gradient(to right, #f…

linux/drivers/leds/leds-gpio.c学习

linux/drivers/leds/leds-gpio.c学习 linux/drivers/leds/leds-gpio.c 是 Linux 内核中的一个驱动程序文件&#xff0c;用于控制 GPIO 引脚上的 LED 灯。下面是对该文件的更详细解读&#xff1a; 1. 头文件引入&#xff1a;该文件引入了一些必要的头文件&#xff0c;包括 <…

Kotlin Multiplatform 使用 CocoaPods 创建多平台分发库

Kotlin Multiplatform 支持直接创建Framework 方式和使用CocoaPods 方式创建Framework。 1、不同之处在于创建的时候需要选择不同的方式。 2、使用CocoaPods 方式还需要在 build.gradle(.kts) 文件中添加内容 在build.gradle(.kts) 文件中添加完成后,执行一下文件。剩下的集成…

Java和Python一些处理sql方式总结

将查询结果导入csv文件中 public static int executeUpdate(String sql, Object[] param) {//创建一个PreparedStatement对象用来操作数据库PreparedStatement pstmt null;//getConnection()方法为我自己定义的获取数据库连接的方法pstmt getConnection().prepareStatement(s…