FlinkSQL开发经验分享

e96cce5a08d68f7e2e9353401bb23fff.gif

最近做了几个实时数据开发需求,也不可避免地在使用Flink的过程中遇到了一些问题,比如数据倾斜导致的反压、interval join、开窗导致的水位线失效等问题,通过思考并解决这些问题,加深了我对Flink原理与机制的理解,因此将这些开发经验分享出来,希望可以帮助到有需要的同学。

下文会介绍3个case案例,每个case都会划分为背景、原因分析和解决方法三部分来进行介绍。

31d0bf6f4abe77c033483c23c526c7b6.png

Case1: 数据倾斜

数据倾斜无论是在离线还是实时中都会遇到,其定义是:在并行进行数据处理的时候,按照某些key划分的数据显著多余其他部分,分布不均匀,导致大量数据集中分布到一台或者某几台计算节点上,使得该部分的处理速度远低于平均计算速度,成为整个数据集处理的瓶颈,从而影响整体计算性能。造成数据倾斜的原因有很多种,如group by时的key分布不均匀,空值过多、count distinct等,本文将只介绍group by + count distinct这种情况。

  背景

对实时曝光流,实时统计近24小时创意的曝光UV和PV。且每分钟更新一次数据。通用的方法就是使用hop滑动窗口来进行统计,代码如下:

selectHOP_START(ts,interval '1' minute,interval '24' hour) as window_start,HOP_END(ts,interval '1' minute,interval '24' hour) as window_end,creative_id,count(distinct uid) as exp_uv  -- 计算曝光UV,count(uid) as exp_pv   --计算曝光PV
from dwd_expos_detail
group byhop(ts,interval '1' minute,interval '24' hour)  -- 滑动窗口开窗,窗口范围:近24小时,滑动间隔:每1分钟,creative_id
  问题及原因
  • 问题发现

在上述flink程序运行的时候,该窗口聚合算子GlobalWindowAggregate出现长时间busy的情况,导致上游的算子出现反压,整个flink任务长时间延迟。

7391c76e5daaec52c01668f74e088f50.png

  • 原因分析

一般面对反压的现象,首先要定位到出现拥堵的算子,在该case中,使用窗口聚合计算每个创意id对应的UV和PV时,出现了计算繁忙拥堵的情况。

针对这种情况,最常想到的就是以下两点原因:

  • 数据量较大,但是设置的并发度过小(此任务中该算子的并发度设置为3)

  • 单个slot的CPU和内存等计算资源不足

点击拥堵算子,并查看BackPressure,可以看到虽然并发度设置为3,但是出现拥堵的只有subtask0这一个并发子任务,因此基本上可以排出上述两种猜想,如果还是不放心,可以设置增加并行度至6,同时提高该算子上的slot的内存和CPU,结果如下:

4597e3720910172e8107239e1c70e35e.png

可以看到依然只有subtask0处于计算拥堵的状态,现在可以完全确认是由于group by时的key上的数据分布不均匀导致的数据倾斜问题。

  解决方法
  • 开启PartialFinal解决count distinct中的热点问题

    • 实现:flink中提供了针对count distinct的自动打散和两阶段聚合,即PartialFinal优化。实现方法:在作业运维中增加如下参数设置:

table.optimizer.distinct-agg.split.enabled: true
  • 限制:这个参数适用于普通的GroupAggregate算子,对于WindowAggregate算子目前只适用于新的Window TVF(窗口表值函数),老的一套Tumble/Hop/Cumulate window是不支持的。

由于我们的代码中并没有使用到窗口表值函数,而是直接在group中使用了hop窗口,因此该方法不适用。

人工对不均匀的key进行打散并实现两阶段聚合

  • 思路:增加按Distinct Key取模的打散层

  • 实现:

    • 第一阶段:对distinct的字段uid取hash值,并除以1024取模作为group by的key。此时的group by分组由于引入了user_id,因此分组变得均匀。

selectHOP_START(ts,interval '1' minute,interval '24' hour) as window_start,HOP_END(ts,interval '1' minute,interval '24' hour) as window_end,creative_id,count(distinct uid) as exp_uv,count(uid) as exp_pvfrom dwd_expos_detailgroup byhop(ts,interval '1' minute,interval '24' hour),creative_id,MOD(HASH_CODE(uid), 1024)
    • 第二阶段:对上述结果,再根据creative_id字段进行分组,并将UV和PV的值求和

selectwindow_start,window_end,creative_id,sum(exp_uv) as exp_uv,sum(exp_pv) as exp_pv
from (selectHOP_START(ts,interval '1' minute,interval '24' hour) as window_start,HOP_END(ts,interval '1' minute,interval '24' hour) as window_end,creative_id,count(distinct uid) as exp_uv,count(uid) as exp_pvfrom dwd_expos_detailgroup byhop(ts,interval '1' minute,interval '24' hour),creative_id,MOD(HASH_CODE(uid), 1024)
)
group bywindow_start,window_end,creative_id
;
  • 效果:在拓扑图中可以看到原窗口聚合算子被分为两个独立的聚合算子,同时每个subtask的繁忙程度也都接近,不再出现不均匀的情况。

f8720411bd30f4e0b2944414730777fd.png

Case2: 水位线失效
  背景

需要先对两条实时流进行双流join,然后再对join后的结果使用hop滑动窗口,计算每个创意的汇总指标。

  问题及原因
  • 问题发现

开窗后长时间无数据产生。

  • 原因分析

水位线对于窗口函数的实现起到了决定性的作用,它决定了窗口的触发时机,Window聚合目前支持Event Time和Processing Time两种时间属性定义窗口。最常用的就是在源表的event_time字段上定义水位线,系统会根据数据的Event Time生成的Watermark来进行关窗。只有当Watermark大于关窗时间,才会触发窗口的结束,窗口结束才会输出结果。如果一直没有触发窗口结束的数据流入Flink,则该窗口就无法输出数据。

  • 限制:数据经过GroupBy、双流JOIN或OVER窗口节点后,会导致Watermark属性丢失,无法再使用Event Time进行开窗。

由于我们在代码中首先使用了interval join来处理点击流和交易流,然后在对生成的数据进行开窗,导致水位线丢失,窗口函数无法被触发。

  解决方法

思路1: 既然双流join之后的时间字段丢失了水位线属性,可以考虑再给join之后的结果再加上一个processing time的时间字段,然后使用该字段进行开窗。

  • 缺点:该字段无法真正体现数据的时间属性,只是机器处理该条数据的时间戳,因此会导致窗口聚合时的结果不准确,不推荐使用。

思路2: 新建tt流

  • 要开窗就必须有水位线,而水位线往往会在上述提及的聚合或者双流join加工中丢失,因此考虑新建一个flink任务专门用来进行双流join,过滤出符合条件的用户交易明细流,并写入到tt,然后再消费该tt,并对tt流中的event_time字段定义watermark水位线,并直接将数据用于hop滑动窗口。

  • 实现:

    • 步骤1:新建flink任务,通过interval join筛选出近六个小时内有过点击记录的用户交易明细,并sink到tt

insert into sink_dwd_pop_pay_detail_ri
selectp1.uid,p1.order_id,p1.order_amount,p1.ts,p2.creative_id
from (selectuid,order_amount ,order_id,tsfrom dwd_trade_detail
) p1join dwd_clk_uv_detail p2on p2.ts between p1.ts - interval '6' hour and p1.tsand p1.uid = p2.uid
;
    • 步骤2: 消费该加工后的交易流,并直接进行滑动窗口聚合

selectHOP_START(ts,INTERVAL '1' minute,INTERVAL '24' hour) as window_start,HOP_END(ts,INTERVAL '1' minute,INTERVAL '24' hour) as window_end,creative_id,sum(order_amount) as total_gmv,count(distinct uid) as cnt_order_uv,round(sum(order_amount) / count(distinct uid) / 1.0,2) as gmv_per_uv
from source_dwd_pop_pay_detail_ri
GROUP BYHOP(ts,INTERVAL '1' minute,INTERVAL '24' hour),creative_id
;
Case3: group by失效
  背景

目的:对于实时流,需要给素材打上是否通过的标签。

打标逻辑:如果素材id同时出现在lastValidPlanInfo和validPlanInfo的两个数组字段中,则认为该素材通过(is_filtered=0),如果素材id只出现在lastValidPlanInfo数组字段中,则认为该素材未通过(is_filtered= 1)。

sink表类型:odps/sls,不支持回撤和主键更新机制。

上述逻辑的实现sql如下:

SELECT`user_id`,trace_id,`timestamp`,material_id ,min(is_filtered)) as is_filtered   -- 最后group by聚合,每个素材得到唯一的标签FROM (SELECT`user_id`,trace_id,`timestamp`,material_id,1 as is_filtered   -- lastValidPlanInfo字段中出现的素材都打上1的被过滤标签FROM dwd_log_parsing,lateral table(string_split(lastValidPlanInfo, ';')) as t1(material_id)WHERE lastValidPlanInfo IS NOT NULLUNION ALLSELECT`user_id`,trace_id,`timestamp`,material_id,0 as is_filtered     -- validPlanInfo字段中出现的素材都打上0的被过滤标签FROM dwd_log_parsing,lateral table(string_split(validPlanInfo, ';')) as t2(material_id)WHERE validPlanInfo IS NOT NULL)GROUP BY`user_id`,trace_id,`timestamp`,material_id
  问题及原因
  • 问题发现

原始数据样例:根据下图可以发现1905和1906两个素材id出现在lastValidPlanInfo中,只有1906这个id出现在validPlanInfo字段中,说明1905被过滤掉了,1906通过了。

e53a0760d6e1e00b0c8402ca3e24a46c.png

期望的计算结果应该是:

material_id

is_filtered

1905

1

1906

0

但是最终写入到odps的结果如下图,可以发现material_id为1906出现了两条结果,且不一致,所以我们不禁产生了一个疑问:是fink中的group by失效了吗?

53d409d7d127521aea36c1e28f975cc2.png

  • 原因分析

由于odps sink表不支持回撤和upsert主键更新机制,因此对于每一条源表的流数据,只要进入到operator算子并产生结果,就会直接将该条结果写入到odps。

union all和lateral table的使用都会把一条流数据拆分为多条流数据。上述代码中首先使用到了lateral table将lastValidPlanInfo和validPlanInfo数组字段中的material_id数字拆分为多条material_id,然后再使用union all+group by实现过滤打标功能,这些操作早已经将原tt流中的一条流数据拆分成了多条。

综合上述两点,

  • 针对1906的素材id,由于lateral table的使用,使得其和1905成为了两条独立的流数据;

  • 由于union all的使用,又将其拆分为is_filtered =1的一条流数据(union all的前半部分),和is_filtered=0的一条流数据(union all的后半部分);

  • 由于flink一次只能处理一条流数据,因此如果先处理了素材1906的is_filtered=1的流数据,经过group by和min(is_filtered)操作,将is_filtered= 1的结果先写入到odps,然后再处理is_filtered=1的流数据,经过group by和min(is_filtered)操作,状态更新is_filtered的最小值变更为0,又将该条结果写入到odps。

  • 由于odps不支持回撤和主键更新,因此会存在两条素材1906的数据,且结果不一致。

  解决方法
  • 思路:既然lateral table和union all的使用,会把一条流数据变为多条,并引发了后续的多次写入的问题。因此我们考虑让这些衍生出的多条流数据可以一次性进入到group by中参与聚合计算,最终只输出1条结果。

  • 实现:mini-batch微批处理

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 1s
  • 概念:mini-batch是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐并减少数据的输出量。微批处理通过增加延迟换取高吞吐,如果您有超低延迟的要求,不建议开启微批处理。通常对于聚合场景,微批处理可以显著地提升系统性能,建议开启。

  • 效果:上述问题得到解决,odps表只输出每个用户的每次请求的每个素材id只有1条数据输出。

e63d0327c675a7f678973577c47b4771.png

总结

FlinkSQL的开发是最方便高效的实时数据需求的实现途径,但是它和离线的ODPS SQL开发在底层的机制和原理上还是有很大的区别,根本的区别就在于流和批的处理。如果按照我们已经习惯的离线思维来写FlinkSQL,就可能会出现一些“离奇”的结果,但是遇到问题并不可怕,要始终相信根本不存在任何“离奇”,所有的问题都是可以追溯到原因的,而在这个探索的过程中,也可以学习到许多知识,所以让我们遇到更多的问题,积累更多的经验,熟练地应用Flink。

49804261c6155a0bab227ae0b3420bcc.png

参考资料

  • 窗口:

    https://help.aliyun.com/zh/flink/developer-reference/overview-4?spm=a2c4g.11186623.0.i33

  • 高性能优化:

    https://help.aliyun.com/zh/flink/user-guide/optimize-flink-sql

5bd37562fdcd3058092e813801ea3f6b.png

团队介绍

淘天业务技术用户运营平台技术团队是一支懂用户,技术驱动的年轻队伍,以用户为中心,通过技术创新提升用户全生命周期体验,持续为用户创造价值。
团队立足体系化打造业界领先的用户增长基础设施,以媒体外投平台、ABTest平台、用户运营平台为代表的基础设施赋能阿里集团用户增长,日均处理数据量千亿规模、调用QPS千万级。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DC-DC 高压降压、非隔离AC-DC、提供强大的动力,选择优质电源芯片-(昱灿)

畅享长续航,尽在我们的充电芯片! 无论是手机、平板还是智能设备,长时间使用后电量不足总是令人头疼。然而,我们的充电芯片将为您带来全新的充电体验!采用先进的技术,我们的充电芯片能够提供快速而稳定的充电…

威纶通触摸屏软件出现显示异常问题(显示黑色)处理方法

异常现象 电脑端显示异常,显示黑色 解决方法 Step1:软件根目录查找DisplaySetting.exe Step2:勾选第1或第2项,重启软件即可 分享创作不易,请多多支持,点赞、收藏、关注! Ending~

《计算机英语》 Unit 3 Software Engineering 软件工程

Section A Software Engineering Methodologies 软件工程方法论 Software development is an engineering process. 软件开发是一个工程过程。 The goal of researchers in software engineering is to find principles that guide the software development process and lea…

冲击2024年CSDN博客之星TOP1:CSDN文章质量分查询在哪里?

文章目录 一,2023年博客之星规则1,不高的入围门槛2,[CSDN博文质量分测评地址](https://www.csdn.net/qc) 二,高分秘籍1,要有目录2,文章长度要足够,我的经验是汉字加代码至少1000字。3&#xff0…

6G时代,即将来临!

日前,由未来移动通信论坛、紫金山实验室主办的2024全球6G技术大会在南京召开。本次大会以“创新预见6G未来”为主题,在大会开幕式上发布了协力推进全球6G统一标准行动的倡议和紫金山科技城加速培育以6G技术引领未来产业行动计划。 在我国已开展第五代移动…

会自动清除的文件——tempfile

原文链接:http://www.juzicode.com/python-tutorial-tempfile/ 在某些不需要持久保存文件的场景下,可以用tempfile模块生成临时文件或者文件夹,这些临时文件或者文件夹在使用完之后就会自动删除。 NamedTemporaryFile用来创建临时文件&…

6月27日云技术研讨会 | 中央集中架构新车型功能和网络测试解决方案

会议摘要 “软件定义汽车”新时代下,整车电气电气架构向中央-区域集中式发展已成为行业共识,车型架构的变革带来更复杂的整车功能定义、更多的新技术的应用(如SOA服务化、TSN等)和更短的车型研发周期,对整车和新产品研…

C语言| 数组的折半查找

数组的折半查找 折半查找:在已经排好序的一组数据中快速查找数据。 先排序,再使用折半查找。 【折半查找的运行过程】 1 存储数组下标 low最小的下标,mid中间的下标, high最大的下标 2 key存放查找的值,每一次对比后…

Acrobat Pro DC 2021:Mac/Win平台上全面高效的PDF编辑器

Acrobat Pro DC 2021是一款在Mac和Windows平台上广受欢迎的PDF编辑器,它凭借其全面的功能和高效的性能,为用户提供了卓越的PDF处理体验。 一、编辑功能全面强大 Acrobat Pro DC 2021允许用户轻松创建、编辑、合并、转换、签署和分享PDF文件。无论是对P…

一个电商创业者眼中的618:平台大变局

战役结束了,战斗还在继续。 一位朋友去年5月创业,网上卖咖啡,这个赛道很拥挤,时机也不好,今年是他参加第一个618。朋友说,今年的目标是锤炼团队,总结方法,以及最重要的——活下去。…

水系统阻力计算

所谓水泵的选取计算其实就是估算(很多计算公式本身就是估算的),估算分的细致些考虑的内容全面些就是精确的计算。 特别补充:当设计流量在设备的额定流量附近时,上面所提到的阻力可以套用,更多的是往往都大…

【C++题解】1713 - 输出满足条件的整数3

问题:1713 - 输出满足条件的整数3 类型:简单循环 题目描述: 有一个数列,该数列的前 4 个数是: 1 4 7 10 ; 请从键盘读入一个正整数 n ,请通过观察前 4 项的规律,输出 1∼n 之间所有…

AudioSep:从音频中分离出特定声音(人声、笑声、噪音、乐器等)本地一键整合包下载

AudioSep是一种 AI 模型,可以使用自然语言查询进行声音分离。这一创新性的模型由Audio-AGI开发,使用户能够通过简单的语言描述来分离各种声音源。 比如在嘈杂的人流车流中说话的录音中,可以分别提取干净的人声说话声音和嘈杂的人流车流噪声。…

咖啡事故,上海Manner咖啡店,1天两起店员和顾客发生冲突

上海咖啡店Manner,一天的时间竟然发生两起店员和员工发生肢体冲突: 事情详情: Manner威海路716店事件: 店员泼顾客咖啡粉,随后被辞退品牌方回应媒体,表示将严肃处理Manner梅花路门店事件:顾客因等待时间长抱怨&…

解锁PDF处理新境界:轻松调整字体,让你的文档焕然一新!

数字化时代,PDF文件已经成为我们日常办公和学习中不可或缺的一部分。它们为我们提供了方便的阅读体验,同时也保证了文档内容的完整性和格式的统一性。然而,有时候我们可能会遇到一个问题:如何轻松调整PDF文件中的字体,…

Linux内核学习——linux内核体系结构(1)

1 Linux内核模式 学习的是Linux 0.11内核,采用的是单内核模式。单内核模式的主要优点是内核代码结构紧凑、执行速度快,但是层次结构性不强。 操作系统如何提供的服务流程? 应用主程序使用指定的参数值执行系统调用指令(int x80)&#xff0…

如何恢复 Mac 数据?适用于 Mac 的免费磁盘恢复软件

对于大多数 Mac 电脑用户来说,丢失数据是他们最不想遇到的噩梦之一。然而,无论我们多么小心地使用 Mac,多么有条理地存储重要文件,我们仍然有可能丢失 Mac 上的数据。某些硬件故障更有可能导致您意外丢失文件。除此之外&#xff0…

Linux htop命令使用

文章目录 简介界面介绍第一行第二行第三行第四行 如何使用 简介 htop 是一个类似于 top 的命令,但具有更丰富的功能和更友好的界面。它可以实时显示系统中各个进程的资源占用情况,如 CPU 使用率、内存使用率等。以下是对 htop 命令的完全解析&#xff1…

echarts Y轴展示时间片段,series data数据 也是时间片段,鼠标放上去 提示框显示对应的时间片段

功能要求 1、折线图,展示每天对应的一个时间片段 2、echarts Y轴展示时间片段,如:[00:00,03:00,05:15] 3、X轴展示日期,如:[xx年xx月xx日] 后端返回的数据结构,如 [{xAdate:"2024-06-15",data:…

异步开发的终极答案—协程

我们在之前的文章中讲过,在并发场景下,传统的基于多线程的命令式开发模型虽然比较简单,但并发数高了之后资源占用较高,大量线程会阻塞;而响应式编程模式我们可以通过异步化处理提升系统资源的利用效率,但异步开发有违人的直觉,门槛比较高。作为成年人,我们肯定希望全都…