kafka精准一次、事务、幂等性

Kafka事务

消息中间件的消息保障的3个级别

  1. At most once 至多一次。数据丢失。
  2. At last once 至少一次。数据冗余
  3. Exactly one 精准一次。好!!!

如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。

:先消费消息、再提交位移。

如果提交位移这一步挂了,就会再消费一遍消息。重复消费====》〉》至少一次

当:先提交位移、再消费消息。

提议位移成功、消费消息失败,那么数据就丢失了====》〉》至多一次

如何精准一次呢?

幂等和事务!

幂等

对接口的多次调用所产生的结果和一次调用的结果是一样的。

即:(第一次调用,中途挂了,再次调用==一次调用) 为true

如何实现?

在v2版本的消息存储格式用有两个字段。produce_id(简称pid) 、first sequence

在这里插入图片描述

每个新的生产者实例在初始化的时候都会被分配一个pid,每个pid,消息发送到每一个分区都有序列号 sequence,序列号会从0开始递增,每发送一条消息,<PID,分区> 对应的序列号的值会➕1。这个序列号值(SN)在broker的内存中维护。只有当SN_new=SN_old+1.

broker才会接收这个消息。

如SN_new < SN_old+1 说明消息重复了,这个消息可以直接丢掉。

如SN_new>SN_old+1 说明消息丢失了,有数据还没有卸写入。抛乱序异常OutOforderSequenceException。

即用序列号来保证消息的顺序消费。

注意 所记录的这个序列号是针对 每一对<PID,分区> 所以这个幂等实现的是单会话、单分区的。

如何保证多个分区之间的幂等性呢?

事务

保证对多个分区写入操作的原子性,要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。

用户显示指定一个事务id: transactionalId。这个事务id是唯一的

从生产者角度来考虑,事务保证了生产者会话消息的幂等发送跨生产者会话的事务恢复.

  • 生产者会话消息的幂等发送:如有有两个相同事务id的生产者,新的创建了 旧的就会被kill
  • 某个生产者实例宕机了,新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断

实现过程,以consume-transform-produce为例。

package com.hzbank.yjj.transaction;import com.hzbank.yjj.producer.CustomerPartitioner;
import com.hzbank.yjj.producer.ProducerlnterceptorPrefix;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;public class TransactionConsumeTransformProduce {public static final String brokerList = "localhost:9092";public static Properties getConsumerProps(){Properties props =new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupId");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);return props;}public static Properties getProducerProps(){Properties props =new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId");return props;}public static void main(String[] args) {//初始化生产者和消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());consumer.subscribe(Collections.singletonList("topic-source"));KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());//初始化事务producer.initTransactions();while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if(!records.isEmpty()){HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();//开启事务producer.beginTransaction();try {for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println("获取到了topic-source发送过来的数据"+record.value());System.out.println("do some ");ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());producer.send(producerRecord);}// 获取最近一次的消费位移long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));}//提交消费位移producer.sendOffsetsToTransaction(offsets,"groupId");//提交事务producer.commitTransaction();} catch (ProducerFencedException e) {System.out.println("异常了");producer.abortTransaction();}}}}}

1. 找到TransactionCoordinator。

TransactionCoordinator负责分配和管理事务。
FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息

transactionalId 的哈希值计算主题_transaction_state 中的分区编号

根据分区leader副本找到所在的broker节点,极为Transaction Coordinator节点

2. 获取pid

通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。

String transactionalId; 事务id
int transactionTimeoutMs; 事务状态更新超时时间

3. 保存pid

TransactionCoordinator 第一次收到事务id会和对应pid保存下来,以消息(事务日志消息)的形式保存到主题_transaction_state中,实现持久化

InitProducerIdRequest还会出发一下任务:

- 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。
- 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务

4. 开启事务

通过 KafkaProduc町的 beginTransaction()方法。调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。

5. Consume-Transform-Produce

整个事务处理数据。

  • AddPartitionsToTxnRequest:让 TransactionCoordinator 将<transactionld, TopicPartition>的对应关系存储在主题

    transaction state 中

  • ProduceRequest:生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中

  • AddOffsetsToTxnRequest:TransactionCoordinator 收到这个AddOffsetsToTxnRequest请求,通过 groupId 来推导出在一consumer_offsets 中的分区

  • TxnOffsetCommitRequest:发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中

6. 提交或者终止事务

KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。

写不下去了,暂时就先理解这么多了,后面再多结合源码去看看。

参考:书籍《深入理解 Kafka:核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/169799.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java基准测试工具JMH的简介与使用

JMH是一套Java基准测试工具&#xff0c;用于对Java执行进行基准测试以及生成测试报告。平时应用于Java一些基础Api或者一些工具类这种离开网络因素的纯系统测试。 使用方式 maven引入&#xff1a; <dependency><groupId>org.openjdk.jmh</groupId><art…

2024北京理工大学计算机考研分析

24计算机考研|上岸指南 北京理工大学 计算机学院始建于1958年&#xff0c;是全国最早设立计算机专业的高校之一。2018年4月&#xff0c;计算机学院、软件学院、网络科学与技术研究院合并成立新的计算机学院。学院累计为国家培养各类人才15000余名。计算机科学学科ESI排名进入全…

Lombok新版超全面使用教程

一、Lombok介绍 Lombok是一个Java库&#xff0c;可以通过注解来简化Java类的编写&#xff0c;减少冗余的样板代码。它提供了一系列的注解&#xff0c;用于自动生成常见的代码&#xff0c;如getter和setter方法、构造函数、equals和hashCode方法、toString方法等。通过使用Lomb…

论文阅读——Prophet(cvpr2023)

一、Framework 这个模型分为两阶段&#xff1a;一是答案启发生成阶段&#xff08;answer heuristics generation stage&#xff09;&#xff0c;即在一个基于知识的VQA数据集上训练一个普通的VQA模型&#xff0c;产生两种类型的答案启发&#xff0c;答案候选列表和答案例子&am…

使用SpringBoot集成FastDFS

使用SpringBoot集成FastDFS 这篇文章我们介绍如何使用 Spring Boot 将文件上传到分布式文件系统 FastDFS 中。 1、FastDFS FastDFS是一个开源的轻量级分布式文件系统&#xff0c;它对文件进行管理&#xff0c;功能包括&#xff1a;文件存储、文件同步、文件访问 &#xff0…

vue2-006——使用脚手架搭建vue2项目+项目结构分析

一、创建项目&#xff1a;vue create 项目名 D:\EnyiWang\Documents\myStudy\vue>vue create vue_testVue CLI v5.0.8 ? Please pick a preset: Default ([Vue 2] babel, eslint)Vue CLI v5.0.8 ✨ Creating project in D:\EnyiWang\Documents\myStudy\vue\vue_test. &am…

设计测试用例的具体方法总结

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️白马沉河共歃誓&#xff0c;怒涛没城亦不悔 ☁️基于需求进行测试用例的设计 基…

[环境配置]vscode免密ssh的设置流程

测试环境&#xff1a; windows 11 ubuntu16.04 vmware 第一步&#xff1a;生成密钥 cmd打开输入&#xff1a;ssh-keygen -t rsa 一路回车后可以在C:\Users\用户名\.ssh路径看到id_rsa.pub&#xff0c;我们打开这个文件&#xff0c;用记事本打开即可&#xff0c;然后复制里…

“不得了·放飞杯” 2023年四川省健身健美锦标赛启动在成都隆重召开

“不得了放飞杯” 2023年四川省健身健美锦标赛启动在成都隆重召开 为了更好地推动四川省健身健美运动的普及和发展&#xff0c;结合《四川全民健身实施计划》的现状&#xff0c;适应新时代健身私教服务产业的发展需求&#xff0c;由中国健美协会指导&#xff0c;四川省健美健美…

BUUCTF [MRCTF2020]Ez_bypass 1

题目环境&#xff1a;F12查看源代码 I put something in F12 for you include flag.php; $flagMRCTF{xxxxxxxxxxxxxxxxxxxxxxxxx}; if(isset($_GET[gg])&&isset($_GET[id])) { $id$_GET[id]; $gg$_GET[gg]; if (md5($id) md5($gg) && $id ! $gg) { …

鸿蒙 ark ui 网络请求 我不允许你不会

前言&#xff1a; 最近有在学习这个鸿蒙的ark ui开发 因为鸿蒙不是发布了一个鸿蒙next的测试版本 明年会启动纯血鸿蒙应用 所以我就想提前给大家写一些博客文章 效果图 11-24 16:26:22.005 25156-25156/com.example.httpsrequest E A0ff00/HTTPS: 请求状态 --> 200, %{pub…

串口虚拟化工具

串口虚拟工具(Configure Virtual Serial Port Driver v7.2) 可以虚拟化串口 串口成对添加&#xff0c;添加之后可以在设备管理器中查看 链接&#xff1a;https://pan.baidu.com/s/1WE9c28MEoSEY7fGhy4kjag 提取码&#xff1a;yahn DebugTool-v.16 作用&#xff1a;可以检验…

区块链技术将如何影响未来的数字营销?

你是否听腻了区块链和数字营销等流行语&#xff0c;却不明白它们对未来意味着什么&#xff1f;那么&#xff0c;准备好系好安全带吧&#xff0c;因为区块链技术将彻底改变我们对数字营销的看法。从建立消费者信任到提高透明度和效率&#xff0c;其可能性是无限的。 让我们来探…

加速 Selenium 测试执行最佳实践

Selenium测试自动化的主要目的是加快测试过程。在大多数情况下&#xff0c;使用 Selenium 的自动化测试比手动测试执行得特别好。在实际自动化测试实践中&#xff0c;我们有很多方式可以加速Selenium用例的执行。 我们可以选择使用不同类型的等待、不同类型的 Web 定位器、不同…

[Docker]十一.Docker Swarm集群raft算法,Docker Swarm Web管理工具

一.Docker Swarm集群raft算法讲解 Raft &#xff1a;一致性算法&#xff0c;在保证大多数管理节点存活的情况下&#xff0c;集群才能使用&#xff0c; 所以就要求如果集群的话&#xff0c; manager 节点必须 >3 台 &#xff0c;如果是两个台&#xff0c;其中一台宕机&#…

彩纸屋在线少儿编程源码/scratch在线编程系统/培训管理系统源码/在线培训系统源码PHP

源码简介&#xff1a; 彩纸屋在线少儿编程源码&#xff0c;它是scratch在线编程系统&#xff0c;作为培训管理系统源码/在线培训系统源码&#xff0c;采用PHP源码。 彩纸屋是全国首家提供scratch开源定制和少儿编程培训管理系统源代码的服务商&#xff0c;彩纸屋提供的scratc…

一条Update语句的执行过程是怎样的?

先看第一个问题&#xff0c;这里做个简单描述 &#xff0c;因为我们着重还是看Update MySQL执行一条Select语句是怎么运行的&#xff1f; 这个问题大家在面试的时候大家都背过类似的题&#xff0c;而且网上也有很多答案&#xff0c;这里分享一个大致流程介绍&#xff0c;关于…

工业级 S25HS01GTDPBHV030 NOR闪存,L9305EP汽车级驱动器IC,LMK03318RHSR时钟发生器,PLL(中文资料)

一、工业级 S25HS01GTDPBHV030 Semper™ NOR闪存 S25HS01GT SEMPER™ NOR Flash闪存系列是英飞凌高性能、安全而可靠的 NOR Flash解决方案。 它集成了适用于汽车、工业、通信等广泛应用的关键安全功能。 凭借 SEMPER™ NOR Flash闪存&#xff0c;英飞凌推出了业界首款符合 ASI…

多线程04 线程安全问题以及一些简单的解决策略

前言 首先我们引入多线程是为了解决多次创建进程和销毁进程带来的巨大开销,线程可以共享内存和硬盘资源等等,这里我们就会想,他们共享这些东西会不会涉及到一些安全问题呢?他们没有独立分配自己的资源是一定会有安全问题的,但是就目前在这个快节奏的社会来说,效率的提升是必然…

Python编程之魂之运算符的优先级教程

文章目录 前言优先级概述相同优先级结合性运算符优先级一览表运算符优先级重点说明结语关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资…