增加并行度后,发现Flink窗口不会计算的问题。

文章目录

  • 前言
  • 一、现象
  • 二、结论
  • 三、解决


前言

窗口没有关闭计算的问题,一直困扰了很久,经过多次验证,确定了问题的根源。


一、现象

Flink使用了window,同时使用了watermark ,并且还设置了较高的并行度。生产是设置了300的并行度,并且接入了几十个topic ,这个地方划重点,后面会提到。结果就是,窗口没有关闭进行计算。于是我查阅的相关文档,得到的答案是因为配置的源并行度大于topic的分区数而导致。这个答案只能说很接近,而且我最开始也觉得很有道理。
解释一下watermark + window的原理

在这里插入图片描述
可以看到前面三个窗口里面都有数据,窗口触发计算的其中一个必要条件是最新的数据没过最低的水位线,就进行计算,认为不会再有乱序的数据进来了。但是从图中我们可以看到其中一个窗口一个数据都没有,就会导致拿不到所有窗口的最低水位线。因此也就无法触发计算。
为了验证这一法则
我在测试环境配置了一个并行度为10的程序,topic只有一个分区,启动任务的时候,我信誓旦旦地保证这不可能关闭窗口进行计算,然而,现实狠狠打了我一巴掌,窗口结果算出来了。虽然只是三言两语,实际上我做了很多尝试,只是其他的实验不重要,都是证明我是错的

于是通过比较的方法,想到和生产的情况不同就在于,生产消费了几十个topic,而我的测试只有一个topic,于是我再次坚信,问题一定就在这了。

我直接在idea进行测试
在这里插入图片描述

我配置了两个topic,并且在一开始只往第一个topic中写数据,而第二个topic不写数据

很好,跑了一整个中午,一次窗口聚合计算都没有。

此时进行最后一步验证,就是往第二个topic写数据。

我在这个时间往第二个topic发了数据

collectTime":1697693856606

在这里插入图片描述
为了让大家看清楚现象,我把日志和截图都给出来

2023-10-19 13:37:32.699 [Legacy Source Thread - Source: Custom Source -> Flat Map -> (Flat Map -> Flat Map -> Sink: Unnamed, Timestamps/Watermarks -> (Flat Map, Flat Map, Flat Map)) (10/16)#0] INFO  c.a.c.d.risk.domain.function.IndicatrixMapFunction - 【通过】滑动窗口前置数据处理
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:8
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:27
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:28
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:17
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:20

在这里插入图片描述

证明就是在这个时间节点上,窗口计算处理结果

二、结论

因此我就可以大胆地推断,是因为多个topic进行了数据消费,其中有个topic数据会进入窗口进行计算,但有的窗口又永远不会有数据进入计算,这就造成对应的窗口永远没有最低的watermark以致于窗口无法关闭并计算。

三、解决

既然问题找到了,那解决办法就随之而生

  • 1、如果可以不适用水印,直接关闭水印即可,只要消费的数据不会积压,并且要求没那么高的话,这个方法最简单
  • 2、减小并行度到能够使得每个窗口都有数据,减小并行度会让不同topic用同一个窗口,至于这个数量,那还得研究研究了
  • 3、把需要到窗口和不到窗口计算的数据进行分流
  • 4、也可以把源与后面算子之间采用rebalance的方式传递,这样就能够轮询的方式往下传递,使得每个window都会有数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/110565.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务负载均衡实践

概述 本文介绍微服务的服务调用和负载均衡,使用spring cloud的loadbalancer及openfeign两种技术来实现。 本文的操作是在微服务的初步使用的基础上进行。 环境说明 jdk1.8 maven3.6.3 mysql8 spring cloud2021.0.8 spring boot2.7.12 idea2022 步骤 改造Eu…

Windows下定时下载Linux服务器的数据库备份文件(pscp+bat脚本+定时任务)

下载传输软件pscp Download PuTTY: latest release (0.79) 创建bat执行脚本 echo 删除旧的备份文件 del D:\db_bk\*.dbecho 下载新的备份文件 D:\Programs\pscp -P 22 -pw youPassword youName192.168.1.1:/home/backup/test.db D:\db_bk\ 设置定时任务 1.使用任务计划程…

Halcon 中查看算子和函数的执行时间

1、在Halcol主窗口的底栏中的第一个图标显示算子或函数的执行时间,如下图: 2、在Halcon的菜单栏中选择【窗口】,在下拉框中选择【打开输出控制台】,进行查看算子或函数的执行时间,如下图:

音视频技术开发周刊 | 315

每周一期,纵览音视频技术领域的干货。 新闻投稿:contributelivevideostack.com。 OpenAI科学家最新演讲:GPT-4即将超越拐点,1000倍性能必定涌现! GPT-4参数规模扩大1000倍,如何实现?OpenAI科学家…

Spring核心扩展点BeanDefinitionRegistryPostProcessor源码分析

我们知道,只要在一个Java类上加上Component、Service、Controller等注解,就可以被加载到Spring容器中,除了以上方式,加了Bean和Import好像也可以将对象添加到Spring容器中,究竟Spring是如何实现这些功能的呢&#xff1…

Redis 主从复制,哨兵,集群——(1)主从复制篇

目录 1. Redis 主从复制是什么? 2. Redis 主动复制能干嘛? 2.1 读写分离 2.2 容灾恢复 2.3 数据备份 2.4 水平扩展支撑高并发 3. Redis 主从复制配置项 3.1 配从库不配主库 3.2 权限密码配置 3.3 基本操作命令 4. 案例演示 4.1 案例说明 4.…

tika解压遇到压缩炸弹如何继续解压

1.问题 项目中要对10层压缩的zip、7z等文件用tika解压遇到错误:tika zip bomb detected 也就是说tika认为这是个压缩炸弹。 “压缩炸弹”是一个压缩包文件的木马程序,通常只有几百KB,解压后会变成上百MB或者上GB庞然大物。把你本地磁盘占满…

迁移conda环境后,非root用户执行pip命令和jupyter命令报错/bad interpreter: Permission denied

移动conda环境,在移动的环境执行pip和jupyter 报错-bash: /data/home/用户名/anaconda3/envs/llm/bin/pip: /root/anaconda3/envs/llm/bin/python: bad interpreter: Permission denied 报错信息 一、原因 原因是当前的这个data/home/用户名/anaconda3/envs/环境名…

2022年亚太杯APMCM数学建模大赛A题结晶器熔剂熔融结晶过程序列图像特征提取及建模分析求解全过程文档及程序

2022年亚太杯APMCM数学建模大赛 A题 结晶器熔剂熔融结晶过程序列图像特征提取及建模分析 原题再现: 连铸过程中的保护渣使钢水弯液面隔热,防止钢水在连铸过程中再次氧化,控制传热,为铸坯提供润滑,并吸收非金属夹杂物…

37 WEB漏洞-反序列化之PHPJAVA全解(上)

目录 PHP反序列化演示案例:先搞一把PHP反序列化热身题稳住-无类问题-本地在撸一把CTF反序列化小真题压压惊-无类执行-实例最后顶一把网鼎杯2020青龙大真题舒服下-有类魔术方法触发-实例 https://www.cnblogs.com/zhengna/p/15661109.html 代码在线测试平台&#xff…

k8s-20 hpa控制器

hpa可通过metrics-server所提供pod的cpu 或者内存的负载情况,从而动态拉伸控制器的副本数,从而达到后端的自动弹缩 官网:https://kubernetes.io/zh-cn/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/ 上传镜像 压测 po…

IPV6 ND协议--源码解析【根源分析】

ND协议介绍 ND介绍请阅读上一篇文章:IPv6知识 - ND协议【一文通透】11.NDP协议分析与实践_router solicitation报文中不携带source link-layer address-CSDN博客 ND协议定义了5种ICMPv6报文类型,如下表所示: NS/NA报文主要用于地址解析RS/…

【数之道 08】走进“卷积神经网络“,了解图像识别背后的原理

卷积神经网络 CNN模型的架构Cnn 的流程第一步 提取图片特征提取特征的计算规则 第二步 最大池化第三步 扁平化处理第四步 数据条录入全连接隐藏层 b站视频 CNN模型的架构 图片由像素点组成,最终成像效果由背后像素的颜色数值所决定的 有这样的一个66的区域&#x…

Datawhale-新能源时间序列赛事学习笔记(1)

1.赛题描述 在电动汽车充电站运营管理中,准确预测充电站的电量需求对于提高充电站运营服务水平和优化区域电网供给能力非常关键。本次赛题旨在建立站点充电量预测模型,根据充电站的相关信息和历史电量数据,准确预测未来某段时间内充电站的充电…

Java基础20问(6-10)

6.Java接口和抽象类的区别? 不同点 1.接口在Java8之前不能写方法实现逻辑,Java8及以后的版本,可以用default关键字写方法的实现。 2.接口中方法都是public的,public可以省略,而抽象类没有这个限制。 3.接口用inter…

【MATLAB第79期】基于MATLAB的数据抽样合集(sobol、LHS、Halton、正交、随机函数)更新中

【MATLAB第79期】基于MATLAB的数据抽样合集(sobol、LHS、Halton、正交、随机函数)更新中 一、随机函数 1.指定区间随机生成数据(小数) [a b]区间随机数生成: Aa(b-a)rand(m,n) m:待生成矩阵A的行数 n: 待生成矩阵A…

物联网AI MicroPython传感器学习 之 AS608指纹识别模块

学物联网,来万物简单IoT物联网!! 一、产品简介 AS608指纹识别模块是一款高性能的光学指纹识别模块。它采用的是指纹识别芯片公司杭州晟元芯片技术有限公司生产的AS608指纹识别芯片。该芯片内置DSP运算单元,集成了指纹识别算法&am…

23.项目开发之量化交易抓取数据QuantTradeData(二)

后端业务:定时更新“A股日线行情”数据 需求说明 为了获取前一天的最新数据,我们需要每天晚上10点定时刷新daily股票列表基础信息,并将最新数据插入或更新到数据库中。 如果该内容是在当天交易日信息未更新前查询(15~16点之前&a…

《数据结构、算法与应用C++语言描述》使用C++语言实现数组循环队列

《数据结构、算法与应用C语言描述》使用C语言实现数组循环队列 定义 队列的定义 队列(queue)是一个线性表,其插入和删除操作分别在表的不同端进行。插入元素的那一端称为队尾(back或rear),删除元素的那一…

rabbitmq发送json格式 utf8编码数据

参考文章:Spring-Cloud RabbitMQ 用法 - 发送json对象 - 简书 生产者: 消费者: