Kafka-TopicPartition

Kafka主题与分区

主题与分区

topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等

主题的管理

主题的管理

  • 创建主题

  • 查看主题信息

  • 修改主题

  • 删除主题

上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 image

创建主题

创建主题的命令格式如下:

kafka-topics.sh --bootstrap-server <server:port> \--create --topic <topic> \--partitions <numPartitions> \--replication-factor <replicationFactor>

创建一个分区数为4、副本因子为2的主题

kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2

创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2 \--config max.message.bytes=128000

通过describe指令来查看分区副本的分配细节

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

使用replica-assignment参数手动指定分区副本的分配方案

使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列

例如:0:1:2,0:1:2,0:1:2,0:1:2

  • 分区与分区之间用逗号分隔

  • 分区与副本之间用冒号分隔

kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create-same \--replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2

注意:

  • 同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常

  • 分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常

主题命名规范

  • 主题名称只能包含ASCII字母、数字、点、减号和下划线

  • 主题名称长度不能超过249个字符

  • 主题名称不能以点开头

  • 不能以__开头,这是Kafka内部使用的主题前缀

  • 不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符

  • 主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的

  • 主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets

  • 主题名称不能与已经存在的消费者组名称冲突

  • 主题名称不能与已经存在的主题名称冲突

查看主题信息

通过list指令来查看当前Kafka集群中所有可用的主题

kafka-topics.sh --bootstrap-server localhost:9092 --list

image

通过describe指令来查看主题的详细信息

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

image

修改主题

当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息

# 修改主题的最大消息字节数,配置值从10000修改为20000kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--config max.message.bytes=20000

通过alter指令来修改主题的分区数

kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-create \--partitions 6

删除主题

通过delete指令来删除主题

kafka-topics.sh --bootstrap-server localhost:9092 \--delete --topic topic-delete

通过delete-config参数来删除之前设置的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--delete-config max.message.bytes

手动删除主题

  • 主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下

  • 主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。

配置管理

kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令

# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--describe --entity-type topics --entity-name topic-config    

KafkaAdminClient

KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。

TopicCommand基本使用

使用KafkaAdminClient来完成TopicCommand的基本操作

查看主题信息

public class demo{public static void describeTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--describe","--topic", "topic-create"};kafka.admin.TopicCommand.main(options);}
}

创建主题

public class demo{public static void createTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--create","--replication-factor", "1","--partitions", "1","--topic", "topic-create-api"};kafka.admin.TopicCommand.main(options);}
}

查看所有可用主题

public class demo{public static void listTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--list"};kafka.admin.TopicCommand.main(options);}
}

KafkaAdminClient基本使用

KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。

AdminClient常见的方法

  • createTopics:创建主题

    • CreateTopicsResult createTopics(Collection newTopics)
  • deleteTopics:删除主题

    • DeleteTopicsResult deleteTopics(Collection topics)
  • listTopics:列出所有可用的主题

    • ListTopicsResult listTopics()
  • describeTopics:查看主题的详细信息

    • DescribeTopicsResult describeTopics(Collection topicNames)
  • describeCluster:查看集群的详细信息

    • DescribeClusterResult describeCluster()
  • describeConfigs:查看配置的详细信息

    • DescribeConfigsResult describeConfigs(Collection resources)
  • alterConfigs:修改配置信息

    • AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)
  • describeConsumerGroups:查看消费者组的详细信息

    • DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
  • listConsumerGroups:列出所有可用的消费者组

    • ListConsumerGroupsResult listConsumerGroups()
  • createPartitions:创建分区

    • CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {/*** 使用AdminClient创建Topic** 创建完成之后使用如下脚本进行检查* 进入KAFKA_HOME/bin* 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list*/public static void createTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);// 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createTopic();}
}
使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {/*** 使用AdminClient查看Topic信息*/public static void describeTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));try {Map<String, TopicDescription> map = result.all().get();for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopic();}
}
使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {/*** 使用AdminClient查看所有可用的Topic*/public static void listTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ListTopicsResult result = adminClient.listTopics();try {Set<String> set = result.names().get();for (String s : set) {System.out.println(s);}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {listTopic();}
}
使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {/*** 使用AdminClient创建分区*/public static void createPartition(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);Map<String, NewPartitions> map = new HashMap<>();NewPartitions newPartitions = NewPartitions.increaseTo(2);map.put("topic-create-api", newPartitions);CreatePartitionsResult result = adminClient.createPartitions(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createPartition();}
}
使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {/*** 使用AdminClient删除Topic*/public static void deleteTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {deleteTopic();}
}
使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {/*** 使用AdminClient修改Topic配置*/public static void alterTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");Config config = new Config(Arrays.asList(configEntry));Map<ConfigResource, Config> map = new HashMap<>();ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");map.put(configResource, config);AlterConfigsResult result = adminClient.alterConfigs(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {alterTopic();}
}
使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {/*** 使用AdminClient查看Topic配置*/public static void describeTopicConfig(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));try {Map<ConfigResource, Config> map = result.all().get();for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopicConfig();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/170934.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【H5 Canvas】【平面几何】特殊图形绘制(箭头/正多边/正多尖角形等)

文章目录 直线/弧线 箭头 直线/弧线 箭头 // startX,startY 起始坐标 // endX,endY 结束坐标 // angel 圆弧角度,取值[0&#xff0c;PI]; 0表示画直线箭头&#xff0c;否则画圆弧箭头 CanvasRenderingContext2D.prototype.drawArrow function(startX,startY,endX,endY,angel)…

openEuler Linux 部署 FineBi

openEuler Linux 部署 FineBi 部署环境 环境版本openEuler Linux22.03MySQL8.0.35JDK1.8FineBi6.0 环境准备 升级系统内核和软件 yum -y updatereboot安装常用工具软件 yum -y install vim tar net-tools 安装MySQL8 将 MySQL Yum 存储库添加到系统的存储库列表中 sudo…

JVM——垃圾回收算法(垃圾回收算法评价标准,四种垃圾回收算法)

目录 1.垃圾回收算法发展简介2.垃圾回收算法的评价标准1.吞吐量2.最大暂停时间3.堆使用效率 3.垃圾回收算法01-标记清除算法垃圾回收算法-标记清除算法的优缺点 4.垃圾回收算法02-复制算法垃圾回收算法-复制算法的优缺点 5.垃圾回收算法03-标记整理算法标记整理算法的优缺点 6.…

适用于 Mac 和 Windows 的顶级U 盘数据恢复软件

由于意外删除或设备故障而丢失 USB 驱动器中的数据始终是一件令人压力很大的事情&#xff0c;检索该信息的最佳选择是使用优质数据恢复软件。为了让事情变得更容易&#xff0c;我们已经为您完成了所有研究并测试了工具&#xff0c;并且我们列出了最好的 USB 记忆棒恢复软件&…

队列实现栈VS栈实现队列

目录 【1】用队列实现栈 思路分析 ​ 易错总结 Queue.c&Queue.h手撕队列 声明栈MyStack 创建&初始化栈myStackCreate 压栈myStackPush 出栈&返回栈顶元素myStackPop 返回栈顶元素myStackTop 判断栈空否myStackEmpty 释放空间myStackFree MyStack总代码…

赢麻了!义乌一个村有5000个网红,有人年收租就300万!

#义乌一村电商年成交额超300亿# ,在中国&#xff0c;电商行业的发展可谓是日新月异&#xff0c;而位于浙江省义乌市的江北下朱村&#xff0c;正是这股潮流的一个典型代表。这个村子&#xff0c;处处弥漫着“直播”的气息&#xff0c;仿佛每个人都在为这个新兴行业助力。 江北下…

软件开发中的抓大放小vs极致细节思维

最近在开发过程中&#xff0c;遇到了好多次 “这个需求点这次要不要做&#xff1f;” 的问题&#xff0c; 主要有两方阵营&#xff0c;比如以研发主导的 “这次先不做、等必要的时候再做” &#xff0c;另外一方是以PM主导的 “这个不做需求不完整&#xff0c;可能影响用户体验…

机器学习第14天:KNN近邻算法

☁️主页 Nowl &#x1f525;专栏《机器学习实战》 《机器学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 文章目录 介绍 实例 回归任务 缺点 实例 分类任务 如何选择最佳参数 结语 介绍 KNN算法的核心思想是&#xff1a;当我们要判断一个数据为哪一类时…

Leetcode—15.三数之和【中等】

2023每日刷题&#xff08;四十一&#xff09; Leetcode—15.三数之和 实现代码 class Solution { public:vector<vector<int>> threeSum(vector<int>& nums) {sort(nums.begin(), nums.end());vector<vector<int>> ans;int i, j, k;int s,…

零基础学Linux内核:1、Linux源码组织架构

文章目录 前言一、Linux内核的特征二、Linux操作系统结构1.Linux在系统中的位置2.Linux内核的主要子系统3、Linux系统主要数据结构 三、linux内核源码组织1、下载Linux源码2、Linux版本号3、linux源码架构目录讲解 前言 这里将是我们从零开始学习Linux的第一节&#xff0c;这节…

Proteus仿真--高仿真数码管电子钟

本文介绍基于数码管的高仿真电子钟&#xff08;完整仿真源文件及代码见文末链接&#xff09; 仿真图如下 本设计中80C51单片机作为主控&#xff0c;用74LS138作为数码管显示控制&#xff0c;共有4个按键&#xff0c;其中分别用于12/24小时显示切换、时间设置、小时加减控制和…

国内20个大模型中文场景测评及体验

中文场景能力测评 SuperCLUE排行榜 大模型及网站 公司&#xff08;大模型&#xff09; 智能程度 借鉴点 体验网站 备注 1 百度文心一言 高   文心一言   2 百川智能 高   百川大模型-汇聚世界知识 创作妙笔生花-百川智能   3 商汤商量SenseChat&#xff…

Matplotlib网格子图_Python数据分析与可视化

Matplotlib网格子图 plt.subplot()绘制子图调整子图之间的间隔plt.subplots创建网格 plt.subplot()绘制子图 若干彼此对齐的行列子图是常见的可视化任务&#xff0c;matplotlib拥有一些可以轻松创建它们的简便方法。最底层且最常用的方法是plt.subplot()。 这个函数在一个网格…

JavaScript 表达式

JavaScript 表达式 目录 JavaScript 表达式 一、赋值表达式 二、算术表达式 三、布尔表达式 四、字符串表达式 表达式是一个语句的集合&#xff0c;计算结果是个单一值。 在JavaScript中&#xff0c;常见的表达式有4种&#xff1a; &#xff08;1&#xff09;赋值表达式…

企业计算机服务器中了mkp勒索病毒怎么办?Mkp勒索病毒解密数据恢复

网络技术的不断发展&#xff0c;为企业的生产运营提供了坚实的基础&#xff0c;但随之而来的网络安全威胁也不断增加&#xff0c;影响了企业的正常生产生活。近期&#xff0c;云天数据恢复中心陆续接到很多企业的求助&#xff0c;企业计算机服务器遭到了mkp勒索病毒攻击&#x…

中伟视界:AI智能分析盒子实现全方位人车监测,保障管道安全

在油气管道长又无人的场景下&#xff0c;人和车的监测问题一直是一个难题。传统的监测手段往往存在盲区和误报问题&#xff0c;给管道运行安全带来了一定的隐患。然而&#xff0c;随着人工智能技术的不断发展&#xff0c;利用AI盒子的智能分析算法可以有效解决这一问题。 首先&…

LeetCode Hot100 108.将有序数组转为二叉搜索树

题目&#xff1a; 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 高度平衡 二叉搜索树。 高度平衡 二叉树是一棵满足「每个节点的左右两个子树的高度差的绝对值不超过 1 」的二叉树。 方法&#xff1a; class Solution {public…

YOLOv5小目标检测层

目录 一、原理 二、yaml配置文件 一、原理 小目标检测层,就是增加一个检测头,增加一层锚框,用来检测输入图像中像素较小的目标 二、yaml配置文件 # YOLOv5 🚀 by Ultralytics, GPL-3.0 license# Parameters nc: 3 # number of classes depth_multiple: 0.33 # model…

【LeetCode:828. 统计子串中的唯一字符 | 贡献法 乘法原理】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

②⑩① 【MySQL】什么是分库分表?拆分策略有什么?什么是MyCat?

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ 目录 ②⑩① 【MySQL】什么是分库分表&#xf…