Kafka存取原理与实现分析,打破面试难关

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
Kafka是什么,以及如何使用SpringBoot对接Kafka
架构必备能力——kafka的选型对比及应用场景



在这里插入图片描述
在前面的几篇内容中,我们依次讲了Kafka的安装、与Spring Boot的结合,还有选型与应用场景。但是笔者也知道,对于很多小伙伴来说,原理及实现才算重头戏,而且也是面试热点,那么本次我们先来进行存取原理的分析,当然抱着疑问去学习才是最快的,因此在开始之前,我也先抛出一些Kafka的重点与热点问题,希望大家在学习过程中能总结印证

  • Kafka为什么吞吐量这么高?
  • Kafka的数据存与取有什么特点?

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、主题与分区

1. 模型

我们其实在《架构必备能力——kafka的选型对比及应用场景》 一文中其实讲到了Kafka的模型,我们这里再把老图拿出来用一遍

在这里插入图片描述
不难看出,逻辑上的源头就是主题,也即Topic,而主题又划分为多个分区。我们先来谈谈主题与分区的实现,在Kafka中,可以使用以下命令来声明一个主题并指定分区:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic my-topic

其中:

–create: 声明一个新的主题。
–zookeeper: 指定 ZooKeeper 的地址和端口号。
–replication-factor: 指定副本因子,即每个分区在集群中的副本数量。这里指定为1,表示每个分区只有一个副本。
–partitions: 指定分区数。这里指定为4,表示该主题有4个分区。
–topic: 指定主题名称,这里为 my-topic。
注意:如果要指定分区数量,必须在创建主题的时候指定,之后无法更改。因此,在创建主题时应该仔细考虑分区数量,以满足业务需求。

当然,如果有同学还记得前面的内容,应该知道我们在对接Spring Boot时,并没有提前建立主题而是直接使用了。其中的原因是,我们在Spring Boot中使用Kafka,如果在发送消息时指定的主题不存在,Kafka会自动创建该主题。在创建时,Kafka将使用默认的分区数量(通常为1),以及默认的副本因子(通常为1)来创建分区。

2. 消息与分发

然后当我们发布者向某个主题发送消息时,其就会被“分发”到某一个分区里

在这里插入图片描述
那么有小伙伴肯定会问:

Kafka的主题消息会进哪个分区是我们可以决定的吗?默认是进入哪个分区?

答案是Kafka的主题消息可以由生产者自己决定要发送到哪个分区,也可以使用Kafka提供的默认分区分配算法来自动决定消息要进入哪个分区。

  • 指定分区:如果生产者自己决定要发送到哪个分区,可以在发送消息时指定消息要发送到的分区编号。此时,如果指定的分区编号存在,则消息会被发送到该分区;如果指定的分区编号不存在,则会抛出异常。

  • 自动分区:如果使用默认的分区分配算法,Kafka提供了多种分配算法,例如轮询(Round-Robin)、随机(Random)、哈希(Hash)等。默认情况下,Kafka使用哈希算法将消息均匀地分配到所有可用的分区中

当然在此之前,我们可以看下KafkaTemplate前面提供的API
在这里插入图片描述
不难知道Kafka消息除了指明主题以外,还由以下要素组成:

  1. 消息的key:是一个可选项,用于标识消息的唯一性和分区。如果不指定key,则会随机分配一个key,并将消息发送到随机的分区。
  2. 消息的value:是消息的实际内容,也是必填项。
  3. 消息的时间戳:是可选项,用于标识消息的时间戳。Kafka可以根据时间戳来处理消息的顺序、分配和延迟。
  4. 消息的分区:指定消息应该发送到哪个分区。如果不指定分区,则使用默认的分区器来决定分区。

二、分区内数据的存储

从逻辑上来说,kafka的分区是一个消息队列,当我们发送的消息经由分区器进行分发后,就会进入某个分区并被顺序的保存下来。在实现上,Kafka的分区更像一个日志记录系统,把消息当作日志,顺序的写入磁盘

1. 消息的存储

我们需要知道,Kafka中,每个分区被组织为一组日志段(Log Segment),其中每个日志段都包含了一个连续的消息序列。当一个日志段被写满后,它将被关闭并分配一个更高的编号,新的消息将被追加到一个新的日志段中。而日志段的核心又由两个部分组成:索引文件(index file)和数据文件(data file)

在这里插入图片描述

  • 数据文件: 也叫日志文件,数据文件是消息分区的核心部分,它是以追加方式写入的文件。当有新的消息写入分区时,Kafka会根据协议、消息头、消息体等信息将消息封装成字节流,然后追加写入数据文件

  • 索引文件: 索引文件是一个不可变的有序映射,它将消息偏移量映射到数据文件中的位置。当一个消费者读取一个分区的消息时,它会使用偏移量读取索引文件中的位置,并从该位置读取数据文件中的消息。

如下图,就是我们上期发送了一条消息,而建立的目录test_topic-0,代表该目录是test_topic主题下的 0 号分区,可以看到里面的 index文件 和 log 文件

在这里插入图片描述

① 偏移量与日志文件

要想更深入的了解,我们必须先解释一下kafka中消息偏移量(offset) 的概念:当一条记录需要写入分区的时候,它会被追加到 log 文件的末尾,同时会被分配一个唯一的序号,称为 Offset(偏移量)。Offset 是一个递增的、不可变的数字,由 Kafka 自动维护需要注意的是,在后续内容中,我们还会提到各种不同的偏移量,请注意区分,不要混淆了

由于Offset 初始值为 0,所以当第一条消息达到分区后,就会建立起 00000000000000000000.log 这样的文件来进行消息的存储,后续消息将会在这个文件内追加写入,直到文件大小超出限制(其默认值为1GB)

举个例子,当第170411个消息(Offset = 170410)来到时,发现 00000000000000000000.log 已经超过了 1 G,此时其就会新创建一个日志段,同时以本offset为名,新建一个日志文件,命名为 0000000000000170410.log,此时本分区就形成了两个日志段,情况如下:

在这里插入图片描述

② 索引的构成

我们上面讲了 .log 文件,也即数据文件的创建机制。但是还没讲段的另一个组成部分,也即索引文件。索引其实就像字典的目录,是帮助大家快速找到某条消息的工具,索引文件存储的内容主要就是 消息偏移量(offset)消息存储地址(position) 的映射关系。

Kafka的索引文件由多个索引条目(index entry)组成,每个索引条目包含两个核心字段:

  • offset消息的偏移量(这里是相对偏移量,每个索引文件都以0起始,其对应的真实偏移量为段初始偏移 + 本offset);
  • position消息在日志文件中的磁盘位置(相对偏移量,偏移量仅适用于对应的日志文件)

在这里插入图片描述

需要注意的是,不是每一条消息都会有索引。这里有参数 index.interval.bytes 的控制,其默认值为 4 KB,即表示当前分区 log 文件写入了 4 KB 数据后才会在索引文件中增加一个索引条目

2. 消息的读取

现在我们已经存储了一些数据,下面就要开始读取了,我们目前掌握了这些文件,那么怎么才能找到并读取消息呢?

① 消费偏移量的存储

我们不难理解,每个消费者负责需要消费分配给它的分区上的消息,并记录自己在每个分区上消费的最新偏移量。对于消费者而言,怎么知道自己应该要消费哪个offset的消息?消费者可以通过以下两种方式记录消费的偏移量:

  1. 手动提交偏移量:消费者在消费消息时,可以手动调用 consumer.commitSync() 或 consumer.commitAsync() 方法将消费的偏移量提交到 Kafka 中。该方法接收的参数表示要提交的偏移量的值,提交后,Kafka 会将该偏移量记录到内部的偏移量管理器中。

  2. 自动同步提交偏移量:消费者可以将 enable.auto.commit 参数设置为 true,开启自动提交偏移量的功能。启用该功能后,Kafka 会自动记录消费者消费过的最新偏移量,并定期将其定期提交到 Kafka 中。

但不管怎么样,这个消费的偏移量最终都是由kafka来进行保存的,那么其具体的存储是怎么实现的呢?Kafka其实提供了将给定消费者组的所有偏移存储在一个叫做组协调器(group coordinator)的组件。

在这里插入图片描述

通过官方文档不难看出,当组协调器收到偏移量变动的请求时,会将对应数据存储在内置的主题 __consumer_offsets 中(在旧版本中偏移量是存在ZK中的),我们可以在ZK中看到这个主题的情况:

在这里插入图片描述

在我们的本地目录中也能看到这个 __consumer_offsets 主题一共建了50个分区(默认):

在这里插入图片描述

当然它分区的个数,可以在Kafka服务器配置文件中通过参数offsets.topic.num.partitions 进行配置。

当我们以某个消费者组消费掉某条消息并提交偏移量后,偏移量会被提交到 __consumer_offsets Topic的一个特定分区,该分区由所消费的主题和消费者组的哈希值决定。在我的例子里,是被提交到了 __consumer_offsets-45,如下:

在这里插入图片描述

②Compaction策略

相信你会对这种存储消费位置的方式有所困惑,因为按照我们前面的说法,Kafka的内容都是以日志形式存储的,在使用的过程中,日志岂不是会越来越大?到最后找一次偏移量都很麻烦?这就不得不提到Kafka中的Compaction策略

compaction是一种保留最后N个版本的消息的消息清理策略,它保留特定键的最新值,同时删除无用的键值,从而减少存储空间。具体来说,Compaction会保留每个消息主题中最新的一组键值对,并删除所有键相同但值较旧的消息。

使用Compaction策略需要满足以下条件:

  • 消息的键必须是唯一的
  • 消息的键必须是可序列化的
  • 消息必须按照键进行划分
  • 消息的存储时间必须足够长,以便新消息可以替换旧消息

而这些消费偏移量的数据,存储的内容如下

key = group.id+topic+分区号
value= offset 的值

这样就导致某个消费组在某个分区的消费数据只会有一条,所以找起来并没有那么复杂

③查找并读取消息

上面我们讲了消费偏移量的存储,其实查找偏移量的过程也是一样的,同一个消费组会先从特定的 __consumer_offsets 拿取偏移量,拿到偏移量以后,比如偏移量是 170417,我们仍以上面的文件情况为例,那么它找到消息的逻辑如下:

  1. 首先用二分查找确定它是在哪个Segment文件中,其中0000000000000000000.index为最开始的文件,第二个文件为0000000000000170410.index(起始偏移为170410+1 = 170411),而第三个文件为0000000000000239430.index(起始偏移为239430+1 = 239431)。所以这个offset = 170417就落在第二个文件中。其他后续文件可以依此类推,以起始偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。

  2. 用该offset减去索引文件的编号,即170417 - 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那个编号。可以看出我们能够找到[4,476]这组数据,476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。

  3. 打开数据文件(0000000000000170410.log),从位置为476的那个地方开始顺序扫描直到找到offset为170417的那条Message。

总结来说:就是通过二分法先找到index文件,然后再在index文件中通过二分法找到某一条索引条目,然后根据该索引条目给出的地址去log文件中快速定位,最后从这个定位开始,顺序扫描下去直到找到我们指定的偏移量数据

3. 快速存取实现

我们上面讲了Kafka的一大堆的奇特设计,不知道小伙伴们是否产生过疑问,比如为什么一个主题要分成多个分区一个分区为什么要划成多个段?以及为什么把数据存储成日志格式 ? 其实这些都是在优化性能,我们从快速存取的角度讲一下Kafka都做了哪些努力【面试重点】:

  1. 多分区负载均衡:Kafka支持将一个主题的数据分散至多个分区,不同分区位于多个broker节点上,实现了集群负载均衡,从而提高了写入和读取的性能。

  2. 分段存储:Kafka会将数据分段存储,每个段的大小和时间可以根据需求进行配置,这样可以提高读取性能并减少删除操作对IO的影响。

  3. 批量写入:Kafka允许客户端一次性写入多条消息到broker,减少了网络传输的时间。

  4. 零拷贝:Kafka使用mmap映射磁盘上的文件到虚拟内存空间,然后通过直接内存访问(Direct Memory Access)的方式将数据从磁盘读取到内存中,还使用sendfile系统调用来实现网络发送时的零拷贝,这样网络数据也可以直接从内核空间中发送,避免了数据拷贝到用户空间的过程。

  5. 异步刷盘:Kafka支持异步刷盘,即将消息写入日志后,不会立即将数据从内存刷入磁盘,而是会缓存一段时间再批量写入磁盘,减少了磁盘I/O的次数,提高了写入性能。

  6. 稀疏索引:Kafka会为每个段维护一个索引,以便在读取数据时快速定位到所需数据的位置。这样可以避免全盘扫描,提高数据读取性能。但如果每个消息都写进索引,会导致索引文件臃肿,且降低存储速度,所以采用了稀疏索引的方式

如果你按照《Kafka是什么,以及如何使用SpringBoot对接Kafka》中的动手操作过,我们可以继续来做个实现,我们先看一下log文件,如下

在这里插入图片描述

然后我们把发送的代码改成如下,这样一次发送1000条消息,注意,我们在这里还加上了 kafkaTemplate.flush(),因为当使用Kafka Template发送消息时,消息并不会立即发送到Kafka Broker,而是会被缓存在Kafka Template中,以减少通信次数,如果我们需要立即发送,这时候就可以使用kafkaTemplate.flush()方法来实现立即发送。

@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {for (int i = 0; i < 1000; i++) {kafkaTemplate.send("another_topic2", 0,"key",message + i);}kafkaTemplate.flush();System.out.println("we have send message");}
}

但当我们发送消息,成功输出 we have send message ,并又成功接收到消息后,如图

在这里插入图片描述

在这里插入图片描述

我们却会看到 log 文件的大小没有发生变化,即便是不停的刷新目录也无济于事

在这里插入图片描述

然而如果我们单击并右键选中该文件,就会看到该文件被更新,且大小发生变动

在这里插入图片描述

这就说明了其写入硬盘的过程是异步且有延迟的,使用了操作系统的延迟写入(delayed write)机制。但其传输数据却可以脱离硬盘,使用内存缓存作为收发介质,直接实现传达


总结

今天我们详细讲解了消息在kafak中的存与取,也介绍了不少细节点,知道了Kafka采用批量传输设计减少网络访问次数,然后用分区、分段、追加日志等方案来提高吞吐量,并且利用了操作系统的零拷贝、异步刷盘等方式来减少磁盘写入的瓶颈,最终成为了一款性能优异、吞吐量极大的中间件。希望通过今天的学习,能对大家有所帮助,我们将在后面继续讲解kafka的其他实现细节。如果你对此有兴趣,可以直接订阅本 kafka 专栏

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/110970.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot汽车租赁系统

功能如下图所示 摘要 Spring Boot汽车租赁系统的设计旨在满足不断增长的租车市场需求&#xff0c;并通过简化开发和部署流程来提供方便的租车解决方案。系统采用了现代化的架构&#xff0c;主要基于以下技术栈&#xff1a; Spring Boot&#xff1a;作为后端的核心框架&#xff…

nginx平滑升级添加echo模块、localtion配置、rewrite配置

nginx平滑升级添加echo模块、location配置、rewrite配置 文章目录 nginx平滑升级添加echo模块、location配置、rewrite配置1.环境说明&#xff1a;2.nginx平滑升级原理&#xff1a;3.平滑升级nginx&#xff0c;并添加echo模块3.1.查看当前nginx版本以及老版本编译参数信息3.2.下…

Django REST Framework完整教程-RESTful规范-序列化和反序列数据-数据视图

文章目录 1.简介及安装2.案例模型2.1.创建模型2.2.安装mysql必要组件2.3.管理后台转中文2.4.启动后台 3.数据序列化4.RESTful规范4.1.协议、域名和版本4.2.uri(统一资源标识符)4.3.查增删改4.4.过滤信息&#xff08;Filtering&#xff09;4.5.状态码&#xff08;Status Codes&a…

Systemverilog断言介绍(二)

3.2 IMMEDIATE ASSERTIONS 即时断言是最简单的断言语句类型。它们通常被认为是SystemVerilog过程代码的一部分&#xff0c;并在代码评估期间访问时进行评估。它们没有时钟或复位的概念&#xff08;除非有时钟/复位控制其封闭的过程块&#xff09;&#xff0c;因此无法验证跨越时…

零基础如何自学C#?

前言 本文来源于知乎的一个提问&#xff0c;提问的是一个大一软件工程专业的学生&#xff0c;他想要自学C#但是不知道该怎么去学&#xff0c;这让他感到很迷茫&#xff0c;希望有人能给他一些建议和提供一些学习方向。 个人建议 确认目标&#xff1a;自学C#首先你需要大概了解…

spring boot MongoDB实战

项目搭建 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 …

谈谈 Redis 如何来实现分布式锁

谈谈 Redis 如何来实现分布式锁 基于 setnx 可以实现&#xff0c;但是不是可重入的。 基于 Hash 数据类型 Lua脚本 可以实现可重入的分布式锁。 获取锁的 Lua 脚本&#xff1a; 释放锁的 Lua 脚本&#xff1a; 但是还是存在分布式问题&#xff0c;比如说&#xff0c;一个客…

李宏毅机器学习笔记-半监督学习

半监督学习&#xff0c;一般应用于少量带标签的数据&#xff08;数量R&#xff09;和大量未带标签数据的场景&#xff08;数量U&#xff09;&#xff0c;一般来说&#xff0c;U>>R。 半监督学习一般可以分为2种情况&#xff0c;一种是transductive learning&#xff0c;…

GO 语言的方法??

GO 中的方法是什么&#xff1f; 前面我们有分享到 GO 语言的函数&#xff0c;他是一等公民&#xff0c;那么 GO 语言中的方法和函数有什么区别呢&#xff1f; GO 语言中的方法实际上和函数是类似的&#xff0c;只不过在函数的基础上多了一个参数&#xff0c;这个参数在 GO 语…

O2O优惠券预测

O2O优惠券预测 赛题理解赛题类型解题思路 数据探索理论知识数据可视化分布 特征工程赛题特征工程思路 模型训练与验证 赛题理解 赛题类型 本赛题要求提交的结果是预测15 天内用券的概率&#xff0c;这是一个连续值&#xff0c;但是因为用券只有用与不用两种情况&#xff0c;而…

Redis入门到实战(四、原理篇)RESP协议

目录 2、Redis内存回收-过期key处理3、Redis内存回收-内存淘汰策略 Redis是一个CS架构的软件&#xff0c;通信一般分两步&#xff08;不包括pipeline和PubSub&#xff09;&#xff1a; 客户端&#xff08;client&#xff09;向服务端&#xff08;server&#xff09;发送一条命令…

数字秒表VHDL实验箱精度毫秒可回看,视频/代码

名称&#xff1a;数字秒表VHDL精度毫秒可回看 软件&#xff1a;Quartus 语言&#xff1a;VHDL 代码功能&#xff1a; 数字秒表的VHDL设计&#xff0c;可以显示秒和毫秒。可以启动、停止、复位。要求可以存储6组时间&#xff0c;可以回看存储的时间 本资源内含2个工程文件&am…

2023 年程序员必读的 27 本软件开发书籍

不断发展的软件开发领域需要不断学习和改进。现代开发实践要求软件工程师具备全面的知识&#xff0c;包括各个领域的理论见解和实践技术&#xff0c;从编程语言和数据库管理到质量保证、网页设计和 DevOps 实践。 这就是编程书籍可以提供帮助的地方。通过及时了解此类书籍并应…

模拟退火算法求解TSP问题(python)

模拟退火算法求解TSP的步骤参考书籍《Matlab智能算法30个案例分析》。 问题描述 TSP问题描述在该书籍的第4章 算法流程 部分实现代码片段 坐标轴转换成两点之间直线距离长度的代码 coordinates np.array([(16.47, 96.10),(16.47, 94.44),(20.09, 92.54),(22.39, 93.37),(2…

如何给Github上的开源项目提交PR?

前言 对于一个热爱开源的程序员而言&#xff0c;学会给GitHub上的开源项目提交PR这是迈出开源的第一步。今天我们就来说说如何向GitHub的开源项目提交PR&#xff0c;当然你提交的PR可以是一个项目的需求迭代、也可以是一个Bug修复、再或者是一些内容文本翻译等等&#xff0c;并…

单片机点亮led管(01)

如何开始学习单片机 1&#xff1a;实践第一 2&#xff1a;补充必要的理论知识&#xff0c;缺什么补什么 3&#xff1a;做工程积累经验&#xff08;可以在网络上收集题目&#xff0c;也可以有自己的想法大胆的实验&#xff09; 单片机是什么&#xff1f; 单片机&#xff08…

5分钟内在Linux上安装.NET Core应用程序

作为开源的忠实粉丝&#xff0c;我喜欢 .NET Core 的跨平台特性。它开启了无限的可能性&#xff0c;从业余爱好项目、实验和概念验证&#xff0c;到在具有高安全性和可扩展性的经济高效基础设施上运行的大规模高负载生产应用程序。我通常从任何云平台提供商那里获得最简单、最便…

<C++> 模拟实现string

目录 前言 一、模拟实现string 1. 成员变量 2. 构造函数 2.1 构造函数 2.2 重载默认构造 2.3 合并 3. 析构函数 4. 拷贝构造函数 5. c_str 6. size 7. operator[ ] 7.1 普通版 7.2 const版本 8. 迭代器—iterator 8.1 普通版iterator 8.2 const版本iterator 9. 尾插 10. …

【微服务 SpringCloud】实用篇 · 服务拆分和远程调用

微服务&#xff08;2&#xff09; 文章目录 微服务&#xff08;2&#xff09;1. 服务拆分原则2. 服务拆分示例1.2.1 导入demo工程1.2.2 导入Sql语句 3. 实现远程调用案例1.3.1 案例需求&#xff1a;1.3.2 注册RestTemplate1.3.3 实现远程调用1.3.4 查看效果 4. 提供者与消费者 …

美创科技入选“内蒙古自治区第一届网络安全应急技术支撑单位”

近日&#xff0c;内蒙古自治区党委网信办、国家网络应急技术处理协调中心内蒙古分中心评选“内蒙古自治区网络安全应急技术支撑单位”结果公布。 经自治区各地区、各部门和单位推荐各单位自主申报&#xff0c;资料审查和专家评审等环节&#xff0c;美创科技成功入选“内蒙古自治…