【大数据学习 | kafka】kafka的数据存储结构

以上是kafka的数据的存储方式。

这些数据可以在服务器集群上对应的文件夹中查看到。

[hexuan@hadoop106 __consumer_offsets-0]$ ll
总用量 8
-rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index
-rw-rw-r--. 1 hexuan hexuan        0 10月 28 22:21 00000000000000000000.log
-rw-rw-r--. 1 hexuan hexuan 10485756 10月 28 22:21 00000000000000000000.timeindex
-rw-rw-r--. 1 hexuan hexuan        8 10月 28 22:21 leader-epoch-checkpoint
-rw-rw-r--. 1 hexuan hexuan       43 10月 28 22:21 partition.metadata

每个文件夹以topic+partition进行命名,更加便于管理和查询检索,因为kafka的数据都是按照条进行处理和流动的一般都是给流式应用做数据供给和缓冲,所以检索速度必须要快,分块管理是最好的方式。

消费者在检索相应数据的时候会非常的简单。

consumer检索数据的过程。

首先文件的存储是分段的,那么文件的名称代表的就是这个文件中存储的数据范围和条数。

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
代表存储的数据是从0条开始的

00000000000000100000.index
00000000000000100000.log
00000000000000100000.timeindex
代表存储的数据是从100000条开始的

所以首先检索数据的时候就可以跳过1G为大小的块,比如检索888这条数据的,就可以直接去00000000000000000000.log中查询数据

那么查询数据还是需要在1G大小的内容中找寻是比较麻烦的,这个时候可以从index索引出发去检索,首先我们可以通过kafka提供的工具类去查看log和index中的内容

# 首先创建一个topic_bkafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_b --partitions 5 --replication-factor 2
# 然后通过代码随机向不同的分区中分发不同的数据1W条
package com.hainiu.kafka.consumer;/*** ClassName : test1* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 22:45* Version 1.0*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test1 {public static void main(String[] args) throws InterruptedException {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);pro.put(ProducerConfig.RETRIES_CONFIG, 3);pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);for (int i = 0; i < 10000; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", ""+i,"this is hainiu");producer.send(record);}producer.close();}
}

然后去查看log和index中的内容

# kafka查看日志和索引的命令
kafka-run-class.sh kafka.tools.DumpLogSegments --files xxx

查看日志.log

[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log 
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
baseOffset: 606 lastOffset: 1205 count: 600 baseSequence: 606 lastSequence: 1205 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 5149 CreateTime: 1730645208577 size: 4929 magic: 2 compresscodec: snappy crc: 1974998903 isvalid: true
baseOffset: 1206 lastOffset: 1439 count: 234 baseSequence: 1206 lastSequence: 1439 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 10078 CreateTime: 1730645208584 size: 2085 magic: 2 compresscodec: snappy crc: 1665550202 isvalid: true

查看索引.index

内容即:

index索引

offset 第几条position 物理偏移量位置,也就是第几个字
11875275
176710140
202215097

log日志

# 打印日志内容的命令 --print-data-log 打印数据
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
| offset: 0 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 0 headerKeys: [] key: 14 payload: this is hainiu
| offset: 1 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 1 headerKeys: [] key: 19 payload: this is hainiu
| offset: 2 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 2 headerKeys: [] key: 24 payload: this is hainiu
| offset: 3 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 3 headerKeys: [] key: 26 payload: this is hainiu

可以看到刷写的日志

baseOffset: 0 lastOffset: 605 count: 606

从0 到605 条一次性刷写606条

lastSequence: 605 producerId

刷写事务日志编号,生产者的编号

通过名称跳过1G的端,然后找到相应的index的偏移量,然后根据偏移量定位log位置,不断向下找寻数据。

大家可以看到index中的索引数据是轻量稀疏的,这个数据是按照4KB为大小生成的,一旦刷写4KB大小的数据就会写出相应的文件索引。

官网给出的默认值4KB

一个数据段大小是1G

timeIndex

我们看到在数据中还包含一个timeindex的时间索引

# 查询时间索引
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex 
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex 
Dumping 00000000000000000000.timeindex
timestamp: 1730645208577 offset: 1205
timestamp: 1730645208584 offset: 1439

可以看到和index索引一样,这个也是4Kb写出一部分数据,但是写出的是时间,我们可以根据时间进行断点找寻数据,指定时间重复计算

也就是说,写到磁盘的数据是按照1G分为一个整体部分的,但是这个整体部分需要4KB写一次,并且一次会生成一个索引问题信息,在检索的时候可以通过稀疏索引进行数据的检索,效率更快。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58990.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

做一个干电池的电量检测器03:数值拟合与电路仿真

首先在表格中进行详细的计算&#xff0c;整理出所需的数据。接着&#xff0c;我们运用MATLAB的强大功能对这些数据进行插值处理&#xff0c;生成了一个离散的数值数组。这个数组的每个数值都精确地对应着模数转换器&#xff08;ADC&#xff09;采样到的信号。通过这些数值&…

1、Java概述、HelloWorld案例

文章目录 今日内容介绍1.1 Java语言发展史和平台概述1.1.1 Java语言发展史1.1.2 Java语言版本1.1.3 Java平台概述1.2 JVM, JRE, JDK概述1.2.1 什么是跨平台?1.2.2 JVM, JRE, JDK说明1.3 常用DOS命令1.3.1 打开控制台1.3.2 常用命令1.4 下载安装JDK1.5 HelloWorld案例1.5.1 执行…

MFC工控项目实例二十八模拟量信号每秒采集100次

用两个多媒体定时器&#xff0c;一个定时0.1秒计时&#xff0c;另一个定时0.01秒用来对模拟量信号采集每秒100次。 1、在SEAL_PRESSUREDlg.h中添加代码 class CSEAL_PRESSUREDlg : public CDialo { public:CSEAL_PRESSUREDlg(CWnd* pParent NULL); // standard constructor&a…

【Mac】Screen Recorder by Omi Mac:Omi录屏专家

大家好&#xff0c;今天给大家介绍的这款软件叫Screen Recorder by Omi Mac&#xff1a;Omi录屏专家。 软件介绍 OmniRecorder for Mac 是一款用于录制屏幕的应用程序&#xff0c;专为 macOS 设计。它允许用户录制整个屏幕或特定区域&#xff0c;支持音频录制和实时编辑。这个…

多波束T50P和SES2000 Medium100安装记录(2024年10月)

SES2000 Medium100买了一直没有机会用&#xff0c;本次外业刚好需要。SES2000最大穿透70m。 Medium100安装与SES2000 Standard基本相同。除了钢管和法兰不同以外&#xff0c;它们安装支架都可以通用。有条件的话&#xff0c;用焊接方式将其固定在船侧舷&#xff0c;前方加一道拉…

Nginx安装配置详解

Nginx Nginx官网 Tengine翻译的Nginx中文文档 轻量级的Web服务器&#xff0c;主要有反向代理、负载均衡的功能。 能够支撑5万的并发量&#xff0c;运行时内存和CPU占用低&#xff0c;配置简单&#xff0c;运行稳定。 写在前 uWSGI与Nginx的关系 1. 安装 Windows 官网 Stabl…

保研考研机试攻略:python笔记(2)

&#x1f428;&#x1f428;&#x1f428;宝子们好呀&#xff0c;今天我们继续来学习N诺提供的python笔记&#xff0c;fighting&#xff01;( •̀ ω •́ )✧ 对这个系列感兴趣的宝子欢迎关注保研考研机试攻略专栏哦 ~ 目录 &#x1f428;&#x1f428;&#x1f428;4进制转…

qt QSplitter详解

1、概述 QSplitter是Qt框架中的一个布局管理器类&#xff0c;它允许用户在应用程序窗口中创建可拖动的分隔器&#xff0c;以便动态地调整多个子窗口或控件的大小。QSplitter非常适合用于分割、重新排列和管理用户界面中的多个区域&#xff0c;提供了一种直观且灵活的方式来控制…

Spring Boot观察者模式实战

观察者模式简介 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为设计模式&#xff0c;它定义了对象间的一种一对多的依赖关系&#xff0c;当一个对象状态发生改变时&#xff0c;所有依赖于它的对象都会得到通知并自动更新。这种模式也被称为发布-订阅模式、模型…

Java中消息队列——ActiveMQ、RabbitMQ、RocketMQ、Kafka

1.什么是消息中间件 消息中间件是一种专门的工具&#xff0c;帮助不同的应用程序通过发送和接收消息来进行交流。想象一下&#xff0c;一个公司里有多个部门&#xff08;如销售、财务和物流&#xff09;&#xff0c;它们需要共享信息。消息中间件就像一个信使&#xff0c;负责将…

mysql left join group_concat 主表丢失数据

问题出现的场景&#xff1a; 有一个主表 a&#xff0c;一个子表 b a表有两条数据&#xff0c;a表第一条数据在b表中有一条子数据&#xff0c;a表第二条数据在b表中有两条子数据。 现在想要查询出来a表的所有数据和a表的子表b的id&#xff0c;b的id 使用GROUP_CONCAT拼接 有…

深度学习在复杂系统中的应用

引言 复杂系统由多个相互作用的组成部分构成&#xff0c;这些部分之间的关系往往是非线性的&#xff0c;整体行为难以通过简单的线性组合来预测。这类系统广泛存在于生态学、气象学、经济学和社会科学等多个领域&#xff0c;具有动态演变、自组织、涌现现象以及多尺度与异质性…

Python爬虫的京东大冒险:如何高效获取商品详情的秘籍

在这个由代码编织的电商世界里&#xff0c;京东商品详情就像是被锁在高塔中的公主&#xff0c;等待着勇敢的Python爬虫骑士去解救。今天&#xff0c;我们要讲述的是如何成为一名Python爬虫骑士&#xff0c;携带你的代码长矛&#xff0c;穿梭在API的数据森林中&#xff0c;高效获…

【RESTful】RESTful API:最佳实践指南

目录 引言一、URL 设计1.1 动词 宾语1.2 动词的覆盖1.3 宾语必须是名词1.4 复数 URL1.5 避免多级 URL 二、状态码2.1 状态码必须精确2.2 2xx 状态码2.3 3xx 状态码2.4 4xx 状态码2.5 5xx 状态码 三、其他最佳实践3.1 版本管理3.2 使用合适的媒体类型3.3 详细的错误响应3.4 安全…

一些关于云电脑与虚拟化东西

前言 好久没有更新了&#xff0c;在进行自我校准。 云计算是什么&#xff1f; 云计算是一种模型&#xff0c;它使得用户能够随时随地、方便地、按需访问共享的可配置计算资源池&#xff08;例如&#xff0c;网络、服务器、存储、应用程序和服务&#xff09;&#xff0c;这些资…

服务器数据恢复—RAID5阵列中部分成员盘重组RAID5阵列后如何恢复原raid5阵列数据?

服务器数据恢复环境&#xff1a; 一台服务器挂接一台存储&#xff0c;该存储中有一组由5块硬盘组建的RAID5阵列。 服务器故障&#xff1a; 存储raid5阵列中有一块硬盘掉线。由于RAID5的特性&#xff0c;阵列并没有出现问题。工作一段时间后&#xff0c;服务器出现故障&#xff…

new/delete和malloc()/free()的区别及其使用

C系列----new/delete和malloc()/free()的区别 这篇文章我将深读刨析一下这二者的区别及其在使用过程中应该注意的事项 文章目录 C系列----new/delete和malloc()/free()的区别前言一、new/delete和malloc/free在操作自定义类型时的区别1.1、在属性和使用上的区别1.2、返回类型的…

我主编的电子技术实验手册(22)——RC并联电路

本专栏是笔者主编教材&#xff08;图0所示&#xff09;的电子版&#xff0c;依托简易的元器件和仪表安排了30多个实验&#xff0c;主要面向经费不太充足的中高职院校。每个实验都安排了必不可少的【预习知识】&#xff0c;精心设计的【实验步骤】&#xff0c;全面丰富的【思考习…

word mathml 创建粗体字母快捷键

在 mathml 中达到latex中 \mathbf{A} 的效果 由于word本身不支持这个命令&#xff0c;所以打算用快捷键实现 快捷键的功能是加粗光标前一个字目 1. Alt F8 打开宏&#xff0c;如果打不开可以尝试 Alt Fn F8 2. 输入 BoldPreviousCharacter 新建宏&#xff1a; Sub Bold…

开源 AI 智能名片 2 + 1 链动模式 S2B2C 商城小程序中积分使用价值的拓展策略

摘要&#xff1a;本文围绕开源 AI 智能名片 2 1 链动模式 S2B2C 商城小程序&#xff0c;深入探讨其积分使用价值的丰富策略。详细分析积分兑换礼品、会员升级、积分抵现等方式在该特定商城小程序环境下的应用特点、存在问题及对用户和商城的影响&#xff0c;旨在为商城的优化运…