flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证

背景

TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失,连至少一次的语义都不能达到

TwoPhaseCommitSinkFunction注意事项

TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务,大概简化为一下步骤:
1 在收到检查点分隔符的时候,开启事务,并把记录都写到开启的事务中,
2. 开始进行状态的保存时,把检查点id对应的事务结束掉,做好准备提交的准备,并开启下一个事务

public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);//当前检查点对应的事务做好准备,比如进行stream.flush等,准备好提交事务preCommit(currentTransactionHolder.handle);// 把当前检查点id对应的事务添加到状态中pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);// 把当前检查点id对应的事务添加到状态中state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
  1. 收到检查点完成的通知notify方法,提交第二步中检查点id对应的事务,注意这一步不是每次flink在进行检查点的时候都会通知,这种情况下,某一次的notify方法就需要把前几次的事务一起进行提交了,另外,如果提交某个检查点的事务失败,那么应用会重启,并且在重启后的initSnapshot方法中再次进行事务提交,如果还是失败,这个过程一直持续
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交事务commit(pendingTransaction.handle);} catch (Throwable t) {//事务失败时记录异常,后面会把异常抛出导致应用重启if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);// 事务成功后移除当前的事务pendingTransactionIterator.remove();}if (firstError != null) {// 事务提交失败会抛出异常,导致job异常中止throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}

总结:

1。事务不能提交失败,如果失败会导致作业失败然后重新提交,如果最终没有成功提交,那么数据会丢失
2。数据库服务端的事务超时时间不能设置太短,不能仅仅大于检查点的间隔大小,原因是上面说的,flink有可能丢失检查点完成后的通知消息,所以服务端的事务超时时间要设置的足够大.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/115330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LongAdder为什么在高并发下保持良好性能?LongAdder源码详细分析

文章目录 一、LongAdder概述1、为什么用LongAdder2、LongAdder使用3、LongAdder继承关系图4、总述&#xff1a;LongAdder为什么这么快5、基本原理 二、Striped64源码分析1、Striped64重要概念2、Striped64常用变量或方法3、静态代码块初始化UNSAFE4、casBase方法5、casCellsBus…

如何利用验证链技术减少大型语言模型中的幻觉

一、前言 随着大型语言模型在自然语言处理领域取得了惊人的进步。相信深度使用过大模型产品的朋友都会发现一个问题&#xff0c;就是有时候在上下文内容比较多&#xff0c;对话比较长&#xff0c;或者是模型本身知识不了解的情况下与GPT模型对话&#xff0c;模型反馈出来的结果…

阿里云服务器续费流程_一篇文章搞定

阿里云服务器如何续费&#xff1f;续费流程来了&#xff0c;在云服务器ECS管理控制台选择续费实例、续费时长和续费优惠券&#xff0c;然后提交订单&#xff0c;分分钟即可完成阿里云服务器续费流程&#xff0c;阿里云服务器网aliyunfuwuqi.com分享阿里云服务器详细续费方法&am…

ios 实现PDF,Word,Excel等文档类型的读取与预览

文章目录 一、前言二、iCould相关配置三、功能实现3.1 UIDocumentPickerViewController 选取控制器3.2 读取文件3.3 文档预览3.3.1 下载并保存3.3.2 QLPreviewController预览文档四、总结一、前言 最近正在研发的项目有一个需求: 允许用户将iCloud中的文档上传,实现文件的流…

Java后端开发(九)-- idea(2022版)将commit(未push)的 本地仓库 的 多条commit记录 进行撤销

目录 1.多次 修改Test01类后,提交到本地仓库 。 2.多次重复 1 的步骤,多次commit成功后,在Git =》Log中会显示,commit记录

v-model和.sync区别

在vue2中提供了.sync修饰符&#xff0c;但是在vue3中不再支持.sync&#xff0c;取而代之的是v-model。 1.在vue2中v-model和.sync区别&#xff1a; 1.相同点&#xff1a;都是语法糖&#xff0c;都可以实现父子组件中的数据的双向通信。 ​ 区别在于往回传值的时候. sync 的 $…

微信扫一扫抽奖活动怎么做

在当今数字化时代&#xff0c;微信作为中国最大的社交媒体平台之一&#xff0c;拥有着庞大的用户群体和广泛的影响力。微信扫一扫抽奖活动作为一种创新的营销方式&#xff0c;可以利用微信的用户基础和社交属性&#xff0c;吸引更多的目标用户参与&#xff0c;提高品牌知名度和…

Win32 命名管道

命名管道简单封装 CNamedPipe.h #pragma once #include <string> #include <windows.h> #include <tchar.h>#pragma warning(disable:4200)class CNamedPipe { public:CNamedPipe();~CNamedPipe();CNamedPipe(const CNamedPipe& r) delete;CNamedPipe&…

鸿蒙状态栏设置

鸿蒙状态栏设置 基于鸿蒙 ArkTS API9&#xff0c;设置状态栏颜色&#xff0c;隐藏显示状态栏。 API参考文档 参考文档 新建项目打开之后发现状态栏是黑色的&#xff0c;页面颜色设置完了也不能影响状态栏颜色&#xff0c;如果是浅色背景&#xff0c;上边有个黑色的头&#…

众和策略:题材股什么意思?

题材股是股票商场上的一个术语&#xff0c;许多刚接触股票出资的人可能对它不太熟悉。那么&#xff0c;题材股什么意思呢&#xff1f;在本文中&#xff0c;咱们将从多个角度剖析这个问题&#xff0c;帮忙读者更好地了解。 一、什么是题材股 题材股是指某个工作或主题的股票集结…

机器学习笔记 - 深度学习中跳跃连接的直观解释

一、概述 如今人们利用深度学习做无数的应用。然而,为了理解在许多作品中看到的大量设计选择(例如跳过连接),了解一点反向传播机制至关重要。 如果你在 2014 年尝试训练神经网络,你肯定会观察到所谓的梯度消失问题。简单来说:你在屏幕后面检查网络的训练过程,你看到的只…

跨越单线程限制:Thread类的魅力,引领你进入Java并发编程的新纪元

线程的概述 线程是一个程序的多个执行路径&#xff0c;执行调度的单位&#xff0c;依托于进程存在。 线程不仅可以共享进程的内存&#xff0c;而且还拥有一个属于自己的内存空间&#xff0c;这段内存空间也叫做线程栈&#xff0c;是在建立线程时由系统分配的&#xff0c;主要用…

【C++】不是用new生成的对象调用析构函数

2023年10月23日&#xff0c;周一上午 #include <iostream>class Book{ private:int price; public:~Book(){std::cout<<"调用析构函数"<<std::endl; } };int main(){Book b1;b1.~Book(); } 从运行结果可以看出&#xff1a; 手动调用b1.~Book()时&…

Python笔记

Python基础 一、数据类型 类型值文本类型str数值类型int&#xff0c;float&#xff0c;complex序列类型list&#xff0c;tuple&#xff0c;range映射类型dict集合类型set&#xff0c;frozenset布尔类型bool二进制类型bytes&#xff0c;bytearray&#xff0c;memoryview 1.基…

Leetcode 第 364 场周赛题解

Leetcode 第 364 场周赛题解 Leetcode 第 364 场周赛题解题目1&#xff1a;2864. 最大二进制奇数思路代码复杂度分析 题目2&#xff1a;美丽塔 I思路代码复杂度分析 题目3&#xff1a;美丽塔 II思路代码复杂度分析 题目4&#xff1a;统计树中的合法路径数目思路代码复杂度分析 …

机器人系统 ROS 常用命令行工具

1. 启动ros 主节点 roscore roscore运行成功如图&#xff1a; 1.1 rosrun 启动服务节点 例子&#xff1a;启动一个小乌龟节点 rosrun turtlesim turtlesim_node运行结果如图&#xff1a; 1.2 启动键盘控制 打开新的命令窗口&#xff0c;启动turtle_teleop_key 节点 rosr…

单窗口单IP适合炉石传说游戏么?

游戏道具制作在炉石传说中是一个很有挑战的任务&#xff0c;但与此同时&#xff0c;它也是一个充满机遇的领域。在这篇文章中&#xff0c;我们将向您展示如何在炉石传说游戏中使用动态包机、多窗口IP工具和动态IP进行游戏道具制作。 作者与主题的关系&#xff1a;作为一名热爱炉…

JSX看着一篇足以入门

JSX 介绍 学习目标&#xff1a; 能够理解什么是 JSX&#xff0c;JSX 的底层是什么 概念&#xff1a; JSX 是 javaScriptXML(HTML) 的缩写&#xff0c;表示在 JS 代码中书写 HTML 结构 作用&#xff1a; 在 React 中创建 HTML 结构&#xff08;页面 UI 结构&#xff09; 优势&a…

监控易一体化运维:打造机房环境监控的卓越典范

随着信息技术的飞速发展&#xff0c;机房作为企业数据和业务的中心&#xff0c;其运行状态和管理的重要性日益凸显。为确保机房的稳定性和可靠性&#xff0c;越来越多的企业选择使用一体化运维管理软件来进行实时监控。在这方面&#xff0c;监控易品牌提供了一款全面而高效的机…

VM虚拟机 13.5 for Mac

VMware Fusion Pro for Mac是一款强大的虚拟机软件&#xff0c;可以在Mac操作系统中创建、运行和管理多个虚拟机&#xff0c;使用户可以在一台Mac电脑上同时运行多个操作系统和应用程序。 以下是VMware Fusion Pro for Mac的主要特点&#xff1a; 1. 支持多种操作系统&#xff…