Flink的基于两阶段提交协议的事务数据汇实现

背景

在flink中可以通过使用事务性数据汇实现精准一次的保证,本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇.

flink kafka事务性数据汇的实现

1。首先在开始进行快照的时候也就是收到checkpoint通知的时候,在snapshot方法中会开启一个新的事务,代码如下:

   public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);preCommit(currentTransactionHolder.handle);// 调用kafkaProducer.flush();清理上一个事务的状态(注意不是提交),只是确保前一个事务的所有资源清理完毕pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
// 调用producer.beginTransaction();方法开启一个新的kafka事务currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}

2.其次在JobManager通知检查点完成的通知方法,也就是notifyCheckpointComplete方法中提交事务

Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {//调用producer.commitTransaction()方法提交事务commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError != null) {throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}

至此,一个两阶段提交的flink事务性数据汇完成了,这个事务性数据汇可以构成端到端一致性的一部分

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/106700.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

15.项目讲解之前端页面的实现

项目讲解之前端页面的实现 本项目前端使用HBuilerX软件编写HBuilderX下载安装配置一键直达&#xff0c; uniapp框架uniapp官网&#xff0c; 使用Element-ui组件Element-ui组件网址进行前端页面的完成。 前端项目下载地址 前端项目 前端项目展示 首页 首页展示 echarts实现…

简单的数学运算如何改变算法

简单的数学运算如何影响事物 当你坐在无人驾驶汽车上行驶时&#xff0c;突然发现前面有一个问题。一个亚马逊快递司机将他们的货车开到了一辆双停的UPS卡车旁边&#xff0c;然后才意识到无法通过。现在他们卡住了&#xff0c;你也卡住了。 街道太窄&#xff0c;无法实现U型转弯…

CCF CSP认证 历年题目自练Day32

题目一 试题编号&#xff1a; 202209-1 试题名称&#xff1a; 如此编码 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 512.0MB 问题描述&#xff1a; 题目背景 某次测验后&#xff0c;顿顿老师在黑板上留下了一串数字 23333 便飘然而去。凝望着这个神秘数字&#xff0c;小…

C# Convert和BitConverter类学习

前言&#xff1a; C# Convert是一个比较好用的强制转换&#xff0c;相比我们之前用的(int)或者是类型.Parse()&#xff0c;Convert给我们提供了很多的选项&#xff0c;特别是对于有字节要求的变量&#xff0c;Convert简直就是C#编程的福音&#xff0c;BitConvert对于byte数组转…

linux下文件存储系统(inode/目录项/硬链接)

概念&#xff1a; 关键点&#xff1a; &#xff08;1&#xff09;inode 也叫做文件属性管理结构体 &#xff08;2&#xff09;目录项里面存两个东西 文件名和 inode号。通过inode号可以找到磁盘上的文件。 &#xff08;3&#xff09;给文件创建硬链接的时候&#xff0c;两个…

中国矿业大学-JAVA期末备考

JAVA里面&#xff0c;“”和“equals"的区别是什么呢&#xff1f; 1.""操作符用于比较两个对象的引用是否相等。也就是说&#xff0c;它会检查两个对象是否指向内存中的同一个地址。如果两个对象的引用完全相同&#xff0c;则""返回true&#xff1b;否…

uniapp 小程序实现图片宽度100%、高度自适应的效果

因为image组件默认是有宽度跟高度的&#xff0c;所以这个高度不怎么好写 通过load事件来控制图片的高度 话不多说&#xff0c;直接上代码&#xff0c; <image class"img" src"/static/image.png" :style"{ height: imgHeight px }"mode&q…

CentOS 7 服务器上创建新用户及设置用户密码有效期

一、创建用户 1、以 root 用户身份登录到 CentOS 服务器 2、运行以下命令以创建新用户&#xff1a; useradd -m -s /bin/bash username其中&#xff0c;username 是您要创建的新用户的用户名。该命令将创建一个新用户并为其分配一个主目录。3、运行以下命令以设置新用户的密码…

Frame Buffer设备驱动 (ili9488 3.5寸tft屏)

Frame Buffer设备驱动 Frame Buffer设备ili9488介绍驱动编写代码编写ili9488.c设备树修改测试ili9488代码分析 LCD资料下载 Frame Buffer设备 在早期的输出显示设备中&#xff0c;大部分为CRT显示器&#xff0c;随着技术的不断发展&#xff0c;现在大部分使用的是液晶显示器。这…

MySQL视图、用户管理和C语言链接

文章目录 1. 视图1.1 基本使用 2. 用户管理2.1 用户信息2.2 创建用户2.3 修改用户密码2.4 删除用户 3. 数据库的权限3.1 给用户授权3.2 回收权限 4. mysql connect4.1 Connector/C 使用4.2 mysql接口介绍 1. 视图 视图是一个虚拟表&#xff0c;其内容由查询定义。同真实的表一…

百度SEO优化的特点(方式及排名诀窍详解)

百度SEO优化的特点介绍&#xff1a; 百度SEO优化是指对网站进行优化&#xff0c;使其在百度搜索引擎中获得更好的排名&#xff0c;进而获取更多的流量和用户。百度SEO优化的特点是综合性强、效果持久、成本低廉、投资回报高。百度的搜索算法不断更新&#xff0c;所以长期稳定的…

开源任务调度框架

本文主要介绍一下任务调度框架Flowjob的整体结构&#xff0c;以及整体的心路历程。 功能介绍 flowjob主要用于搭建统一的任务调度平台&#xff0c;方便各个业务方进行接入使用。 项目在设计的时候&#xff0c;考虑了扩展性、稳定性、伸缩性等相关问题&#xff0c;可以作为公司…

YOLOv5网络结构图

网络结构图&#xff08;简易版和详细版&#xff09; 网络框架介绍 前言&#xff1a; YOLOv5是一种基于轻量级卷积神经网络&#xff08;CNN&#xff09;的目标检测算法&#xff0c;整体可以分为三个部分&#xff0c; backbone&#xff0c;neck&#xff0c;head。 如上图所示…

C++——多态

多态是有继承关系的类对象调用相同的函数&#xff0c;会有不同的结果。例如&#xff0c;普通人买高铁票不打折&#xff0c;学生打75折&#xff0c;儿童免费&#xff0c;这种情况就适合使用多态 虚函数 被virtual修饰的类成员函数 class A{ public:virtual void fun() {} }; …

【深度学习】深度学习实验四——循环神经网络(RNN)、dataloader、长短期记忆网络(LSTM)、门控循环单元(GRU)、超参数对比

一、实验内容 实验内容包含要进行什么实验,实验的目的是什么,实验用到的算法及其原理的简单介绍。 1.1 循环神经网络 (1)理解序列数据处理方法,补全面向对象编程中的缺失代码,并使用torch自带数据工具将数据封装为dataloader。 (2)分别采用手动方式以及调用接口方式…

Redis--List、Set、Zset、Hash、Bitmaps、HyperLogLog、Geospatial

List LPUSH key value1 [value2] 将一个或多个值插入到列表头部 127.0.0.1:6379> LPUSH myls1 1 (integer) 1 127.0.0.1:6379> LPUSH myls1 2 (integer) 2 127.0.0.1:6379> LRANGE myls1 0 -1 1) "2" 2) "1" LPOP key 移出并获取列表的第一个元素…

【排序算法】详解直接插入排序和希尔排序原理及其性能分析

文章目录 插入排序算法原理细节分析代码实现复杂度分析:稳定性分析:与冒泡排序的对比 希尔排序算法原理细节分析代码实现复杂度分析稳定性分析 总结对比 插入排序 算法原理 插入排序又或者说直接插入排序,是一种和冒泡排序类似的并且比较简单的排序方法&#xff0c; 基本思想…

3、Linux下安装

以下操作仅限于rh系列:支持rpm/yum安装方式&#xff0c;不支持deb/apt安装方式。 以下操作仅限于rh系列&#xff1a;支持rpm/yum安装方式&#xff0c;不支持 deb/apt安装方式。 1、在线下载安装包&#xff1a; wget https://downloads.mysql.com/archives/get/p/23/file/ m…

打造个人专属形象!工业级人物写真生成工具FaceChain开源

简介 FaceChain 是一个可以用来打造个人数字形象的深度学习模型工具。用户仅需要提供最低一张照片即可获得独属于自己的个人形象数字替身。FaceChain 支持在 gradio 的界面中使用模型训练和推理能力&#xff0c;也支持资深开发者使用 python 脚本进行训练推理。 Github链接&…

长短期记忆网络(LSTM)

一. 什么是LSTM Long Short Term Memory&#xff08;LSTM&#xff0c;长短期记忆&#xff09;是一种特殊的递归神经网络。这种网络与一般的前馈神经网络不同&#xff0c;LSTM可以利用时间序列对输入进行分析。 简而言之&#xff0c;当使用前馈神经网络时&#xff0c;神经网络会…