Kafka安全模式之身份认证

一、简介

Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决,以及对可能会碰到的问题进行总结。

二、原理介绍

Kafka身份认证主要分为以下几种:

(1)客户端与broker之间的连接认证

(2)broker与broker之间的连接认证

(3)broker与zookeeper之间的连接认证

日常项目中,无论是生产者还是消费者,我们都是作为客户端与kafka进行交互,因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图,客户端提交认证数据,服务端会根据认证数据对当前客户端进行身份校验,校验成功后的客户端即可成功登录kafka,进行后续操作。

图1 客户端与broker之间认证过程图

目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制,而SASL认证又分为了以下几种方式:

(1)基于Kerberos的GSSAPI

SASL-GSSAPI提供了一种非常安全的身份验证方法,但使用前提是企业中有Kerberos基础,一般使用随机密码的keytab认证方式,密码是加密的,在0.9版本中引入,目前是企业中使用最多的认证方式。

(2)SASL-PLAIN

SASL-PLAIN方式是一个经典的用户名/密码的认证方式,其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的,当客户端使用PLAIN模式进行认证时,密码是明文传输的,因此安全性较低,但好处是足够简单,方便我们对其进行二次开发,在0.10版本引入。

(3)SASL-SCRAM

SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式,它将用户名/密码存储在zookeeper中,并且可以通过脚本动态增减用户,当客户端使用SCRAM模式进行认证时,密码会经过SHA-256或SHA-512哈希加密后传输到服务器,因此安全性较高,在0.10.2版本中引入。

对Kafka集群来说,要想实现完整的安全模式,首先为集群中的每台机器生成密钥和证书是第一步,其次利用SASL对客户端进行身份验证是第二步,最后对不同客户端进行读写操作的授权是第三步,这些步骤即可以单独运作也可以同时运作,从而提高kafka集群的安全性。

三、具体实现

本文主要介绍作为kafka生产者,如何基于Kerberos进行身份认证给第三方kafka发送数据。

Kerberos主要由三个部分组成:密钥分发中心Key Distribution Center(即KDC)、客户端Client、服务端Service,大致关系图如下图2所示,其中KDC是实现身份认证的核心组件,其包含三个部分:

  1. Kerberos Database:储存用户密码以及其他信息
  2. Authentication Service(AS):进行用户身份信息验证,为客户端提供Ticket Granting Tickets(TGT)
  3. Ticket Granting Service(TGS):验证TGT,为客户端提供Service Tickets

我们作为生产者向第三方kafka发送数据,因此需要第三方提供以下安全认证文件:

  • 用户名principle:标识客户端的用户身份,也即用于登录的用户名
  • 指定用户名对应的秘钥文件xx.keytab:存储了用户的加密密码
  • 指定安全认证的服务配置文件krb5.conf:客户端根据该文件中的信息去访问KDC

获取以上安全认证文件后,即可编写java代码连接第三方kafka,步骤如下:

1、将安全认证文件xx.keytabkrb5.conf放置于某一路径下,确保后续java代码可进行读取

2、添加kafka配置文件,开启安全模式认证,其中kerberos.path是第一步中认证文件所在的目录

3、修改Kafka生产者配置,开启安全连接

4、调用认证工具类进行登录认证

LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件,该文件是kafka安全模式下认证的核心。代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileWriter;
import java.io.IOException;/*** @ProjectName: stdp-security-demo* @Package: * @ClassName: LoginUtil* @Author: stdp* @Description: ${description}*/
public class LoginUtil {public enum Module {KAFKA("KafkaClient"), ZOOKEEPER("Client");private String name;Module(String name) {this.name = name;}public String getName() {return name;}}private static final Logger LOGGER = LoggerFactory.getLogger(LoginUtil.class);/*** line operator string*/private static final String LINE_SEPARATOR = System.getProperty("line.separator");/*** jaas file postfix*/private static final String JAAS_POSTFIX = ".jaas.conf";private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf";public static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config";private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");/*** oracle jdk login module*/private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)throws IOException{// 1.check input parametersif ((userPrincipal == null) || (userPrincipal.length() <= 0)){LOGGER.error("input userPrincipal is invalid.");throw new IOException("input userPrincipal is invalid.");}if ((userKeytabPath == null) || (userKeytabPath.length() <= 0)){LOGGER.error("input userKeytabPath is invalid.");throw new IOException("input userKeytabPath is invalid.");}if ((krb5ConfPath == null) || (krb5ConfPath.length() <= 0)){LOGGER.error("input krb5ConfPath is invalid.");throw new IOException("input krb5ConfPath is invalid.");}// 2.check file exsitsFile userKeytabFile = new File(userKeytabPath);if (!userKeytabFile.exists()){LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");}if (!userKeytabFile.isFile()){LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");}File krb5ConfFile = new File(krb5ConfPath);if (!krb5ConfFile.exists()){LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");}if (!krb5ConfFile.isFile()){LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");}// 3.set and check krb5configsetKrb5Config(krb5ConfFile.getAbsolutePath());//        LOGGER.info("check zookeeper server Principal =============================================");setZookeeperServerPrincipal(userPrincipal);
//        LOGGER.info("check jaas.conf +++++++++++++++++++++++++++++++++++++++++++++++++");setJaasFile(userPrincipal,userKeytabPath);LOGGER.info("Login success!!!!!!!!!!!!!!");}public static void setKrb5Config(String krb5ConfigFile) throws IOException {System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);if (ret == null) {LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");}if (!ret.equals(krb5ConfigFile)){LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");}}public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX;LOGGER.info("jaasPath = {}",jaasPath);//windows路径下分隔符替换jaasPath = jaasPath.replace("\\","\\\\");userKeytabPath = userKeytabPath.replace("\\","\\\\");//删除jaas文件deleteJaasFile(jaasPath);writeJaasFile(jaasPath,userPrincipal,userKeytabPath);System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);}private static void deleteJaasFile(String jaasPath) throws IOException {File jaasFile = new File(jaasPath);if (jaasFile.exists()){if (!jaasFile.delete()){throw new IOException("failed to delete exists jaas file.");}}}private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {FileWriter writer = new FileWriter(new File(jaasPath));try{writer.write(getJaasConfContext(userPrincipal,userKeytabPath));writer.flush();}catch (IOException e){throw new IOException("Failed to create jaas.conf File.");}finally {writer.close();}}private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{Module[] allModule = Module.values();StringBuffer builder = new StringBuffer();for (Module module: allModule){String serviceName = null;if ("Client".equals(module.getName())){serviceName = "zookeeper";}else if ("KafkaClient".equals(module.getName())){serviceName = "kafka";}builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));}return builder.toString();}private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {StringBuffer builder = new StringBuffer();if (IS_IBM_JDK){builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);builder.append("credsType=both").append(LINE_SEPARATOR);builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);builder.append("useKeytab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);builder.append("debug=true;").append(LINE_SEPARATOR);builder.append("};").append(LINE_SEPARATOR);}else {builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);builder.append("useKeyTab=true").append(LINE_SEPARATOR);builder.append("keyTab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);builder.append("useTicketCache=false").append(LINE_SEPARATOR);builder.append("storeKey=true").append(LINE_SEPARATOR);builder.append("debug=true;").append(LINE_SEPARATOR);builder.append("};").append(LINE_SEPARATOR);}return builder.toString();}public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);String ret = System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);if (ret == null) {LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");}if (!ret.equals(zkServerPrincipal)){LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");}}
}

经过以上四步的配置,启动项目后即可自动连接kafka进行身份校验,若登录成功,会输出如下提示信息:Login success,并且会将生成的jaas文件路径打印出来。

四、常见问题

1、认证文件找不到

这是因为步骤1中kerberos.path配置有问题,检查path路径下是否存在认证文件keytab和krb5.conf。

2、 principal和keytab不匹配

不同的用户名对应不同的密码,在身份校验时,需保证用户名principle和密码keytab的一致性,否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景:

  •  配置文件中出现问题:检查kerberos.principle和kerberos.keytab中的用户名(即hkjj)是否一致。

  •  检查生成的jaas文件中用户名和配置的用户名是否相同

如果步骤1检查没用问题,则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中,就是由于现场技术配置kerberos.principle时后面多打了一个空格,导致自动生成的jaas文件中的principle后多一个空格,因此和keytab认证失败。

为了彻底解决这个误打空格的问题,可以直接修改认证工具类LoginUtil,在生成jaas文件的principle时去掉可能存在的空格。

3、用户密码keytab更新,导致出现checksum failed

这是由于principal对应的密码修改了,但是程序中使用的还是旧的密码,就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab

4jaas文件找不到

该问题是由于找不到jaas.conf 这个文件导致的,而基于kerberos认证时一般不会出现,这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。

该问题通常出现在SASL-PLAIN方式的认证中,因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径,如果文件路径出错则会报以上错误。

五、总结

在kafka身份认证的过程中,需要的principal,keytab,ServiceName等信息均配置在jaas文件中,因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。

基于kerberos认证时,可根据安全认证文件自动生成jaas配置文件,从而保证了密码加密传输,相比于SASL-PLAIN模式更具安全性,并且认证实现过程也较为简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706805.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android和Linux的开发差异

最近开始投入Android的怀抱。说来惭愧&#xff0c;08年就听说这东西&#xff0c;当时也有同事投入去看&#xff0c;因为恶心Java&#xff0c;始终对这玩意无感&#xff0c;没想到现在不会这个嵌入式都快要没法搞了。为了不中年失业&#xff0c;所以只能回过头又来学。 首先还是…

dcat admin 自定义页面

自定义用户详情页 整体分为两部分&#xff1a;用户信息、tab框 用户信息采用自定义页面加载&#xff0c;controller代码如下&#xff1a; protected function detail($id) {return Show::make($id, GameUser::with(finance), function (Show $show) {// 这段就是加载自定义页面…

git 中使用git clean删除未跟踪Untracked的文件

git clean -nf 是 Git 中的一个命令。让我们分解一下这个命令的意思&#xff1a; git clean: 这是一个命令&#xff0c;其功能是用来删除未被跟踪的文件。 -n&#xff1a;这是一个选项&#xff0c;也可以写作 --dry-run。添加这个选项后&#xff0c;命令将显示哪些文件会被删除…

go 的使用总结

go的内存逃逸&#xff1f; go语言在编辑阶段通过逃逸分析把分配在栈上变量 分配到堆上去。 栈内存&#xff1a; 一段连续的内存&#xff0c;便于高效运行指令过程中的临时变量存储。 堆内存&#xff1a; 主要由垃圾回收器 回收没有被引用的指针。 逃逸分析&#xff1a;栈内…

.NET Core Web API注册和发现实例

在.NET Core Web API中&#xff0c;服务注册和发现是实现微服务架构的重要组成部分。通过注册服务实例&#xff0c;客户端能够动态地找到可用的服务端点&#xff0c;从而实现服务的透明调用。在.NET Core中&#xff0c;有多种方式可以实现服务注册和发现&#xff0c;例如使用Co…

frp 内网穿透 linux部署版

frp 内网穿透 linux部署版 前提安装 frp阿里云服务器配置测试服务器配置访问公网 前提 使用 frp&#xff0c;您可以安全、便捷地将内网服务暴露到公网&#xff0c;通过访问公网 IP 直接可以访问到内网的测试环境。准备如下&#xff1a; 公网 IP已部署好的测试服务 IP:端口号阿…

【可实战】被测系统业务架构、系统架构、技术架构、数据流、业务逻辑分析

一、为什么要学习 更深的理解业务逻辑&#xff08;公司是做什么的&#xff1f;它最重要的商务决策是什么&#xff1f;它里面的数据流是怎么做的&#xff1f;有哪些业务场景&#xff1f;考验你对这家公司、对所负责业务的熟悉程度。公司背后服务器用什么软件搭建的&#xff1f;…

小程序框架(概念、工作原理、发展及应用)

引言 移动应用的普及使得用户对于轻量级、即时可用的应用程序需求越来越迫切。在这个背景下&#xff0c;小程序应运而生&#xff0c;成为一种无需下载安装、即点即用的应用形式&#xff0c;为用户提供了更便捷的体验。小程序的快速发展离不开强大的开发支持&#xff0c;而小程…

Cypher语句查询neo4j数据库教程

文章目录 Cypher介绍执行Cypher语句查询总结 Cypher介绍 NodeMatcher和RelationshipMatcher能够表达的匹配条件相对简单&#xff0c;更加复杂的查询还是需要用Cypher语句来表达。 Py2neo本身支持执行Cypher语句的执行&#xff0c;可以将复杂的查询写成Cypher语句&#xff0c;…

ubuntu20下使用 torchviz可视化计算图

安装 torchviz&#xff1a; pip install torchviz示例代码&#xff1a;下面是一个简单的示例代码&#xff0c;展示如何使用 torchviz 可视化计算图&#xff1a; python import torch from torchviz import make_dot# 创建一个简单的模型 model torch.nn.Sequential(torch.nn…

40+个适合高中生探索的计算机科学研究思路课题

作为一名对计算机科学感兴趣的高中生&#xff0c;应该如何提升对这个学科的的认知呢&#xff1f;进行研究就是一个不错的选择&#xff0c;通过研究&#xff0c;你可以加深对这个领域的理解&#xff0c;获得宝贵的技能&#xff0c;并为社会做出贡献&#xff01; 研究经验可以…

【办公类-21-05】20240227单个word按“段落数”拆分多个Word(成果汇编 只有段落文字 1拆5)

作品展示 背景需求 前文对一套带有段落文字和表格的word进行13份拆分 【办公类-21-04】20240227单个word按“段落数”拆分多个Word&#xff08;三级育婴师操作参考题目1拆13份&#xff09;-CSDN博客文章浏览阅读293次&#xff0c;点赞8次&#xff0c;收藏3次。【办公类-21-04…

Qt中控件pushbutton的使用

pushbutton控件的使用 pushbutton生成的方式 ①纯代码生成&#xff1a;通过代码实现pushbutton控件所有的属性&#xff0c;不推荐&#xff1b; ②纯UI界面生成&#xff1a;通过拖动工具栏中已有的pushbutton控件进行实现&#xff0c;但一些属性设置上&#xff0c;纯UI界面操作…

电商数据分析10——电商行业中的用户画像构建与数据分析应用

目录 写在开头1. 用户画像的概念和重要性1.1 用户画像定义1.2 用户画像在电商中的应用价值 2. 构建用户画像的数据分析方法2.1 数据收集与处理2.2 关键特征提取和用户分类2.3 用户行为和偏好分析 3. 用户画像在电商营销中的应用案例3.1 个性化推荐系统3.1.1 背景介绍3.1.2 问题…

【六袆-Golang】Golang:安装与配置Delve进行Go语言Debug调试

安装与配置Delve进行Go语言Debug调试 一、Delve简介二、win-安装Delve三、使用Delve调试Go程序[命令行的方式]四、使用Golang调试程序 Golang开发工具系列&#xff1a;安装与配置Delve进行Go语言Debug调试 摘要&#xff1a; 开发环境中安装和配置Delve&#xff0c;一个强大的G…

高通 AI Hub 上手指南

文章介绍 2月26日&#xff0c;高通在2024年世界移动通信大会&#xff08;MWC2024&#xff09;上发布高通AI Hub&#xff0c; AI Hub 简化了AI 模型部署到边缘设备的过程。可以利用AI-hub云端托管 Qualcomm 设备上&#xff0c;在几分钟内完成模型的优化、验证和部署。本文以Pyto…

RV32/64 特权架构 - 特权模式与指令

RV32/64 特权架构 - 特权模式与指令 1 特权模式2 特权指令2.1 mret&#xff08;从机器模式返回到先前的模式&#xff09;2.2 sret&#xff08;从监管模式返回到先前的模式&#xff09;2.3 wfi&#xff08;等待中断&#xff09;2.4 sfence.vma&#xff08;内存屏障&#xff09; …

idea 更新maven java版本变化

今天遇到个问题就是&#xff0c;点击maven的reload&#xff0c;会导致setting 里的java compiler 版本变化 这里的话&#xff0c;应该是settings.xml文件里面的这个限定死了&#xff0c;修改一下或者去掉就可以了 <profile><id>JDK-1.8</id><activatio…

华为OD机试真题-靠谱的车-2023年OD统一考试(C卷)---Python3-开源

题目&#xff1a; 考察内容&#xff1a; 思维转化&#xff0c;进制转化&#xff0c;9进制转为10进制&#xff0c;在4的位置1&#xff0c;需要判断是否大于4 代码&#xff1a; """ 题目分析&#xff1a; 9进制转化为10进制23-25 39-50 399-500输入&#xff1a…

06 基于单位脉冲信号的信号合成与分解

各位看官&#xff0c;大家好&#xff01;本讲为《数字信号处理理论篇》06 基于单位脉冲信号的信号合成与分解。&#xff08;特别提示&#xff1a;课程内容为由浅入深的特性&#xff0c;而且前后对照&#xff0c;不要跳跃观看&#xff0c;请按照文章或视频顺序进行观看。 笔者今…