05、Kafka 操作命令

05、Kafka 操作命令

1、主题命令

(1)创建主题

kafka-topics.sh --create --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 4 --replication-factor 3

–bootstrap-server:设置kafka执行节点

–topic:主题名称

–partitions:设置分区数,可以用于并发消费。

–replication-factor:设置副本因子,数量不能大于kafka节点数。

(2)查看主题

kafka-topics.sh --list --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

image-20240509122153192

(3)在我们配置的文件夹下,

/usr/local/kafka_2.12-3.7.0/data

就可以看到topic对应的文件夹,其中 0,1,2,3是因为我们指定的partitions为4,创建了4个分区。

image-20240509160712720

在其他两台机器上,也同样有这三个文件夹,是因为我们的replication-factor 为3,表示会有三个副本,刚好对应三台机器。

(3)查询主题描述信息

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

image-20240509161337423

ReplicationFactor 表示副本数为3

Partition:表示当前的分区号

Replicas:表示副本存放的机器

Leader:表示三个副本中的leader

Isr:表示当前可用的机器id

(4)修改主题

分区数partitions可以修改,只能比原来的大。

replication-factor 一旦确定就不能修改了。

kafka-topics.sh --alter --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 6

image-20240509162034581

(5)删除主题

kafka-topics.sh --delete --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test

image-20240509165305344

2、生产消息

给test1主题,发送消息

[root@localhost bin]# kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

如果给一个不存在的主题,发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2
>message1

image-20240509171549191

第一个消息发送后会有警告,后续发送消息没有警告,因为会自动创建topic,并且指定

partitions 和 replication-factor 都为1。

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2

image-20240509171756305

但是,建议 topic 还是手动创建,应该 partitions 是可以修改的,但是 replication-factor

是不允许修改的。

3、消费消息

(1)消费最新数据

消费 test1 主题下的消息:

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

我们再启动一个生产者来发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

image-20240509172354876

此时查看我们的消费者,就会打印接受到的消息:

image-20240509172430840

上面消费者,只能消费最新的数据,无法消费历史数据。

(2)消费历史数据

消费 test1 主题下的历史消息:

  • –from-beginning 表示从头开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --from-beginning

image-20240509173256632

  • –offset earliest --partition 1 表示从1号分区的头部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset earliest --partition 1

image-20240509173418365

  • –offset latest --partition 1 表示从1号分区的尾部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset latest --partition 1

image-20240509173610825

  • –offset 2 --partition 1 从1号分区的offset 为2的位置开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset 2 --partition 1

image-20240509173747666

4、消费者组

消费者组其实就是一个容器,可以容纳若干个消费者,每一个消费者必须被包含在一个消费者组里面。

(1)通过 --group 来指定消费者组。

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --group my-group1

如果分组存在,则会将消费者添加到对应分组下,如果不存在,则会创建消费者组。

(2)查看已有的消费者组

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --list

image-20240509174505505

(3)删除消费者组

删除消费者组,必须要先保证消费者组下没有消费者:

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --delete --group my-group1

image-20240509174700845

(4)查看消费的位置

kafka-consumer-groups.sh --describe  --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --group my-group1

image-20240509175128763

PARTITION:表示分区号

CURRENT-OFFSET:当前消费到的offset下标

LOG-END-OFFSET:当前分区最新的offset下标

LAG:是偏移量,未消费的数量。

消费详情:

kafka消费者在消费数据的时候,都是分组消费的,不同的消费者组之间没有影响,都会去消费。在同一个组内消费,一条消息只能被一个消费者消费,同时一个分区数据只能被一个消费者消费,如果消费者数量大于分区数,则多余出来的消费者永远不会消费消息。如果分区数大于消费者,则会均匀的将分区分配给多个消费者。

image-20240510105221038

因此,kafka 的 topic 的 partition 个数代表是 kafka 的 topic 的并行度,同一时间最多可以有多个线程来消费 topic 的数据,所以如果要提高 kafka 的 topic 的消费能力,应该增大 partition 的个数。

5、手动平衡 leader:

# 使用的脚本是:kafka-leader-election.sh
# 必要的参数:
# --bootstrap-server:指定服务器列表
# --election-type:选举的类型,默认选择preferred即可
# 下面的三个参数三选一:
# --all-topic-partitions:平衡所有的主题、所有的分区
# --topic:平衡指定的主题,如果选择这个参数,则必须使用--partition指定分区
# --path-to-json-file:将需要平衡的主题、分区信息写入一个json文件,指定这个文件
#     json的格式: {"partitions": [{"topic": "test", "partition": 1}, {"topic": "test", "partition": 2}]}[root@qianfeng01 data]$ kafka-leader-election.sh \
--bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 \
--election-type preferred \
--all-topic-partitions

6、kafka 自带的压力测试工具

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。

一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

(1)生产者Producer压测

kafka-producer-perf-test.sh \
--topic test \
--record-size 100 \
--num-records 100000 \
--throughput -1 \
--producer-props bootstrap.servers=192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

record-size:一条信息有多大,单位字节

num-records:总共发送多少条信息

throughput:每秒多少条信息,设置成-1,表示不限流,可测生产者最大吞吐量

producer-props:发送端端消息配置

结果:

100000 records sent, 27495.188342 records/sec (2.62 MB/sec), 1461.75 ms avg latency, 2183.00 ms max latency, 1696 ms 50th, 2103 ms 95th, 2177 ms 99th, 2181 ms 99.9th.

解析:

一共写入10万条消息

吞吐量为2.62 MB/sec

每次写入的平均延迟为1461.75ms

最大延迟2183.00 ms

(2)消费者 Consumer 压力测试

consumer测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能

kafka-consumer-perf-test.sh \
--broker-list hadoop100:9092 \
--topic test \
--fetch-size 10000 \
--messages 10000000 \
--threads 1

broker-list:节点地址

topic:指定topic名称

fetch-size:指定每个fetch的数据大小

messages:总共要消费的消息个数

结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec2020-06-27 13:17:57:490, 2020-06-27 13:18:11:751, 20.0272, 1.4043, 210000, 14725.4751, 1593235077858, -1593235063597, -0.0000, -0.0001

解释:

开始时间

结束时间

共消费数据:20.0272M

吞吐量:1.4043MB/s

共消费数据:210000条

平均每秒消费:14725.4751条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9953.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Gradient发布支持100万token的Lllama3,上下文长度从8K扩展到1048K

前言 近日Gradient公司在Crusoe Energy公司的算力支持下,开发了一款基于Llama-3的大型语言模型。这款新模型在原Llama-3 8B的基础上,将上下文长度从8000 token大幅扩展到超过104万token。 这一创新性突破,展现了当前SOTA大语言模型在长上下…

类和对象(上篇)

面向对象和面向过程 C语言是面向过程的,关注的是过程,分析出求解问题的步骤,通过函数调用逐步解决问题。 C是基于面向对象的,关注的是对象,将一件事情拆分成不同的对象,靠对象之间的交互完成。 类的引入 在…

【17-Ⅰ】Head First Java 学习笔记

HeadFirst Java 本人有C语言基础,通过阅读Java廖雪峰网站,简单速成了java,但对其中一些入门概念有所疏漏,阅读本书以弥补。 第一章 Java入门 第二章 面向对象 第三章 变量 第四章 方法操作实例变量 第五章 程序实战 第六章 Java…

024.反转链表

给定单链表的头节点 head ,请反转链表,并返回反转后的链表的头节点。 示例 1: 输入:head [1,2,3,4,5] 输出:[5,4,3,2,1]示例 2: 输入:head [1,2] 输出:[2,1]示例 3: 输…

《解锁高效合同管理系统:优化业务流程,提升管理效率》

随着企业规模的扩大和业务复杂性的增加,合同管理变得愈发重要。合同是企业与客户、供应商、合作伙伴之间的法律约束和商业承诺,而有效的合同管理系统则成为企业提高运营效率、降低风险的关键工具。本文将探讨合同管理系统的重要性以及如何利用合同管理系…

【YashanDB知识库】ycm托管数据库时报错OM host ip:127.0.0.1 is not support join to YCM

问题现象 问题的风险及影响 导致数据库无法托管监控 问题影响的版本 问题发生原因 安装数据库时修改了OM的监听ip为127.0.0.1 解决方法及规避方式 后台修改OM的ip为本机的ip或者0.0.0.0 问题分析和处理过程 1、修改env文件中的om IP地址,修改为0.0.0.0或本机…

milvus元数据在etcd的存储解析

milvus元数据在etcd的存储解析 数据以key-value形式存在。 大致包含如下一些种类: databasecollectionfieldpartitionindexsegment-indexresource_groupsession database 创建一个数据库会产生2个key,但value是相同的。 key规则: 前缀/root-coord/database/db…

【深度学习】Diffusion扩散模型原理解析1

1、前言 diffusion,这几年一直很火的模型,比如这段时间在网上的文生图大模型——Stable diffusion。就是以diffusion作为基底模型,但由于该模型与VAE那边,都涉及了较多了概率论知识,实在让人望而却步。所以&#xff0…

线路和绕组中的波过程(二)

本篇为本科课程《高电压工程基础》的笔记。 本篇为这一单元的第二篇笔记。上一篇传送门。 行波通过串联电感与旁路并联电容 由于并联电容或串联电感的存在,线路上传播的行波会发生幅值和波形的变化。 直角波通过串联电感 有一个无限长的直角波 U 1 f U_{1f} U1…

C语言 | Leetcode C语言题解之第82题删除排序链表中的重复元素II

题目: 题解: struct ListNode* deleteDuplicates(struct ListNode* head) {if (!head) {return head;}struct ListNode* dummy malloc(sizeof(struct ListNode));dummy->next head;struct ListNode* cur dummy;while (cur->next && cu…

vue----- watch监听$attrs 的注意事项

目录 前言 原因分析 解决方案 总结 前言 在 Vue 开发过程中,如遇到祖先组件需要传值到孙子组件时,需要在儿子组件接收 props ,然后再传递给孙子组件,通过使用 v-bind"$attrs" 则会带来极大的便利,但同时…

设计模式 六大原则之单一职责原则

文章目录 概述代码例子小结 概述 先看下定义吧,如下: 单一职责原则的定义描述非常简单,也不难理解。一个类只负责完成一个职责或者功能。也就是说在类的设计中, 我们不要设计大而全的类,而是要设计粒度小、功能单一的类。 代码例…

灯珠CCD或CMOS成像RGB数据 光谱重建

1. 源由 本文主要为了通过摄像头CCD或者CMOS传感器对灯珠成像数据分析、重建灯珠可见光范围光谱数据的研究,从原理和方法上论证可行性。 随着照明技术迅猛发展,LED技术日渐成熟。LED产品由于具备经久耐用、节能且价格低等优势,已成为照明行…

传输层之 TCP 协议

TCP协议段格式 源/目的端口号:表示数据是从哪个进程来,到哪个进程去。 序号:发送数据的序号。 确认序号:应答报文的序号,用来回复发送方的。 4 位首部长度:一个 TCP 报头,长度是可变的&#xff…

2024年汉字小达人活动还有4个多月开赛:来做18道历年选择题备考吧

不出特殊情况的话,距离2024年第11届汉字小达人比赛还有4个多月的时间,如何利用这段时间有条不紊地备考呢?我的建议是两手准备:①把小学1-5年级的语文课本上的知识点熟悉,重点是字、词、成语、古诗。②把历年真题刷刷熟…

脆皮之“指针和数组的关系”

文章目录 1. 数组名的理解2. 使用指针访问数组3. 一维数组传参的本质4. 冒泡排序5. 二级指针6. 指针数组7. 指针数组模拟二维数组 hello,大家好呀,窝是脆皮炸鸡。这一期是关于数组和指针的,我觉得并不是很难,但是我觉着下一期可能…

自定义el-select下拉菜单的内容以及数据回显的内容

最终的效果 下拉选项的自定义内容好实现&#xff0c;因为他有默认插槽&#xff0c;所以直接在el-option标签里面写自定义内容就可以实现 <el-selectref"seriesBorderTypeRef"class"series-border-type"change"changeSeriesBorderType"v-model…

ESLint: Unexpected ‘debugger‘ statement.(no-debugger)(debugger报红)

ESLint: Unexpected debugger statement.(no-debugger) 解决办法&#xff1a; 找到.eslintrc.js文件中rules的no-debugger更改为0即可

gpustat 不能使用问题

突然间就不能用了&#xff0c;可能是环境出了问题&#xff0c;如果GPU没问题的话&#xff0c;那么换个环境重新安装试一下&#xff08;pip install gpustat&#xff09;&#xff0c;目前是换个环境就可以了&#xff08;做个笔记&#xff09;

信息系统项目管理师0101:项目建议与立项申请(7项目立项管理—7.1项目建议与立项申请)

点击查看专栏目录 文章目录 第七章 项目立项管理7.1项目建议与立项申请1.立项申请概念2.项目建议书内容记忆要点总结第七章 项目立项管理 项目立项管理是对拟规划和实施的项目技术上的先进性、适用性,经济上的合理性、效益性,实施上的可能性、风险性以及社会价值的有效性、可…