源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/105320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab 图像均值滤波

目录 一、算法原理二、代码实现三、结果展示本文由CSDN点云侠翻译,放入付费专栏只为防不要脸的爬虫。专栏值钱的不是本文,切勿因本文而订阅。 一、算法原理 均值滤波是一种常用的线性滤波方法,用于平滑图像并减少噪声。它的实现过程如下: 确定滤波器的大小:选择一个固定的…

flutter dio 请求封装(空安全)

一、添加依赖 dio: ^5.3.2二、请求封装 class HttpHelper {static Dio? mDio;static BaseOptions? options;static HttpHelper? httpHelper;CancelToken cancelToken CancelToken();static const String GET get;static const String POST post;static const String PU…

js冒泡排序的几种写法?

冒泡排序是一种简单的排序算法&#xff0c;其基本思想是重复地遍历要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果他们的顺序错误就把他们交换过来。以下是几种常见的 JavaScript 实现方式&#xff1a; 使用基本的 for 循环实现冒泡排序&#xff1a; function bub…

P1443 马的遍历

#include <iostream> #include <queue> using namespace std; #define M 400 int arr[M 5][M 5]; typedef struct Node {int x, y; } Node; //将马能走的8个方向封装成一个二维数组 int dir[8][2] {{2, 1}, {2, -1}, {-2, 1}, {-2, -1},{1, 2}, {-1, 2}, {1, -2…

nginx的location的优先级和匹配方式

nginx的location的优先级和匹配方式 在http模块中有server&#xff0c;server模块中有location&#xff0c;location匹配的是uri 在一个server中&#xff0c;会有多个location&#xff0c;如何来确定匹配哪个location niginx的正则表达式 ^ 字符串的起始位置 $ 字符串的…

IDEA中查看整个项目代码行数

近期正在手撸Spring源码&#xff0c;想要看下自己写了多少行代码。记录下如何查看项目的代码行数&#xff0c;方便日后查阅

Hadoop3教程(六):HDFS中的DataNode

文章目录 &#xff08;63&#xff09;DataNode工作机制&#xff08;64&#xff09;数据完整性&#xff08;65&#xff09;掉线时限参数设置参考文献 &#xff08;63&#xff09;DataNode工作机制 DataNode内部存储了一个又一个Block&#xff0c;每个block由数据和数据元数据组…

【云计算】相关解决方案介绍

文章目录 1.1 云服务环境 Eucalyptus1.1.1 介绍1.1.2 开源协议及语言1.1.3 官方网站 1.2 开源云计算平台 abiCloud1.2.1 开源协议及语言1.2.2 官方网站 1.3 分布式文件系统 Hadoop1.3.1 开源协议及语言1.3.2 官方网站 1.4 JBoss云计算项目集 StormGrind1.4.1 开源协议及语言1.4…

【数据库——MySQL(实战项目1)】(3)图书借阅系统——存储函数

目录 1. 简述2. 功能代码2.1 创建存储函数&#xff0c;根据图书编号查借阅人姓名&#xff0c;并调用该函数查询‘ **小邓在森林** ’已借未还的图书情况&#xff1b;2.2 创建存储函数&#xff0c;计算某借阅人还能借阅的图书数目&#xff0c;学生限额 5 本&#xff0c;教师限额…

Vue3 + Nodejs 实战 ,文件上传项目--实现文件批量上传(显示实时上传进度)

目录 技术栈 1.后端接口实现 2.前端实现 2.1 实现静态结构 2.2 整合上传文件的数据 2.3 实现一键上传文件 2.4 取消上传 博客主页&#xff1a;専心_前端,javascript,mysql-CSDN博客 系列专栏&#xff1a;vue3nodejs 实战--文件上传 前端代码仓库&#xff1a;jiangjunjie…

树莓派部署.net core网站程序

1、发布你的项目 使用mobaxterm上传程序 回到mobaxterm,f进入目录输入&#xff1a; cd webpublish 运行程序&#xff1a;dotnet WebApplication1.dll 访问地址为&#xff1a;http://localhost:5000,尝访问如下&#xff1a; 已经出现 返回的json&#xff0c;证明是可以访问的…

C#里氏替换

在C#中&#xff0c;里氏替换原则是面向对象编程中的一个重要原则&#xff0c;它是关于继承和多态性的概念。 里氏替换原则的定义是&#xff1a;如果S是T的子类型&#xff08;或者T是S的基类型&#xff09;&#xff0c;那么程序中任意使用T类型的地方都可以替换为S类型而不会产…

MyBatis的缓存,一级缓存,二级缓存

10、MyBatis的缓存 10.1、MyBatis的一级缓存 一级缓存是SqlSession级别的&#xff0c;通过同一个SqlSession对象 查询的结果数据会被缓存&#xff0c;下次执行相同的查询语句&#xff0c;就 会从缓存中&#xff08;缓存在内存里&#xff09;直接获取&#xff0c;不会重新访问…

软件架构设计(业务架构、应用架构、数据架构、技术架构)

一、架构相关概念 1、系统 系统&#xff1a;由一群有关联的个体组成&#xff0c;根据某种规则运作&#xff0c;能完成个别原件不能独立完成的工作的群体。大的系统可以嵌套小系统&#xff0c;被嵌套的小系统往往称为大系统的子系统。 2、模块 模块是从逻辑上将系统分解&#…

Java也能做OCR!SpringBoot 整合 Tess4J 实现图片文字识别

前言 今天给大家分享一个SpringBoot整合Tess4j库实现图片文字识别的小案例&#xff0c;希望xdm喜欢。 文末有案例代码的Git地址&#xff0c;可以自己下载了去玩玩儿或继续扩展也行。 话不多说&#xff0c;开整吧。 什么是Tess4j库 先简单给没听过的xdm解释下&#xff0c;这里要…

风储联合系统的仿真模型研究

摘要 风能是目前国内外应用较为广泛的一种绿色可再生能源&#xff0c;近几年我国风电产业的发展十分迅速。然后&#xff0c;越来越多的风力发电系统建并网&#xff0c;风力发电产生的电能受外界因素影响较大&#xff0c;具有一定的随机性和波动性&#xff0c;给并网后的电力系统…

铜死亡+机器学习+WGCNA+分型生信思路

今天给同学们分享一篇单基因泛癌免疫实验生信文章“IGF2BP3 overexpression predicts poor prognosis and correlates with immune infiltration in bladder cancer”&#xff0c;这篇文章于2023年2月3日发表在BMC Cancer期刊上&#xff0c;影响因子为3.8。 膀胱癌是全球最常见…

【环境】Ubuntu20.04 安装 Anaconda 顺顺利利

ubuntu里面安装的Anaconda也是顺顺利利 别忘了source source一下 参考链接 中间遇到了一个问题&#xff0c;用下面的链接轻松解决了 关于修改anaconda安装路径的问题

Stable Diffusion 动画SD-Animatediff V2

AI不仅可以生成令人惊叹的图片,还能给这些图片注入生命,让它们动起来。 这就是AnimateDiff要做的事情,一个神奇的工具,能将静态的AI生成图像转换成动画。 本次介绍基于SD如何实现这个神奇的方法。 文章目录 插件安装使用方法参数调整文生动图/视频Controlnet方法SD API方…

Java设计模式:Callback

介绍 回调&#xff08;Callback&#xff09;是一种设计模式&#xff0c;在这种模式中&#xff0c;一个可执行的代码被作为参数传递给其他代码&#xff0c;接收方的代码可以在适当的时候调用它。 在真实世界的例子中&#xff0c;当我们需要在任务完成时被通知时&#xff0c;我…