Flink SQL 支持 kafka 开启 kerberos 权限控制.

一. 背景.

最近在验证kafka 开启kerberos的情况下, flink任务的支持情况.
但是验证的时候发现一个互斥的情况. 在读取数据的时候, 在开启kafka gruop id的权限控制的时候, flink sql 即使设置了gruop id , 竟然还能读取数据.
这个和预期不符. 所以才较真验证了一下.

二. kafka消费topic数据姿势

消费kafka的数据的时候首先要构造KafkaConsumer客户端, 然后KafkaConsumer客户端有两种方式读取topic 中的数据.

  • 使用 subscribe 是最常见的,因为它支持动态分区再均衡和消费者组的管理,适合多数场景。
  • 使用 assign 适合需要精确控制分区消费的特定场景,但不支持自动再均衡,因此需要开发者手动管理分区分配和调整。

2.1. subscribe 方法

  1. 目的:主要用于订阅一个或多个主题。消费者会自动分配这些主题的分区。
  2. 使用场景:适合使用消费者组(Consumer Group)的场景。Kafka 会自动进行分区的再均衡(rebalancing),确保同一消费者组内不会有多个消费者消费同一分区。
  3. 自动分配:使用 subscribe 时,Kafka 会自动为消费者分配它所订阅主题下的分区。
  4. 再均衡监听器:可以通过实现 ConsumerRebalanceListener 接口来自定义在分区再均衡时的行为。
  5. 动态性:如果新的分区被添加到主题中,消费者将自动开始消费新的分区。
  6. API 示例:
    List<String> topics = Arrays.asList("topic1", "topic2");
    consumer.subscribe(topics);
    

2.2. assign 方法

  1. 目的:用于手动分配消费者要消费的具体分区。
  2. 使用场景:适合需要对某些特定分区进行精确控制的场景。例如,需要单独处理特定分区时。
  3. 手动分配:通过 assign 方法,开发者显式指定消费者应该消费哪些分区。
  4. 无再均衡:使用 assign 时,Kafka 不会执行分区再均衡。消费者组的概念在这种模式下不适用。
  5. 静态性:如果主题增加了新的分区,消费者不会自动开始消费这些新分区,除非显式地调用 assign 方法来分配新的分区。
  6. API 示例:
    List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1));
    consumer.assign(partitions);
    

三. 使用java client 验证.

3.1. 总结

  1. 无论subscribe 和assign 都需要授权topic .
  2. subscribe 方法需要指定group id , 所以需要group id 授权.
  3. assign 方法 group id 不是必填项, 不指定group id 的时候, group id 不生效, 指定了之后group id , 权限控制就会生效.

3.2. subscribe 方法

        public static void main(String[] args) {System.setProperty("java.security.krb5.conf", "tmp/krb5.conf");Properties props = new Properties();// group.id,指定了消费者所属群组props.put("bootstrap.servers", "master01:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("group.id", "kafka-group-01");props.put("auto.offset.reset","earliest");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +"useKeyTab=true " +"keyTab=\"/tmp/kafka.keytab\" " +"storeKey=true " +"useTicketCache=false " +"serviceName=\"kafka\" " +"principal=\"kafka/ALL@EXAMPLE.COM\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Collections.singletonList("kafka-validate-01"));ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));for (ConsumerRecord<String, String> record : records) {LOG.info("KafkaConsumerDemoSubscribe#ConsumerRecord  -> KEY : {} , VALUE : {} ", record.key(),record.value());}}

3.3. assign 方法示例

public static void main(String[] args) {System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf");Properties props = new Properties();// group.id,指定了消费者所属群组props.put("bootstrap.servers", "master01:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("group.id", "kafka-group");props.put("auto.offset.reset","earliest");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +"useKeyTab=true " +"keyTab=\"/tmp/kafka.keytab\" " +"storeKey=true " +"useTicketCache=false " +"serviceName=\"kafka\" " +"principal=\"kafka/ALL@EXAMPLE.COM\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);String topic = "kafka-validate-01";topic= "kafka_kerberos";consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));for (ConsumerRecord<String, String> record : records) {LOG.info("KafkaConsumerDemoAssign#ConsumerRecord  -> KEY : {} , VALUE : {} ", record.key(),record.value());}}

四. FLINK SQL 任务验证…

flink 官方文档:
FLINK 使用assign构建KafkaConsumer , scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。

序号参数含义kafka gruop id 是否必填
1group-offsets (默认)从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
2earliest-offset从可能的最早偏移量开始
3latest-offset从最末尾偏移量开始
4timestamp从用户为每个 partition 指定的时间戳开始
4specific-offsets从用户为每个 partition 指定的偏移量开始
  • 只有使用scan.startup.mode group-offsets flink任务运行的时候才会报gruop id 相关的权限异常.

异常信息:

Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: kafka-validate-group-xx

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/65165.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

KVM虚拟机管理脚本

思路&#xff1a; 在/opt/kvm下创建一个磁盘文件&#xff0c;做差异镜像&#xff0c;创建一个虚拟机配置文件&#xff0c;做虚拟机模版 [rootnode01 ~]# ls /opt/kvm/ vm_base.qcow2 vm_base.xml创建虚拟机的步骤&#xff1a;首先创建虚拟机的差异镜像&#xff0c;然后复制虚…

Null value was assigned to a property of primitive type setter of 的原因与解决方案

Null value was assigned to a property of primitive type setter of 的原因与解决方案 org.hibernate.PropertyAccessException: Null value was assigned to a property of primitive type setter of com.xxx.xxx.DealerUser.dealerId数据库表结构 实体类 当数据库的dealer…

【数据结构与算法】排序算法(下)——计数排序与排序总结

写在前面 书接上文&#xff1a;【数据结构与算法】排序算法(中)——交换排序之快速排序 文章主要讲解计数排序的细节与分析源码。之后进行四大排序的总结。 文章目录 写在前面一、计数排序(非比较排序)代码的实现&#xff1a; 二、排序总结 2.1、稳定性 3.2、排序算法复杂度及…

Multi移动端开发

Multi移动端开发 安装环境 安装功能 VS2022安装 【ASP.NET和Web开发】、【.NET Multi-platform App UI开发】、【.NET桌面开发】 配置程序源 【工具】–>【选项】–>【NuGet包管理器】–>【程序包源】&#xff0c;添加如下&#xff1a; 名称&#xff1a;MES_APP 源&…

若依plus apifox导入接口显示为空

项目已经正常启动 访问接口有些没问题&#xff0c;有些有问题 其他模块都可以正常导入 解决&#xff1a;

音视频入门基础:AAC专题(13)——FFmpeg源码中,获取ADTS格式的AAC裸流音频信息的实现

音视频入门基础&#xff1a;AAC专题系列文章&#xff1a; 音视频入门基础&#xff1a;AAC专题&#xff08;1&#xff09;——AAC官方文档下载 音视频入门基础&#xff1a;AAC专题&#xff08;2&#xff09;——使用FFmpeg命令生成AAC裸流文件 音视频入门基础&#xff1a;AAC…

英文学术会议海报poster模板【可编辑】

英文学术会议海报poster模板【可编辑】 下载链接&#xff1a;学术会议海报poster模板【可编辑】 横版海报 竖版海报 下载链接&#xff1a;学术会议海报poster模板【可编辑】 提供了一套学术海报的PPT模板&#xff0c;适用于学术会议、研讨会等场合。 竖版&#xff0c;包含11…

机器学习之KNN算法预测数据和数据可视化

机器学习及KNN算法 目录 机器学习及KNN算法机器学习基本概念概念理解步骤为什么要学习机器学习需要准备的库 KNN算法概念算法导入常用距离公式算法优缺点优点&#xff1a;缺点︰ 数据可视化二维界面三维界面 KNeighborsClassifier 和KNeighborsRegressor理解查看KNeighborsRegr…

Jmeter自学【8】- 使用JMeter模拟设备通过MQTT发送数据

今天使用jmeter推送数据到MQTT&#xff0c;给大家分享一下操作流程。 一、安装JMeter 参考文档&#xff1a;Jmeter自学【1】- Jmeter安装、配置 二、安装MQTT插件 1、下载插件 我的Jmeter版本是5.6.3&#xff0c;用到的插件是&#xff1a;mqtt-xmeter-2.0.2-jar-with-depe…

若依框架中的上传图片后如何实现回显到页面的

在日常开发中&#xff0c;总会遇到上传文件、图片等功能&#xff0c;然后本地开发的话&#xff0c;又没有像OSS、七牛等网络存储&#xff0c;这个时候通常将文件上传到本地&#xff0c;那么上传之后拿到的是本地的路径&#xff0c;存储到数据库中&#xff0c;查询的时候如何将本…

Linux 文件 I/O 基础

目录 前言 一、文件描述符&#xff08;File Descriptor&#xff09; 二、打开文件&#xff08;open 函数&#xff09; 三、读取文件&#xff08;read 函数&#xff09; 四、写入文件&#xff08;write 函数&#xff09; 五、关闭文件&#xff08;close 函数&#xff09; …

【CSS in Depth 2 精译_091】15.4:让 CSS 高度值过渡到自动高度 + 15.5:自定义属性的过渡设置(全新)+ 15.6:本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第五部分 添加动效 ✔️【第 15 章 过渡】 ✔️ 15.1 状态间的由此及彼15.2 定时函数 15.2.1 定制贝塞尔曲线15.2.2 阶跃 15.3 非动画属性 15.3.1 不可添加动画效果的属性15.3.2 淡入与淡出 15.4 过…

路由器的原理

✍作者&#xff1a;柒烨带你飞 &#x1f4aa;格言&#xff1a;生活的情况越艰难&#xff0c;我越感到自己更坚强&#xff1b;我这个人走得很慢&#xff0c;但我从不后退。 &#x1f4dc;系列专栏&#xff1a;网路安全入门系列 目录 路由器的原理一&#xff0c;路由器基础及相关…

spring专题笔记(七):spring如何引入外部属性文件?spring在xml配置bean时如何引入外部的properties属性文件内容?

目录 1、spring在xml配置bean时引入外部的properties属性文件内容作用是什么&#xff1f; 2、引入配置文件步骤 2.1、首先创建一个java类MyDataSource&#xff0c;主要包含四个属性。 2.2、准备一个myDataConfig.properties属性文件&#xff0c;里面配置MyDataSource类中需…

梳理你的思路(从OOP到架构设计)_认识框架(Framework) 01

目录 1、 是框架的核心要素​编辑&i> 范例1&#xff1a; 范例2&#xff1a; 范例3&#xff1a; 1、 <E&I>是框架的核心要素 在特定领域(Domain)里&#xff0c;将EIT造形的<E&I>部份有意义地组合起来&#xff0c;就成为框架(Framework)了。基本…

邮件白名单是什么?

邮件白名单是一种电子邮件过滤规则&#xff0c;用于指定哪些发件人、域名或IP地址的邮件被允许通过过滤系统&#xff0c;不受任何限制地进入收件人的邮箱。与黑名单&#xff08;用于阻止特定发件人的邮件&#xff09;相反&#xff0c;白名单确保了来自受信任来源的邮件能够畅通…

Maven项目中不修改 pom.xml 状况下直接运行OpenRewrite的配方

在Java 的Maven项目中&#xff0c;可以在pom.xml 中配置插件用来运行OpenRewrite的Recipe&#xff0c;但是有一些场景是希望不修改pom.xml 文件就可以运行Recipe&#xff0c;比如&#xff1a; 因为不需要经常运行 OpenRewrite&#xff0c;所以不想在pom.xml 加入不常使用的插件…

JVM实战—2.JVM内存设置与对象分配流转

大纲 1.JVM内存划分的原理细节 2.对象在JVM内存中如何分配如何流转 3.部署线上系统时如何设置JVM内存大小 4.如何设置JVM堆内存大小 5.如何设置JVM栈内存与永久代大小 6.问题汇总 1.JVM内存划分的原理细节 (1)背景引入 (2)大部分对象的存活周期都是极短的 (3)少数对象…

kong网关使用pre-function插件,改写接口的返回数据

一、背景 kong作为api网关&#xff0c;除了反向代理后端服务外&#xff0c;还可对接口进行预处理。 比如本文提及的一个小功能&#xff0c;根据http header某个字段的值&#xff0c;等于多少的时候&#xff0c;返回一个固定的报文。 使用到的kong插件是pre-function。 除了上…