kafka学习笔记--如何保证生产者数据可靠、不重复、有序

本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)


PS:本节内容尚硅谷的视频讲的不太友好,又查了很多资料才搞明白

文章目录

  • 数据可靠性
  • 数据不重复
  • 数据有序性

数据可靠性

首先数据的可靠性指的是:

  • 消息不会意外丢失
  • 消息不会重复传递

那回顾我们的数据发送流程,在确认数据发送成功的这一步,也就是ack应答这里,不同的参数对应着不同的策略,如果选择了0和1,则存在丢数的问题,如图:
0: 如果数据发送到某个主题的leader时,leader所在节点挂了,那么这条消息就丢失了
在这里插入图片描述
1: 同理,leader收到了,还没应答时挂了,也会丢数据
在这里插入图片描述
-1(all): 使用-1能保证数据落配盘后才回答,保证数据不丢失
在这里插入图片描述
但是,如果Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

这就引出了ISR队列的概念了

ISR,是一个机制,也代表着一个同步合集,是由Leader维护的一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的


那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?


主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

我们将ISR与ACK应答结合起来使用,就形成了数据可靠条件

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据不重复

上面讲解的,只能保证数据可靠,但是这又引出了一个新的问题
如果,leader在同步完成之后,向生产者回答时,挂掉了,这时候剩下的备份分区会自动选举出一个新leader出来,但是生产者并不知道它挂掉了,只会以为是消息发送失败了,触发重试,又将数据发送了一遍,然后新的leader就又接受了一遍消息,然后在备份分区上再存一遍。这就导致了这条消息存在两份,产生数据重复问题。
在这里插入图片描述
那么kafka是怎么保证数据不重复的呢?
其实这就是数据的幂等性问题了,幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

kafka默认启用数据幂等性,即设置 enable.idempotence = true

在生产者发消息时,这条消息是有它自己的属性的,其中有三个数据被拿来作为数据的主键,kafka会以此来判断这条消息是否重复,若重复,则只保留一条

PID:又叫生产者编号(producerid), Producer在初始化的时候(只有初始化的时候会随机生成PID,也就是重启就会再次生成)会被分配一个PID

Partition:又叫分区编号,即这条消息要发往的分区的paritionid

SeqNumber:又叫序列号,发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息)

这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条,这样就保证了数据的唯一,不重复。

但是幂等性只能保证的是在单分区单会话内不重复,如果发消息时生产者挂掉了,重启后它不知道是否发送成功了,又将这个消息再发送一遍,此时它的PID发生变化,那么这条消息就被认为是一条新的消息,导致重复存储,这种情况怎么解决呢?

这就要引入kafka的事务机制了,事务这个东西大家都知道啥意思,不再重复解释

我们通过事务,让客户端挂掉后继续处理,而不是重新从头来过,保证消息的仅一次发送

注意:开启事务,必须开启幂等性。

请添加图片描述

kafka使用事务,有5个API

// 初始化事务
void initializeTransactions ();// 开启事务
void beginTransaction () throws ProducerFencedException;// 在事务中提交已消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;// 提交事务
void commitTransaction () throws ProducerFencedException;// 放弃事务(类似于回滚事务的操作)
void abortTransaction () throws ProducerFencedException;

举个例子:

package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Test {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put("bootstrap.servers", "hadoop102:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());properties.put("transactional.id", "transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息// 发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));}// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}

数据有序性

如果某主题TOPIC只有一个分区,那么它天生有序,因为分区其实就是一个有序队列

如果是多分区的,kafka是通过滑动窗口的思想解决这个问题的

我们知道kafka发送请求时,最多缓存5个,其实在发送时,每个请求都有自己的单调递增编号,kafka broker在接收数据时,会自动按照编号将数据排序,并且如果其中一个编号的请求失败时,后续再次成功,数据过来后,会自动的根据编号插入到应该在的位置上
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/211848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

掌握PyTorch数据预处理(一):让模型表现更上一层楼!!!

引言 在PyTorch中&#xff0c;数据预处理是模型训练过程中不可或缺的一环。通过精心优化数据&#xff0c;我们能够确保模型在训练时能够更高效地学习&#xff0c;从而在实际应用中达到更好的性能。今天&#xff0c;我们将深入探讨一些常用的PyTorch数据预处理技巧&#xff0c;…

Linux系统编程:进程间通信总结

管道 在Linux中&#xff0c;管道是一种进程间通信方式&#xff0c;它允许一个进程&#xff08;写入端&#xff09;将其输出直接连接到另一个进程&#xff08;读取端&#xff09;的输入。从本质上说&#xff0c;管道也是一种文件&#xff0c;但它又和一般的文件有所不同。 具体…

Docker部署开源分布式任务调度平台DolphinScheduler并实现远程访问办公

文章目录 前言1. 安装部署DolphinScheduler1.1 启动服务 2. 登录DolphinScheduler界面3. 安装内网穿透工具4. 配置Dolphin Scheduler公网地址5. 固定DolphinScheduler公网地址 前言 本篇教程和大家分享一下DolphinScheduler的安装部署及如何实现公网远程访问&#xff0c;结合内…

12.9_黑马数据结构与算法笔记Java

目录 057 多路递归 e03 杨辉三角2 thinking&#xff1a;二维数组的动态初始化&#xff1f; 057 多路递归 e03 杨辉三角3 058 链表 e01 反转单向链表1 058 链表 e01 反转单向链表2 058 链表 e01 反转单向链表3 递归 058 链表 e01 反转单向链表4 为什么是returnn1呢&…

【Cisco Packet Tracer】路由器 NAT实验

NAT的实现方式有三种&#xff0c;即静态转换Static Nat、动态转换Dynamic Nat和端口多路复用OverLoad。 静态转换是指内部本地地址一对一转换成内部全局地址&#xff0c;相当内部本地的每一台PC都绑定了一个全局地址。一般用于在内网中对外提供服务的服务器。 [3] 动态转换是指…

改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制

🗝️改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制 代码ACmixBiFormerBAMBlock加入方法各种yaml加入结构本文提供了改进 YOLOv8注意力系列包含不同的注意力机制以及多种加入方式,在本文中具有完整的代码和包含多种更有效加入YOLOv8中的yaml结构,读者可以获…

Mysql索引一篇就够了

索引 定义 索引是对数据库表中一列或者多列的值进行排序的结构。 目的 数据库索引好比一本书的目录&#xff0c;提高查询效率。但是为表设置索引要付出相应的代价&#xff1a; 增加了数据库的存储空间 在插入和修改时需花费更多的时间&#xff08;因为索引也要随之变动&#…

C++相关闲碎记录(5)

1、容器提供的类型 2、Array Array大小固定&#xff0c;只允许替换元素的值&#xff0c;不能增加或者移除元素改变大小。Array是一种有序集合&#xff0c;支持随机访问。 std::array<int, 4> x; //elements of x have undefined value std::array<int, 5> x {…

渗透测试——七、网站漏洞——命令注入和跨站请求伪造(CSRF)

渗透测试 一、命令注入二、跨站请求伪造(CSRF)三、命令注入页面之注人测试四、CSRF页面之请求伪造测试 一、命令注入 命令注入(命令执行) 漏洞是指在网页代码中有时需要调用一些执行系统命令的函数例如 system()、exec()、shell_exec()、eval()、passthru()&#xff0c;代码未…

基于ssm在线云音乐系统的设计与实现论文

摘 要 随着移动互联网时代的发展&#xff0c;网络的使用越来越普及&#xff0c;用户在获取和存储信息方面也会有激动人心的时刻。音乐也将慢慢融入人们的生活中。影响和改变我们的生活。随着当今各种流行音乐的流行&#xff0c;人们在日常生活中经常会用到的就是在线云音乐系统…

走迷宫(详细分析)

目录 一、课题描述 输入样例&#xff1a; 输出样例&#xff1a; 二、需求分析 输入的形式和输入值的范围&#xff1a; 输出的形式&#xff1a; 程序所能达到的功能&#xff1a; 三、概要设计 四、流程图 五 、代码详细注释 六、测试数据和结果 一、课题描述 以一个…

freeswitch webrtc video_demo客户端进行MCU的视频会议

系统环境 一、编译服务器和加载模块 二、下载编译指定版本video_demo 三、配置verto.conf.xml 1.修改配置文件 2.重新启动 四、MCU通话测试 1.如何使用video_demo 2.测试结果 五、MCU的通话原理及音频/视频/布局/管理员等参数配置 附录 freeswitch微信交流群 系统环境 lsb_rel…

lv11 嵌入式开发 IIC(下) 20

目录 1 Exynos4412下IIC控制器介绍 1.1 总览 1.2 特征 1.3 工作框图 1.4 其他内容介绍 1.5 四种工作模式寄存器流程 2 IIC寄存器详解 2.1 概述 2.2 控制寄存器 2.3 状态寄存器 2.4 地址寄存器 2.5 数据寄存器 2.6 其他寄存器 3 MPU06050 3.1 简介 3.2 MPU6050主…

HJ103 Redraiment的走法

题目&#xff1a; HJ103 Redraiment的走法 题解&#xff1a; dfs 暴力搜索 枚举数组元素&#xff0c;作为起点如果后续节点大于当前节点&#xff0c;继续向后搜索记录每个起点的结果&#xff0c;求出最大值 public int getLongestSub(int[] arr) {int max 0;for (int i 0…

data_loader返回的每个batch的数据大小是怎么计算得到的?

data_loader是一个通用的术语&#xff0c;用于表示数据加载器或数据批次生成器。它是在机器学习和深度学习中常用的一个概念。 一、data loader 数据加载器&#xff08;data loader&#xff09;是一个用于加载和处理数据集的工具&#xff0c;它可以将数据集划分为小批次&#…

内存学习——堆(heap)

目录 一、概念二、自定义malloc函数三、Debug运行四、heap_4简单分析4.1 heap管理链表结构体4.2 堆初始化4.3 malloc使用4.4 free使用 一、概念 内存分为堆和栈两部分&#xff1a; 栈&#xff08;Stack&#xff09;是一种后进先出&#xff08;LIFO&#xff09;的数据结构&…

AVFormatContext封装层:理论与实战

文章目录 前言一、封装格式简介1、FFmpeg 中的封装格式2、查看 FFmpeg 支持的封装格式 二、API 介绍三、 实战 1&#xff1a;解封装1、原理讲解2、示例源码 13、运行结果 14、示例源码 25、运行结果 2 四、 实战 2&#xff1a;转封装1、原理讲解2、示例源码3、运行结果 前言 A…

文章解读与仿真程序复现思路——电力系统自动化EI\CSCD\北大核心《考虑电力-交通交互的配电网故障下电动汽车充电演化特性》

这个标题涉及到电力系统、交通系统和电动汽车充电的复杂主题。让我们逐步解读&#xff1a; 考虑电力-交通交互的配电网故障&#xff1a; 电力-交通交互&#xff1a; 指的是电力系统和交通系统之间相互影响、相互关联的关系。这可能涉及到电力需求对交通流量的影响&#xff0c;反…

回溯算法之N皇后

一 什么是回溯算法 回溯算法&#xff08;Backtracking Algorithm&#xff09;是一种用于解决组合优化问题的算法&#xff0c;它通过逐步构建候选解并进行验证&#xff0c;以寻找所有满足特定条件的解。回溯算法通常应用于在给定约束条件下枚举所有可能解的问题&#xff0c;如…

Git—文件添加查看删除修改

目录 1.添加文件—场景一 2.查看.git文件 3.添加文件—场景三 4.修改文件 5.版本回退 6.撤销修改 7.删除文件 1.添加文件—场景一 在包含.git的目录下新建⼀个ReadMe文件&#xff0c;我们可以使用 git add 命令可以将文件添加到暂存 区&#xff1a; ●添加一个或多个文…