kafka学习笔记--如何保证生产者数据可靠、不重复、有序

本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)


PS:本节内容尚硅谷的视频讲的不太友好,又查了很多资料才搞明白

文章目录

  • 数据可靠性
  • 数据不重复
  • 数据有序性

数据可靠性

首先数据的可靠性指的是:

  • 消息不会意外丢失
  • 消息不会重复传递

那回顾我们的数据发送流程,在确认数据发送成功的这一步,也就是ack应答这里,不同的参数对应着不同的策略,如果选择了0和1,则存在丢数的问题,如图:
0: 如果数据发送到某个主题的leader时,leader所在节点挂了,那么这条消息就丢失了
在这里插入图片描述
1: 同理,leader收到了,还没应答时挂了,也会丢数据
在这里插入图片描述
-1(all): 使用-1能保证数据落配盘后才回答,保证数据不丢失
在这里插入图片描述
但是,如果Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

这就引出了ISR队列的概念了

ISR,是一个机制,也代表着一个同步合集,是由Leader维护的一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的


那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?


主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

我们将ISR与ACK应答结合起来使用,就形成了数据可靠条件

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据不重复

上面讲解的,只能保证数据可靠,但是这又引出了一个新的问题
如果,leader在同步完成之后,向生产者回答时,挂掉了,这时候剩下的备份分区会自动选举出一个新leader出来,但是生产者并不知道它挂掉了,只会以为是消息发送失败了,触发重试,又将数据发送了一遍,然后新的leader就又接受了一遍消息,然后在备份分区上再存一遍。这就导致了这条消息存在两份,产生数据重复问题。
在这里插入图片描述
那么kafka是怎么保证数据不重复的呢?
其实这就是数据的幂等性问题了,幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

kafka默认启用数据幂等性,即设置 enable.idempotence = true

在生产者发消息时,这条消息是有它自己的属性的,其中有三个数据被拿来作为数据的主键,kafka会以此来判断这条消息是否重复,若重复,则只保留一条

PID:又叫生产者编号(producerid), Producer在初始化的时候(只有初始化的时候会随机生成PID,也就是重启就会再次生成)会被分配一个PID

Partition:又叫分区编号,即这条消息要发往的分区的paritionid

SeqNumber:又叫序列号,发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息)

这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条,这样就保证了数据的唯一,不重复。

但是幂等性只能保证的是在单分区单会话内不重复,如果发消息时生产者挂掉了,重启后它不知道是否发送成功了,又将这个消息再发送一遍,此时它的PID发生变化,那么这条消息就被认为是一条新的消息,导致重复存储,这种情况怎么解决呢?

这就要引入kafka的事务机制了,事务这个东西大家都知道啥意思,不再重复解释

我们通过事务,让客户端挂掉后继续处理,而不是重新从头来过,保证消息的仅一次发送

注意:开启事务,必须开启幂等性。

请添加图片描述

kafka使用事务,有5个API

// 初始化事务
void initializeTransactions ();// 开启事务
void beginTransaction () throws ProducerFencedException;// 在事务中提交已消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;// 提交事务
void commitTransaction () throws ProducerFencedException;// 放弃事务(类似于回滚事务的操作)
void abortTransaction () throws ProducerFencedException;

举个例子:

package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Test {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put("bootstrap.servers", "hadoop102:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());properties.put("transactional.id", "transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息// 发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));}// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}

数据有序性

如果某主题TOPIC只有一个分区,那么它天生有序,因为分区其实就是一个有序队列

如果是多分区的,kafka是通过滑动窗口的思想解决这个问题的

我们知道kafka发送请求时,最多缓存5个,其实在发送时,每个请求都有自己的单调递增编号,kafka broker在接收数据时,会自动按照编号将数据排序,并且如果其中一个编号的请求失败时,后续再次成功,数据过来后,会自动的根据编号插入到应该在的位置上
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/211848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

名字的漂亮度

给出一个字符串&#xff0c;该字符串仅由小写字母组成&#xff0c;定义这个字符串的“漂亮度”是其所有字母“漂亮度”的总和。 每个字母都有一个“漂亮度”&#xff0c;范围在1到26之间。没有任何两个不同字母拥有相同的“漂亮度”。字母忽略大小写。给出多个字符串&#xff0…

从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD

文章目录 依赖初始化客户端发起请求请求参数请求头设置超时时间设置线程数设置用户名密码结果解析节点选择器配置嗅探器整体示例问题参考 OpenSearch开发环境安装Docker和Docker-Compose两种方式 依赖 <dependency><groupId>org.elasticsearch.client</groupId…

【脚本】图片-音视频-压缩文件处理

音视频处理 一&#xff0c;图片操作1&#xff0c;转换图片格式2&#xff0c;多张图片合成视频 二&#xff0c;音频操作1&#xff0c;转换音频格式2&#xff0c;分割音频为多段3&#xff0c;合成多段音频 三&#xff0c;视频操作1&#xff0c;转换视频格式2&#xff0c;提取视频…

【Go自学版】01-基础

// 变量 var a, b, c 8, 2.3, "hello" var d float64; e : 6var A []int; var B [10]int; C : [10]int{1, 2, 3, 4} for i : 0; i < len(B); i {} for _, value : range C {} D make([]int, 3) // len 4, cap 10, 扩容方式 cap*2 E : make([]int, 4, 10) E …

掌握PyTorch数据预处理(一):让模型表现更上一层楼!!!

引言 在PyTorch中&#xff0c;数据预处理是模型训练过程中不可或缺的一环。通过精心优化数据&#xff0c;我们能够确保模型在训练时能够更高效地学习&#xff0c;从而在实际应用中达到更好的性能。今天&#xff0c;我们将深入探讨一些常用的PyTorch数据预处理技巧&#xff0c;…

C++如何通过调用ffmpeg接口对H264文件进行编码和解码

C可以通过调用FFmpeg的API来对H264文件进行编码和解码。下面是一个简单的例子。 首先需要在代码中包含FFmpeg的头文件&#xff1a; extern "C" { #include <libavcodec/avcodec.h> #include <libavformat/avformat.h> #include <libswscale/swscale…

Linux系统编程:进程间通信总结

管道 在Linux中&#xff0c;管道是一种进程间通信方式&#xff0c;它允许一个进程&#xff08;写入端&#xff09;将其输出直接连接到另一个进程&#xff08;读取端&#xff09;的输入。从本质上说&#xff0c;管道也是一种文件&#xff0c;但它又和一般的文件有所不同。 具体…

Docker部署开源分布式任务调度平台DolphinScheduler并实现远程访问办公

文章目录 前言1. 安装部署DolphinScheduler1.1 启动服务 2. 登录DolphinScheduler界面3. 安装内网穿透工具4. 配置Dolphin Scheduler公网地址5. 固定DolphinScheduler公网地址 前言 本篇教程和大家分享一下DolphinScheduler的安装部署及如何实现公网远程访问&#xff0c;结合内…

前端知识笔记(二十七)———CSS核心功能手册:从熟悉到精通

参考HTML代码 <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"utf-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wi…

12.9_黑马数据结构与算法笔记Java

目录 057 多路递归 e03 杨辉三角2 thinking&#xff1a;二维数组的动态初始化&#xff1f; 057 多路递归 e03 杨辉三角3 058 链表 e01 反转单向链表1 058 链表 e01 反转单向链表2 058 链表 e01 反转单向链表3 递归 058 链表 e01 反转单向链表4 为什么是returnn1呢&…

【Cisco Packet Tracer】路由器 NAT实验

NAT的实现方式有三种&#xff0c;即静态转换Static Nat、动态转换Dynamic Nat和端口多路复用OverLoad。 静态转换是指内部本地地址一对一转换成内部全局地址&#xff0c;相当内部本地的每一台PC都绑定了一个全局地址。一般用于在内网中对外提供服务的服务器。 [3] 动态转换是指…

C++ 迭代器

迭代器 迭代器类似于指针类型&#xff0c;也提供了对对象的间接访问。 就迭代器而言&#xff0c;其对象是容器中的元素或 string 对象中的字符。 获取迭代器 容器的迭代器类型 使用作用域运算符来说明我们希望使用的类型成员&#xff1b;例&#xff1a;string::iterator it…

探秘MSSQL存储过程:功能、用法及实战案例

在现代软件开发中&#xff0c;高效地操作数据库是至关重要的。而MSSQL&#xff08;Microsoft SQL Server&#xff09;作为一款强大的关系型数据库管理系统&#xff0c;为我们提供了丰富的功能和工具来处理数据。其中&#xff0c;MSSQL存储过程是一项强大而又常用的功能&#xf…

改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制

🗝️改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制 代码ACmixBiFormerBAMBlock加入方法各种yaml加入结构本文提供了改进 YOLOv8注意力系列包含不同的注意力机制以及多种加入方式,在本文中具有完整的代码和包含多种更有效加入YOLOv8中的yaml结构,读者可以获…

C++ 的关键字(保留字)介绍

一.C中部分关键字的用法 1. auto 关键字auto是C11引入的&#xff0c;它可以用于变量声明和函数返回类型的推导。当你不关心变量的具体类型时&#xff0c;可以使用auto来让编译器根据初始化表达式推导出变量的类型。这样可以简化代码&#xff0c;提高可读性。 1.在for循环中遍…

Mysql索引一篇就够了

索引 定义 索引是对数据库表中一列或者多列的值进行排序的结构。 目的 数据库索引好比一本书的目录&#xff0c;提高查询效率。但是为表设置索引要付出相应的代价&#xff1a; 增加了数据库的存储空间 在插入和修改时需花费更多的时间&#xff08;因为索引也要随之变动&#…

一、C#笔记

1.注释 /*多行注释*/class HelloWorld{ void Hello(){Console.WriteLine("Hello!");//单行注释}} 2.理解语句 2.1方法、语法、语义 2.2使用标识符 标识符语法规则&#xff1a; 只能使用字母&#xff08;大写和小写&#xff09;、数字和下划…

C++相关闲碎记录(5)

1、容器提供的类型 2、Array Array大小固定&#xff0c;只允许替换元素的值&#xff0c;不能增加或者移除元素改变大小。Array是一种有序集合&#xff0c;支持随机访问。 std::array<int, 4> x; //elements of x have undefined value std::array<int, 5> x {…

渗透测试——七、网站漏洞——命令注入和跨站请求伪造(CSRF)

渗透测试 一、命令注入二、跨站请求伪造(CSRF)三、命令注入页面之注人测试四、CSRF页面之请求伪造测试 一、命令注入 命令注入(命令执行) 漏洞是指在网页代码中有时需要调用一些执行系统命令的函数例如 system()、exec()、shell_exec()、eval()、passthru()&#xff0c;代码未…

基于ssm在线云音乐系统的设计与实现论文

摘 要 随着移动互联网时代的发展&#xff0c;网络的使用越来越普及&#xff0c;用户在获取和存储信息方面也会有激动人心的时刻。音乐也将慢慢融入人们的生活中。影响和改变我们的生活。随着当今各种流行音乐的流行&#xff0c;人们在日常生活中经常会用到的就是在线云音乐系统…