一. 背景.
最近在验证kafka 开启kerberos的情况下, flink任务的支持情况.
但是验证的时候发现一个互斥的情况. 在读取数据的时候, 在开启kafka gruop id的权限控制的时候, flink sql 即使设置了gruop id , 竟然还能读取数据.
这个和预期不符. 所以才较真验证了一下.
二. kafka消费topic数据姿势
消费kafka的数据的时候首先要构造KafkaConsumer客户端, 然后KafkaConsumer客户端有两种方式读取topic 中的数据.
- 使用
subscribe
是最常见的,因为它支持动态分区再均衡和消费者组的管理,适合多数场景。 - 使用
assign
适合需要精确控制分区消费的特定场景,但不支持自动再均衡,因此需要开发者手动管理分区分配和调整。
2.1. subscribe
方法
- 目的:主要用于订阅一个或多个主题。消费者会自动分配这些主题的分区。
- 使用场景:适合使用消费者组(Consumer Group)的场景。Kafka 会自动进行分区的再均衡(rebalancing),确保同一消费者组内不会有多个消费者消费同一分区。
- 自动分配:使用
subscribe
时,Kafka 会自动为消费者分配它所订阅主题下的分区。 - 再均衡监听器:可以通过实现
ConsumerRebalanceListener
接口来自定义在分区再均衡时的行为。 - 动态性:如果新的分区被添加到主题中,消费者将自动开始消费新的分区。
- API 示例:
List<String> topics = Arrays.asList("topic1", "topic2"); consumer.subscribe(topics);
2.2. assign
方法
- 目的:用于手动分配消费者要消费的具体分区。
- 使用场景:适合需要对某些特定分区进行精确控制的场景。例如,需要单独处理特定分区时。
- 手动分配:通过
assign
方法,开发者显式指定消费者应该消费哪些分区。 - 无再均衡:使用
assign
时,Kafka 不会执行分区再均衡。消费者组的概念在这种模式下不适用。 - 静态性:如果主题增加了新的分区,消费者不会自动开始消费这些新分区,除非显式地调用
assign
方法来分配新的分区。 - API 示例:
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1)); consumer.assign(partitions);
三. 使用java client 验证.
3.1. 总结
- 无论subscribe 和assign 都需要授权topic .
- subscribe 方法需要指定group id , 所以需要group id 授权.
- assign 方法 group id 不是必填项, 不指定group id 的时候, group id 不生效, 指定了之后group id , 权限控制就会生效.
3.2. subscribe 方法
public static void main(String[] args) {System.setProperty("java.security.krb5.conf", "tmp/krb5.conf");Properties props = new Properties();// group.id,指定了消费者所属群组props.put("bootstrap.servers", "master01:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("group.id", "kafka-group-01");props.put("auto.offset.reset","earliest");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +"useKeyTab=true " +"keyTab=\"/tmp/kafka.keytab\" " +"storeKey=true " +"useTicketCache=false " +"serviceName=\"kafka\" " +"principal=\"kafka/ALL@EXAMPLE.COM\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Collections.singletonList("kafka-validate-01"));ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));for (ConsumerRecord<String, String> record : records) {LOG.info("KafkaConsumerDemoSubscribe#ConsumerRecord -> KEY : {} , VALUE : {} ", record.key(),record.value());}}
3.3. assign 方法示例
public static void main(String[] args) {System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf");Properties props = new Properties();// group.id,指定了消费者所属群组props.put("bootstrap.servers", "master01:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// props.put("group.id", "kafka-group");props.put("auto.offset.reset","earliest");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +"useKeyTab=true " +"keyTab=\"/tmp/kafka.keytab\" " +"storeKey=true " +"useTicketCache=false " +"serviceName=\"kafka\" " +"principal=\"kafka/ALL@EXAMPLE.COM\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);String topic = "kafka-validate-01";topic= "kafka_kerberos";consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));for (ConsumerRecord<String, String> record : records) {LOG.info("KafkaConsumerDemoAssign#ConsumerRecord -> KEY : {} , VALUE : {} ", record.key(),record.value());}}
四. FLINK SQL 任务验证…
flink 官方文档:
FLINK 使用assign构建KafkaConsumer , scan.startup.mode
配置项决定了 Kafka consumer 的启动模式。
序号 | 参数 | 含义 | kafka gruop id 是否必填 |
---|---|---|---|
1 | group-offsets (默认) | 从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。 | 是 |
2 | earliest-offset | 从可能的最早偏移量开始 | 否 |
3 | latest-offset | 从最末尾偏移量开始 | 否 |
4 | timestamp | 从用户为每个 partition 指定的时间戳开始 | 否 |
4 | specific-offsets | 从用户为每个 partition 指定的偏移量开始 | 否 |
- 只有使用
scan.startup.mode
为group-offsets
flink任务运行的时候才会报gruop id 相关的权限异常.
异常信息:
Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: kafka-validate-group-xx