Kafka 命令详解及使用示例

文章目录

  • Kafka 命令详解及使用示例
  • Kafka 命令详解
    • `kafka-topics.sh`:主题管理
      • 创建主题
      • 创建带副本的主题
      • 修改主题分区数
      • 了解分区分布
      • 列出主题
      • 查看主题详情
      • 删除主题
    • `kafka-console-producer.sh`:消息生产者
      • 发送消息到主题
      • 带键值对的消息
      • 消息生产性能优化
      • 带分区键的消息发送
    • `kafka-console-consumer.sh`:消息消费者
      • 消费主题中的消息
      • 只读取键值对消息
      • 实时消费消息
      • 只消费特定分区的消息
      • 以 JSON 格式输出消息
    • `kafka-consumer-groups.sh`:消费者组管理
      • 查看消费者组信息
      • 查看消费者组的偏移量信息
      • 重置消费者组的偏移量
    • `kafka-configs.sh`:配置管理
      • 查看主题配置
      • 修改主题配置
    • `kafka-acls.sh`:访问控制列表管理
      • 为用户创建权限
      • 删除用户权限
  • 示例总结


Kafka 命令详解及使用示例

Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。


Kafka 命令详解

kafka-topics.sh:主题管理

主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题,包括创建、删除、列出主题等操作。

创建主题

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --topic:主题名称。
  • --partitions:分区数,消息将分布在多个分区中。
  • --replication-factor:副本因子,用于消息的高可用性。

创建带副本的主题

在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。

bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
  • --partitions 设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。
  • --replication-factor 设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。

注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。

修改主题分区数

Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。

bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092

此命令将 important-topic 的分区数从 5 扩展到 10 个。

了解分区分布

通过 --describe 命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。

bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092

输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。

列出主题

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看主题详情

bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

删除主题

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

kafka-console-producer.sh:消息生产者

Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。

发送消息到主题

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

输入消息后按 Enter 发送到 Kafka 主题。

带键值对的消息

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

在这里,消息的键和值通过冒号分隔,例如:

key1:value1
key2:value2

消息生产性能优化

在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。

bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
  • batch.size:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。
  • linger.ms:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。

此外,生产者可以配置为异步发送,这样可以减少网络等待时间:

--producer-property acks=1
  • acks=1 表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。

带分区键的消息发送

指定消息发送到特定的分区时,可以使用 key 参数,这在有状态的消息处理(如事务处理)场景中非常重要。

bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1key1:message2 会发送到相同的分区。


kafka-console-consumer.sh:消息消费者

消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。

消费主题中的消息

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
  • --from-beginning:从主题的起始位置读取所有消息。

只读取键值对消息

bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,

这样读取的消息会显示为键和值的格式,例如:

key1,value1

实时消费消息

使用 kafka-console-consumer.sh 来实时消费主题中的消息:

bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092

如果要从主题的起始位置读取消息,可以添加 --from-beginning 参数。

只消费特定分区的消息

Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:

bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10

此命令从分区 0 开始读取第 10 条消息。

以 JSON 格式输出消息

Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:

bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka-consumer-groups.sh:消费者组管理

Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。

查看消费者组信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费者组的偏移量信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

输出信息包括每个分区的已消费消息偏移量以及消费者的状态。

重置消费者组的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
  • --to-earliest:将偏移量重置为最早的消息。

kafka-configs.sh:配置管理

Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。

查看主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

修改主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000

此命令将主题 my-topic 的消息保留时间修改为 2 天(单位为毫秒)。


kafka-acls.sh:访问控制列表管理

Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh 用于管理权限。

为用户创建权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic

此命令允许用户 Alice 对主题 my-topic 执行所有操作。

删除用户权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic

示例总结

我们通过几个简单的示例介绍了 Kafka 的基本操作:

  1. 创建主题 my-topic,并通过控制台生产者发送消息。
  2. 使用控制台消费者从该主题中读取消息。
  3. 管理消费者组的偏移量,重置到最早的消息。
  4. 修改主题的保留时间,以及管理用户的权限。

Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/879435.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python基础】Python错误和异常处理(详细实例)

本文收录于 《Python编程入门》专栏,从零基础开始,分享一些Python编程基础知识,欢迎关注,谢谢! 文章目录 一、前言二、Python中的错误类型三、Python异常处理机制3.1 try-except语句3.2 try-except-else语句3.3 try-fi…

TiDB 扩容过程中 PD 生成调度的原理及常见问题丨TiDB 扩缩容指南(一)

导读 作为一个分布式数据库,扩缩容是 TiDB 集群最常见的运维操作之一。本系列文章,我们将基于 v7.5.0 具体介绍扩缩容操作的具体原理、相关配置及常见问题的排查。 通常,我们根据当前资源状态来决定是否需要调整 TiKV 节点的规模&#xff0…

探索螺钉设计:部分螺纹与全螺纹,哪种更适合你的项目?

为什么有些螺钉有部分螺纹? 螺钉由头部、柄部和尖端组成,是世界上zui常用的紧固件之一。与螺栓一样,它们旨在将多个对象或表面连接在一起。但是,在比较不同类型的螺钉时,您可能会注意到其中一些都具有部分螺纹杆。 什么是螺柄&a…

Python | Leetcode Python题解之第397题整数替换

题目: 题解: class Solution:def integerReplacement(self, n: int) -> int:ans 0while n ! 1:if n % 2 0:ans 1n // 2elif n % 4 1:ans 2n // 2else:if n 3:ans 2n 1else:ans 2n n // 2 1return ans

Python_两个jpg图片文件名称互换

项目场景 处理Adobe Photoshop导出的两个切片的顺序错误问题 小编在进行图片切片处理的时候,发现用PS导出的切片顺序错误,例如用PS导出的切片分别为test_01.jpg,test_02.jpg,但实际的使用需求是将两个图片的顺序调换&#xff0c…

self-play RL学习笔记

让AI用随机的路径尝试新的任务,如果效果超预期,那就更新神经网络的权重,使得AI记住多使用这个成功的事件,再开始下一次的尝试。——llya Sutskever 这两天炸裂朋友圈的OpenAI草莓大模型o1和此前代码能力大幅升级的Claude 3.5&…

基于less和scss 循环生成css

效果 一、less代码 复制代码 item-count: 12; // 生成多少个 .item 类.item-loop(n) when (n > 0) {.icon{n} {background: url(../../assets/images/menu/icon{n}.png) no-repeat;background-size: 100% 100%;}.item-loop(n - 1);}.item-loop(item-count);二、scss代码 f…

【人工智能】Transformers之Pipeline(十七):文本分类(text-classification)

目录 一、引言 二、文本分类(text-classification) 2.1 概述 2.2 DistilBERT—BERT 的精简版:更小、更快、更便宜、更轻便 2.3 应用场景​​​​​​​ 2.4 pipeline参数 2.4.1 pipeline对象实例化参数 2.4.2 pipeline对象使用参数 …

【Hot100】LeetCode—287. 寻找重复数

目录 1- 思路题目识别快慢指针-类比链表判环 2- 实现⭐31. 下一个排列——题解思路 3- ACM 实现 原题链接:287. 寻找重复数 1- 思路 题目识别 识别1 :给定一个数组,寻找数组中的重复数。必须用 O(1) 的空间复杂度,且不能修改数组…

VMware Fusion Pro 13 Mac版虚拟机 安装Win11系统教程

Mac分享吧 文章目录 Win11安装完成,软件打开效果一、VMware安装Windows11虚拟机1️⃣:准备镜像2️⃣:创建虚拟机3️⃣:虚拟机设置4️⃣:安装虚拟机5️⃣:解决连不上网问题 安装完成!&#xff0…

fuxa搭建与使用(web组态)

1. 安装Node.js -> npm安装 参考网址:https://blog.csdn.net/WHF__/article/details/129362462 一、安装运行 C:\WINDOWS\system32>node -v v20.17.0 C:\WINDOWS\system32>npm -v 10.8.2 二、环境配置 在安装路径(D:\Program_Files\nodejs&a…

[数据集][目标检测]车油口挡板开关闭合检测数据集VOC+YOLO格式138张2类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):138 标注数量(xml文件个数):138 标注数量(txt文件个数):138 标注类别…

【2024.08】图模互补:知识图谱与大模型融合综述-笔记

阅读目的:假设已有一个知识图谱,如何利用图谱增强模型的问答,如何检索知识图谱、知识图谱与模型的文本如何相互交互、如何利用知识图谱增强模型回答的可解释性。 从综述中抽取感兴趣的论文进一步阅读。 来源:图模互补&#xff1…

Docker零基础入门

参考课程https://www.bilibili.com/video/BV1VC4y177re/?vd_source=b15169a302bee35f484245aecc69d4dd 参考书籍Docker 实践 - 面向 AI 开发人员的 Docker 实践 (dockerpractice.readthedocs.io) 1. 什么是Docker 1.1. Docker起源 随着计算机的发展,计算机上已经可以运行多…

CAN通讯常见错误纠正

CAN通讯常见错误 1.在使用CAN设备进行数据通讯时,有时候参数配置不当可能就会导致通讯的失败,如下图1所示,出现通信错误的原因是两个设备的波特率配置不一致导致。 图1 2.有时候在配置参数的时候,不能只关注波特率速度配置一致就…

Script-server: 一款开源的脚本管理工具,为你的Python脚本提供一个直观的 Web UI

在日常工作中,我们经常会使用各种脚本来自动化任务,提升效率。但传统的脚本管理方式往往伴随着一些困扰:复杂的命令行操作、难以理解的脚本参数、缺乏直观的反馈等等。这些问题,让原本应该便捷的脚本管理变得繁琐。 Script-server…

太速科技-基于XC7Z100+AD9361的双收双发无线电射频板卡

基于XC7Z100AD9361的双收双发无线电射频板卡 一、板卡概述 基于XC7Z100AD9361的双收双发无线电射频板卡是基于Xilinx ZYNQ FPGA和ADI的无线收发芯片AD9361开发的专用功能板卡,用于4G小基站,无线图传,数据收发等领域。 二、板卡…

QQ频道机器人零基础开发详解(基于QQ官方机器人文档)[第三期]

QQ频道机器人零基础开发详解(基于QQ官方机器人文档)[第三期] 第三期介绍:频道模块之频道成员 目录 QQ频道机器人零基础开发详解(基于QQ官方机器人文档)[第三期]第三期介绍:频道模块之频道成员获取子频道在线成员数获取频道成员列表获取频道身份组成员列…

Java项目: 基于SpringBoot+mybatis+maven课程答疑系统(含源码+数据库+毕业论文)

一、项目简介 本项目是一套基于SpringBootmybatismaven课程答疑系统 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格调试,eclipse或者idea 确保可以运行! 该系统功能完善、界面美观、操作简单、…

102.WEB渗透测试-信息收集-FOFA语法(2)

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于: 易锦网校会员专享课 上一个内容:101.WEB渗透测试-信息收集-FOFA语法(1) FOFA使用实例 • title&q…