10、神秘的“位移主题”

神秘的“位移主题”

  • 1、什么是位移主题
  • 2、位移主题的消息格式
  • 3、位移主题是怎么被创建的
  • 4、什么地方会用到位移主题
  • 5、位移主题的删除机制

本章主题是:Kafka 中的内部主题(Internal Topic)__consumer_offsets。

__consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。这里将统一使用位移主题来指代__consumer_offsets。需要注意的是,它有两个下划线哦。

1、什么是位移主题

Consumer 的位移管理机制其实就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

位移主题和普通的 Kafka 主题类似,我们可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,其实并不需要 “搭理” 它,也不用花心思去管理它,把它丢给 Kafka 就完事了。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息,千万不要自己写个 Producer 随意向该主题发送消息。

2、位移主题的消息格式

那这个主题的消息格式是怎么的呢?里面的消息格式,可以简单地理解为是一个 KV 对。
key 的内容
1、key 保存了 Group ID。因为必须要有字段来标识这个位移数据是哪个 Consumer 的
2、Key 还保存了 Consumer 要提交位移的分区。因为 Consumer 提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移
结论就是,位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号>
value 的内容
1、保存一个位移值
2、保存位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。
可以简单地认为消息体就是保存了位移值。

3、位移主题是怎么被创建的

当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。

其实位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。但如果是 Kafka 自动创建的,分区数是怎么设置的呢?

这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50,因此 Kafka 会自动创建一个 50 分区的位移主题。

存储在 Kafka 日志路径下如 __consumer_offsets-xxx 这样的目录,这就是 Kafka 自动创建的位移主题。

那除分区数外,副本数或备份因子是怎么控制的呢?答案就是 Broker 端另一个参数 offsets.topic.replication.factor。它的默认值是 3。

总结一下,如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。

4、什么地方会用到位移主题

创建位移主题当然是为了用的,那么什么地方会用到位移主题呢?

Kafka Consumer 提交位移时会写入该主题,那 Consumer 是怎么提交位移的呢?

目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。

Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。

如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

我们来举个极端一点的例子。假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 = 100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

5、位移主题的删除机制

Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。可以理解为压实,或干脆采用 JVM 垃圾回收中的术语:整理。

不管怎么翻译,Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。
在这里插入图片描述

图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225593.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[GESP样题 三级] 逛商场

题目名字 逛商场 题目链接 题意 给一定的零花钱&#xff0c;每个物品和剩余的零钱依次比较&#xff0c;只要能买就一定买&#xff0c;如果不能买就跳过&#xff0c;计算能买多少个物品&#xff1b; 思路 设置if循环来判断所剩零钱是否大于等于这个物品设置一个sum&#xff0c…

PHPRunner 10.91 Crack

PHPRunner是一款非常好用的网页制作工具&#xff0c;界面简洁美观&#xff0c;支持处理多个数据库连接并添加设计页面&#xff0c;页面中可以显示不同的不相关对象&#xff0c;如网格&#xff0c;单个记录&#xff0c;图表&#xff0c;报告等。PHPRunner支持多个操作系统&#…

【一起学Rust | 框架篇 | Tauri2.0框架】Tauri App开启远程调试功能

文章目录 前言一、搭建PageSpy环境二、接入SDK三、进行远程调试调试控制台网络抓包审查元素 四、延伸 前言 Tauri在Rust圈内成名已久&#xff0c;凭借Rust的可靠性&#xff0c;使用系统原生的Webview构建更小的App 以及开发人员可以灵活的使用各种前端框架而一战成名。 然而&…

批量识别名片并转换为Excel:提高工作效率的实用技巧

随着数字化的快速发展&#xff0c;很多传统的工作也开始向电子化转型。而名片管理就是其中之一。许多人会遇到与题目相似的问题&#xff1a;拥有大量名片&#xff0c;但却不方便携带和管理。 批量识别名片并将其转换为Excel格式是一个很好的想法&#xff0c;这不仅可以提高你的…

用Bat文件调用小牛翻译api快速翻译

为了帮助大家更加轻松地调用机器翻译api&#xff0c;本人探索实现了一种可以通过BAT文件来调用机器翻译api&#xff0c;对粘贴板中的文本进行翻译&#xff0c;并将翻译结果保存为txt文件。下面把实现步骤简要说明如下&#xff1a; 第一步&#xff1a;获取小牛机器翻译api 进入…

k8s-1.23版本安装

一、主机初始化 1、修改主机名 hostnamectl set-hostname master hostnamectl set-hostname node1 hostnamectl set-hostname node2 hostnamectl set-hostname node32、主机名解析 echo 192.168.1.200 master >> /etc/hosts echo 192.168.1.201 node1 >>…

用Gemini Pro 来做开发?API出来了

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

【中国海洋大学】操作系统随堂测试8整理

1. RAID系统使用多块磁盘改进性能或可靠性&#xff0c;其中构建RAID0至少需要&#xff08;&#xff09;个磁盘&#xff1b;RAID5阵列至少需要&#xff08;&#xff09;个磁盘。 答&#xff1a;2 3 2. 请描述一下磁盘存储空间管理方法&#xff1a;成组链接法的数据结构、盘块回…

上班摸鱼不被老板发现:设计模式--观察者模式

观察者模式 观察者模式&#xff0c;又叫做发布–订阅模式(Publish/Subscribe)模式 观察者模式定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生改变时&#xff0c;会通知所有的观察者对象&#xff0c;使他们能够自动更…

【网络安全】网络防护之旅 - 非对称密钥体制的解密挑战

&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《网络安全之道 | 数字征程》⏰墨香寄清辞&#xff1a;千里传信如电光&#xff0c;密码奥妙似仙方。 挑战黑暗剑拔弩张&#xff0c;网络战场誓守长。 目录 &#x1f608;1. 初识网络安…

vue-实现高德地图-省级行政区地块显示+悬浮显示+标签显示

<template><div><div id"container" /><div click"showFn">显示</div><div click"removeFn">移除</div></div> </template><script> import AMapLoader from amap/amap-jsapi-load…

Spring Boot业务代码中使用@Transactional事务失效总结

1、概述 我们知道 Spring 声明式事务功能提供了极其方便的事务配置方式&#xff0c;配合 Spring Boot 的自动配置&#xff0c;大多数 Spring Boot 项目只需要在方法上标记 Transactional注解&#xff0c;即可一键开启方法的事务性配置。当然后端开发人员对数据库事务这个概念并…

模型评估指标

1.回归模型 回归模型常常使用MSE均方误差&#xff0c;预测值与真实值之间的平均差距 2.分类模型 2.1 Accuracy正确率 分类正确的数目的占比 但在类别不平衡的情况下&#xff0c;模型可能倾向于预测占多数的类别&#xff0c;导致Acc高但对少数类别的预测效果其实比较差的。…

Vue脚手架 Vue CLI安装

目录 0.为什么要安装Vue CLI脚手架 1.配置方法 1.全局安装 (一次) 2.查看Vue版本&#xff08;一次&#xff09; 报错&#xff1a;出现禁止运行脚本 3.创建项目架子&#xff08;可多次&#xff09; 报错npm err! 问题&#xff1a;已知npm换过国内源&#xff0c;且进度条…

go-zero目录结构和说明

. ├── code-of-conduct.md 行为准则 ├── CONTRIBUTING.md 贡献指南 ├── core 框架的核心组件 │ ├── bloom 布隆过滤器&#xff0c;用于检测一个元素是否在一个集合中 │ ├── breaker 熔断器&am…

MicroPython相关教程

WebRepl MicroPython-ESP32之WebRepl-1Z实验室 - 简书 https://www.jianshu.com/p/c2ddd4fd05be ESP32上面webrepl的开启与连接 - 简书 https://www.jianshu.com/p/f4163eae4a05 Esp32安装micropython和配置webrepl记录备忘 - 哔哩哔哩 https://www.bilibili.com/read/cv121…

工资计算_分支结构 C语言xdoj63

问题描述 小明的公司每个月给小明发工资&#xff0c;而小明拿到的工资为交完个人所得税之后的工资。假设他一个月的税前工资为S元&#xff0c;则他应交的个人所得税按如下公式计算&#xff1a; 1&#xff09; 个人所得税起征点为3500元&#xff0c;若S不超过3500&#xff0c;则…

数据挖掘目标(客户价值分析)

import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as snsIn [2]: datapd.read_csv(r../教师文件/air_data.csv)In [3]: data.head()Out[3]: Start_timeEnd_timeFareCityAgeFlight_countAvg_discountFlight_mileage02011/08/182014/0…

android 13.0 app应用安装黑名单

1.概述 在13.0系统rom定制化开发中,客户需求要实现应用安装黑名单功能,在白名单之中的应用可以安装,其他的app不准安装,实现一个 控制app安装的功能,这需要从app安装流程入手就可以实现功能 PMS就是负责管理app安装的,功能就添加在这里就可以了,接下来看具体实现这个功能…

uniapp播放 m3u8格式视频 兼容pc和移动端

支持全自动播放、设置参数 自己摸索出来的,花了一天时间,给点订阅支持下,订阅后,不懂的地方可以私聊我。 代码实现 代码实现 1.安装dplayer组件 npm i dplayer2. static/index.html下引入 hls 引入hls.min.js 可以存放在static项目hls下面<script src="/static…