文章目录
- 前言
- Topic 管理
- 配置管理
- 消费者群组管理
- 查看消费者群组
- 修改消费者群组
- 为主题添加分区
- 从主题中删除消息
- 首领选举
前言
除了通过命令行和可视化界面对 kafka 进行管理,也可以通过 AdminClient
的 API 对 kafka 进行管理。
本文将介绍如何通过 AdminClient
进行 kafka 管理:主题管理、消费者群组管理和配置管理。
创建 AdminClient 对象
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// TODO: 用AdminClient做一些有用的事情
admin.close(Duration.ofSeconds(30));
Topic 管理
列出集群所有 Topic
ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);
验证主题是否存在,如果不存在就创建新的主题
DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST); ➊try {topicDescription = demoTopic.values().get(TOPIC_NAME).get(); ➋System.out.println("Description of demo topic:" + topicDescription);if (topicDescription.partitions().size() != NUM_PARTITIONS) { ➌System.out.println("Topic has wrong number of partitions. Exiting.");System.exit(-1);}
} catch (ExecutionException e) { ➍// 对于大部分异常,提前退出if (! (e.getCause() instanceof UnknownTopicOrPartitionException)) {e.printStackTrace();throw e;}// 如果执行到这里,则说明主题不存在System.out.println("Topic " + TOPIC_NAME +" does not exist. Going to create it now");// 需要注意的是,分区和副本数是可选的// 如果没有指定,那么将使用broker的默认配置CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR))); ➎// 检查主题是否已创建成功:if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) { ➏System.out.println("Topic has wrong number of partitions.");System.exit(-1);}
}
❶ 为了检查主题是否配置正确,可以调用describeTopics()方法,并传入一组想要验证的主题名字作为参数。它会返回DescribeTopicResult对象,这个对象对map(主题名字到Future的映射)进行了包装。
❷ 如果一直等待Future完成,那么可以调用get()得到想要的结果——在这里是一个TopicDescription对象。但服务器也可能无法正确处理请求——如果主题不存在,那么服务器就不会返回我们想要的结果。在这种情况下,服务器将返回一个错误,Future将抛出ExecutionException,这个异常是服务器返回的错误导致的。因为我们想要处理主题不存在的情况,所以需要处理这些异常。
❸ 如果主题存在,那么Future将返回一个TopicDescription对象,其中包含了主题的所有分区、分区首领所在的broker、副本清单和同步副本清单。需要注意的是,这个对象并不包含主题的配置信息。本章后面会讨论配置管理。
❹ 如果Kafka返回错误,那么所有的AdminClient结果对象都会抛出ExecutionException异常,因为AdminClient结果对Future对象进行了包装,而Future又对异常进行了包装,所以需要检查ExecutionException的嵌套异常才能获取到Kafka返回的错误信息。
❺ 如果主题不存在,就创建一个新主题。在创建主题时,可以只指定主题名字,其他参数使用默认值。当然,也可以指定分区数量、副本数量和其他配置参数。
❻ 最后,等待主题创建完成并验证结果。这里只检查分区数量。因为在创建主题时指定了分区数量,所以要确保它是对的。如果我们在创建主题时使用了broker默认配置,则更加需要验证结果。再次调用get()获取CreateTopic的结果,这个方法也有可能抛出异常,最常见的是TopicExistsException,我们需要处理这个异常。
删除主题
admin.deleteTopics(TOPIC_LIST).all().get();// 检查主题是否已删除
// 需要注意的是,由于删除是异步操作,这个时候主题可能还存在
try {topicDescription = demoTopic.values().get(TOPIC_NAME).get();System.out.println("Topic " + TOPIC_NAME + " is still around");
} catch (ExecutionException e) {System.out.println("Topic " + TOPIC_NAME + " is gone");
}
异步回调获取执行结果
前边的操作都调用了 get()
获取执行结果,但是这个是阻塞等待的,有时候我们希望异步回调的方式获取结果:
vertx.createHttpServer().requestHandler(request -> { ➊String topic = request.getParam("topic"); ➋String timeout = request.getParam("timeout");int timeoutMs = NumberUtils.toInt(timeout, 1000);DescribeTopicsResult demoTopic = admin.describeTopics( ➌Collections.singletonList(topic),new DescribeTopicsOptions().timeoutMs(timeoutMs));demoTopic.values().get(topic).whenComplete( ➍new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {@Overridepublic void accept(final TopicDescription topicDescription,final Throwable throwable) {if (throwable != null) {request.response().end("Error trying to describe topic "+ topic + " due to " + throwable.getMessage()); ➎} else {request.response().end(topicDescription.toString()); ➏}}});
}).listen(8080);
❶ 用Vert.x创建一个简单的HTTP服务器。服务器在收到请求时会调用我们定义的requestHandler。
❷ 请求当中包含了一个主题名字,我们将用这个主题的描述信息作为响应。
❸ 像往常一样调用AdminClient.describeTopics,并得到一个包装好的Future对象。
❹ 这里没有调用get()方法,而是构造了一个函数,Future在完成时会调用这个函数。
❺ 如果Future抛出异常,就将错误返回给HTTP客户端。
❻ 如果Future顺利完成,就将主题描述信息返回给客户端。
配置管理
配置管理是通过描述和更新一系列配置资源(ConfigResource)来实现的。配置资源可以是broker、broker日志记录器和主题。我们通常会用kafka-config.sh或其他Kafka管理工具来检查和修改broker及broker日志配置,但主题配置管理是在应用程序中完成的。
例如,很多应用程序使用了压实的主题,它们会定期(为安全起见,要比默认的保留期限更加频繁一些)检查主题是否被压实,如果没有,就采取相应的行动来纠正主题配置:
ConfigResource configResource =new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); ➊
DescribeConfigsResult configsResult =admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
// 打印非默认配置
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println); ➋// 检查主题是否被压实
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {// 如果主题没有被压实,就将其压实Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET)); ➌Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();alterConf.put(configResource, configOp);admin.incrementalAlterConfigs(alterConf).all().get();
} else {System.out.println("Topic " + TOPIC_NAME + " is compacted topic");
}
❶ 如上所述,ConfigResource有几种类型,这里检查的是主题配置。也可以在同一个请求中指定多个不同类型的资源。
❷ describeConfigs的结果是一个map(从ConfigResource到配置的映射)。每个配置项都有一个isDefault()方法,可以让我们知道哪些配置被修改了。如果用户为主题配置了非默认值,或者修改了broker级别的配置,而创建的主题继承了broker的非默认配置,那么我们便能知道这个配置不是默认的。
❸ 为了修改配置,这里指定了需要修改的ConfigResource和一组操作。每个修改操作都由一个配置条目(配置的名字和值,此处名字是cleanup.policy,值是compacted)和操作类型组成。Kafka的4种操作类型分别是:SET(用于设置值)、DELETE(用于删除值并重置为默认值)、APPEND和SUBSTRACT。后两种只适用于List类型的配置,用于向列表中添加值或从列表中移除值,这样就不用每次都把整个列表发送给Kafka了。
消费者群组管理
查看消费者群组
列出消费者群组
admin.listConsumerGroups().valid().get().forEach(System.out::println);
- 这里调用valid()方法,可以让get()返回的消费者群组只包含由集群正常返回的消费者群组。错误都将被忽略,不作为异常抛出。
查看更多描述信息
ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST).describedGroups().get(CONSUMER_GROUP).get();System.out.println("Description of group " + CONSUMER_GROUP+ ":" + groupDescription);
- 描述信息包括群组成员、它们的标识符和主机地址、分配给它们的分区、分配分区的算法以及群组协调器的主机地址。
获取分区最近偏移量和最近提交偏移量信息
Map<TopicPartition, OffsetAndMetadata> offsets =admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get(); ➊Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();for(TopicPartition tp: offsets.keySet()) {requestLatestOffsets.put(tp, OffsetSpec.latest()); ➋
}Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =admin.listOffsets(requestLatestOffsets).all().get();for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) { ➌String topic = e.getKey().topic();int partition = e.getKey().partition();long committedOffset = e.getValue().offset();long latestOffset = latestOffsets.get(e.getKey()).offset();System.out.println("Consumer group " + CONSUMER_GROUP+ " has committed offset " + committedOffset+ " to topic " + topic + " partition " + partition+ ". The latest offset in the partition is "+ latestOffset + " so consumer group is "+ (latestOffset - committedOffset) + " records behind");
}
- ❶ 获取消费者群组读取的所有主题和分区,以及每个分区最新的提交偏移量。与
describeConsumerGroups
不同,listConsumerGroupOffsets
只接受一个消费者群组而不是一个集合作为参数。 - ❷ 我们希望获取到结果集中每一个分区最后一条消息的偏移量。
OffsetSpec
提供了3个非常方便的实现:earliest()
、latest()
和forTimestamp()
,分别用于获取分区中最早和最近的偏移量,以及在指定时间或紧接在指定时间之后写入的消息的偏移量。 - ❸ 最后,遍历所有分区,将最近提交的偏移量、分区中最近的偏移量以及它们之间的差值打印出来。
修改消费者群组
AdminClient也提供了修改消费者群组的方法:删除群组、移除成员、删除提交的偏移量和修改偏移量。
显式地将提交的偏移量修改为最早的偏移量,可以强制消费者从主题开头位置开始读取,实际上就是“重置”消费者。
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =admin.listOffsets(requestEarliestOffsets).all().get(); ➊Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:earliestOffsets.entrySet()) {resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset())); ➋
}try {admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get(); ➌
} catch (ExecutionException e) {System.out.println("Failed to update the offsets committed by group "+ CONSUMER_GROUP + " with error " + e.getMessage());if (e.getCause() instanceof UnknownMemberIdException)System.out.println("Check if consumer group is still active."); ➍
}
- ❶ 要重置消费者群组,并让它从最早的偏移量位置开始消费,需要先获取最早的偏移量。
- ❷ 在这个循环中,将
listOffsets
返回的ListOffsetsResultInfo
转成alterConsumerGroupOffsets
需要的OffsetAndMetadata
。 - ❸ 调用
alterConsumerGroupOffsets
之后等待Future
完成,这样便可知道是否执行成功。 - ❹ 导致
alterConsumerGroupOffsets
执行失败最常见的一个原因是没有停止消费者群组(只能直接关闭消费者应用程序,因为没有可用于关闭消费者群组的命令)。如果消费者群组仍然处于活跃状态,那么一旦修改了偏移量,群组协调器就会认为有非群组成员正在提交偏移量,并抛出UnknownMemberIdException
异常。
为主题添加分区
可以用createPartitions方法为主题添加分区。需要注意的是,如果一次性为多个主题添加分区,则可能会出现一些主题添加成功一些主题添加失败的情况。
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2)); ➊
admin.createPartitions(newPartitions).all().get();
- ❶ 在添加分区时,需要指定添加分区后主题将拥有的分区总数,而不是要添加的新分区的数量。
从主题中删除消息
deleteRecords方法会将所有偏移量早于指定偏移量的消息标记为已删除,使消费者无法读取这些数据。这个方法将返回被删除的消息的最大偏移量,这样我们就可以检查删除操作是否按预期执行了。从磁盘上彻底删除数据是异步进行的。需要注意的是,可以用listOffsets方法获取在特定时间点或之后写入的消息的偏移量。也可以组合使用这些方法来删除早于任意特定时间点的消息。
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:olderOffsets.entrySet())recordsToDelete.put(e.getKey(),RecordsToDelete.beforeOffset(e.getValue().offset()));
admin.deleteRecords(recordsToDelete).all().get();
首领选举
首选首领选举
- 每一个分区都有一个可以被指定为首选首领的副本。如果所有分区的首领都是它们的首选首领副本,那么每个broker上的首领数量应该是均衡的。
- 在默认情况下,Kafka每5分钟会检查一次首领是否就是首选首领副本,如果不是,但它有资格成为首领,就会选择首选首领副本作为首领。
- 如果auto.leader.rebalance.enable被设置为false,或者你想快一点儿执行选举,则可以调用electLeader()方法。
不彻底的首领选举
如果一个分区的首领副本变得不可用,而其他副本没有资格成为首领(通常是因为缺少数据),那么这个分区将没有首领,也就不可用了。解决这个问题的一种方法是触发不彻底的首领选举,也就是选举一个本来没有资格成为首领的副本作为首领。这可能导致数据丢失——所有写入旧首领但未被复制到新首领的消息都将丢失。electLeader()方法也可以用来触发不彻底的首领选举。
Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get(); ➊
} catch (ExecutionException e) {if (e.getCause() instanceof ElectionNotNeededException) {System.out.println("All leaders are preferred already"); ➋}
}
- ❶ 这里选举的是某个特定主题的某个分区的首选首领。我们可以指定任意数量的分区和主题。如果在调用这个方法时传入null而不是分区列表,则它将触发所有分区的首领选举。
- ❷ 如果集群的状态是健康的,那么这个方法将不执行任何操作。首选首领选举和不彻底的首领选举只在当前首领不是首选首领副本时才有效。