Flink和Kafka连接时的精确一次保证

Flink写入Kafka两阶段提交

端到端的 exactly-once(精准一次)

kafka -> Flink -> kafka

1)输入端

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

2)Flink内部

Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

3)输出端

两阶段提交(2PC)

写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

必须的配置

1)必须启用检查点

2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

4)事务超时配置

【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

代码实战

kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.写出到 Kafka精准一次 写入 Kafka,需要满足以下条件,【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/144721.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s-集群升级 2

在每个集群节点都安装部署cir-docker 配置cri-docker 升级master节点 导入镜像到本地并将其上传到仓库 修改节点套接字 升级kubelet 注&#xff1a;先腾空后进行升级&#xff0c;顺序不能搞反&#xff0c;否则会导致严重问题 配置kubelet使用cri-docker 解除节点保护 升级wor…

水库大坝安全监测预警系统的重要作用

水库大坝建造在地质构造复杂、岩土特性不均匀的地基上&#xff0c;在各种荷载的作用和自然因素的影响下&#xff0c;其工作性态和安全状况随时都在变化。如果出现异常&#xff0c;又不被及时发现&#xff0c;其后果不堪设想。全天候实时监测&#xff0c;实时掌握水库水位、雨情…

postman连接数据库

参考&#xff1a;https://blog.csdn.net/qq_45572452/article/details/126620210 1、安装node.js 2、配置环境变量 3、安装xmysql连接数据库cmd窗口输入"npm install -g xmysql"后回车cmd窗口输入"xmysql"后回车,验证xmysql是否安装成功(下图代表安装成功)…

【ATTCK】ATTCK视角下的水坑钓鱼攻防战法

在网络安全领域&#xff0c;ATT&CK已经成为了研究和理解恶意攻击者行为的重要工具。站在攻击者的视角&#xff0c;ATT&CK为我们描绘了他们在攻击过程中所使用的各种战术、技术和常见知识。本文将结合ATT&CK框架&#xff0c;对水坑钓鱼攻击进行深入分析&#xff0c;…

【C++面向对象】13. 接口 / 抽象类*

文章目录 【 1. 抽象类 】1.1 抽象类的定义1.2 抽象类的应用条件1.3 实例 【 2. 设计策略 】 接口描述了类的行为和功能&#xff0c;而不需要完成类的特定实现。C 接口是使用 抽象类&#xff08;abstract base class&#xff0c;也称为ABC&#xff09; 来实现的。 【 1. 抽象类…

phalcon 访问IndexController 中只能访问indexAction方法,访问不了testAction等其它问题的解决办法

phalcon 访问IndexController 中只能访问indexAction方法&#xff0c;访问不了testAction&#xff0c;也访问不了indexAction方法&#xff0c;但是可以访问ArticleController里面的任意方法。访问其它方法出现这个错误“php - phalcon IndexController handler class cannot be…

配置开启Docker2375远程连接与解决Docker未授权访问漏洞

一、配置开启Docker远程连接 首先需要安装docker,参考我这篇文章&#xff1a;基于CentOS7安装配置docker与docker-compose 配置开启Docker远程连接的步骤&#xff1a; //1-编辑/usr/lib/systemd/system/docker.service 文件 vim /usr/lib/systemd/system/docker.service //2…

2023鸿蒙预定未来,环境搭建学习

鸿蒙开发基础知识 鸿蒙的基本概念和特点 鸿蒙&#xff08;HarmonyOS&#xff09;是华为公司开发的一款全场景分布式操作系统。它的设计目标是为各种设备提供统一的、无缝的用户体验。鸿蒙的核心特点包括以下几个方面&#xff1a; 分布式架构&#xff1a;鸿蒙采用分布式架构&…

LCD1602指定位置显示字符串-详细版

本文为博主 日月同辉&#xff0c;与我共生&#xff0c;csdn原创首发。希望看完后能对你有所帮助&#xff0c;不足之处请指正&#xff01;一起交流学习&#xff0c;共同进步&#xff01; > 发布人&#xff1a;日月同辉,与我共生_单片机-CSDN博客 > 欢迎你为独创博主日月同…

Django模板层

模板之变量 所有的数据类型都可以在模板中使用 render(request, index.html, context{}) render(request, index.html, contextlocals()) """在模板中使用变量的时候&#xff0c;用的是字典的key值&#xff0c;key值value值一般保持一致"""详细…

DigitalVirt 洛杉矶 CMIN2 VPS 测评

发布于 2023-07-16 在 https://chenhaotian.top/vps/digitalvirt-us-cmin2/ 官网链接&#xff08;含AFF&#xff09;&#xff1a;https://digitalvirt.com/aff.php?aff459 美国西海岸 四网回程 CMIN2 移动新线路。 晚高峰延迟 165ms 左右&#xff0c;不丢包&#xff0c;非常…

Linux安装RabbitMQ详细教程

一、下载安装包 下载erlang-21.3-1.el7.x86_64.rpm、rabbitmq-server-3.8.8-1.el7.noarch.rpm 二、安装过程 1、解压erlang-21.3-1.el7.x86_64.rpm rpm -ivh erlang-21.3-1.el7.x86_64.rpm2、安装erlang yum install -y erlang3、查看erlang版本号 erl -v4、安装socat …

【1567.乘积为正数的最长子数组长度】

目录 一、题目描述二、算法原理三、代码实现 一、题目描述 二、算法原理 三、代码实现 class Solution { public:int getMaxLen(vector<int>& nums) {int nnums.size();vector<int> f(n);vector<int> g(n);f[0]nums[0]>0?1:0;g[0]nums[0]<0?1:0…

vue 使用 this.$router.push 传参数,接参数的 query或params 两种方法示例

背景&#xff1a;vue项目 使用this.$router.push进行路由跳转时&#xff0c;可以通过query或params参数传递和接收参数。 通过query参数传递参数&#xff1a; // 传递参数 this.$router.push({path: /target,query: {id: 1,name: John} }); // 接收参数 this.$route.query.id …

初学者向导:Sketch设计软件自学教程大全

Sketch软件是Mac平台上流行的矢量图形编辑软件&#xff0c;旨在帮助用户创建各种设计原型&#xff0c;如网站、移动应用程序、图标等。Sketch软件的设计风格简单明了&#xff0c;界面操作简单易用&#xff0c;非常适合UI/UX设计师、平面设计师等数字创意人员。本文作为软件自学…

技巧篇:在Pycharm中配置集成Git

一、在Pycharm中配置集成Git 我们使用git需要先安装git工具&#xff0c;这里给出下载地址&#xff0c;下载后一路直接安装即可&#xff1a; https://git-for-windows.github.io/ 0. git中的一些常用词释义 Repository name&#xff1a; 仓库名称 Description(可选)&#xff1a;…

基于 React 的 HT for Web ,由厦门图扑团队开发和维护 - 用于 2D/3D 图形渲染和交互

本心、输入输出、结果 文章目录 基于 React 的 HT for Web &#xff0c;由厦门图扑团队开发和维护 - 用于 2D/3D 图形渲染和交互前言什么是 HT for WebHT for Web 的特点如何使用 HT for Web相关链接弘扬爱国精神 基于 React 的 HT for Web &#xff0c;由厦门图扑团队开发和维…

这款开源神器,让聚类算法从此变得简单易用

Scikit-Learn 以其提供的多个经过验证的聚类算法而著称。尽管如此&#xff0c;其中大多数都是参数化的&#xff0c;并需要设置集群的数量&#xff0c;这是聚类中最大的挑战之一。 通常&#xff0c;使用迭代方法来决定数据的最佳聚类数量&#xff0c;这意味着你需要多次进行聚类…

Netty入门指南之NIO Selector监管

作者简介&#xff1a;☕️大家好&#xff0c;我是Aomsir&#xff0c;一个爱折腾的开发者&#xff01; 个人主页&#xff1a;Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客 当前专栏&#xff1a;Netty应用专栏_Aomsir的博客-CSDN博客 文章目录 参考文献前言问题解…

深入解析Nacos:服务发现、配置管理与更多特性解析

&#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 深入解析Nacos&#xff1a;服务发现、配置管理与更多特性解析 前言第一&#xff1a;Nacos初识配置中心对比 第二&#xff1a;Nacos关键特性服务注册与发现1. 自动注册2. 动态…