通过 API 接口管理 Kafka

文章目录

  • 前言
  • Topic 管理
  • 配置管理
  • 消费者群组管理
    • 查看消费者群组
    • 修改消费者群组
  • 为主题添加分区
  • 从主题中删除消息
  • 首领选举

前言

除了通过命令行和可视化界面对 kafka 进行管理,也可以通过 AdminClient的 API 对 kafka 进行管理。
本文将介绍如何通过 AdminClient 进行 kafka 管理:主题管理、消费者群组管理和配置管理。

创建 AdminClient 对象

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// TODO: 用AdminClient做一些有用的事情
admin.close(Duration.ofSeconds(30));

Topic 管理

列出集群所有 Topic

ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);

验证主题是否存在,如果不存在就创建新的主题

DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST);try {topicDescription = demoTopic.values().get(TOPIC_NAME).get();System.out.println("Description of demo topic:" + topicDescription);if (topicDescription.partitions().size() != NUM_PARTITIONS) {System.out.println("Topic has wrong number of partitions. Exiting.");System.exit(-1);}
} catch (ExecutionException e) {// 对于大部分异常,提前退出if (! (e.getCause() instanceof UnknownTopicOrPartitionException)) {e.printStackTrace();throw e;}// 如果执行到这里,则说明主题不存在System.out.println("Topic " + TOPIC_NAME +" does not exist. Going to create it now");// 需要注意的是,分区和副本数是可选的// 如果没有指定,那么将使用broker的默认配置CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR)));// 检查主题是否已创建成功:if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) {System.out.println("Topic has wrong number of partitions.");System.exit(-1);}
}

❶ 为了检查主题是否配置正确,可以调用describeTopics()方法,并传入一组想要验证的主题名字作为参数。它会返回DescribeTopicResult对象,这个对象对map(主题名字到Future的映射)进行了包装。
❷ 如果一直等待Future完成,那么可以调用get()得到想要的结果——在这里是一个TopicDescription对象。但服务器也可能无法正确处理请求——如果主题不存在,那么服务器就不会返回我们想要的结果。在这种情况下,服务器将返回一个错误,Future将抛出ExecutionException,这个异常是服务器返回的错误导致的。因为我们想要处理主题不存在的情况,所以需要处理这些异常。
❸ 如果主题存在,那么Future将返回一个TopicDescription对象,其中包含了主题的所有分区、分区首领所在的broker、副本清单和同步副本清单。需要注意的是,这个对象并不包含主题的配置信息。本章后面会讨论配置管理。
❹ 如果Kafka返回错误,那么所有的AdminClient结果对象都会抛出ExecutionException异常,因为AdminClient结果对Future对象进行了包装,而Future又对异常进行了包装,所以需要检查ExecutionException的嵌套异常才能获取到Kafka返回的错误信息。
❺ 如果主题不存在,就创建一个新主题。在创建主题时,可以只指定主题名字,其他参数使用默认值。当然,也可以指定分区数量、副本数量和其他配置参数。
❻ 最后,等待主题创建完成并验证结果。这里只检查分区数量。因为在创建主题时指定了分区数量,所以要确保它是对的。如果我们在创建主题时使用了broker默认配置,则更加需要验证结果。再次调用get()获取CreateTopic的结果,这个方法也有可能抛出异常,最常见的是TopicExistsException,我们需要处理这个异常。

删除主题

admin.deleteTopics(TOPIC_LIST).all().get();// 检查主题是否已删除
// 需要注意的是,由于删除是异步操作,这个时候主题可能还存在
try {topicDescription = demoTopic.values().get(TOPIC_NAME).get();System.out.println("Topic " + TOPIC_NAME + " is still around");
} catch (ExecutionException e) {System.out.println("Topic " + TOPIC_NAME + " is gone");
}

异步回调获取执行结果
前边的操作都调用了 get()获取执行结果,但是这个是阻塞等待的,有时候我们希望异步回调的方式获取结果:

vertx.createHttpServer().requestHandler(request -> {String topic = request.getParam("topic");String timeout = request.getParam("timeout");int timeoutMs = NumberUtils.toInt(timeout, 1000);DescribeTopicsResult demoTopic = admin.describeTopics(Collections.singletonList(topic),new DescribeTopicsOptions().timeoutMs(timeoutMs));demoTopic.values().get(topic).whenComplete(new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {@Overridepublic void accept(final TopicDescription topicDescription,final Throwable throwable) {if (throwable != null) {request.response().end("Error trying to describe topic "+ topic + " due to " + throwable.getMessage());} else {request.response().end(topicDescription.toString());}}});
}).listen(8080);

❶ 用Vert.x创建一个简单的HTTP服务器。服务器在收到请求时会调用我们定义的requestHandler。
❷ 请求当中包含了一个主题名字,我们将用这个主题的描述信息作为响应。
❸ 像往常一样调用AdminClient.describeTopics,并得到一个包装好的Future对象。
❹ 这里没有调用get()方法,而是构造了一个函数,Future在完成时会调用这个函数。
❺ 如果Future抛出异常,就将错误返回给HTTP客户端。
❻ 如果Future顺利完成,就将主题描述信息返回给客户端。

配置管理

配置管理是通过描述和更新一系列配置资源(ConfigResource)来实现的。配置资源可以是broker、broker日志记录器和主题。我们通常会用kafka-config.sh或其他Kafka管理工具来检查和修改broker及broker日志配置,但主题配置管理是在应用程序中完成的。
例如,很多应用程序使用了压实的主题,它们会定期(为安全起见,要比默认的保留期限更加频繁一些)检查主题是否被压实,如果没有,就采取相应的行动来纠正主题配置:

ConfigResource configResource =new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);DescribeConfigsResult configsResult =admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
// 打印非默认配置
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println);// 检查主题是否被压实
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {// 如果主题没有被压实,就将其压实Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();alterConf.put(configResource, configOp);admin.incrementalAlterConfigs(alterConf).all().get();
} else {System.out.println("Topic " + TOPIC_NAME + " is compacted topic");
}

❶ 如上所述,ConfigResource有几种类型,这里检查的是主题配置。也可以在同一个请求中指定多个不同类型的资源。
❷ describeConfigs的结果是一个map(从ConfigResource到配置的映射)。每个配置项都有一个isDefault()方法,可以让我们知道哪些配置被修改了。如果用户为主题配置了非默认值,或者修改了broker级别的配置,而创建的主题继承了broker的非默认配置,那么我们便能知道这个配置不是默认的。
❸ 为了修改配置,这里指定了需要修改的ConfigResource和一组操作。每个修改操作都由一个配置条目(配置的名字和值,此处名字是cleanup.policy,值是compacted)和操作类型组成。Kafka的4种操作类型分别是:SET(用于设置值)、DELETE(用于删除值并重置为默认值)、APPEND和SUBSTRACT。后两种只适用于List类型的配置,用于向列表中添加值或从列表中移除值,这样就不用每次都把整个列表发送给Kafka了。

消费者群组管理

查看消费者群组

列出消费者群组

admin.listConsumerGroups().valid().get().forEach(System.out::println);
  • 这里调用valid()方法,可以让get()返回的消费者群组只包含由集群正常返回的消费者群组。错误都将被忽略,不作为异常抛出。

查看更多描述信息

ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST).describedGroups().get(CONSUMER_GROUP).get();System.out.println("Description of group " + CONSUMER_GROUP+ ":" + groupDescription);
  • 描述信息包括群组成员、它们的标识符和主机地址、分配给它们的分区、分配分区的算法以及群组协调器的主机地址。

获取分区最近偏移量和最近提交偏移量信息

Map<TopicPartition, OffsetAndMetadata> offsets =admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();for(TopicPartition tp: offsets.keySet()) {requestLatestOffsets.put(tp, OffsetSpec.latest());}Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =admin.listOffsets(requestLatestOffsets).all().get();for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {String topic = e.getKey().topic();int partition =  e.getKey().partition();long committedOffset = e.getValue().offset();long latestOffset = latestOffsets.get(e.getKey()).offset();System.out.println("Consumer group " + CONSUMER_GROUP+ " has committed offset " + committedOffset+ " to topic " + topic + " partition " + partition+ ". The latest offset in the partition is "+ latestOffset + " so consumer group is "+ (latestOffset - committedOffset) + " records behind");
}
  • ❶ 获取消费者群组读取的所有主题和分区,以及每个分区最新的提交偏移量。与describeConsumerGroups不同,listConsumerGroupOffsets只接受一个消费者群组而不是一个集合作为参数。
  • ❷ 我们希望获取到结果集中每一个分区最后一条消息的偏移量。OffsetSpec提供了3个非常方便的实现:earliest()latest()forTimestamp(),分别用于获取分区中最早和最近的偏移量,以及在指定时间或紧接在指定时间之后写入的消息的偏移量。
  • ❸ 最后,遍历所有分区,将最近提交的偏移量、分区中最近的偏移量以及它们之间的差值打印出来。

修改消费者群组

AdminClient也提供了修改消费者群组的方法:删除群组、移除成员、删除提交的偏移量和修改偏移量。
显式地将提交的偏移量修改为最早的偏移量,可以强制消费者从主题开头位置开始读取,实际上就是“重置”消费者。

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =admin.listOffsets(requestEarliestOffsets).all().get();Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:earliestOffsets.entrySet()) {resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));}try {admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get();} catch (ExecutionException e) {System.out.println("Failed to update the offsets committed by group "+ CONSUMER_GROUP + " with error " + e.getMessage());if (e.getCause() instanceof UnknownMemberIdException)System.out.println("Check if consumer group is still active.");}
  • ❶ 要重置消费者群组,并让它从最早的偏移量位置开始消费,需要先获取最早的偏移量。
  • ❷ 在这个循环中,将listOffsets返回的ListOffsetsResultInfo转成alterConsumerGroupOffsets需要的OffsetAndMetadata
  • ❸ 调用alterConsumerGroupOffsets之后等待Future完成,这样便可知道是否执行成功。
  • ❹ 导致alterConsumerGroupOffsets执行失败最常见的一个原因是没有停止消费者群组(只能直接关闭消费者应用程序,因为没有可用于关闭消费者群组的命令)。如果消费者群组仍然处于活跃状态,那么一旦修改了偏移量,群组协调器就会认为有非群组成员正在提交偏移量,并抛出UnknownMemberIdException异常。

为主题添加分区

可以用createPartitions方法为主题添加分区。需要注意的是,如果一次性为多个主题添加分区,则可能会出现一些主题添加成功一些主题添加失败的情况。

Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2)); ➊
admin.createPartitions(newPartitions).all().get();
  • ❶ 在添加分区时,需要指定添加分区后主题将拥有的分区总数,而不是要添加的新分区的数量。

从主题中删除消息

deleteRecords方法会将所有偏移量早于指定偏移量的消息标记为已删除,使消费者无法读取这些数据。这个方法将返回被删除的消息的最大偏移量,这样我们就可以检查删除操作是否按预期执行了。从磁盘上彻底删除数据是异步进行的。需要注意的是,可以用listOffsets方法获取在特定时间点或之后写入的消息的偏移量。也可以组合使用这些方法来删除早于任意特定时间点的消息。

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>  e:olderOffsets.entrySet())recordsToDelete.put(e.getKey(),RecordsToDelete.beforeOffset(e.getValue().offset()));
admin.deleteRecords(recordsToDelete).all().get();

首领选举

首选首领选举

  • 每一个分区都有一个可以被指定为首选首领的副本。如果所有分区的首领都是它们的首选首领副本,那么每个broker上的首领数量应该是均衡的。
  • 在默认情况下,Kafka每5分钟会检查一次首领是否就是首选首领副本,如果不是,但它有资格成为首领,就会选择首选首领副本作为首领。
  • 如果auto.leader.rebalance.enable被设置为false,或者你想快一点儿执行选举,则可以调用electLeader()方法。

不彻底的首领选举
如果一个分区的首领副本变得不可用,而其他副本没有资格成为首领(通常是因为缺少数据),那么这个分区将没有首领,也就不可用了。解决这个问题的一种方法是触发不彻底的首领选举,也就是选举一个本来没有资格成为首领的副本作为首领。这可能导致数据丢失——所有写入旧首领但未被复制到新首领的消息都将丢失。electLeader()方法也可以用来触发不彻底的首领选举。

Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get();} catch (ExecutionException e) {if (e.getCause() instanceof ElectionNotNeededException) {System.out.println("All leaders are preferred already");}
}
  • ❶ 这里选举的是某个特定主题的某个分区的首选首领。我们可以指定任意数量的分区和主题。如果在调用这个方法时传入null而不是分区列表,则它将触发所有分区的首领选举。
  • ❷ 如果集群的状态是健康的,那么这个方法将不执行任何操作。首选首领选举和不彻底的首领选举只在当前首领不是首选首领副本时才有效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/40626.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Vue学习]生命周期及其各阶段举例

当我们运行vue项目&#xff0c;看到了屏幕上显示的界面&#xff0c;看到了界面上显示的数据和标签&#xff0c;之后将这个界面叉掉&#xff0c;这一过程其实经历了一整个vue的生命周期的四个阶段&#xff0c;即创建阶段、挂载阶段、更新阶段以及销毁阶段, 而对于每个阶段的启动…

使用 pyecharts 渲染成图片程序报错: echarts is not defined问题处理

背景 之前写的使用 snapshot_selenium 来保存pyeacharts渲染成的网页截图&#xff0c;可以正常运行。程序搁置了半年&#xff0c;不知道动了电脑哪里&#xff0c;再次运行程序时&#xff0c;程序开始报错&#xff1a;JavascriptException: javascript error: echarts is not d…

【SQL】已解决:SQL分组去重并合并相同数据

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决&#xff1a;SQL分组去重并合并相同数据 在数据库操作中&#xff0c;数据的分组、去重以及合并是常见需求。然而&#xff0c;初学者在编写SQL语句时&#xff0c;可能会遇到一…

正弦波与单位圆关系的可视化 包括源码

正弦波与单位圆关系的可视化 包括源码 flyfish 正弦波与单位圆的关系 正弦波可以通过单位圆上的点在直线&#xff08;通常是 y 轴&#xff09;上的投影来表示。具体来说&#xff0c;考虑一个单位圆&#xff0c;其半径为 1&#xff0c;圆心在原点。我们可以通过旋转一个角度 …

每日一道算法题 判断子序列

题目 判断子序列_牛客题霸_牛客网 (nowcoder.com) Python # # 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可 # # # param S string字符串 # param T string字符串 # return bool布尔型 # class Solution:def isSubseq…

【全网最全流程+所有代码】企业微信回调联调,开通企微回调和收到企微回调

流程图: 只是这里的消息回调,仅作为提示,群内有消息了。不是具体的消息,而是类似这样的结构,: 如果需要获取消息,还需要拉取企微群内消息方法,这个后续再更新。 好了,我们开始吧。 开启消息回调和接收消息回调,地址是一样的,只是 开启消息回调,get请求, 接受消…

人工智能在日常生活中的十大应用:从医疗到智能家居

人工智能已成为当今人类日常生活的重要组成部分&#xff0c;无论您是否意识到&#xff0c;它几乎在所有场景中都能提供帮助。每次您进行网络搜索、在线预订旅行、接收来自京东等购物平台的产品推荐又或是打开您的新浪、抖音时&#xff0c;都能看到影子&#xff0c;这些只是一些…

代码随想录算法训练营第51天 [115.不同的子序列 583. 两个字符串的删除操作 72. 编辑距离 ]

代码随想录算法训练营第51天 [115.不同的子序列 583. 两个字符串的删除操作 72. 编辑距离 ] 一、115.不同的子序列 链接: 代码随想录. 思路&#xff1a;dp[i][j] 以t[j-1]为结尾的字符串在 以s[i-1]为结尾的字 符串出现个数 相等的时候 dp[i][j] dp[i - 1][j - 1] dp[i - 1][…

JAVA案例模拟电影信息系统

一案例要求&#xff1a; 二具体代码(需要在同一个包下创建三个类) Ⅰ&#xff1a;实现类 package 重修;import java.util.Random; import java.util.Scanner;public class first {public static void main(String[] args) {javabean[]moviesnew javabean[4];movies[0] new ja…

加密与安全_ Jasypt (Java Simplified Encryption)不完全指北

文章目录 官网功能概述Code附 官网 http://www.jasypt.org/ 功能概述 Jasypt 是一个 Java 库&#xff0c;它允许开发人员以最小的努力添加基本的加密功能&#xff0c;并且不需要深入了解密码学的工作原理。 高安全性、基于标准的加密技术&#xff0c;适用于单向和双向加密。…

AIGC对设计师积极性的影响

随着科技的迅猛发展&#xff0c;生成式人工智能&#xff08;AIGC&#xff09;工具正逐渐深入设计的每个角落&#xff0c;对设计师的工作方式和思维模式产生了深远的影响。AIGC不仅极大提升了设计师的工作效率&#xff0c;更激发了他们的创新思维&#xff0c;为设计行业带来了翻…

Spring Boot在java领域中有哪些优势

哈喽&#xff0c;大家好呀&#xff0c;淼淼又来和大家见面啦&#xff0c;随着云计算、微服务架构的兴起&#xff0c;Java开发领域迫切需要一套高效、灵活且易于上手的框架来应对日益复杂的业务需求。正是在这样的背景下&#xff0c;Spring Boot应运而生&#xff0c;以其独特的魅…

Dungeonborne联机失败、延迟高、卡顿的解决方法

Dungeonborne将第一人称动作的即时性与经典的西幻RPG职业设计巧妙融合&#xff0c;为玩家带来了一场前所未有的游戏体验。在这款沉浸式第一人称PvPvE地下城探险游戏中&#xff0c;我们可以独自深入探索&#xff0c;也可以与值得信赖的伙伴并肩作战&#xff0c;共同揭开地下城的…

移动端UI风格营造舒适氛围

移动端UI风格营造舒适氛围

中服云数字孪生平台引领工业物联仿真新纪元!

中服云数字孪生平台3.0是基于中服云物联网平台和数据中台打造的一款实时数据2D/3D集成展示监控平台。 旨在解决工业物联网数据的直观展示、实虚互动、仿真模拟、故障诊断、告警、预警、预测、实时观测、实时监控等问题。提供了数据采集、数据底座、监控逻辑、建模工具、展示互…

android 国内下载Gradle源

在中国使用 Gradle 时&#xff0c;可以配置使用一些国内的镜像源&#xff0c;以提高下载速度和稳定性。以下是几个常用的 Gradle 镜像源地址&#xff1a; 配置 gradle-wrapper.properties 文件: 阿里云: distributionUrlhttps\://services.gradle.org/distributions/gradle-7.…

数据结构 —— 图的遍历

数据结构 —— 图的遍历 BFS&#xff08;广度遍历&#xff09;一道美团题DFS&#xff08;深度遍历&#xff09; 我们今天来看图的遍历&#xff0c;其实都是之前在二叉树中提过的方法&#xff0c;深度和广度遍历。 在这之前&#xff0c;我们先用一个邻接矩阵来表示一个图&#…

220千伏变电站辅助设备智能监控平台 无人化与自动化升级改造工程

220千伏变电站特点 高电压等级&#xff1a;220千伏变电站的最大特点是其高压传输能力&#xff0c;能够将发电厂产生的电能高效地传输到较远的地区&#xff0c;满足大型城市及工业区域的用电需求。 输电能力大&#xff1a;220千伏变电站在输电能力上远大于普通的110千伏或更低…

Mybatis框架的集成使用

1_框架概述 框架是一个半成品&#xff0c;已经对基础的代码进行了封装并提供相应的API&#xff0c;开发者在使用框架时直接调用封装好的api可以省去很多代码编写&#xff0c;从而提高工作效率和开发速度,框架是一种经过校验、具有一定功能的半成品软件. 经过校验&#xff1a;指…

【超万卡GPU集群关键技术深度分析 2024】

文末有福利&#xff01; 1. 集群高能效计算技术 随着大模型从千亿参数的自然语言模型向万亿参数的多模态模型升级演进&#xff0c;超万卡集群吸需全面提升底层计算能力。 具体而言&#xff0c;包括增强单芯片能力、提升超节点计算能力、基于 DPU (Data Processing Unit) 实现…