Apache Kafka 是一个分布式流处理平台,用于构建实时流数据管道和应用程序。在 Kafka 中,消费者(Consumer)用于从 Kafka 主题(Topic)中读取消息并进行处理。本文将介绍 Kafka 消费者的进阶用法,包括手动提交偏移量、消费者组、重新平衡等功能。
一. 创建 Kafka 消费者
首先,我们需要创建 Kafka 消费者,并配置消费者属性。
而创建消费在方式有俩种,一种是注解,另一种则是通过new KafkaConsumer()的方式来获取到消费者实例;
1.注解:
@KafkaListener(topic = "your-topic")public void handle(ConsumerRecord consumerRecord) {System.out.println("消费者消费消息:" + consumerRecord);System.out.println(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));System.out.println("消费内容:" + consumerRecord.value());}@KafkaListener(topics = {"your-topic1", "your-topic2"})public void handleCMDB(ConsumerRecord consumerRecord) {System.out.println("消费者消费消息:" + consumerRecord);System.out.println(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));logger.info(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));System.out.println("消费内容:" + consumerRecord.value());}
使用注解通过topic=指定需要监听的通道,同时可以使用topics监听多个通道,通过consumerRecord.value()即可获取到通道中的数据值;而kafka的配置需要在yml配置文件中指定
2.new KafkaConsumer()实例:
使用创建实例的方式需要指定properties配置,否则获取到的消费者实例为空,没有任何意义,而创建消费者来进行消费的方法有俩种,一种方式必须指定groupid,一种方式是默认会指定groupid不需要手动指定,以下是代码示例:
// 方式1:需要指定gruopid
public String useMsg(){Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "yourKafkaServers");props.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultConsumerGroup"); // 默认组props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafka_test_topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return "消费成功";}// 方式2:无需手动指定gruopid
public String useOtherMsg(){Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "yourKafkaServers");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 使用TopicPartition指定消费者主题和分区TopicPartition topicPartition = new TopicPartition("yourtopic", 0);consumer.assign(Arrays.asList(topicPartition));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("消费消息!!!!");System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return "无group消费者消费数据";}
二. 手动提交偏移量
手动提交偏移量可以确保消息被成功处理后再提交偏移量,避免消息丢失或重复消费。
设置偏移量有一种简单粗暴的方式:
consumer.seek(topicPartition, 0);
直接将偏移量设置为0,即从队列最开始的地方读取数据
第二种方式根据时间戳来设置偏移量,代码如下:
TopicPartition topicPartition = new TopicPartition("yourtopic", 0);consumer.assign(Arrays.asList(topicPartition));// 指定偏移量开始时间戳,默认为当天00:00的时间戳long timestampToSearch = LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toEpochSecond() * 1000;if (kafkaConfig.getLastRunTime() != null) {// 默认十秒提前量容错long advanceTime = kafkaConfig.getAdvanceTime() != null ? kafkaConfig.getAdvanceTime() : 10000;timestampToSearch = kafkaConfig.getLastRunTime().getTime() - advanceTime;}logger.info("设置当前偏移量开始时间戳:" + timestampToSearch);// 查找每个分区在指定时间的偏移量Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();timestampsToSearch.put(topicPartition, timestampToSearch);Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);// 将消费者指针移动到指定时间的偏移量for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {TopicPartition partition = entry.getKey();OffsetAndTimestamp offsetAndTimestamp = entry.getValue();if (offsetAndTimestamp != null) {consumer.seek(partition, offsetAndTimestamp.offset());}}
这里的getLastRunTime即为获取到上次从队列中退出时的时间,advanceTime为容错时间,即退出时间需要往前推迟多久,这样可能会读取到重复的一俩条数据,但并不会丢失数据,有利有弊且利大于弊;有细心读者会发现这里的队列设置偏移量是使用的不需要指定groupid的消费者,那么指定groupid的消费者该如何设置偏移量呢?别急,下面就给你们端上来:
consumer.subscribe(Collections.singletonList("yourtopic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分区被撤销时的处理,这里可以清空任何与这些分区相关的信息}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 分区被分配时的处理TopicPartition topicPartition = new TopicPartition("yourtopic", 0);// 指定偏移量开始时间戳,默认为当天00:00的时间戳long timestampToSearch = LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toEpochSecond() * 1000;if (kafkaConfig.getLastRunTime() != null) {// 默认十秒提前量容错long advanceTime = kafkaConfig.getAdvanceTime() != null ? kafkaConfig.getAdvanceTime() : 10000;timestampToSearch = kafkaConfig.getLastRunTime().getTime() - advanceTime;}logger.info("设置当前偏移量开始时间戳:" + timestampToSearch);Map<TopicPartition, Long> timestampsToSearch = Collections.singletonMap(topicPartition, timestampToSearch);Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {OffsetAndTimestamp offsetAndTimestamp = entry.getValue();if (offsetAndTimestamp != null) {consumer.seek(entry.getKey(), offsetAndTimestamp.offset());}}}});
通过内部类,重写了ConsumerRebalanceListener
接口的实现来处理分区的重新分配情况,同时根据时间戳重新设置偏移量,这也是所谓的重新平衡,以实现从指定时间点开始消费消息的功能
三. 消费者组和重新平衡
消费者组允许多个消费者共同消费一个主题,并通过重新平衡(Rebalance)来分配分区给不同的消费者。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);consumer.subscribe(Collections.singletonList(TOPIC_NAME));consumer.poll(0); // 触发消费者加入消费者组
consumer.seekToBeginning(consumer.assignment());consumeMessages(consumer);