Kafka-服务端-日志存储

基本概念

首先需要了解的是,Kafka使用日志文件的方式保存生产者发送的消息。每条消息都有一个offset值来表示它在分区中的偏移量,这个offset值是逻辑值,并不是消息实际存放的物理地址。

offset值类似于数据库表中的主键,主键唯一确定了数据库表中的一条记录,offset唯一确定了分区中的一条消息。Kafka存储机制在逻辑上如图所示。

在这里插入图片描述为了提高写入的性能,同一个分区中的消息是顺序写入的,这就避免了随机写入带来的性能问题。

一个Topic可以划分成多个f分区,而每个分区又有多个副本。当一个分区的副本(无论是Leader副本还是Follower副本)被划分到某个Broker上时,Kafka就要在此Broker上为此分区建立相应的Log,而生产者发送的消息会存储在Log中,供消费者拉取后消费。

Kafka中存储的一般都是海量消息数据,为了避免日志文件太大,Log并不是直接对应于磁盘上的一个日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_≤partition_id>,Log与分区之间的关系是一一对应的,对应分区中的全部消息都存储在此目录下的日志文件中。

Kafka通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,其中日志文件用于记录消息,索引文件中保存了消息的索引。

随着消息的不断写入,日志文件的大小到达一个阈值时,就创建新的日志文件和索引文件继续写入后续的消息和索引信息。

日志文件的文件名的命名规则是[baseOffset].log,baseOffset是日志文件中第一条消息的offset。

在这里插入图片描述
为了提高查询消息的效率,每个日志文件都对应一个索引文件,这个索引文件并没有为每条消息都建立索引项,而是使用稀疏索引方式为日志文件中的部分消息建立了索引。

图展示了索引文件与日志文件之间的对应关系。

在这里插入图片描述

FileMessageSet

在Kafka中使用FileMessageSet管理上文介绍的日志文件,它对应磁盘上的一个真正的日志文件。

FileMessageSet继承了MessageSet抽象类,如图所示。

在这里插入图片描述

MessageSet中保存的数据格式分为三部分,如图所示:8字节的offset值,4字节的size表示message data大小,这两部分组合成为LogOverhead,message data部分保存了消息的数据,逻辑上对应一个Message对象。

Kafka使用Message类表示消息,Message使用ByteBuffer保存数据,其格式及各个部分的含义如图所示。

在这里插入图片描述

  • CRC32:4个字节,消息的校验码。
  • magic:1字节,魔数标识,与消息格式有关,取值为0或1。当magic为0时,消息的offset使用绝对offset且消息格式中没有timestamp部分;当magic为1时,消息的offset使用相对offset且消息格式中存在timestamp部分。所以,magic值不同,消息的长度是不同的。
  • attributes:1字节,消息的属性。其中第0~2位的组合表示消息使用的压缩类型,0表示无压缩,1表示gzip压缩,2表示snappy压缩,3表示lz4压缩。第3位表示时间戳类型,0表示创建时间,1表示追加时间。
  • timestamp:时间戳,其含义由attribute的第3位确定。
  • key length:消息key的长度。
  • key:消息的key。
  • value length:消息的value长度。
  • value:消息的value。

MessageSet抽象类中定义了两个比较关键的方法;

//将当前MessageSet中的消息写入到Channe1中
def writeTo(channel:GatheringByteChannel,offset:Long,maxsize:Int):Int
//提供迭代器,顺序读取MessageSet中的消息
def iterator:Iterator[MessageAndoffset]

这两个方法说明MessageSet具有顺序写入消息和顺序读取的特性。

在后面对FileMessageSet和ByteBufferMessageSet的介绍过程中会介绍这两个方法的实现。

了解了MessageSet抽象类以及其中保存消息的格式,我们开始分析FileMessageSet实现类。FileMessageSet的核心字段如下所述。

  • file:java.io.File类型,指向磁盘上对应的日志文件。
  • channel:FileChannel类型,用于读写对应的日志文件。
  • start和end:FileMessageSet对象除了表示一个完整的日志文件,还可以表示日志文件分片(Slice),start和end表示分片的起始位置和结束位置。
  • isSlice:Boolean类型,表示当前FileMessageSet是否为日志文件的分片。
  • _size:FileMessageSet大小,单位是字节。如果FileMessageSet是日志文件的分片,则表示分片大小(即end-start的值);如果不是分片,则表示整个日志文件的大小。注意,因为可能有多个Handler线程并发向同一个分区写入消息,所有_size是AtomicInteger类型。

在FileMessageSet中有多个重载的构造方法,这里选择一个比较重要的方法进行介绍。

此构造方法会创建一个非分片的FileMessageSet对象。在Window NTFS文件系统以及老版本的Linux文件系统上,进行文件的预分配会提高后续写操作的性能,为此FileMessageSet提供了preallocate的选项,决定是否开启预分配的功能。

我们也可以通过FileMessageSet构造函数的mutable参数决定是否创建只读的FileMessageSet。

ByteBufferMessageSet

MessageSet的另一个子类是ByteBufferMessageSet,FileMessageSet.append方法的参数就是此类的对象。

为什么必须append方法的参数是ByteBufferMessageSet,而不是直接使用ByteBuffer呢?

向MemoryRecords写入消息时,可以使用Compressor对消息批量进行压缩,然后将压缩后的消息发送给服务端。

在有些设计中,将每个请求的负载单独压缩后再进行传输,这种设计虽然可以减小传输的数据量,但是存在一个小问题,我们常见压缩算法是数据量越大压缩率越高,一般情况下每个请求的负载不会特别大,这就导致压缩率比较低。

Kafka实现的压缩方式是将多个消息一起进行压缩,这样就可以保证较高的压缩率。

而且在一般情况下,生产者发送的压缩数据在服务端也是以保持压缩状态进行存储的,消费者从服务端获取的也是压缩消息,消费者在处理消息之前才会解压消息,这也就实现了“端到端的压缩”。

压缩后的消息格式与非压缩的消息格式类似,但是分为两层,如图所示。
在这里插入图片描述

迭代压缩消息

上文提到,消费者获取的压缩消息格式与生产者发送出去的格式应该一模一样。

Fetcher.parseFetchedData()方法解析CompleteFetch时,对MemoryRecords进行迭代。

下面来看看MemoryRecords的迭代器的相关实现,其中就涉及与压缩消息相关的实现。

MemoryRecords的迭代器是其静态内部类RecordsIterator,它继承了Abstractlterator抽象类。AbstractIterator实现了Iterator接口,它对Iterator进行了实现和简化。

AbstractIterator只暴露了一个makeNext抽象方法供子类实现,此方法主要负责用于创建下一个迭代项。

这样,开发人员就不用了解Iterator接口的实现细节了,只关注如何创建下一迭代项即可。

Abstractlterator中使用next字段指向迭代的下一项,使用state字段标识当前迭代器的状态,state字段是State枚举类型,其取值和含义如下所述。

  • READY:迭代器已经准备好迭代下一项。
  • NOT_READY:迭代器未准备好迭代下一项,需要调用maybeComputeNext)。
  • DONE:当前迭代已经结束。
  • FAILED:当前迭代器在迭代过程中出现异常。

为了同时能够处理压缩消息和非压缩消息,MemoryRecords.RecordsIterator分为两层迭代,使用shallow参数标区分。

当shallow为true时,认为消息是非压缩消息,只迭代当前这一层消息,为方便描述,我们称之为“浅层迭代”(shallow iteration);当shallow为false时,不只迭代当前层消息,还会创建Inner Iterator(也是MemoryRecords.RecordsIterator对象)迭代嵌套的压缩消息,我们称之为“深层迭代”(deep iteration)。

OffsetIndex

为了提高查找消息的性能,从Kafka 0.8版本开始,为每个日志文件添加了对应的索引文件。

Offsetlndex对象对应管理磁盘上的一个索引文件,与FileMessageSet共同构成一个LogSegment对象。

首先来介绍索引文件中索引项的格式:每个索引项为8字节,分为两部分,第一部分是相对offset,占4个字节;

第二部分是物理地址,也就是其索引消息在日志文件中对应的position位置,占4个字节。这样就实现了offset与物理地址之间的映射。

相对offset表示的是消息相对于baseOffset的偏移量。例如,分段后的一个日志文件的baseOffset是20,当然,它的文件名就是20.log,那么offset为23的Message在索引文件中的相对offset就是23-20=3。消息的offset是Long类型,4个字节可能无法直接存储消息的offset,所以使用相对offset,这样可以减小索引文件占用的空间。

Kafka使用稀疏索引的方式构造消息的索引,它不保证每个消息在索引文件中都有对应的索引项,这算是磁盘空间、内存空间、查找时间等多方面的折中。

不断减小索引文件大小的目的是为了将索引文件映射到内存,在OffsetIndex中会使用MappedByteBuffer将索引文件映射到内存中。

LogSegment

为了防止Log文件过大,将Log切分成多个日志文件,每个日志文件对应一个LogSegment。

在LogSegment中封装了一个FileMessageSet和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其他辅助功能。

下面先来看LogSegment的核心字段。

  • log:用于操作对应日志文件的FileMessageSet对象。
  • index:用于操作对应索引文件的OffsetIndex对象。
  • baseOffset:LogSegment中第一条消息的offset值。
  • indexIntervalBytes:索引项之间间隔的最小字节数。
  • bytesSinceLastIndexEntry:记录自从上次添加索引项之后,在日志文件中累计加入的Message集合的字节数,用于判断下次索引项添加的时机。
  • created:标识LogSegment对象创建时间,当调用truncateTo()方法将整个日志文件清空时,会将此字段重置为当前时间。

LogSegment中还有一个值得注意的方法是recover()方法,其主要功能是根据日志文件重建索引文件,同时验证日志文件中消息的合法性。

在重建索引文件过程中,如果遇到了压缩消息需要进行解压,主要原因是因为索引项中保存的相对offset是第一条消息的offset,而外层消息的offset是压缩消息集合中的最后一条消息的offset。

Log

Log是对多个LogSegment对象的顺序组合,形成一个逻辑的日志。

为了实现快速定位LogSegment,Log使用跳表(SkipList)对LogSegment进行管理。

跳表是一种随机化的数据结构,它的查找效率和红黑树差不多,但是插入/删除操作却比红黑树简单很多。目前在Redis等开源软件中都能看到它的身影,在JDK中也提供了跳表的实现——ConcurrentSkipListMap,而且ConcurrentSkipListMap还是一个线程安全的实现。

在Log中,将每个LogSegment的baseOffset作为key,LogSegment对象作为value,放入segments这个跳表中管理,如图所示。

在这里插入图片描述
们现在要查找offset大于6570的消息,可以首先通过segments快速定位到消息所在的LogSegment对象,定位过程如图中的虚线所示。之后使用前面介绍的LogSegment.read方法,先按照OffsetIndex进行索引,然后从日志文件中进行读取。

向Log中追加消息时是顺序写入的,那么只有最后一个LogSegment能够进行写入操作,在其之前的所有LogSegment都不能写入数据。

最后一个LogSegment使用Log.activeSegment方法获取,即segments集合中最后一个元素,为了描述方便,我们将此Segment对象称为“activeSegment”。

随着数据的不断写入,当activeSegment的日志文件大小到达一定阈值时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。

介绍完了Log的基本原理后,来看一下Log类中的关键字段。

  • dir:Log对应的磁盘目录,此目录下存放了每个LogSegment对应的日志文件和索引文件。
  • lock:可能存在多个Handler线程并发向同一个Log追加消息,所以对Log的修改操作需要进行同步。
  • segments:用于管理LogSegment集合的跳表。
  • config:Log相关的配置信息。
  • recoveryPoint:指定恢复操作的起始offset,recoveryPoint之前的Message已经刷新到磁盘上持久存储,而其后的消息则不一定,出现宕机时可能会丢失。所以只需要恢复recoveryPoint之后的消息即可。
  • nextOffsetMetadata:LogOffsetMetadata对象。主要用于产生分配给消息的offset,同时也是当前副本的LEO(LogEndOffset)。

最后来看flush方法的原理,如图所示,fush方法会将recoverPoint~LEO之间的消息数据刷新到磁盘上,并修改recoverPoint值。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/639961.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

亚马逊KYC审核的重要性,所需提交的文件有哪些?—站斧浏览器

亚马逊KYC审核的重要性有哪些&#xff1f; KYC审核是亚马逊对卖家身份的一种验证&#xff0c;确保卖家遵守相关法规。只有通过审核的卖家才能在欧洲平台进行销售。因此&#xff0c;正确理解和应对KYC审核对于卖家来说至关重要。 注册完成后立即触发&#xff1a;新注册的卖家可…

华为欧拉操作系统结合内网穿透实现固定公网地址SSH远程连接

文章目录 1. 本地SSH连接测试2. openEuler安装Cpolar3. 配置 SSH公网地址4. 公网远程SSH连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 欧拉操作系统(openEuler, 简称“欧拉”)是面向数字基础设施的操作系统,支持服务器、云计算、边缘openEuler是面向数字基础设施的操作系…

Linux与windows互相传输文件之rzsz命令

文章目录 关于rzsz安装软件使用命令方法一&#xff1a;直接拖拽方法二&#xff1a;直接在终端输入rz 关于rzsz 这个工具用于 windows 机器和远端的 Linux 机器通过 XShell 传输文件 安装完毕之后可以通过拖拽的方式将文件上传过去 首先看一下我们的机器可以使用网络吗&#xff…

c语言-实现动态内存管理的库函数

文章目录 前言一、什么是动态内存分配&#xff1f;二、malloc()和free()2.1 malloc()介绍2.2 malloc()的使用2.3 free()介绍 三、calloc()3.1 calloc()介绍3.2 calloc()使用 四、realloc()4.1 realloc()介绍4.2 realloc()使用 总结 前言 本篇文章介绍c语言中实现动态内存管理的…

U-Mamba: Enhancing Long-range Dependency for Biomedical Image Segmentation

Abstract 卷积神经网络(Convolutional Neural Networks, cnn)和transformer是生物医学图像分割中最流行的架构&#xff0c;但由于固有的局部性或计算复杂性&#xff0c;它们处理远程依赖关系的能力有限。为了解决这一挑战&#xff0c;我们引入了U-Mamba&#xff0c;一个通用的…

二.Winform使用Webview2在Demo1中实现地址简单校验

Winform使用Webview2在Demo1中实现地址简单校验 往期目录回顾添加对于的简单url验证提示通过上节和本节涉及到的函数有 往期目录 往期相关文章目录 专栏目录 回顾 通过一.Winform使用Webview2(Edge浏览器核心) 创建demo(Demo1)实现回车导航到指定地址 我们已经知道了解决资源…

HCIA-HarmonyOS设备开发认证-HarmonyOS简介

目录 前言目标一、HarmonyOS简介1.1、初识HarmonyOS1.2、HarmonyOS典型应用场景 二、HarmonyOS架构与安全2.1、HarmonyOS架构 前言 本章主要介绍HarmonyOS分布式操作系统的概念、关键技术与能力以及HarmonyOS典型的应用场景。 目标 学习完成本课程后&#xff0c;您将能够&…

MySql必知必会

1.什么是BufferPool&#xff1f; Buffer Pool基本概念 Buffer Pool&#xff1a;缓冲池&#xff0c;简称BP。其作用是用来缓存表数据与索引数据&#xff0c;减少磁盘IO操作&#xff0c;提升效率。 Buffer Pool由缓存数据页(Page) 和 对缓存数据页进行描述的控制块 组成, 控制…

树莓派 Linux - 使用ngrok实现内网穿透

官网 &#xff1a;ngrok | Unified Application Delivery Platform for Developers 简单的注册一下即可 我这里的操作系统是kali Linux 选择linux 启动服务最好使用静态域名 把需要穿透的端口号配置为本地的端口号即可 参考视频 没有服务器&#xff0c;就不能上线网站了&am…

HYBBS 表白墙网站PHP程序源码 可封装成APP

源码介绍 PHP表白墙网站源码&#xff0c;可以做校园内的&#xff0c;也可以做校区间的&#xff0c;可封装成APP。告别QQ空间的表白墙吧。 安装PHP5.6以上随意 上传程序安装&#xff0c;然后设置账号密码&#xff0c;登陆后台切换模板手机PC都要换开启插件访问前台。 安装完…

Unity 工厂方法模式(实例详解)

文章目录 在Unity中&#xff0c;工厂方法模式是一种创建对象的常用设计模式&#xff0c;它提供了一个接口用于创建对象&#xff0c;而具体的产品类是由子类决定的。这样可以将对象的创建过程与使用过程解耦&#xff0c;使得代码更加灵活和可扩展。 工厂模式的主要优点如下&…

深度学习笔记(九)——tf模型导出保存、模型加载、常用模型导出tflite、权重量化、模型部署

文中程序以Tensorflow-2.6.0为例 部分概念包含笔者个人理解&#xff0c;如有遗漏或错误&#xff0c;欢迎评论或私信指正。 本篇博客主要是工具性介绍&#xff0c;可能由于软件版本问题导致的部分内容无法使用。 首先介绍tflite: TensorFlow Lite 是一组工具&#xff0c;可帮助开…

[足式机器人]Part2 Dr. CAN学习笔记- 最优控制Optimal Control Ch07-1最优控制问题与性能指标

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记 - 最优控制Optimal Control Ch07-1最优控制问题与性能指标

Spring+SprinMVC+MyBatis注解方式简易模板

SpringSprinMVCMyBatis注解方式简易模板代码Demo GitHub访问 ssm-tpl-anno 一、数据准备 创建数据库test&#xff0c;执行下方SQL创建表ssm-tpl-cfg /*Navicat Premium Data TransferSource Server : 127.0.0.1Source Server Type : MySQLSource Server Version :…

【优化技术专题】「性能优化系列」针对Java对象压缩及序列化技术的探索之路

针对Java对象压缩及序列化技术的探索之路 序列化和反序列化为何需要有序列化呢&#xff1f;Java实现序列化的方式二进制格式 指定语言层级二进制格式 跨语言层级JSON 格式化类JSON格式化&#xff1a;XML文件格式化 序列化的分类在速度的对比上一般有如下规律&#xff1a;Java…

选择排序(二)——堆排序(性能)与直接选择排序

目录 一.前言 二.选择排序 2.1 堆排序 2.2选择排序 2.2.1 基本思想 2.2.2直接选择排序 三.结语 一.前言 本文给大家带来的是选择排序&#xff0c;其中选择排序中的堆排序在之前我们已经有过详解所以本次主要是对比排序性能&#xff0c;感兴趣的友友可移步观看堆排&#…

世微AP5179 60V高端电流采样降压恒流驱动器 LED车灯备用灯信号灯

产品描述 AP5179是一款连续电感电流导通模式的降压恒流源&#xff0c;用于驱动一颗或多颗串联LED输入电压范围从 5 V 到 60V&#xff0c;输出电流 可达 2.0A 。根据不同的输入电压和 外部器件&#xff0c; 可以驱动高达数十瓦的 LED。 内置功率开关&#xff0c;采用高端电流采样…

python实现带刷新的文本进度条

进度条已执行的部分使用“**”&#xff0c;未执行的部分使用“--”&#xff0c;用print&#xff08;&#xff09;来完成 使用到的函数&#xff1a; time.sleep(),作用是在程序执行过程中暂停一段时间&#xff0c;即会使程序暂停指定的秒数&#xff0c;然后再继续执行后面的代…

【Unity学习笔记】Unity TestRunner使用

转载请注明出处&#xff1a;&#x1f517;https://blog.csdn.net/weixin_44013533/article/details/135733479 作者&#xff1a;CSDN|Ringleader| 参考&#xff1a; Input testingGetting started with Unity Test FrameworkHowToRunUnityUnitTest如果对Unity的newInputSystem感…

HTML JavaScript 数字变化特效

效果 案例一&#xff1a;上下滚动 案例二&#xff1a;本身变化 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><met…