源码解析FlinkKafkaConsumer支持周期性水位线发送

背景

当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

FlinkKafkaConsumer水位线发送

1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

        // if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}

2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

        public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}

3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

    @Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}        

4.对应算子任务组合当前任务消费的所有分区水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/106066.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

进阶JAVA篇- DateTimeFormatter 类与 Period 类、Duration类的常用API(八)

目录 1.0 DateTimeFormatter 类的说明 1.1 如何创建格式化器的对象呢&#xff1f; 1.2 DateTimeFormatter 类中的 format&#xff08;LocalDateTime ldt&#xff09; 实例方法 2.0 Period 类的说明 2.1 Period 类中的 between(localDate1,localDate2) 静态方法来创建对象。 3.…

京东优惠券怎么找?

京东优惠券怎么找&#xff1f; 1、手机安装「草柴」后&#xff0c;打开京东挑选要购买的商品&#xff1b; 2、挑选好京东商品后&#xff0c;点击右上角的「分享」&#xff0c;并点击「复制链接」&#xff1b; 3、将复制的京东商品链接&#xff0c;粘贴到草柴输入框&#xff0c…

antd pro form 数组套数组 form数组动态赋值 shouldUpdate 使用

antd form中数组套数组 form数组动态变化 动态赋值 需求如上&#xff0c;同时添加多个产品&#xff0c;同时每个产品可以增加多台设备&#xff0c;根据设备增加相应编号&#xff0c;所以存在数组套数组&#xff0c;根据数组值动态变化 使用的知识点 form.list form中的数组…

十六、代码校验(5)

本章概要 基准测试 微基准测试JMH 的引入 基准测试 我们应该忘掉微小的效率提升&#xff0c;说的就是这些 97% 的时间做的事&#xff1a;过早的优化是万恶之源。—— Donald Knuth 如果你发现自己正在过早优化的滑坡上&#xff0c;你可能浪费了几个月的时间(如果你雄心勃勃的…

Vitis导入自制IP导致无法构建Platform

怎么还有这种问题&#xff08; 解决Vitis导入自制IP导致无法构建Platform – TaterLi 个人博客 Vitis报错&#xff1a;fatal error: xxx.h: No such file or directory._ly2lj的博客-CSDN博客 在指定位置黏入以上代码即可&#xff1a; INCLUDEFILES$(wildcard *.h) LIBSOUR…

基于springboot养老院管理系统开题报告

一、项目简介 本项目是一款基于Spring Boot的养老院管理系统&#xff0c;旨在为养老院提供一个全方位的解决方案&#xff0c;包括老人信息管理、医疗服务管理、饮食管理、活动管理等模块。通过系统的数字化管理&#xff0c;将老人的生活照料和医学护理更加科学化、规范化&…

【AI视野·今日Robot 机器人论文速览 第五十四期】Fri, 13 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Fri, 13 Oct 2023 Totally 45 papers &#x1f449;上期速览✈更多精彩请移步主页 Interesting: &#x1f4da;AI与机器人安全, 从攻击界面、伦理法律和人机交互层面进行了论述。(from 密西西比大学) &#x1f4da;机器人与图机器学…

Centos Docker部署Redis集群三主三从

一、安装Docker yum install docker-engine 二、编辑节点配置文件 创建文件夹 cd /home # 节点一&#xff1a;6370端口 mkdir -p redis-cluster/redis-6370/conf mkdir -p redis-cluster/redis-6370/data # 节点二&#xff1a;6371端口 mkdir -p redis-cluster/redis-6371/co…

华为云云耀云服务器L实例评测|企业项目最佳实践之建议与总结(十二)

华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践系列&#xff1a; 华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践之云服务器介绍(一) 华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践之华为云介绍(二) 华为云云耀云服务器L实例评测&#xff5…

Java基础面试-接口和抽象类的区别

抽象类 详细描述 类和类之间具有共同特征&#xff0c;将这些共同特征抽取出来&#xff0c;形成的就是抽象类。如果一个类没有足够的信息来描述一个具体的对象&#xff0c;这个类就是抽象类。因为类本身就是不存在的&#xff0c;所以抽象类无法创建对象&#xff0c;也就是无法…

2021年12月 Python(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python编程&#xff08;1~6级&#xff09;全部真题・点这里 C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 第1题 执行以下程序 a[33,55,22,77] a.sort() for i in a:print(i)运行…

Nginx:反向代理(示意图+配置)

示意图&#xff1a; 反向代理 反向代理&#xff08;Reverse Proxy&#xff09;是代理服务器的一种&#xff0c;它代表服务器接收客户端的请求&#xff0c;并将这些请求转发到适当的服务器。当请求在后端服务器完成之后&#xff0c;反向代理搜集请求的响应并将其传输给客户端。…

MFC-对话框

目录 1、模态和非模态对话框&#xff1a; &#xff08;1&#xff09;、对话框的创建 &#xff08;2&#xff09;、更改默认的对话框名称 &#xff08;3&#xff09;、创建模态对话框 1&#xff09;、创建按钮跳转的界面 2&#xff09;、在跳转的窗口添加类 3&#xff0…

java_方法引用和构造器引用

文章目录 一、方法引用1.1、方法引用的理解1.2、格式1.3、举例 二、构造器引用2.1、格式2.2、例子2.3、数组引用 一、方法引用 1.1、方法引用的理解 方法引用&#xff0c;可以看做是基于lambda表达式的进一步刻画当需要提供一个函数式接口的实例时&#xff0c;可以使用lambda…

牛客周赛 Round 15_B

题目描述 对于一个小写字母而言&#xff0c;游游可以通过一次操作把这个字母变成相邻的字母。a和b邻&#xff0c;b和c相邻&#xff0c;以此类推。特殊的&#xff0c;a和z也是相邻的。可以认为&#xff0c;小写字母的相邻规则为一个环。 游游拿到了一个仅包含小写字母的字符串…

了解油封对汽车安全的影响?

油封也称为轴封或径向轴封&#xff0c;是车辆发动机、变速箱和其他各种机械系统中的重要部件。它们的主要功能是阻止重要发动机部件的液体(例如油或冷却剂)泄漏&#xff0c;同时防止污染物进入。这些看似简单的任务&#xff0c;但对汽车的安全性和可靠性有着深远的影响。 油封…

docker安装nessus

注册地址:https://zh-tw.tenable.com/products/nessus/nessus-essentials 临时邮箱:http://24mail.chacuo.net/ 帮助文档:https://docs.tenable.com/nessus/Content/DeployNessusDocker.htmdocker pull tenableofficial/nessusdocker run --name "my-nessus" -d -p 8…

vscode用密钥文件连接ssh:如果一直要输密码怎么办

commandshiftP&#xff1a;打开ssh配置文件 加上这么一段&#xff0c;host就是你给主机起的名字 对IdentityFile进行更改&#xff0c;改成相应的密钥文件 然后commandshiftP链接到主机就可以了 但是有时候它会让输入密码 这是由于你给这个IdentityFile的权限太多了&#xf…

npm 常用指令总结

1. 初始化包 一个存放了代码的文件夹,如果里面有 package.json 文件,则可以把这个文件夹称之为包。 npm init -y 注意: 由于包名不能有中文,不能有大写,不能和未来要下载的包重名. 所以我们快速初始化包时,我们的文件夹也不能违反前面说的规则.(因为默认会将文件夹的名称,作…

数据结构 - 2(顺序表10000字详解)

一&#xff1a;List 1.1 什么是List 在集合框架中&#xff0c;List是一个接口&#xff0c;继承自Collection。 Collection也是一个接口&#xff0c;该接口中规范了后序容器中常用的一些方法&#xff0c;具体如下所示&#xff1a; Iterable也是一个接口&#xff0c;Iterabl…