FlinkSQL之temporary join开发

2705c43b7f4f8013c53321f32a9a2fed.gif

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在此文中,供各位同学参考与交流。

ffcd1b37f760b4430542917b5e7f8f77.png

背景介绍

关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:

https://www.51cto.com/article/713922.html

目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:

1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。

2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。

3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。

48804d99d95cf0aa49670aa4181b12e5.png

图片来源:孙金城, 《Blink 漫谈系列 - Temporal Table JOIN》

应用场景&实例分享

当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:

CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM (    SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;

同上我们也需要定义流量日志明细流的watermark,并进行双流join

CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`

结果如下:

--商品标签信息
12:00> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================  t1                 A12:30> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================t1                 B--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;id              time
=======          ========t1              12:00       t1              12:15       t1               12:30       --执行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id               time              tag(商品标签)
=======          ========        =======================t1              12:00                   At1              12:15                   At1              12:30                   B
开发经验
  稀疏数据处理

由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

  数据延迟下发
  • 问题

在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。

8f9f7e4845d6e74bf7cb6e93c76e09a0.jpeg

  • 原因分析

我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table

select a.*,b.tag
from
(
select * from source_1 
union all 
select * from source_2
union all 
select * from source_3
union all 
select * from source_4
) a
temporay join 
b流

source_4会在整点流入少部分当前小时59分钟的数据,而temporay join 是由两边的watermark所触发,所以会有a流等待b流的时间到达当前小时59分钟后再触发的现象。

ccf9a4c7f6f2b304cbb642b493d40e34.jpeg

  • 解法

对source_4中log_time>当前时间的部分,做temporary join时将log_time置为当前时间,该问题就解决了。

7b3d2f9b26327c4027ac08d723febb8d.png

总结

1. 在单流驱动的双流join场景中,temporary join是一种常见的处理方式。

2. temporary join由两条流的watermark触发,需要对两条流的watermark进行预处理,防止数据稀疏和数据抢跑等现象影响数据下发。

5f2cad15473f0438ff03df1f6df8861d.png

参考资料

  • https://www.51cto.com/article/713922.html

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

af16dc18390c18a051935ea9c0e343b9.png

团队介绍

我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58063.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【开源免费】基于SpringBoot+Vue.JS网上超市系统(JAVA毕业设计)

本文项目编号 T 037 ,文末自助获取源码 \color{red}{T037,文末自助获取源码} T037,文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

研发运营一体化(DevOps)能力成熟度模型

目录 应用设计 安全风险管理 技术运 持续交付 敏捷开发管理 基于微服务的端到端持续交付流水线案例 应用设计 安全风险管理 技术运 持续交付

Android 判断手机放置的方向

#1024程序员节|征文# 文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据总结 需求 老板:我有个手持终端,不能让他倒了,当他倒或者倾斜的时候要发出报警; 程序猿:我这..... 老板…

2024-09-28 地址空间与进程控制

一、进程地址空间 Pt.2 同一个变量,地址相同,其实是虚拟地址相同,内容不同其实是被映射到了不同的物理地址 1. 页表 内存保护与页表标志位 在操作系统中,页表用于管理内存的访问权限。每个页表项通常包含一组标志位&…

二:Python学习笔记--基础知识(1) 变量,关键字,数据类型,赋值运算符,比较运算符

目录 1. 变量 2. python关键字 3. python数据类型 3.1 数字类型 整型 int 浮点型 float 内置函数-type 3.2 字符串类型 3.3 布尔类型 3.4 空类型 3.5 列表类型 3.6 元组类型 3.7 字典类型 4. python赋值运算 5. python比较运算符 1. 变量 组成:必须是数…

基于SSM的BBS社区论坛系统源码

运行环境:ideamysql5.7jdk8maven 使用技术:ssmmysqlshirolayui 功能模块:用户管理、模板管理、帖子管理、公告管理、权限管理等

yolov9目标检测/分割预测报错AttributeError: ‘list‘ object has no attribute ‘device‘常见汇总

这篇文章主要是对yolov9目标检测和目标分割预测测试时的报错,进行解决方案。 在说明解决方案前,严重投诉、吐槽一些博主发的一些文章,压根没用的解决方法,也不知道他们从哪里抄的,误人子弟、浪费时间。 我在解决前&…

Lampiao靶机入侵实战

07-Lampiao靶机入侵实战 一、扫描采集信息 1、获取IP地址 nmap -sn 192.168.81.0/24获得IP地址为:192.168.81.1282、获取端口信息 由于nmap默认情况下只扫描常用的1000个端口,覆盖面并不全,所以建议全端口扫描 nmap -p 1-65535 192.168.…

DiffusionDet: Diffusion Model for Object Detection—扩散模型检测论文解析

DiffusionDet: Diffusion Model for Object Detection—扩散模型检测论文解析 这是一篇发表在CVPR 2023的一篇论文,因为自己本身的研究方向是目标跟踪,之前看了一点使用扩散模型进行多跟踪的论文,里面提到了DiffusionDet因此学习一下。 论文…

读数据工程之道:设计和构建健壮的数据系统21数据获取

1. 数据获取 1.1. 数据获取是将数据从一个地方移动到另一个地方的过程 1.1.1. 数据获取与系统内部获取是不同的 1.2. 数据获取是数据工程生命周期中将数据从源系统移入存储的一个中间步骤 1.3. 数据集成则是将来自不同来源系统的数据组合到一个新的数据集 1.4. 数据获取的…

数字后端零基础入门系列 | Innovus零基础LAB学习Day6

今天没有具体的数字IC后端lab实验。今天的重点是熟悉掌握静态时序分析STA中的几类timing path以及setup和hold检查机制(包含setup和hold计算公式)。 芯片流片失败的那些故事 数字后端零基础入门系列 | Innovus零基础LAB学习Day5 等大家把今天内容学习…

QT获取本机所有IP地址以及修改本机IP(注意区分Windows和Linux环境)

QT 获取本机所有 IP 地址 Chapter1 QT 获取本机所有 IP 地址获取本机所有 IP 地址,包括 IPV6的地址,需要引用 QNetworkInterface1.检索所有网络接口:2.获取接口的详细信息:3.获取接口的 IP 地址:4.用于网络诊断和监控&…

KEYSIGHT E4980A是德E4980AL 精密LCR表

Keysight E4980A 精密 LCR 表为各种元件测量提供了精度、速度和多功能性的最佳组合。 E4980A 在低阻抗和高阻抗范围内提供快速测量速度和出色的性能,是元件和材料的一般研发和制造测试的终极工具。 LAN、USB 和 GPIB PC 连接可提高您的设计和测试效率。 Keysight E4…

大语言模型的Scaling Law【Power Low】

NLP-大语言模型学习系列目录 一、注意力机制基础——RNN,Seq2Seq等基础知识 二、注意力机制【Self-Attention,自注意力模型】 三、Transformer图文详解【Attention is all you need】 四、大语言模型的Scaling Law【Power Low】 文章目录 NLP-大语言模型学习系列目录一、什么是…

练习LabVIEW第十九题

学习目标: 刚学了LabVIEW,在网上找了些题,练习一下LabVIEW,有不对不好不足的地方欢迎指正! 第十九题: 创建一个程序把另外一个VI的前面板显示在Picture控件中 开始编写: 在前面板放置一个二…

iOS AVAudioSession 详解【音乐播放器的配置】

前言 在 iOS 音频开发中,AVAudioSession 是至关重要的工具,它控制着应用的音频行为,包括播放、录音、后台支持和音频中断处理等。对于音乐播放器等音频需求强烈的应用,设计一个合理的 AVAudioSession 管理体系不仅能保证音频播放…

一文详解高光谱数据python处理包spectral(SPy)

一、基本操作 读取高光谱数据文件 import spectral # 读取ENVI格式的高光谱图像 # image的后缀可以是.raw、.spe、.lan等 # 代码里img对象,类似于rasterio库的dataset对象,可以用它来读取高光谱数据 img spectral.envi.read_envi(filemy_data.hdr, im…

【LeetCode】修炼之路-0008- String to Integer (atoi)【python】

题目 基本思路 其实题目已经说了如何实现了,我们按照给定的思路实现即可 1. 问题四大要求详解 1.1 处理空格 (Whitespace) 忽略字符串开头的任何空格字符 (" ")例如: " 123" 应该被处理为 “123”Python实现: 可以使用 strip() 方法或循环处…

Python浪漫之星星与文字构造的错位图

效果图: 完整代码: import tkinter as tk import random import math from tkinter.constants import *width 888 height 500 heartx width / 2 hearty height / 2 side 11class Star:def __init__(self, canvas, x, y, size):self.canvas canvas…

精准医疗沟通新体验:开源语音识别(ASR)如何提升医生与患者对话

需求背景:一家远程医疗公司在为偏远地区提供在线医疗服务的过程中,发现传统手动记录方式效率太低,无法满足需求,影响就诊的效率。 解决方案:使用思通数科的ASR平台,公司可以实现多话者对话转录和自动病历生…