kafka基本操作以及kafka-topics.sh 使用方式

文章目录

  • 1 kafka的基本操作
    • 1.1 创建topic
    • 1.2 查看topic
    • 1.3 查看topic属性
    • 1.4 发送消息
    • 1.5 消费消息
  • 2 kafka-topics.sh 使用方式
    • 2.1 查看帮助
    • 2.2 副本数量规则
    • 2.3 创建主题
    • 2.4 查看broker上所有的主题
    • 2.5 查看指定主题 topic 的详细信息
    • 2.6 修改主题信息之增加主题分区数量
    • 2.7 删除主题

1 kafka的基本操作

1.1 创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

  1. –create 是创建主题的的动作指令。
  2. –zookeeper 指定kafka所连接的zookeeper服务地址。
  3. –replicator-factor 指定了副本因子(即副本数量)。
  4. –partitions 指定分区个数;多个通道,类似车道。
  5. –topic 指定所要创建主题的名称,比如test。

成功则显示:

Created topic "test".

1.2 查看topic

sh kafka-topics.sh --list --zookeeper localhost:2181

显示:

test

1.3 查看topic属性

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

显示:

Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

1.4 发送消息

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

发送端输入:

>hello
>where are you
>let’s go

1.5 消费消息

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

消费端显示:

hello
where are you
let’s go
^CProcessed a total of 3 messages

2 kafka-topics.sh 使用方式

创建、修改、删除以及查看等功能。

2.1 查看帮助

/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。

$ sh kafka-topics.sh --help
Command must include exactly one action: --list, --describe, --create, --alter or --delete
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        replica assignment, and/or           configuration for the topic.         
--config <String: name=value>            A topic configuration override for the topic being created or altered.The   following is a list of valid         configurations:                      cleanup.policy                        compression.type                      delete.retention.ms                   file.delete.delay.ms                  flush.messages                        flush.ms                              follower.replication.throttled.       replicas                             index.interval.bytes                  leader.replication.throttled.replicas max.message.bytes                     message.downconversion.enable         message.format.version                message.timestamp.difference.max.ms   message.timestamp.type                min.cleanable.dirty.ratio             min.compaction.lag.ms                 min.insync.replicas                   preallocate                           retention.bytes                       retention.ms                          segment.bytes                         segment.index.bytes                   segment.jitter.ms                     segment.ms                            unclean.leader.election.enable        See the Kafka documentation for full   details on the topic configs.        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   removed for an existing topic (see   the list of configurations under the --config option).                    
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting       topics, the action will only execute if the topic exists                  
--if-not-exists                          if set when creating topics, the       action will only execute if the      topic does not already exist         
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a    topic that has a key, the partition  logic or ordering of the messages    will be affected                     
--replica-assignment <String:            A list of manual partition-to-broker   broker_id_for_part1_replica1 :           assignments for the topic being      broker_id_for_part1_replica2 ,           created or altered.                  broker_id_for_part2_replica1 :                                                broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       describe. Can also accept a regular  expression except for --create option
--topics-with-overrides                  if set when describing topics, only    show topics that have overridden     configs                              
--unavailable-partitions                 if set when describing topics, only    show partitions whose leader is not  available                            
--under-replicated-partitions            if set when describing topics, only    show under replicated partitions     
--zookeeper <String: hosts>              REQUIRED: The connection string for    the zookeeper connection in the form host:port. Multiple hosts can be     given to allow fail-over.            

2.2 副本数量规则

副本数量不能大于broker的数量。
kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1

报错:

Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: 
Replication factor: 2 larger than available brokers: 1.(kafka.admin.TopicCommand$)

注意:副本数量和分区数量的区别。

2.3 创建主题

创建主题时候,有3个参数是必填的:

  1. –partitions(分区数量);
  2. –topic(主题名) ;
  3. –replication-factor(复制系数)。

同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

返回显示:

Created topic "test1".

另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

2.4 查看broker上所有的主题

–list。

 sh kafka-topics.sh --list --zookeeper localhost:2181

结果显示:

test
test1

2.5 查看指定主题 topic 的详细信息

–describe。

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1 

结果显示:

Topic:test1	PartitionCount:1	ReplicationFactor:1	Configs:Topic: test1	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

2.6 修改主题信息之增加主题分区数量

–alter。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2

结果显示:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages
will be affected
Adding partitions succeeded!

查看主题信息:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

可以看到已经成功的将主题的分区数量从1修改为了2。

Topic:test1	PartitionCount:2	ReplicationFactor:1	Configs:Topic: test1	Partition: 0	Leader: 0	Replicas: 0	Isr: 0Topic: test1	Partition: 1	Leader: 0	Replicas: 0	Isr: 0

当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2

会报错:

Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK 
path localhost:2181at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$)

注意:
不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。

2.7 删除主题

–delete。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。

Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

可以测试一下:

# 一个终端启动生产者:
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1# 另一个终端启动消费者:
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning

发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

然后就重启kafka:

# 停止:
sh kafka-server-stop.sh -daemon ../config/server.properties
# 启动:
sh kafka-server-start.sh -daemon ../config/server.properties

再次删除就可以了。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/171602.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker操作手册

写在前面的几个重要命令 docker与本地件的文件拷贝 # 查看容器ID docker ps -a# 本地文件拷本到容器 docker cp {local_path} {CONTAINER ID}:{path}# 容器拷本到本地 docker cp {CONTAINER ID}:{path} {local_path} # eg docker cp /Users/helloworld/Downloads/R-3.5.0 0a1…

【人工智能】Chatgpt的训练原理

前言 前不久&#xff0c;在学习C语言的我写了一段三子棋的代码&#xff0c;但是与我对抗的电脑是没有任何思考的&#xff0c;你看了这段代码就理解为什么了&#xff1a; void computerMove(char Board[ROW][COL], int row, int col) {while (1){unsigned int i rand() % ROW, …

设计模式之十二:复合模式

模式通常被一起使用&#xff0c;并被组合在同一个解决方案中。 复合模式在一个解决方案中结合两个或多个模式&#xff0c;以解决一般或重复发生的问题。 首先重新构建鸭子模拟器&#xff1a; package headfirst.designpatterns.combining.ducks;public interface Quackable …

【HarmonyOS】ArkUI状态管理:组件内状态、装饰器、高级用法与最佳实战

文章目录 ArkUI状态管理机制详解1. 概述2. 基本概念2.1 状态变量2.2 数据传递和同步2.3 初始化方法3. 装饰器总览3.1 管理组件拥有的状态3.2 管理应用拥有的状态3.3 其他状态管理功能4. @State装饰器详解4.1 使用规则说明4.2 传递/访问规则说明4.3 观察变化和行为表现5. 使用场…

2.一维数组——输入10个成绩,求平均成绩,将低于平均成绩的分数输出

文章目录 前言一、题目描述 二、题目分析 三、解题 程序运行代码 前言 本系列为一维数组编程题&#xff0c;点滴成长&#xff0c;一起逆袭。 一、题目描述 输入10个成绩&#xff0c;求平均成绩&#xff0c;将低于平均成绩的分数输出 二、题目分析 averagesum/输入个数; 三、…

计算机网络入门

计算机网络 一、计算机网络基础 定义计算机网络计算机网络的发展历程计算机网络的分类&#xff08;局域网、广域网、互联网等&#xff09; 1. 计算机网络的定义&#xff1a; 计算机网络是指通过通信链路将多台计算机连接在一起&#xff0c;以便它们之间能够相互通信和共享资…

【nlp】4.2 nlp中标准数据集(GLUE数据集合中的dev.tsv 、test.tsv 、train.tsv)

nlp中标准数据集 1 GLUE数据集合介绍1.1 数据集合介绍1.2 数据集合路径2 GLUE子数据集的样式及其任务类型2.1 CoLA数据集文件样式2.2 SST-2数据集文件样式2.3 MRPC数据集文件样式2.4 STS-B数据集文件样式2.5 QQP数据集文件样式2.6 (MNLI/SNLI)数据集文件样式2.7 (QNLI/RTE/WNLI…

【网络】传输层 --- 详解TCP协议

目录 一、协议段格式及其策略确认应答(ACK)机制6个标志位超时重传流量控制滑动窗口1、先谈滑动窗口一般情况2、再谈特殊窗口 拥塞控制拥塞窗口 延迟应答&&捎带应答面向字节流粘包问题 二、三次握手和四次挥手三次握手为什么是3次&#xff1f;不是2、4、5、6次呢 四次挥…

[datastore@cyberfear.com].Elbie、[thekeyishere@cock.li].Elbie勒索病毒数据怎么处理|数据解密恢复

引言&#xff1a; 随着科技的进步&#xff0c;勒索病毒变得越来越复杂&#xff0c;而[datastorecyberfear.com].Elbie、[thekeyisherecock.li].Elbie勒索病毒是其中的一种令人头疼的威胁。本文将深入介绍[datastorecyberfear.com].Elbie、[thekeyisherecock.li].Elbie勒索病毒…

邻接表存储实现有向网构建

7-2 邻接表存储实现有向网构建 编程实现&#xff1a;以邻接表的存储方式&#xff0c;创建一个有向网&#xff0c;顶点为字符型。 输入格式: 第一行输入顶点个数和边的个数&#xff0c;中间用空格分开。下一行开始依次输入顶点&#xff0c;空格或回车分开。接着依次输入边依附…

NoSQL基础知识小结

NoSQL 基础知识 什么是 NoSQL? NoSQL&#xff08;Not Only SQL 的缩写&#xff09;泛指非关系型的数据库&#xff0c;主要针对的是键值、文档以及图形类型数据存储。 NoSQL 数据库天生支持分布式&#xff0c;数据冗余和数据分片等特性&#xff0c;旨在提供可扩展的高可用高…

【代码】基于VMD(变分模态分解)-SSA(麻雀搜索算法优化)-LSTM的光伏功率预测模型(完美复现)matlab代码

程序名称&#xff1a;基于VMD&#xff08;变分模态分解&#xff09;-SSA&#xff08;麻雀搜索算法优化&#xff09;-LSTM的光伏功率预测模型 实现平台&#xff1a;matlab 代码简介&#xff1a;提出了变分模态分解(VMD)和麻雀搜索算法(SSA)与长短期记忆神经网络 (LSTM)相耦合,…

C语言数组

数组 一维数组 定义一维数组 定义一维数组的一般形式&#xff1a; 类型符 数组名 [常量表达式];其中&#xff1a; 数组名的命名规则和变量名相同&#xff0c;遵循标识符命名规则。常量表达式的值表示数组中元素的个数&#xff0c;也称为数组的长度。常量表达式可以包含常量…

Linux云服务器打包部署前端Vue项目

1. 打包 在项目包的终端使用命令打包成dist文件。 npm run build2. Linux云服务器上创建文件夹 mkdir /home/www/dist注&#xff1a;dist文件夹不用创建&#xff0c;将打包好的dist.zip放进去&#xff0c;然后解压就行。 3. 安装nginx yum install -y nginx4. 修改配置文件…

Spark-06:Spark 共享变量

目录 1.广播变量&#xff08;broadcast variables&#xff09; 2.累加器&#xff08;accumulators&#xff09; 在分布式计算中&#xff0c;当在集群的多个节点上并行运行函数时&#xff0c;默认情况下&#xff0c;每个任务都会获得函数中使用到的变量的一个副本。如果变量很…

Android 相机库CameraView源码解析 (一) : 预览

1. 前言 这段时间&#xff0c;在使用 natario1/CameraView 来实现带滤镜的预览、拍照、录像功能。 由于CameraView封装的比较到位&#xff0c;在项目前期&#xff0c;的确为我们节省了不少时间。 但随着项目持续深入&#xff0c;对于CameraView的使用进入深水区&#xff0c;逐…

【LeetCode】挑战100天 Day17(热题+面试经典150题)

【LeetCode】挑战100天 Day17&#xff08;热题面试经典150题&#xff09; 一、LeetCode介绍二、LeetCode 热题 HOT 100-192.1 题目2.2 题解 三、面试经典 150 题-193.1 题目3.2 题解 一、LeetCode介绍 LeetCode是一个在线编程网站&#xff0c;提供各种算法和数据结构的题目&…

java stream流常用方法

filter(Predicate predicate)&#xff1a;根据指定条件过滤元素。 List<Integer> numbers Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> evenNumbers numbers.stream().filter(n -> n % 2 0).collect(Collectors.toList()); System.out.pr…

VS2010 VS2015环境编译boost库

VS2010下安装boost库 去www.boost.org下载最新的boost&#xff0c;我下载了boost_1_46_1.7z&#xff08;我放在D:/cpp目录下&#xff09;解压到当前文件夹打开VS2010->VS TOOLS->VS命令提示CD D:/cpp/boost_1_46_1输入bootstrap&#xff0c;便生成bjam.exe文件输入bjam …

[建议收藏] 一个网站集合所有最新最全的AI工具

今天给大家推荐一个宝藏的AI工具合集网站&#xff0c;有了这个网站&#xff0c;你们再也不用去其他地方找AI工具了。 名称&#xff1a;AI-BOT工具集 这个网站精选1000AI工具&#xff0c;并持续每天更新添加&#xff0c;包括AI写作、AI绘画、AI音视频处理、AI平面设计、AI自动编…