Kafka-创建topic源码

一、命令创建topic

kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

二、kafka-topics脚本

exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$@"

脚本中指定了处理它的主类:TopicCommand

三、TopicCommand

public abstract class TopicCommand {public static void main(String... args) {Exit.exit(mainNoExit(args));}private static int mainNoExit(String... args) {try {execute(args);return 0;} catch (Throwable e) {return 1;}}static void execute(String... args) throws Exception {//解析命令行参数TopicCommandOptions opts = new TopicCommandOptions(args);//创建TopicServiceTopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());try {if (opts.hasCreateOption()) {//这是处理topic创建的,我们主要分析它topicService.createTopic(opts);} else if (opts.hasAlterOption()) {//更高topic逻辑topicService.alterTopic(opts);} else if (opts.hasListOption()) {//获取topictopicService.listTopics(opts);} else if (opts.hasDescribeOption()) {//topi相关描述信息topicService.describeTopic(opts);} else if (opts.hasDeleteOption()) {//删除topictopicService.deleteTopic(opts);}}catch(...){...}finally {topicService.close();}}public static class TopicService implements AutoCloseable {public void createTopic(TopicCommandOptions opts) throws Exception {CommandTopicPartition topic = new CommandTopicPartition(opts);if (Topic.hasCollisionChars(topic.name)) {//由于度量名称的限制,带有句点(“.”)或下划线(“_”)的主题可能会发生冲突。为了避免问题,最好使用其中之一,但不要两者都使用System.out.println(".........");}createTopic(topic);}public void createTopic(CommandTopicPartition topic) throws Exception {if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE || rf < 1).isPresent()) {//复制因子必须介于1和“+Short.MAX_VALUE+”之间throw new IllegalArgumentException("...");}if (topic.partitions.filter(p -> p < 1).isPresent()) {//分区必须大于0throw new IllegalArgumentException("...");}try {NewTopic newTopic;//取决于创建 topic 时 是否指定了   replica-assignmentif (topic.hasReplicaAssignment()) {newTopic = new NewTopic(topic.name, topic.replicaAssignment);} else {newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue));}//给topic设置参数Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream().collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name)));newTopic.configs(configsMap);//批量创建topicCreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic),new CreateTopicsOptions().retryOnQuotaViolation(false));//等待所有topic都创建成功createResult.all().get();System.out.println("Created topic " + topic.name + ".");} catch (ExecutionException e) {//......}}}}

TopicCommandOptions中有对创建topic所有参数的解读,我们下面来详细看下这些参数

四、创建Topic参数

bootstrap-server

        必选项:连接Kafka server用

command-config

        包含要传递给Admin Client的配置的属性文件。这仅与--bootstrap-server选项一起使用,用于描述和更改broker配置

list

        列出所有可用的topic

create

        创建一个新的topic

delete

        删除一个topic

alter

        更改分区数量和副本分配,通过--alter更新现有主题的配置

describe

        列出给定topic的详细信息

topic

        要创建、更改、描述或删除的主题。它还接受正则表达式,但--create选项除外。将主题名称放在双引号中,并使用“\\”前缀转义正则表达式符号;例如 \"test\\.topic\"

topic-id

        仅与用于描述主题的--bootstrap-server选项一起使用

config

        正在创建的主题的主题配置覆盖。

delete-config

        要删除现有主题的主题配置覆盖

partitions

        正在创建或更改的主题的分区数量(警告:如果为具有键的主题增加分区,则分区逻辑或消息顺序将受到影响)。如果未提供用于,则为集群默认值

replication-factor

        正在创建的主题中每个分区的复制因子。如果未提供,则为群集默认值

replica-assignment

        正在创建或更改的topic的手动分区到broker分配列表

under-replicated-partitions

        如果在描述主题时设置,则仅在复制分区下显示

unavailable-partitions

        如果在描述主题时设置,则仅显示其leader不可用的分区

under-min-isr-partitions

        如果在描述主题时设置,则仅显示isr计数 < 配置的最小值的分区。

at-min-isr-partitions

        如果在描述主题时设置,则仅显示isr计数 = 配置的最小值的分区        

topics-with-overrides

        如果在描述主题时设置,则仅显示已覆盖配置的topic

if-exists

        如果在更改、删除或描述主题时设置,则仅当主题存在时才会执行该操作

if-not-exists

        如果在创建主题时设置,则仅当主题不存在时才会执行该操作。

exclude-internal

        运行list或describe命令时排除内部topic。默认情况下,内部topic将被列出

partition-size-limit-per-response

        一个DescribeTopicPartitions响应中包含的最大分区大小

五、AdminClient

从第二步的源码中看到最终将topic的创建交给了AdminClient来完成,下面我们继续往下分析

1、创建

在TopicService的构造方法中创建的AdminClient

它是Kafka的管理客户端,支持管理和检查topic、broker、配置和ACL。

AdminClient的创建用到了bootstrap.servers,它里面有连接KafkaServer的host:port列表。

bootstrap.servers配置仅用于发现群集中的broker,然后AdminClient将根据需要连接到这些broker。因此,只包括两个或三个经纪人地址就足以应对broker不可用的风险。

TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());
    public static class TopicService implements AutoCloseable {private final Admin adminClient;public TopicService(Properties commandConfig, Optional<String> bootstrapServer) {this.adminClient = createAdminClient(commandConfig, bootstrapServer);}private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) {if (bootstrapServer.isPresent()) {commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer.get());}return Admin.create(commandConfig);}}

2、交由子类KafkaAdminClient处理

public class KafkaAdminClient extends AdminClient {private final AdminClientRunnable runnable;//创建一批topicpublic CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,final CreateTopicsOptions options) {final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());final CreatableTopicCollection topics = new CreatableTopicCollection();for (NewTopic newTopic : newTopics) {//判断名字是否符合规范if (topicNameIsUnrepresentable(newTopic.name())) {KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();future.completeExceptionally(new InvalidTopicException("The given topic name '" +newTopic.name() + "' cannot be represented in a request."));topicFutures.put(newTopic.name(), future);} else if (!topicFutures.containsKey(newTopic.name())) {//topicFutures 装的是还没有创建的 topicnametopicFutures.put(newTopic.name(), new KafkaFutureImpl<>());topics.add(newTopic.convertToCreatableTopic());}}if (!topics.isEmpty()) {final long now = time.milliseconds();final long deadline = calcDeadlineMs(now, options.timeoutMs());//里面封装了 ApiKeys.CREATE_TOPICS 请求final Call call = getCreateTopicsCall(options, topicFutures, topics,Collections.emptyMap(), now, deadline);//实现了Runnable接口runnable.call(call, now);}return new CreateTopicsResult(new HashMap<>(topicFutures));}
}

从这里我们看到,这里会用一个线程向broker发送ApiKeys.CREATE_TOPICS 请求。下面我们来看broker端怎么处理topics的创建请求的。按照我们之前的经验,要去看KafkaApis中对应ApiKeys.CREATE_TOPICS的处理逻辑

class KafkaApis(...){request.header.apiKey match {//....case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)//.....}
}

六、CREATE_TOPICS的处理逻辑

从KafkaApi中我们看到很多请求都调用了maybeForwardToController()方法来处理,但是传入的参数不同,从名称上我们可以猜测这些请求可能交由Controller来处理,回想下《Kafka-Controller角色需要做什么?》中当一个broker当选为Controller时第一件事就是注册监听器,去监听broker改变、topic改变、topic删除、isr改变等,并分别准备好了响应的处理逻辑。因此这里只要让topic发生改变就可以自动触发让Controller处理了。下面看下handleCreateTopicsRequest()中都做了什么?

1、获取ZooKeeper

val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))

2、判断集群当下是否有Controller

如果集群当下没有Controller,直接向客户端返回Errors.NOT_CONTROLLER错误。我们按照集群当下有Controller继续分析。

    if (!zkSupport.controller.isActive) {//如果没有contorller,直接向客户端发送响应信息(集群当下没有controller),且这个时候时创建不了topic的,createTopicsRequest.data.topics.forEach { topic =>results.add(new CreatableTopicResult().setName(topic.name).setErrorCode(Errors.NOT_CONTROLLER.code))}sendResponseCallback(results)} else {//正常逻辑}

3、检查topic名称

集群元数据topic是一个具有不同实现的内部topic。不应允许用户创建同名的topic。

          if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {//拒绝创建内部主题  __cluster_metadatainfo(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}")}topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))

4、调用ZkAdminManager创建topic

      zkSupport.adminManager.createTopics(createTopicsRequest.data.timeoutMs,createTopicsRequest.data.validateOnly,toCreate,authorizedForDescribeConfigs,controllerMutationQuota,handleCreateTopicsResults)}

1、循环校验每个topic是否符合规则

1、topic是否已经存在

2、topic是否为null

3、numPartitions或replicationFactor和replicasAssignments都已设置。两者不能同时使用

2、确定分区分配列表

如果用户指定了列表,那么就直接用用户的,否则使用Kafka自己的分配策略(下篇博客分析)

        val assignments = if (topic.assignments.isEmpty) {CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokers.asJavaCollection, resolvedNumPartitions, resolvedReplicationFactor))} else {val assignments = new mutable.HashMap[Int, Seq[Int]]//注意:我们不会检查replicaAssignment是否包含未知的代理——与添加分区的情况不同,这遵循TopicCommand中的现有逻辑topic.assignments.forEach { assignment =>assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)}assignments}

3、topics目录下创建指定的topic

//ConfigType.TOPIC : topics 目录
//topic :要创建的topic名称
zkClient.setOrCreateEntityConfigs(ConfigType.TOPIC, topic, config)

4、topic目录下创建分区目录和对应信息

    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },isUpdate = false, usesTopicId)

5、创建对应的元数据

CreatePartitionsMetadata(topic.name, assignments.keySet)

七、Controller端处理逻辑

我们找到TopicChange对应的处理逻辑

  override def process(event: ControllerEvent): Unit = {try {event match {case TopicChange =>processTopicChange()//......}}}private def processTopicChange(): Unit = {if (!isActive) return//从 brokers/topics/目录下获取所有的topicval topics = zkClient.getAllTopicsInCluster(true)//从controllerContext 获取当下缓存中所有的 topic//两者相减获取 新增加的 topicval newTopics = topics -- controllerContext.allTopics// 获取删除的topic (既topics目录没有,但是缓存中有)val deletedTopics = controllerContext.allTopics.diff(topics)//设置新的topic到缓存controllerContext.setAllTopics(topics)//检测zk中 每个topic 目录的变化registerPartitionModificationsHandlers(newTopics.toSeq)//现在要添加分区和副本了,也就是从topic下获取 topic_id、adding_replicas、removing_replicas、partitions 信息val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentAndTopicIdForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)processTopicIds(addedPartitionReplicaAssignment)addedPartitionReplicaAssignment.foreach { case TopicIdReplicaAssignment(_, _, newAssignments) =>newAssignments.foreach { case (topicAndPartition, newReplicaAssignment) =>//controllerContext 的缓存中 更新分区、副本、leder信息controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty) {val partitionAssignments = addedPartitionReplicaAssignment.map { case TopicIdReplicaAssignment(_, _, partitionsReplicas) => partitionsReplicas.keySet }.reduce((s1, s2) => s1.union(s2))//更高topic下的分区、副本为可用状态 OnlineReplica//此时 往topic 生产数据就ok了 onNewPartitionCreation(partitionAssignments)}}

 从源码中我们可以看到,Controller这端会不断的将新的topic以及其下的topic_id、adding_replicas、removing_replicas、partitions 信息加载到缓存,并使用它们的状态机将它们更新至可用状态。并剔除掉删除的topic。始终保持,当向topic生产数据时,它这里都时最新的状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AI系统】GhostNet 系列

GhostNet 系列 本文主要会介绍 GhostNet 系列网络&#xff0c;在本文中会给大家带来卷积结构的改进方面的轻量化&#xff0c;以及与注意力(self-attention)模块的进行结合&#xff0c;部署更高效&#xff0c;更适合移动计算的 GhostNetV2。让读者更清楚的区别 V2 与 V1 之间的…

YOLOv8改进,YOLOv8引入CARAFE轻量级通用上采样算子,助力模型涨点

摘要 CARAFE模块的设计目的是在不增加计算复杂度的情况下,提升特征图的质量,特别是在视频超分辨率任务中,提升图像质量和细节。CARAFE结合了上下文感知机制和聚合特征的能力,通过动态的上下文注意力机制来提升细节恢复的效果。 理论介绍 传统的卷积操作通常依赖于局部区域…

大型制造企业IT蓝图、信息化系统技术架构规划与实施路线方案

关注 获取ppt​​​​​​全文&#xff0c;请关注作者

HTTP 长连接(HTTP Persistent Connection)简介

HTTP长连接怎么看&#xff1f; HTTP 长连接&#xff08;HTTP Persistent Connection&#xff09;简介 HTTP 长连接&#xff08;Persistent Connection&#xff09;是 HTTP/1.1 的一个重要特性&#xff0c;它允许在一个 TCP 连接上发送多个 HTTP 请求和响应&#xff0c;而无需为…

001集—— 创建一个WPF项目 ——WPF应用程序入门 C#

本例为一个WPF应用&#xff08;.NET FrameWork&#xff09;。 首先创建一个项目 双击xaml文件 双击xaml文件进入如下界面&#xff0c;开始编写代码。 效果如下&#xff1a; 付代码&#xff1a; <Window x:Class"WpfDemoFW.MainWindow"xmlns"http://schema…

微信小程序配置less并使用

1.在VScode中下载Less插件 2.在微信小程序中依次点击如下按钮 选择 从已解压的扩展文件夹安装… 3.选中刚在vscode中下载安装的插件文件 如果没有修改过插件的安装目录&#xff0c;一般是在c盘下C:\用户\用户名.vscode\extensions\mrcrowl.easy-less-2.0.2 我的路径是&#xf…

Vue网页屏保

Vue网页屏保 在vue项目中&#xff0c;如果项目长时间未操作需要弹出屏幕保护程序&#xff0c;以下为网页屏保效果&#xff0c;看板内容为连接的资源。 屏保组件 <template><div v-if"isActive" class"screensaver" click"disableScreens…

【SpringBoot】使用IDEA创建SpringBoot项目

1、使用SpringBoot脚手架创建 我们使用SpringBoot的脚手架Spring Initializr创建&#xff0c;如图所示&#xff1a; 2、选择SpringBoot版本 最开始做项目时候&#xff0c;组长说创建一个 springboot 2.5.4 的项目&#xff0c;mysql使用 5.6.X &#xff0c;maven使用是3.6.X…

如何在鸿蒙API9和x86模拟器中使用MQTT

目录 引言 安装MQTT软件包 避免MQTT软件包自动升级 程序的编写 运行测试 结语 引言 虽然我的课主要是OpenHarmony南向开发的&#xff0c;但是结课时有个同学说他在写鸿蒙APP时无法将MQTT库加入到设备中&#xff0c;希望我帮忙看看。由于他没有鸿蒙的真机&#xff0c;只能…

保姆级教程用vite创建vue3项目并初始化添加PrimeVue UI踩坑实录

文章目录 一、什么是PrimeVue二、详细教程1.添加PrimeVue2.配置main.js3.添加自动引入4.配置vite.config.js5.创建测试页面 一、什么是PrimeVue PrimeVue 是一个用于 Vue.js 3.x 开发的一款高质量、广受欢迎的 Web UI 组件库。 官网地址&#xff1a;https://primevue.org/ 二、…

QT的ui界面显示不全问题(适应高分辨率屏幕)

//自动适应高分辨率 QCoreApplication::setAttribute(Qt::AA_EnableHighDpiScaling);一、问题 电脑分辨率高&#xff0c;默认情况下&#xff0c;打开QT的ui界面&#xff0c;显示不全按钮内容 二、解决方案 如果自己的电脑分辨率较高&#xff0c;可以尝试以下方案&#xff1a;自…

超级详细,如何手动安装python第三方库?

文章目录 1&#xff0c;python第三方库安装包有3种类型2&#xff0c;python第三方库安装包whl文件如何安装&#xff1f;3&#xff0c;python第三方库安装包zip和tar.gz文件如何安装&#xff1f;4&#xff0c; python第三方库安装包exe文件如何安装&#xff1f; 手动安装第三方库…

Alibaba EasyExcel 导入导出全家桶

一、阿里巴巴EasyExcel的优势 首先说下EasyExcel相对 Apache poi的优势&#xff1a; EasyExcel也是阿里研发在poi基础上做了封装&#xff0c;改进产物。它替开发者做了注解列表解析&#xff0c;表格填充等一系列代码编写工作&#xff0c;并将此抽象成通用和可扩展的框架。相对p…

什么叫自动获得ip地址?自动获得的ip地址怎么设置

在数字化时代&#xff0c;网络连接已成为我们日常生活和工作中不可或缺的一部分。然而&#xff0c;对于非技术用户而言&#xff0c;复杂的网络配置常常令人感到困惑。幸运的是&#xff0c;自动获得IP地址技术的出现&#xff0c;极大地简化了网络配置过程。本文将详细介绍自动获…

流媒体之linux下离线部署FFmpeg 和 SRS

前言 用户对网络做了限制&#xff0c;只能访问指定的网址&#xff0c;和没网没啥区别&#xff0c;导致无法连接外网&#xff0c;无法获取安装包&#xff0c;还有一些编译需要的开源工具 用户需要用平台查看库房的海康摄像头实时监控&#xff0c;只能在库房里一台纯净的ubantu…

数字时代的文化宝库:存储技术与精神生活

文章目录 1. 文学经典的数字传承2. 音乐的无限可能3. 影视艺术的数字化存储4. 结语 数字时代的文化宝库&#xff1a;存储技术与精神生活 在数字化的浪潮中&#xff0c;存储技术如同一座桥梁&#xff0c;连接着过去与未来&#xff0c;承载着人类文明的瑰宝。随着存储容量的不断增…

渗透测试之Web基础之Linux病毒编写——泷羽sec

声明&#xff1a; 学习视频来自B站UP主泷羽sec,如涉及侵权马上删除文章。本文只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 泷羽sec的个人空间-泷羽sec个人主页-哔哩哔哩视频 (bilibili.com)https://space.bilibili.com/350329294 导读&#xff1a; 时刻…

基于神经网络的弹弹堂类游戏弹道快速预测

目录 一、 目的... 1 1.1 输入与输出.... 1 1.2 隐网络架构设计.... 1 1.3 激活函数与损失函数.... 1 二、 训练... 2 2.1 数据加载与预处理.... 2 2.2 训练过程.... 2 2.3 训练参数与设置.... 2 三、 测试与分析... 2 3.1 性能对比.... 2 3.2 训练过程差异.... 3 四、…

Xlsxwriter生成Excel文件时TypeError异常处理

在使用 XlsxWriter 生成 Excel 文件时&#xff0c;如果遇到 TypeError&#xff0c;通常是因为尝试写入的值或格式与 XlsxWriter 的限制或要求不兼容。 1、问题背景 在使用 Xlsxwriter 库生成 Excel 文件时&#xff0c;出现 TypeError: “expected string or buffer” 异常。此…

MATLAB期末复习笔记(下)

目录 五、数据和函数的可视化 1.MATLAB的可视化对象 2.二维图形的绘制 3.图形标识 4.多子图绘图 5.直方图的绘制 &#xff08;1&#xff09;分类 &#xff08;2&#xff09;垂直累计式 &#xff08;3&#xff09;垂直分组式 &#xff08;4&#xff09;水平分组式 &…