FlinkSQL之temporary join开发

2705c43b7f4f8013c53321f32a9a2fed.gif

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在此文中,供各位同学参考与交流。

ffcd1b37f760b4430542917b5e7f8f77.png

背景介绍

关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:

https://www.51cto.com/article/713922.html

目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:

1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。

2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。

3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。

48804d99d95cf0aa49670aa4181b12e5.png

图片来源:孙金城, 《Blink 漫谈系列 - Temporal Table JOIN》

应用场景&实例分享

当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:

CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM (    SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;

同上我们也需要定义流量日志明细流的watermark,并进行双流join

CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`

结果如下:

--商品标签信息
12:00> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================  t1                 A12:30> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================t1                 B--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;id              time
=======          ========t1              12:00       t1              12:15       t1               12:30       --执行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id               time              tag(商品标签)
=======          ========        =======================t1              12:00                   At1              12:15                   At1              12:30                   B
开发经验
  稀疏数据处理

由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

  数据延迟下发
  • 问题

在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。

8f9f7e4845d6e74bf7cb6e93c76e09a0.jpeg

  • 原因分析

我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table

select a.*,b.tag
from
(
select * from source_1 
union all 
select * from source_2
union all 
select * from source_3
union all 
select * from source_4
) a
temporay join 
b流

source_4会在整点流入少部分当前小时59分钟的数据,而temporay join 是由两边的watermark所触发,所以会有a流等待b流的时间到达当前小时59分钟后再触发的现象。

ccf9a4c7f6f2b304cbb642b493d40e34.jpeg

  • 解法

对source_4中log_time>当前时间的部分,做temporary join时将log_time置为当前时间,该问题就解决了。

7b3d2f9b26327c4027ac08d723febb8d.png

总结

1. 在单流驱动的双流join场景中,temporary join是一种常见的处理方式。

2. temporary join由两条流的watermark触发,需要对两条流的watermark进行预处理,防止数据稀疏和数据抢跑等现象影响数据下发。

5f2cad15473f0438ff03df1f6df8861d.png

参考资料

  • https://www.51cto.com/article/713922.html

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

af16dc18390c18a051935ea9c0e343b9.png

团队介绍

我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58063.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【鼠鼠学AI代码合集#8和9】线性神经网络

深度学习中的矢量化加速 在深度学习模型的训练过程中,处理大量数据的效率对模型性能至关重要。为此,我们通常希望能够在一次操作中处理整个小批量的样本,而不是逐一处理每个样本。这种优化方式称为矢量化。通过矢量化,我们可以充…

【开源免费】基于SpringBoot+Vue.JS网上超市系统(JAVA毕业设计)

本文项目编号 T 037 ,文末自助获取源码 \color{red}{T037,文末自助获取源码} T037,文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

C++学习笔记----9、发现继承的技巧(六)---- 有趣且令人迷惑的继承问题(2)

2、给继承类添加虚基类成员函数的重载 给继承类添加新的虚基类成员函数的重载是可能的。也就是说,可以用新的原型给继承类中的虚成员函数添加重载,但是继续继承基类版本。这个技术使用了using声明来显式包含了继承类中的成员函数的基类定义。下面为示例&…

研发运营一体化(DevOps)能力成熟度模型

目录 应用设计 安全风险管理 技术运 持续交付 敏捷开发管理 基于微服务的端到端持续交付流水线案例 应用设计 安全风险管理 技术运 持续交付

Spring-SpringMVC-SpringBoot理解

Spring是一个完整的框架,能够解决全栈的问题,核心是IOC控制反转和AOP面向切面。 SpringMVC只是Spring的一个模块module,属于Spring框架中WEB层的一部分。 即SpringMVC归属于Spring,不是一个独立的框架 由于SpringMVC配置繁杂使…

关于非中文或者url文本不换行的问题

我在一个写一个简单的url展示的时候,发现url一直溢出不换行,查了各种方法不管用,我请教了我大哥,他直接甩给我两个css放进去就好了 word-break:break-all; 按字符截断换行 /* 支持IE和chrome,FF不支持*/ word-w…

Android 判断手机放置的方向

#1024程序员节|征文# 文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据总结 需求 老板:我有个手持终端,不能让他倒了,当他倒或者倾斜的时候要发出报警; 程序猿:我这..... 老板…

机器学习4

第3章 线性模型 3.1 线性模型的基本形式 3.1.1 线性模型的核心公式 线性模型通过属性的线性组合进行预测,其核心公式为: [ f(x) \omega_1 X_1 \omega_2 X_2 … \omega_d X_d b ] 其中: ω 1 , ω 2 , . . . , ω d \omega_1, \omega_…

医疗实施-项目管理06-估算成本

估算成本 估算成本介绍作用注意事项 常见的估算方法自上而下估算数据分析-储备分析三点估算PERT 常见的估算分类根据产品进行估算根据实施计划估算 估算成本介绍 作用 估算成本是对完成项目所需资源成本进行类似估算的过程。 作用:确定项目所需资金 注意事项 注…

2024-09-28 地址空间与进程控制

一、进程地址空间 Pt.2 同一个变量,地址相同,其实是虚拟地址相同,内容不同其实是被映射到了不同的物理地址 1. 页表 内存保护与页表标志位 在操作系统中,页表用于管理内存的访问权限。每个页表项通常包含一组标志位&…

SZTU数科导论实验_pandas知识点总结

Pandas nunique示例 value_counts主要功能参数示例输出示例 groupby主要功能常用操作基本语法示例输出示例常用聚合方法 这个函数参数中的as_index选择True或者False有什么影响呢参数 as_indexTrue(默认)参数 as_indexFalse适用场景 info示例代码输出解释…

Spring Boot技术在厨艺交流平台中的创新应用

摘 要 使用旧方法对厨艺交流信息进行系统化管理已经不再让人们信赖了,把现在的网络信息技术运用在厨艺交流信息的管理上面可以解决许多信息管理上面的难题,比如处理数据时间很长,数据存在错误不能及时纠正等问题。 这次开发的厨艺交流平台功能…

二:Python学习笔记--基础知识(1) 变量,关键字,数据类型,赋值运算符,比较运算符

目录 1. 变量 2. python关键字 3. python数据类型 3.1 数字类型 整型 int 浮点型 float 内置函数-type 3.2 字符串类型 3.3 布尔类型 3.4 空类型 3.5 列表类型 3.6 元组类型 3.7 字典类型 4. python赋值运算 5. python比较运算符 1. 变量 组成:必须是数…

基于SSM的BBS社区论坛系统源码

运行环境:ideamysql5.7jdk8maven 使用技术:ssmmysqlshirolayui 功能模块:用户管理、模板管理、帖子管理、公告管理、权限管理等

yolov9目标检测/分割预测报错AttributeError: ‘list‘ object has no attribute ‘device‘常见汇总

这篇文章主要是对yolov9目标检测和目标分割预测测试时的报错,进行解决方案。 在说明解决方案前,严重投诉、吐槽一些博主发的一些文章,压根没用的解决方法,也不知道他们从哪里抄的,误人子弟、浪费时间。 我在解决前&…

Lampiao靶机入侵实战

07-Lampiao靶机入侵实战 一、扫描采集信息 1、获取IP地址 nmap -sn 192.168.81.0/24获得IP地址为:192.168.81.1282、获取端口信息 由于nmap默认情况下只扫描常用的1000个端口,覆盖面并不全,所以建议全端口扫描 nmap -p 1-65535 192.168.…

DiffusionDet: Diffusion Model for Object Detection—扩散模型检测论文解析

DiffusionDet: Diffusion Model for Object Detection—扩散模型检测论文解析 这是一篇发表在CVPR 2023的一篇论文,因为自己本身的研究方向是目标跟踪,之前看了一点使用扩散模型进行多跟踪的论文,里面提到了DiffusionDet因此学习一下。 论文…

读数据工程之道:设计和构建健壮的数据系统21数据获取

1. 数据获取 1.1. 数据获取是将数据从一个地方移动到另一个地方的过程 1.1.1. 数据获取与系统内部获取是不同的 1.2. 数据获取是数据工程生命周期中将数据从源系统移入存储的一个中间步骤 1.3. 数据集成则是将来自不同来源系统的数据组合到一个新的数据集 1.4. 数据获取的…

Java中如何实现对象的序列化与反序列化过程?

1、Java中如何实现对象的序列化与反序列化过程? 在Java中,对象序列化是一个过程,该过程可以将对象的状态(属性)转化为可以传输或者存储的形式,然后再将对象状态(属性)反序列化回对象…

数字后端零基础入门系列 | Innovus零基础LAB学习Day6

今天没有具体的数字IC后端lab实验。今天的重点是熟悉掌握静态时序分析STA中的几类timing path以及setup和hold检查机制(包含setup和hold计算公式)。 芯片流片失败的那些故事 数字后端零基础入门系列 | Innovus零基础LAB学习Day5 等大家把今天内容学习…