【kafka】使用kafka client连接 kerberos认证的 kafka,scala版

注意keytab路径中不要使用\\,都使用/作为分隔符

使用kerberos需要配置jaas如下日志打印,两个配置至少设置一个:

[DEBUG] org.apache.kafka.common.security.JaasContext:106 --- System property 'java.security.auth.login.config' and Kafka SASL property 'sasl.jaas.config' are not set, using default JAAS configuration.
import org.slf4j.Logger
trait Logging {val LOG: Logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
}
import org.apache.kafka.clients.producer.ProducerRecord
import org.scalatest.FunSuite
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import java.time.Duration
import scala.collection.JavaConversions._class KafkaClientTest extends FunSuite with Logging {val JAAS_CONFIG_KEYTAB_TEMPLATE: String =s"""|com.sun.security.auth.module.Krb5LoginModule required|debug=true|doNotPrompt=true|storeKey=true|useKeyTab=true|keyTab="%s"|principal="%s";|""".stripMarginval topic = "tmp_test"val bootstrapServers = "01.xxx.com:6667,01.xxx..com:6667,01.xxx..com:6667"val principal = "xxx@XXXXX.COM"val keytab = "D:/xxx/xxx.keytab"val krb5conf = "D:/xxx/krb5.conf"def getProducerProps: Properties = {val properties = new Properties()properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI")properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)properties}def getConsumerProps: Properties = {val properties = new Properties()properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI")properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tester")properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")properties}def setEnv(props: Properties) = {System.setProperty("java.security.krb5.conf", krb5conf)// 以下二者选其中之一就可以了。// 方式一:System.setProperty("java.security.auth.login.config", "D:/configs/kafka_client_jaas.conf")// 方式二:val jaasStr = JAAS_CONFIG_KEYTAB_TEMPLATE.format(keytab, principal).trimLOG.warn(s"format str: \n${jaasStr}")props.setProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasStr)}test("produce") {val props = getConsumerPropssetEnv(props)try {val producer = new KafkaProducer[String, String](props)var counter = 0;while (true) {val record = new ProducerRecord[String, String](topic, s"hello ${counter}")val fu = producer.send(record)Thread.sleep(3000L)if (counter % 10 == 0) {producer.flush()}counter += 1}} catch {case e: Exception =>throw new RuntimeException(e)}}test("consumer") {val props = getConsumerPropssetEnv(props)val consumer = new KafkaConsumer(props)consumer.subscribe(List(topic))while (true) {val record = consumer.poll(Duration.ofSeconds(3))val it = record.iterator()while (it.hasNext) {LOG.info(s"${it.next().value()}")}}}
}

kafka_client_jaas.conf 文件内容:
文件模板参考:KAFKA_HOME/conf/kafka_client_jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab = trueuseTicketCache=falsestoreKey = truekeyTab="D:/***/xxx.keytab"principal="xxx@XXXXX.COM"serviceName="kafka";
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/669753.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity3D判断屏幕中某个坐标点的位置是否在指定UI区域内

系列文章目录 unity工具 文章目录 系列文章目录前言一、使用rect.Contains()判断1-1、转换坐标1-2、代码如下:1-3、注意事项1-3、测试效果如下 二、使用坐标计算在不在区域内2-1、方法如下:2-2、注意事项 三、使用RectTransformUtility.ScreenPointToLo…

2024-02-05 Linux shell 脚本检查检测某个环境变量是否已设置

一、这段脚本定义了一个函数check_config,用于检查传递给函数的环境变量或shell变量是否已设置(即非空)。如果所有变量都已设置,函数返回0(成功);否则,它打印一条信息,指…

MongoDB从入门到实战之Docker快速安装MongoDB

前言 在上一篇文章中带领带同学们快速入门MongoDB这个文档型的NoSQL数据库,让大家快速的了解了MongoDB的基本概念。这一章开始我们就开始实战篇教程,为了快速把MongoDB使用起来我将会把MongoDB在Docker容器中安装起来作为开发环境使用。然后我这边MongoD…

SQL Server数据库日志查看若已满需要清理的三种解决方案

首先查看获取实例中每个数据库日志文件大小及使用情况,根据数据库日志占用百分比来清理 DBCC SQLPERF(LOGSPACE) 第一种解决方案: 在数据库上点击右键 → 选择 属性 → 选择 文件,然后增加数据库日志文件的文件大小。 第二种解决方案 手动…

宝塔+php+ssh+vscode+虚拟机 远程调试

远程(虚拟机)宝塔 安装扩展 配置文件添加,zend_extension看你虚拟机的具体位置 [Xdebug] zend_extension/www/server/php/74/lib/php/extensions/no-debug-non-zts-20190902/xdebug.so xdebug.modedebug xdebug.start_with_requesttrigger xdebug.client_host&quo…

gunicorn日志--access-logformat不起作用

使用gunicornsupervisor启动fastapi服务,命令如下: gunicorn main:app --workers 2 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000 --access-logfile - 其中access log记录到的内容如下: 176.123.7.11:0 - "GE…

服务器与电脑的区别

1. PC机和专业服务器是完全是两种东西,硬件不同,当然驱动也不可能相同。比如说对服务器/工作站主板而言,最重要的 是高可靠性和稳定性,其次才是高性能。因为大多数的服务器都要满足每天24小时、每周7天的满负荷工作要求。价格方面…

C/C++ - 容器list

目录 容器特性 list 容器特性 使用场景 构造函数 默认构造函数 填充构造函数 范围构造函数 复制构造函数 大小函数 函数:size 函数:empty​ 函数:max_size​ 增加函数 函数:​push_back​ 函数:push_f…

车位检测,YOLOV8,OPENCV调用

车位检测YOLOV8NANO,opencv调用 车位检测,YOLOV8NANO,训练得到PT模型,然后转换成ONNX,OPENCV的DNN调用,支持C,PYTHON,ANDROID

行业知识图谱是什么

行业知识图谱是一种将行业知识从业务场景抽象到计算机可读的知识结构。它是指在行业内业务领域中,对各种业务实体、概念、属性以及关系的一种结构化描述。是一种通过知识表示、推理和查询的手段,将行业知识以图形化方式表示出来。它可以被用来建立企业的…

C#用Array类的FindAll方法和List<T>类的Add方法按关键词在数组中检索元素并输出

目录 一、使用的方法 1. Array.FindAll(T[], Predicate) 方法 (1)定义 (2)示例 2.List类的常用方法 (1)List.Add(T) 方法 (2)List.RemoveAt(Int32) 方法 (3&…

Bytebase 签约 Vianova,助力欧洲城市交通智能平台中 Snowflake 和 PG 的变更自动化及版本控制

在数字化发展的浪潮中,自动化数据库变更管理成为提升产品上线效率、降低人为失误风险的关键工具,同时促进流程的一致性与标准化,确保合规性和变更的可追溯性。近日,数据库 DevOps 团队协同管理工具 Bytebase 签约欧洲交通数据管理…

免费在线绘图工具有哪些好用的?线画图工具是比较好的选择?

据说一张图胜过千言万语。一张好的图片可以帮助你快速表达自己的想法,让自己的想法更直观清晰,无论是产品分析、方案选项还是技术交流。市面上有很多绘图工具。这里有一些好用又免费的工具,绝对会让你在几分钟内坠入爱河。 即时设计 - 可实时…

Android~集成opencv问题

dlopen failed: library “libopencv_java4.so” not found E FATAL EXCEPTION: mainProcess: com.cv.monitor, PID: 4371java.lang.UnsatisfiedLinkError: dlopen failed: library "libopencv_java4.so" not found: needed by /data/app/~~KwL8rjD8_by_YVAyU82UCA…

YUM | 起源 | 发展 | 运行逻辑

介绍 YUM(Yellowdog Updater, Modified)起源于 Red Hat Linux 发行版 up2date 工具。 最初,up2date 是由 Red Hat 公司提供的用于管理系统更新的工具。然而,社区逐渐对 up2date 出现一些不满,主要是由于其使用体验和…

ubuntu20安装mysql8

1.安装 sudo apt update sudo apt install mysql-server-8.0 -y2.查看运行状态 yantaoubuntu20:~$ sudo systemctl status mysql ● mysql.service - MySQL Community ServerLoaded: loaded (/lib/systemd/system/mysql.service; enabled; vendor preset:>Active: active …

10英寸安卓车载平板电脑丨ONERugged车载工业平板:解决农业工作效率

农业是人类社会的基石之一,而农业工作效率的提升一直是农民和农业专业人士关注的重要议题。随着技术的不断进步,车载工业平板成为了解决农业工作效率的创新解决方案。本文将探讨车载工业平板如何为农业带来巨大的改变,提高农民的工作效率和农…

常用数字处理格式校验

1、前端校验 1.1 要求为数字类型&#xff08;不限位数与正负&#xff09; input输入框添加 type“number” <el-input type"number"/>当typenumber时&#xff0c;仍然可以输入字母e或E。解决方法是&#xff1a;给typenumber的输入框添加一个正则表达式&…

Android Studio 安装Flutter插件但是没法创建项目

Android Studio 安装Flutter插件但是没法创建项目 如果你在Android Studio已经安装了Dart、Flutter插件&#xff0c;但是不能创建Flutter项目。 原因是因为Android Studio的版本更新&#xff0c;Android APK Support这个插件没被选中。 一旦勾选这个插件之后&#xff0c;就能…

python创建pdf文件

目录 一&#xff1a;使用reportlab库 二&#xff1a;使用使pdf库 在Python中生成PDF文件可以使用多种库&#xff0c;其中最常用的是reportlab和fpdf。以下是使用这两个库生成PDF文件的示例代码&#xff1a; 一&#xff1a;使用reportlab库 1&#xff1a;写入文字信息 from r…