Kafka 命令详解及使用示例

文章目录

  • Kafka 命令详解及使用示例
  • Kafka 命令详解
    • `kafka-topics.sh`:主题管理
      • 创建主题
      • 创建带副本的主题
      • 修改主题分区数
      • 了解分区分布
      • 列出主题
      • 查看主题详情
      • 删除主题
    • `kafka-console-producer.sh`:消息生产者
      • 发送消息到主题
      • 带键值对的消息
      • 消息生产性能优化
      • 带分区键的消息发送
    • `kafka-console-consumer.sh`:消息消费者
      • 消费主题中的消息
      • 只读取键值对消息
      • 实时消费消息
      • 只消费特定分区的消息
      • 以 JSON 格式输出消息
    • `kafka-consumer-groups.sh`:消费者组管理
      • 查看消费者组信息
      • 查看消费者组的偏移量信息
      • 重置消费者组的偏移量
    • `kafka-configs.sh`:配置管理
      • 查看主题配置
      • 修改主题配置
    • `kafka-acls.sh`:访问控制列表管理
      • 为用户创建权限
      • 删除用户权限
  • 示例总结


Kafka 命令详解及使用示例

Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。


Kafka 命令详解

kafka-topics.sh:主题管理

主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题,包括创建、删除、列出主题等操作。

创建主题

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --topic:主题名称。
  • --partitions:分区数,消息将分布在多个分区中。
  • --replication-factor:副本因子,用于消息的高可用性。

创建带副本的主题

在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。

bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
  • --partitions 设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。
  • --replication-factor 设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。

注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。

修改主题分区数

Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。

bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092

此命令将 important-topic 的分区数从 5 扩展到 10 个。

了解分区分布

通过 --describe 命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。

bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092

输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。

列出主题

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看主题详情

bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

删除主题

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

kafka-console-producer.sh:消息生产者

Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。

发送消息到主题

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

输入消息后按 Enter 发送到 Kafka 主题。

带键值对的消息

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

在这里,消息的键和值通过冒号分隔,例如:

key1:value1
key2:value2

消息生产性能优化

在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。

bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
  • batch.size:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。
  • linger.ms:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。

此外,生产者可以配置为异步发送,这样可以减少网络等待时间:

--producer-property acks=1
  • acks=1 表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。

带分区键的消息发送

指定消息发送到特定的分区时,可以使用 key 参数,这在有状态的消息处理(如事务处理)场景中非常重要。

bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1key1:message2 会发送到相同的分区。


kafka-console-consumer.sh:消息消费者

消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。

消费主题中的消息

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
  • --from-beginning:从主题的起始位置读取所有消息。

只读取键值对消息

bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,

这样读取的消息会显示为键和值的格式,例如:

key1,value1

实时消费消息

使用 kafka-console-consumer.sh 来实时消费主题中的消息:

bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092

如果要从主题的起始位置读取消息,可以添加 --from-beginning 参数。

只消费特定分区的消息

Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:

bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10

此命令从分区 0 开始读取第 10 条消息。

以 JSON 格式输出消息

Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:

bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka-consumer-groups.sh:消费者组管理

Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。

查看消费者组信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费者组的偏移量信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

输出信息包括每个分区的已消费消息偏移量以及消费者的状态。

重置消费者组的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
  • --to-earliest:将偏移量重置为最早的消息。

kafka-configs.sh:配置管理

Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。

查看主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

修改主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000

此命令将主题 my-topic 的消息保留时间修改为 2 天(单位为毫秒)。


kafka-acls.sh:访问控制列表管理

Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh 用于管理权限。

为用户创建权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic

此命令允许用户 Alice 对主题 my-topic 执行所有操作。

删除用户权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic

示例总结

我们通过几个简单的示例介绍了 Kafka 的基本操作:

  1. 创建主题 my-topic,并通过控制台生产者发送消息。
  2. 使用控制台消费者从该主题中读取消息。
  3. 管理消费者组的偏移量,重置到最早的消息。
  4. 修改主题的保留时间,以及管理用户的权限。

Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/879435.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Knife4j:打造优雅的SpringBoot API文档

1. 为什么需要API文档? 在现代软件开发中,API文档的重要性不言而喻。一份清晰、准确、易于理解的API文档不仅能够提高开发效率,还能降低前后端沟通成本。今天,我们要介绍的Knife4j正是这样一款强大的API文档生成工具,它专为Spring Boot项目量身打造,让API文档的生成…

【Python基础】Python错误和异常处理(详细实例)

本文收录于 《Python编程入门》专栏,从零基础开始,分享一些Python编程基础知识,欢迎关注,谢谢! 文章目录 一、前言二、Python中的错误类型三、Python异常处理机制3.1 try-except语句3.2 try-except-else语句3.3 try-fi…

TiDB 扩容过程中 PD 生成调度的原理及常见问题丨TiDB 扩缩容指南(一)

导读 作为一个分布式数据库,扩缩容是 TiDB 集群最常见的运维操作之一。本系列文章,我们将基于 v7.5.0 具体介绍扩缩容操作的具体原理、相关配置及常见问题的排查。 通常,我们根据当前资源状态来决定是否需要调整 TiKV 节点的规模&#xff0…

探索螺钉设计:部分螺纹与全螺纹,哪种更适合你的项目?

为什么有些螺钉有部分螺纹? 螺钉由头部、柄部和尖端组成,是世界上zui常用的紧固件之一。与螺栓一样,它们旨在将多个对象或表面连接在一起。但是,在比较不同类型的螺钉时,您可能会注意到其中一些都具有部分螺纹杆。 什么是螺柄&a…

Python | Leetcode Python题解之第397题整数替换

题目: 题解: class Solution:def integerReplacement(self, n: int) -> int:ans 0while n ! 1:if n % 2 0:ans 1n // 2elif n % 4 1:ans 2n // 2else:if n 3:ans 2n 1else:ans 2n n // 2 1return ans

滚雪球学SpringCloud[2.3]:服务发现与负载均衡详解

全文目录: 前言1. Ribbon的使用与配置1.1 Ribbon 概述Ribbon 的核心功能: 1.2 Ribbon 的基本使用1.2.1 引入 Ribbon 依赖1.2.2 配置 RestTemplate 与 Ribbon1.2.3 示例:通过 Ribbon 调用服务 1.3 Ribbon 的配置选项 2. Ribbon的负载均衡策略2…

Python_两个jpg图片文件名称互换

项目场景 处理Adobe Photoshop导出的两个切片的顺序错误问题 小编在进行图片切片处理的时候,发现用PS导出的切片顺序错误,例如用PS导出的切片分别为test_01.jpg,test_02.jpg,但实际的使用需求是将两个图片的顺序调换&#xff0c…

self-play RL学习笔记

让AI用随机的路径尝试新的任务,如果效果超预期,那就更新神经网络的权重,使得AI记住多使用这个成功的事件,再开始下一次的尝试。——llya Sutskever 这两天炸裂朋友圈的OpenAI草莓大模型o1和此前代码能力大幅升级的Claude 3.5&…

three.js KeyframeTrack

KeyframeTrack 关键帧轨道(KeyframeTrack)是关键帧(keyframes)的定时序列, 它由时间和相关值的列表组成, 用来让一个对象的某个特定属性动起来。 在使用手册的“下一步”章节中,“动画系统”一文对three.js动画系统中的不同元素作出了概述 和JSON model format的…

基于less和scss 循环生成css

效果 一、less代码 复制代码 item-count: 12; // 生成多少个 .item 类.item-loop(n) when (n > 0) {.icon{n} {background: url(../../assets/images/menu/icon{n}.png) no-repeat;background-size: 100% 100%;}.item-loop(n - 1);}.item-loop(item-count);二、scss代码 f…

c++临时对象导致的生命周期问题

对象的生命周期是c中非常重要的概念,它直接决定了你的程序是否正确以及是否存在安全问题。 今天要说的临时变量导致的生命周期问题是非常常见的,很多时候没有一定经验甚至没法识别出来。光是我自己写、review、回答别人的问题就犯了或者看到了许许多多这…

TCL一面(HR)

1. 假设你是正在面试前端开发工程师的候选人,面试官让你详细讲一讲你作为队长参加支付宝小程序开发者大赛,你的作品是“甲骨文猜谜与探索”,请你讲一讲反思、收获和亮点。 在我作为队长参加支付宝小程序开发者大赛的过程中,我们的…

【人工智能】Transformers之Pipeline(十七):文本分类(text-classification)

目录 一、引言 二、文本分类(text-classification) 2.1 概述 2.2 DistilBERT—BERT 的精简版:更小、更快、更便宜、更轻便 2.3 应用场景​​​​​​​ 2.4 pipeline参数 2.4.1 pipeline对象实例化参数 2.4.2 pipeline对象使用参数 …

通过shell脚本一键修改Linux主机名和IP地址脚本

目录 1.前言 2.shell脚本的具体实现以及解析 1.1脚本功能概述 1.2脚本结构分析 3.致谢 1.前言 在复杂的 Linux 系统管理中,高效准确地进行配置调整是至关重要的任务。当面临需要同时修改主机名和 IP 地址的情况时,手动操作不仅繁琐易错&#xf…

【Hot100】LeetCode—287. 寻找重复数

目录 1- 思路题目识别快慢指针-类比链表判环 2- 实现⭐31. 下一个排列——题解思路 3- ACM 实现 原题链接:287. 寻找重复数 1- 思路 题目识别 识别1 :给定一个数组,寻找数组中的重复数。必须用 O(1) 的空间复杂度,且不能修改数组…

VMware Fusion Pro 13 Mac版虚拟机 安装Win11系统教程

Mac分享吧 文章目录 Win11安装完成,软件打开效果一、VMware安装Windows11虚拟机1️⃣:准备镜像2️⃣:创建虚拟机3️⃣:虚拟机设置4️⃣:安装虚拟机5️⃣:解决连不上网问题 安装完成!&#xff0…

fuxa搭建与使用(web组态)

1. 安装Node.js -> npm安装 参考网址:https://blog.csdn.net/WHF__/article/details/129362462 一、安装运行 C:\WINDOWS\system32>node -v v20.17.0 C:\WINDOWS\system32>npm -v 10.8.2 二、环境配置 在安装路径(D:\Program_Files\nodejs&a…

[数据集][目标检测]车油口挡板开关闭合检测数据集VOC+YOLO格式138张2类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):138 标注数量(xml文件个数):138 标注数量(txt文件个数):138 标注类别…

【2024.08】图模互补:知识图谱与大模型融合综述-笔记

阅读目的:假设已有一个知识图谱,如何利用图谱增强模型的问答,如何检索知识图谱、知识图谱与模型的文本如何相互交互、如何利用知识图谱增强模型回答的可解释性。 从综述中抽取感兴趣的论文进一步阅读。 来源:图模互补&#xff1…

TakePhotoX

Demo下载 TakePhotoXDemo Android 版本 APK 下载 - PGYER.COM 安装密码:123456 GitHub - yijiebuyi/TakePhotoX: 基于CameraX 实现拍照,二维码扫描,录像 功能 支持前后摄像头切换支持4:3 16:9 1:1 图片拍摄支持二维码扫描识别支持灯光控制…