源码解析flink的GenericWriteAheadSink为什么做不到精确一次输出

背景

GenericWriteAheadSink是可以用于几乎是精准一次输出的场景,为什么说是几乎精准一次呢?我们从源码的角度分析一下

GenericWriteAheadSink做不到精准一次输出的原因

首先我们看一下flink检查点完成后通知GenericWriteAheadSink开始进行分段的记录输出并提交事务的代码

pubblic void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);synchronized (pendingCheckpoints) {Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();while (pendingCheckpointIt.hasNext()) {PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();long pastCheckpointId = pendingCheckpoint.checkpointId;int subtaskId = pendingCheckpoint.subtaskId;long timestamp = pendingCheckpoint.timestamp;StreamStateHandle streamHandle = pendingCheckpoint.stateHandle;if (pastCheckpointId <= checkpointId) {try {if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {try (FSDataInputStream in = streamHandle.openInputStream()) {//开始把分段记录列表的记录进行输出boolean success =sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(new DataInputViewStreamWrapper(in),serializer),serializer),pastCheckpointId,timestamp);if (success) {//把分段记录列表输出成功后提交事务committer.commitCheckpoint(subtaskId, pastCheckpointId);streamHandle.discardState();pendingCheckpointIt.remove();}}} else {streamHandle.discardState();pendingCheckpointIt.remove();}} catch (Exception e) {// we have to break here to prevent a new (later) checkpoint// from being committed before this oneLOG.error("Could not commit checkpoint.", e);break;}}}}}

从上面的源码可以看出,sendValue方法和提交事务commitCheckpoint方法并不能保证原子性,这就意味着如果sendValue执行了一部分或者全部,而提交事务方法commitCheckpoint失败,那么此时这个检查点对应的事务相当于就没有完成,在下一个检查点的通知消息中,会把历史检查点的事务重新sendValue然后进行commit一次,这就意味着相同的记录会执行两次sendValue操作,这就是GenericWriteAheadSink不能保证精准一次的原因

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式:组合模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)

简介&#xff1a; 组合模式&#xff0c;它是一种用于处理树形结构、表示“部分-整体”层次结构的设计模式。它允许你将对象组合成树形结构&#xff0c;以表示部分和整体的关系。这种模式的主要目的是简化客户端代码&#xff0c;并使客户端以一致的方式处理单个对象和组合对象。…

CSS基本讲解与使用(详解)

什么是CSS: CSS&#xff08;Cascading Style Sheets&#xff0c;层叠样式表&#xff09;是一种用于定义网页元素外观和样式的标记语言。它是一种用于将结构化文档&#xff08;通常是HTML和XML&#xff09;的外观和排版从内容的标记中分离出来的技术。CSS的主要目标是将网页的呈…

iOS Flutter Engine源码调试和修改

iOS Flutter Engine源码调试和修改 1. 前提:2. 步骤&#xff1a;3. 参考资料 1. 前提: 已将成功安装deop_tools工具已经通过gclient命令同步好flutter engine源码 2. 步骤&#xff1a; 进入engine/src目录 创建flutter engine构建文件 真机文件debug模式&#xff1a; ./flu…

网络知识基础一

1.HTTP相应的结构是怎么样的? HTTP响应由三个部分组成: 1:状态码(Status Code):描述了响应的状态。可以用来检查是否成功的完成了请求。请求失败的情况下,状态码可用来找出失败的原因。如果Servlet没有返回状态码,默认会返回成功的状态码HttpServletResponse.SC_OK。 2:…

Flyway Desktop updated

Flyway Desktop updated 为比较工件序列化和反序列化添加了额外的调试日志记录。 Flyway Desktop现在将记住以前用于创建项目和匹配克隆的位置。 新的脱机许可工作流现在已在Microsoft Windows上启用。 现在&#xff0c;在配置目标数据库列表时&#xff0c;环境ID是可见的。 现…

【虹科干货】Redis Enterprise vs ElastiCache——如何选择缓存解决方案?

使用Redis 或 Amazon ElastiCache 来作为缓存加速已经是业界主流的解决方案&#xff0c;二者各有什么优势&#xff1f;又有哪些区别呢&#xff1f; 文况速览&#xff1a; - Redis 是什么&#xff1f; - Redis Enterprise 是什么&#xff1f; - Amazon ElastiCache 是什么&…

git如何将master分支合并到自己创建的分支

现在有一个master分支&#xff0c;还有一个自己创建的doc分支&#xff0c;现在想要把master分支合并到dac分支 首先&#xff0c;确保你已切换到doc分支。如果尚未切换&#xff0c;可以使用以下命令切换到doc分支&#xff1a; git checkout doc确保你的doc分支是最新的&#x…

postgres查看是否锁表并释放

postgres查看是否锁表 select oid from pg_database where datname ‘库名’ select oid from pg_class where relname‘表名’    #注意pg_class不是全局&#xff0c;需要切换到相应的库里去查询 记住2个oid的值 select pid from pg_locks where relation oid的值 s…

tomcat动静分离

1.七层代理动静分离 nginx代理服务器&#xff1a;192.168.233.61 代理又是静态 tomcat1:192.168.233.71 tomcat2:192.168.233.72 全部关闭防火墙 在http模块里面 tomcat1&#xff0c;2 删除上面的hostname 148 配置 直接访问 http://192.168.66.17/index.jsp 2.四层七层动…

常见面试题-Redis专栏(二)

theme: cyanosis typora-copy-images-to: imgsRedisson 分布式锁&#xff1f;在项目中哪里使用&#xff1f;多久会进行释放&#xff1f;如何加强一个分布式锁&#xff1f; 答&#xff1a; 首先入门级别的分布式锁是通过 setnx 进行实现&#xff0c;使用 setnx 实现有四个注意…

docker基本概念

docker概念 一、什么是docker&#xff1f;二、docker组件&#xff1a;三、我们能用docker做什么四、Docker与配置管理&#xff1a;五、Docker的技术组件六、Docker资源 引用《第一本docker书》内容&#xff0c;以下均为读书笔记 一、什么是docker&#xff1f; Docker是一个能把…

中文编程开发语言工具应用案例:ps5体验馆计时收费管理系统软件

中文编程开发语言工具应用案例&#xff1a;ps5体验馆计时收费管理系统软件 软件部分功能&#xff1a; 1、计时计费功能&#xff1a;只需点开始计时即可&#xff0c;时间直观显示 2、商品管理功能&#xff1a;可以管理饮料等商品 3、会员管理功能&#xff1a;支持只用手机号作…

Arcgis 数据操作

在进行数据操作的时候&#xff0c;需要注意坐标系要一致&#xff0c;这是前提。 数据类型 文件地理数据库&#xff1a;gbd 个人地理数据库&#xff1a;mdb &#xff08;Mircosoft Access&#xff09; 矢量数据&#xff1a;shp 推荐使用gbd数据&#xff0c;效率会更高。 采…

【912.排序数组】

目录 一、题目描述二、算法原理2.1快速排序2.2归并排序 三、代码实现3.1快排代码实现3.2归并代码实现 一、题目描述 二、算法原理 2.1快速排序 2.2归并排序 三、代码实现 3.1快排代码实现 class Solution { public:int getRandom(int left,int right,vector<int>&…

[翻译]理解Postgres的IOPS:为什么数据即使都在内存,IOPS也非常重要

理解Postgres的IOPS&#xff1a;为什么数据即使都在内存&#xff0c;IOPS也非常重要 磁盘IOPS&#xff08;每秒输入/输出操作数&#xff09;是衡量磁盘系统性能的关键指标。代表每秒可以执行的读写操作数量。对于严重依赖于磁盘访问的PG来说&#xff0c;了解和优化磁盘IOPS对实…

es : java 查询

1. POM 配置 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.6.2</version></dependency> 2. 建立ES集群连接 RestHighLevelClient cli…

Ubuntu系统下使用docker容器配置nginx并部署前端项目

1.下载 Nginx 镜像 命令 描述 docker pull nginx 下载最新版 Nginx 镜像 :2. 创建要挂载的宿主机目录 启动前需要先创建 Nginx 外部挂载的配置文件&#xff08; /home/nginx/conf/nginx.conf&#xff09; 之所以要先创建 , 是因为 Nginx 本身容器只存在 / etc/nginx 目录 ,…

Hive知识梳理(好文)

Hive是建立在 Hadoop 上的数据仓库基础构架。可以将SQL查询转换为MapReduce的job在Hadoop集群上执行。 元数据 Hive元数据信息存储在Hive MetaStore中&#xff0c;或者mysql中。 分隔符 Hive默认的分格符有三种&#xff0c;分别是&#xff08;Ctrl/A&#xff09;、&#xff0…

Spring 域对象共享数据

1、使用ServletAPI向request域对象共享数据 首页 <a th:href"{/testServletAPI}">测testServletAPI</a><br>success.html <!DOCTYPE html> <html lang"en" xmlns:th"http://www.thymeleaf.org"> <head>&…

iOS 13以下系统,使用iOS QQ 登录 SDK 崩溃问题

最近用iPhone 6p 系统&#xff1a;12.5.4 调用QQ三方登录&#xff0c;出现崩溃到初始化QQ SDK的位置 在询问了QQ官方客服后&#xff0c;得到了答复&#xff0c;可以放弃治疗了