Kafka topic消息清理几种方式

Kafka清理Topic消息

参考链接:https://cloud.tencent.com/developer/article/1590094

快速配置删除法

  1. kafka启动之前,在server.properties配置delete.topic.enable=true

  2. 执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manage集群管理工具删除。如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,并且在Zookeeper中的/admin/delete_topics下创建对应的子节点,加上配置,重启kafka,之前的topic就真正删除了

  3. 优点由Kafka来完成Topic的相关删除,只需要修改server.properties配置文件的delete.topic.enable为true就可以了

  4. 缺点:需要重启Kafka来完成配置文件的生效

# 默认是false,注意等号前后一定不能有空格,否则配置会不生效
delete.topic.enable=true# Bitnami Chart环境变量设置(涉及重启了)
KAFKA_CFG_DELETE_TOPIC_ENABLE=true# 创建新的Topic logstash_test(拥有3个副本)
kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test --partitions 1 --replication-factor 3# 查看Topic logstash_test的状态,发现Leader是1(broker.id=0),有三个备份分别是0,1,2
I have no name!@ape-kafka-0:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
Topic: logstash_test    TopicId: 1j9d-WGVTzKTpGdTtO0YFQ PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824Topic: logstash_test    Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1# 查看Zookeeper上的Topic
$ zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]
[zk: localhost:2181(CONNECTED) 1] ls /config/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]# 查看Kafka的server.properties配置文件中log.dirs 的目录
I have no name!@ape-kafka-0:/$ ls /bitnami/kafka/data/logstash_test-0/
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint  partition.metadata# 删除Topic logstash_test
I have no name!@ape-kafka-0:/$ kafka-topics.sh --delete --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test# 再次查看Topic logstash_test的状态,说明Topic已经被删除了
I have no name!@ape-kafka-0:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
Error while executing topic command : Topic 'logstash_test' does not exist as expected
[2024-06-26 03:13:45,323] ERROR java.lang.IllegalArgumentException: Topic 'logstash_test' does not exist as expectedat kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:399)at kafka.admin.TopicCommand$TopicService.describeTopic(TopicCommand.scala:311)at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$)# 再次查看Zookeeper上的Topic,logstash_test也已经被删除了
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog]
[zk: localhost:2181(CONNECTED) 3] ls /config/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog]# 再次查看/log.dirs 目录,logstash_test相关日志也被删除了
I have no name!@ape-kafka-0:/$ ls /bitnami/kafka/data/logstash_test*
ls: cannot access '/bitnami/kafka/data/logstash_test*': No such file or directory

手动删除数据

  1. 优点:不需要重启Kafka服务,直接删除Topic对应的系统日志,然后在Zookeeper中删除对应的目录
  2. 缺点:需要人为手动删除,删除之后重新创建同名的Topic会有问题(使用方式一不会有此问题)
  3. 不对推荐使用这个方法:简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over
# 创建新的Topic logstash_test(拥有3个副本)
I have no name!@ape-kafka-0:/$ kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --replication-factor 3 --partitions 1 --topic logstash_test
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic logstash_test.# 查看Topic logstash_test的状态,发现Leader是1(broker.id=1),有三个备份分别是0,1,2
I have no name!@ape-kafka-0:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
Topic: logstash_test    TopicId: S7bPYklqRXy6GB8Qwq67_A PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824Topic: logstash_test    Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2# 查看Zookeeper上的Topic
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]
[zk: localhost:2181(CONNECTED) 1] ls /config/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]# 查看Kafka的server.properties配置文件中log.dirs的目录
I have no name!@ape-kafka-0:/$ ls /bitnami/kafka/data/logstash_test-0/
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint  partition.metadata# 删除Zookeeper上的Topic
[zk: localhost:2181(CONNECTED) 5] deleteall /brokers/topics/logstash_test
[zk: localhost:2181(CONNECTED) 6] deleteall /config/topics/logstash_test# 删除Topic logstash_test的log文件(这里Kafka集群的所有节点都要删除)
rm -rf /bitnami/kafka/data/logstash_test*# 查询还有哪些topic
I have no name!@ape-kafka-0:/$ kafka-topics.sh --list --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092
__consumer_offsets
frontend_invoke_queue
frontend_invoke_result_log
lake_add_namelist
lake_entrylog
logstash_test# 再次查看Topic logstash_test的状态,可以发现topic还是存在的,这个时候需要手动删除一下topic(数据已清理)
I have no name!@ape-kafka-1:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
Topic: logstash_test    TopicId: S7bPYklqRXy6GB8Qwq67_A PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824Topic: logstash_test    Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2# 删除之后创建同名的Topic会有问题
I have no name!@ape-kafka-1:/$ kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --replication-factor 3 --partitions 1 --topic logstash_test
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Error while executing topic command : Topic 'logstash_test' already exists.
[2024-06-26 03:38:34,038] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'logstash_test' already exists.(kafka.admin.TopicCommand$)# 删除topic,删除失败(重启kafka后恢复)
I have no name!@ape-kafka-1:/$ kafka-topics.sh --delete --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
Error while executing topic command : This server does not host this topic-partition.
[2024-06-26 03:40:30,871] ERROR org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.(kafka.admin.TopicCommand$)

设置删除策略

  1. 简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over
  2. 相关参数如下,kafka启动之前,在server.properties配置
#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete# 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发Log删除的操作。删除操作总是先删除最旧的日志
# 消息在Kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。
log.retention.hours=4# 当剩余空间低于log.retention.bytes字节,则开始删除1og
log.retention.bytes=37580963840# 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除
log.retention.check.interval.ms=1000

offset删除数据

# 生成数据
# 1. 创建一个新的topic test, 3个分区,1个副本
I have no name!@ape-kafka-0:/$ kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --partitions 3 --replication-factor 1
Created topic test.# 2. 生成随机消息100条kafka-verifiable-producer.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --max-messages 100# 3. 查看topic消息有多少
I have no name!@ape-kafka-0:/$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --time -1
test:0:0
test:1:100
test:2:0# 4. 将配置文件编辑如下,将会将partition 重0删除到49,50并不会删除
cat <<EOF> offset.json
{"partitions":[{"topic":"test", "partition":1, "offset": 50}], "version":1}
EOF# 5. 执行删除kafka-delete-records.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --offset-json-file offset.json# 6. 取出消息,看是否符合预期,实际测试0-49被删除了kafka-console-consumer.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --from-beginning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/861057.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

床上用品跨境电商:拥有沃尔玛1P特权的商家享有哪些显著优势?

在全球化的背景下&#xff0c;跨境电商对床上用品行业至关重要。沃尔玛因其品牌影响力、客户资源及物流体系在跨境电商平台中脱颖而出。拥有沃尔玛1P特权的商家享有更多曝光机会和独家优惠。那么&#xff0c;这些特权商家具体有哪些优势呢? 沃尔玛1P特权商家在曝光率上具有显著…

Ubuntu 20.04安装中文输入法出错:gnome-user-docs-zh-hans安装失败

问题&#xff1a;Ubuntu20.04安装中文输入法出错&#xff1a;gnome-user-docs-zh-hans安装失败 现象&#xff1a; 打开language Support页面的时候&#xff0c;提示install依赖的文件 这个过程中会弹窗提示: The following packages have unmet dependencies:gnome-user-doc…

怎么从零到一建立一个海外仓:分步指南,7个关键步骤归纳

无论你是想从零到一建立一个海外仓&#xff0c;还是想升级现有的海外仓&#xff0c;下面的7个步骤对你都会有一些参考价值。从海外仓选址到安装必要系统&#xff0c;再到人员配置&#xff0c;在创建海外仓的时候这些细节都非常重要。 1、确定海外仓所需的空间容量 确定海外仓…

微信小程序版threejs的使用

首先是使用环境:我是使用的uniapp制作的微信小程序,当然原生的也是可以的,但是测试过很多,发现微信官方的threejs移植版本只能够导入gltf格式的模型,无法导入obj,这就有些尴尬了,为此我找了很多版本的threejs,首先是threejs-miniprogram,也就是官方的,可以直接在unia…

网络安全自学入门:(超详细)从入门到精通学习路线规划,学完即可就业

很多人上来就说想学习黑客&#xff0c;但是连方向都没搞清楚就开始学习&#xff0c;最终也只是会无疾而终&#xff01;黑客是一个大的概念&#xff0c;里面包含了许多方向&#xff0c;不同的方向需要学习的内容也不一样。 算上从学校开始学习&#xff0c;已经在网安这条路上走…

多商户万能DIY商城小程序源码系统 支持自营+独立部署 带完整的安装代码包以及搭建教程

系统概述 多商户万能 DIY 商城小程序源码系统是一个综合性的电商平台解决方案&#xff0c;旨在满足不同用户的多样化需求。它不仅支持自营模式&#xff0c;还为多商户入驻提供了广阔的空间&#xff0c;使平台能够汇聚各类商品和商家&#xff0c;形成一个丰富多样的商业生态。 …

基于YOLO的目标检测系统(PyQT页面+模型+数据集)

亲爱的读者&#xff0c;欢迎来到我们的 YOLO 检测交互式应用系统专栏。在这里&#xff0c;我们为您准备了几个高质量的、基于 YOLO&#xff08;You Only Look Once&#xff09;算法的交互式应用系统&#xff0c;每一个系统都包含了直观易用的 PYQT 页面、经过精心训练的模型和相…

神经网络——数据预处理

基于方差缩放的参数初始化 方差缩放方法能够根据神经元的链接数量来自适应地调整初始化分布地方差&#xff0c;尽可能的保证每个神经元的输入和输出方差一致。 那么&#xff0c;为什么要保证前后方差的一致性呢&#xff1f; 这是因为如果输入空间和输出空间的方差差别较大&a…

功率回路布线

目录 一、布线布局和散热问题 二、设计PCB线宽、过孔与电压、电流关系 一、布线布局和散热问题 功率电路通常包括控制电路、驱动电路和功率输出三部分。其中功率输出部分通常采用开光工作方式&#xff0c;这种工作方式会发生大电压和大电流的突变&#xff0c;其可通过电源和信…

优化Java中XML和JSON序列化

优化Java中XML和JSON序列化 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在Java应用程序中&#xff0c;对于XML和JSON的序列化操作是非常常见的需求。本文将…

GroundingDINO1.5突破开放式物体检测界限:介绍与应用

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

【面试题】Spring面试题

目录 Spring Framework 中有多少个模块&#xff0c;它们分别是什么&#xff1f;Spring框架的设计目标、设计理念&#xff1f;核心是什么&#xff1f;Spring框架中都用到了哪些设计模式&#xff1f;Spring的核心机制是什么&#xff1f;什么是Spring IOC容器&#xff1f;什么是依…

【杂记-浅谈堆叠技术】

堆叠技术 一、堆叠技术概述二、堆叠技术的优势三、堆叠技术的实现四、堆叠技术的应用场景五、堆叠技术与M-LAG的对比 一、堆叠技术概述 堆叠技术是一种网络设备虚拟化技术&#xff0c;它允许将多台物理交换机通过特定的堆叠线缆连接在一起&#xff0c;在逻辑上构成一个单一的交…

ArcGIS数据处理与制图

在数字化和智能化的浪潮中&#xff0c;GIS&#xff08;地理信息系统&#xff09;和GPT&#xff08;生成式预训练模型&#xff09;的结合正日益成为推动科研、城市规划、环境监测等领域发展的关键技术。GIS以其强大的空间数据处理、先进的空间分析工具、灵活的地图制作与可视化能…

图像亮度和对比度的调整

在网上找了很多图像亮度的调整算法&#xff0c;下面是其中一种&#xff0c;可以通过条形框进行调整&#xff0c;并实时的查看对应参数值后的效果。 图像亮度处理公式: y [x - 127.5 * (1 - B)] * k 127.5 * (1 B); x 是输入像素值 y 是输出像素值 B 是亮度值&#xff0c; …

使用vue + canvas绘制仪表盘

使用vue canvas绘制仪表盘 效果图&#xff1a; 父容器 <template><div class"panelBoard-page"><h1>panelBoard</h1><Demo1 :rate"rate" /></div> </template> <script setup> import { ref } from …

HTTP详解:TCP三次握手和四次挥手

一、TCP协议概述 TCP协议是互联网协议栈中传输层的核心协议之一&#xff0c;它提供了一种可靠的数据传输方式&#xff0c;确保数据包按顺序到达&#xff0c;并且没有丢失或重复。TCP的主要特点包括&#xff1a; 面向连接&#xff1a;TCP在传输数据之前需要建立连接。可靠传输&…

友思特分享 | 完美聚光:用于光刻曝光的UV-LED光引擎

导读 LED替代汞灯在紫外光源中的使用已成为大势所趋。友思特先进的 UV-LED-EXP 系统可作为OEM集成、汞灯光刻设备改造或直接定制光路设计和曝光设备&#xff0c;为紫外光源的半导体光刻曝光过程提供近乎完美的光照质量。 汞弧灯与UV LED 汞弧灯是高强度气体放电灯。简单地解释…

一起学Transformer(1) - Transformer 基础概念

文章目录 一、 Hugging Face 简介1. 公司背景和发展历程2. Transformers 库的功能和应用场景1&#xff09;功能2&#xff09; 应用场景 3. Transformer 模型概述 二、 Transformer 结构简介1. 常见的 Transformer 模型1&#xff09; BERT (Bidirectional Encoder Representatio…

网上购物商城

摘 要 本论文基于Java语言设计与实现了一个网上购物商城系统。首先&#xff0c;通过对国内外网上购物商城的发展现状进行分析&#xff0c;确定了本系统的研究目的与意义。然后&#xff0c;进行了系统需求分析&#xff0c;包括可行性分析和业务需求描述&#xff0c;以及软硬件需…