# Kafka_深入探秘者(4):kafka 主题 topic

Kafka_深入探秘者(4):kafka 主题 topic

一、kafka 主题管理

1、kafka 创建主题 topic 命令

1)命令:


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 创建一个名为 heima 的 主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --create --topic heima --partitions 2 --replication-factor 1

2)命令 参数说明:

–zookeeper :指定了 kafka 所连接的 zookeeper 服务地址。zookeeper必传参数,多个zookeeper用’,"分开。
–topic : 指定了所要创建主题的名称
–partitions : 指定了分区个数,每个线程处理一个分区数据。
–replication-factor : 指定了副本因子,用于设置主题副本数,每个副本分布在不通节点,不能超过总结点数。如你只有一个节点,但是创建时指定副本数为2,就会报错
–create : 创建主题的动作指令。

2、查看 topic 元数据信细的方法:

topic 元数据信息保存在 Zookeeper 节点中,


# 切换到 zookeeper 安装目录
cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin# 连接 zookeeper 查看元数据
bin/zkCli.sh -server localhost:2181# 查看 节点 信息:
get /brokers/topics/heima

在这里插入图片描述

3、展示出当前所有主题


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 展示出当前所有主题
bin/kafka-topics.sh --list --zookeeper localhost:2181# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --list --zookeeper 172.18.30.110:2181

4、 查看主题详情:


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 查看主题详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --describe --topic heima

5、 修改 主题 topic 配置(增加配置)


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 修改 主题 topic 配置(增加配置)
bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --config flush.messages=1# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --config flush.messages=1

6、 删除 主题 topic 配置 flush.messages=1


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 删除 主题 topic 配置 flush.messages=1
bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --delete-config flush.messages# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --delete-config flush.messages

7、 删除 topic 主题

若 delete.topic.enable=true 直接彻底删除该 Topic 主题。

若 delete.topic.enable=false
如果当前 Topic 没有使用过,即没有传输过信息:可以彻底删除。
如果当前 Topic 有使用过,即有过传输过信息:并没有真正删除 Topic,只是把这个 Topic 标记为删除(marked fordeletion),重启 Kafka Server 后删除。


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 删除 topic 主题名为 heima 的主题
bin/kafka-topics.sh  --delete --zookeeper localhost:2181 --topic heima # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --delete --zookeeper 172.18.30.110:2181 --topic heima

在这里插入图片描述

二、kafka 分区

1、kafka 增加分区


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/# 增加 topic 主题名为 heima 的主题的 分区为 3 个
bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --partitions 3 # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --partitions 3# 查看 heima 主题详情:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima

在这里插入图片描述

2、其他主题参数配置

见官方文档: http://kafka.apache.org/documentation/#topicconfigs


bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --add-config max.message.bytes=128000 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe  bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic--alter --delete-config max.message.bytes

三、kafka 主题 topic 总结

1、KafkaAdminClient 应用

我们都习惯使用 Kafka 中 bin 目录下的脚本工具来管理査看 Kafka,但是有些时候需要将某些管理查看的功能集成到系统 (比如Kafka Manager)中,那么就需要调用一些 API 来直接操作 Kafka 了。

2、在 kafka_learn 工程中,创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用


/***  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaAdminConfigOperation.java**  2024-6-22 创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用*/
package djh.it.kafka.learn.chapter3;import kafka.controller.NewPartition;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdminConfigOperation {public static void main( String[] args ) throws ExecutionException, InterruptedException {describeTopicConfig();   //获取主题详细信息
//        alterTopicConfig();    //添加主题
//        addTopicPartitions();  //添加分区}private static void addTopicPartitions() throws ExecutionException, InterruptedException {String brokerList = "172.18.30.110:9092";String topic = "heima";Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);AdminClient client = AdminClient.create(props);NewPartitions newPartitions = NewPartitions.increaseTo(4);Map<String, NewPartitions> newPartitionsMap = new HashMap<>();newPartitionsMap.put(topic,newPartitions);CreatePartitionsResult result = client.createPartitions(newPartitionsMap);result.all().get();client.close();}private static void describeTopicConfig() throws ExecutionException, InterruptedException {String brokerList = "172.18.30.110:9092";String topic = "heima";Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);AdminClient client = AdminClient.create(props);ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));Config config = result.all().get().get(resource);System.out.println(config);client.close();}
}

在这里插入图片描述

上一节关联链接请点击
# Kafka_深入探秘者(3):kafka 消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/859724.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

揭秘!速卖通卖家如何靠自养号测评打造爆款?

然而&#xff0c;许多商家对测评的认识存在严重的误区&#xff0c;他们错误地以为仅仅通过几次草率的测评就能快速塑造出爆款产品。实际上&#xff0c;测评远非如此简单&#xff0c;它是一个需要深思熟虑、精心策划和持续投入的过程。测评的真正价值在于帮助平台精准地把握产品…

Java-LinkedList和ArrayList的区别、Get/Add操作性能分析以及常见的遍历方式

LinkedList和ArrayList的区别、Get/Add操作性能分析以及常见的遍历方式 一、LinkedList基本特性主要方法 二、ArrayList初始化及基本操作ArrayList注意点&#xff08;待完善&#xff09;代码示例 三、ArrayList与LinkedList的区别四、Get/Add操作性能分析五、LinkedList遍历方式…

中霖教育:二级建造师报名后缺考有影响吗?

在完成二级建造师的报名程序后&#xff0c;考生无法进行退考。如果是不参加考试&#xff0c;可以选择弃考。弃考对个人并没有负面影响&#xff0c;缺席考试的话也不会被记录在个人诚信档案中。当然&#xff0c;如果弃考的话此次考试的成绩将被记为0&#xff0c;下一年参加考试按…

关于电机的线性思考

当大多数工程师听到电机这个词时&#xff0c;他们自然地想到旋转装置&#xff0c;例如有刷直流、无刷直流、步进电机或变频原动机。但是电机不一定是旋转的&#xff0c;很多时候设计需要直线运动。一种解决方案是添加某种齿轮或皮带装置来转换旋转运动&#xff0c;这种解决方案…

基于SpringBoot的实习管理系统设计与实现

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果有相关需求&#xff0c;可以私信联系我。 开发语言&#xff1a; Java 数据库&#xff1a; MySQL 技术&#xff1a; SpringBoot框架&#xff0c;B/S模式 工具&#xff1a; MyEclipse&#xff0c;Tomcat 系统展示 …

预备役二招算法测试题解

这次题目出的都是一些偏向于基础的题目&#xff0c;就是一些简单的模拟&#xff0c;思维&#xff0c;以及基础算法&#xff08;二分&#xff0c;前缀和&#xff09; &#xff08;点击题目标题&#xff0c;进入原题&#xff09; 我是签到题 题解&#xff1a;就是说给你 t 组数据…

每日鲜语携手中国国家高尔夫球队队员殷若宁征战巴黎,打响中国高端鲜奶品牌“鲜”声量

近日&#xff0c;高端鲜奶品牌每日鲜语宣布携手蒙牛品牌代言人、中国国家高尔夫球队队员殷若宁&#xff0c;共赴巴黎奥运赛场&#xff0c;为梦想挥杆而上。邀请众多消费者开启高品质、健康的生活方式。此举不仅彰显了每日鲜语作为高端鲜奶新标杆的品牌定位&#xff0c;也同时延…

vue2.0项目安装依赖 sass 报错

1、报错代码&#xff1a; 2、原因&#xff1a;项目有点老&#xff0c;vue2的版本&#xff0c;所以node-sass在npm安装的时候大概率的会安装出错&#xff0c;或下载时间过长&#xff0c;因此考虑用dart-sass来替换。 npm install node-sassnpm:dart-sass3、然后就可以成功运行了…

8.项目结构——黑马程序员Java最新AI+若依框架项目

目录 前言一、后端1.ruoyi-admin2.ruoyi-common3.ruoyi-framework4. ruoyi-generator(可删除)5.ruoyi-quartz&#xff08;可删除&#xff09;5.ruoyi-system6. ruoyi-ui7.sql8.依赖关系9. 总结 二、前端三、SQL 前言 提示&#xff1a;本篇叙述若依前后端项目结构 一、后端 1…

无线领夹麦克风哪个品牌音质最好,揭秘无线麦克风哪个牌子最好!

​在这个数字化、信息化的时代&#xff0c;短视频和直播已经成为了人们生活中不可或缺的一部分。而无线麦克风&#xff0c;则是这些活动中不可或缺的重要工具。它们能够轻松捕捉声音&#xff0c;让内容更加生动、真实。然而&#xff0c;市场上的无线麦克风种类繁多&#xff0c;…

51-60 CVPR 2024 最佳论文 | Generative Image Dynamics

在2023年11月&#xff0c;谷歌研究院发布了一项令人瞩目的研究成果——Generative Image Dynamics&#xff08;生成图像动力学&#xff09;。这项技术的核心是将静态的图片转化为动态的、无缝循环的视频&#xff0c;而且更令人兴奋的是&#xff0c;这些生成的视频还具有交互性。…

AI赋能天气:微软研究院发布首个大规模大气基础模型Aurora

编者按&#xff1a;气候变化日益加剧&#xff0c;高温、洪水、干旱&#xff0c;频率和强度不断增加的全球极端天气给整个人类社会都带来了难以估计的影响。这给现有的天气预测模型提出了更高的要求——这些模型要更准确地预测极端天气变化&#xff0c;为政府、企业和公众提供更…

【转】5种常见的网络钓鱼攻击以及防护手段

2022 年 Verizon 数据泄露调查报告指出&#xff0c;去年 75% 的社会工程攻击涉及网络钓鱼&#xff0c;仅去年一年就有超过 33 万个账户被网络钓鱼&#xff0c;网络钓鱼占整体社会工程攻击的 41%。 不能将所有责任归咎于员工&#xff0c;因为薄弱的安全意识建设及宣教是大部分漏…

XSS跨站攻击漏洞

XSS跨站攻击漏洞 一 概述 1 XSS概述 xss全称为&#xff1a;Cross Site Scripting&#xff0c;指跨站攻击脚本&#xff0c;XSS漏洞发生在前端&#xff0c;攻击的是浏览器的解析引擎&#xff0c;XSS就是让攻击者的JavaScript代码在受害者的浏览器上执行。 XSS攻击者的目的就是…

Windows 中 Chrome / Edge / Firefox 浏览器书签文件默认存储路径

1. Chrome 浏览器 按组合键 Win R&#xff0c;打开运行对话框&#xff0c;输入 %USERPROFILE%\AppData\Local\Google\Chrome\User Data\Default或在Chrome 浏览器地址栏输入 chrome://version查看【个人资料路径】 2. Edge 浏览器 按组合键 Win R&#xff0c;打开运行对…

GD32F4xx 移植agile_modbus软件包与电能表通信

目录 1. agile_modbus1.1 简介1.2 下载2. agile_modbus使用2.1 源码目录2.2 移植3. 通信调试3.1 代码3.3 通信测试1. agile_modbus 1.1 简介 agile_modbus是一个轻量级的Modbus协议栈,主要特点: 支持RTU和TCP协议,采用纯C语言开发,不涉及任何硬件接口,可直接在任何形式的…

Java流程控制语句

Java流程控制语句有三种&#xff1a; 顺序结构、分支结构和循环结构。 顺序结构&#xff1a; 顺序结构语句是Java程序默认的执行流程&#xff0c;按照代码的先后顺序&#xff0c;从上到下依次执行。 原文链接&#xff1a; Java流程控制控制语句 - 红客网络编程与渗透技术 示例…

查看es p12证书文件过期方法

查看证书过期时间: openssl pkcs12 -in elastic-certificates.p12 -nokeys -out elastic-certificates.crt (需要输入证书生成时配置密码) openssl x509 -enddate -noout -in elastic-certificates.crt

乐鑫云方案研讨会回顾|ESP RainMaker® 引领创业潮,赋能科创企业

近日&#xff0c;乐鑫信息科技 (688018.SH) ESP RainMaker 云生态方案线下研讨会和技术沙龙在深圳成功举办&#xff0c;吸引了众多来自照明电工、新能源、安防、宠物等垂类领域的客户与合作伙伴。活动现场&#xff0c;与会嘉宾围绕产品研发、测试认证、品牌构建、跨境电商等多维…

【面试题】什么是等保2.0?

等保2.0&#xff0c;全称网络安全等级保护2.0制度&#xff0c;是我国网络安全领域的基本国策和基本制度&#xff0c;旨在加强网络系统的安全保护&#xff0c;确保关键信息不被泄露、篡改、破坏或丧失。以下是关于等保2.0的详细介绍&#xff1a; 实施时间与背景&#xff1a; 等…