Kafka配置Kerberos安全认证及与Java程序集成

Background

  • 本文主要介绍在 Kafka 中如何配置 Kerberos 认证,以及 java 使用 JAAS 来进行 Kerberos 认证连接。
  • 本文演示为单机版。

所用软件版本

查看 Kerberos 版本命令:klist -V

软件名称版本
jdk1.8.0_202
kafka2.12-2.2.1
kerberos1.15.1

1、Kerberos Server 安装

  • Kerberos 是一种由 MIT(麻省理工大学)提出的网络身份验证协议,它旨在通过使用密钥加密技术为客户端和服务器应用程序提供强身份验证。
  • Kerberos 是一种基于加密 Ticket 的身份认证协议,主要由三个部分组成:Key Distribution Center (即KDC)、Client 和 Service:
    客户端会先访问两次 KDC,然后再访问目标服务,如:HTTP 服务、Zookeeper 服务、Kafka 服务等。

12c395d0ab9fd49425fcfe07f585060.png

  • 在线安装
yum install krb5-server krb5-workstation krb5-libs -y
  • 配置主机名映射

/etc/hosts文件中新增本机映射(我这里的主机名是monkey)。

127.0.0.1 monkey
  • 配置 krb5.conf

根据需要修改 /etc/krb5.conf文件,其中WLF.COM你可以改成你需要的,还有monkey是你的主机映射。

# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crtdefault_realm = WLF.COM#default_ccache_name = KEYRING:persistent:%{uid}[realms]
WLF.COM = {kdc = monkeyadmin_server = monkey
}[domain_realm]
.monkey = WLF.COM
monkey = WLF.COM
  • 配置 kdc.conf
  • 修改/var/kerberos/krb5kdc/kdc.conf,kdc的专属配置文件。
  • Java 使用 aes256-cts 验证方式需要安装额外的 jar 包,所以为了方便不用哈。
[kdcdefaults]kdc_ports = 88kdc_tcp_ports = 88[realms]WLF.COM = {#master_key_type = aes256-ctsacl_file = /var/kerberos/krb5kdc/kadm5.acldict_file = /usr/share/dict/wordsadmin_keytab = /var/kerberos/krb5kdc/kadm5.keytabmax_file = 24hmax_renewable_life = 7dsupported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal}
  • 配置 kadm5.acl
  • 修改权限相关配置文件 /var/kerberos/krb5kdc/kadm5.acl
  • 其中前一个*号是通配符,表示像名为“abc/admin”或“xxx/admin”的人都可以使用此工具(远程或本地)管理kerberos数据库,后一个跟权限有关,*表示所有权限。WLF.COM是上面配置的realm。
*/admin@WLF.COM *
  • 初始化KDC数据库
kdb5_util create -r WLF.COM -s
ll -a /var/kerberos/krb5kdc/

image.png

  • 启动 Kerberos 服务
systemctl start krb5kdc kadmin
systemctl status krb5kdc kadmin

image.png

Kerberos 服务机器上可以使用 kadmin.local 来执行各种管理的操作!

2、Kafka 开启 Kerberos 认证

所有Kerberos 相关配置文件(java连接所需),我们都放在 Kafka 的 config/kerberos/目录下的(kerberos 目录需新建),把krb5.conf也拷贝过去。

  • 创建 keytab
cd /opt/kafka_2.12-2.2.1/
cp /etc/krb5.conf config/kerberos/cd
kadmin.local -q "add_principal -randkey kafka-server/monkey@WLF.COM"
kadmin.local -q "add_principal -randkey kafka-client@WLF.COM"
kadmin.local -q "xst -k config/kerberos/kafka-server.keytab kafka-server/monkey@WLF.COM"
kadmin.local -q "xst -k config/kerberos/kafka-client.keytab kafka-client@WLF.COM"
  • Kafka 服务端配置

修改config/server.properties配置文件,新增如下内容:

listeners=SASL_PLAINTEXT://monkey:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka-server
  • KafkaServer 配置

新建config/kerberos/kafka-server-jaas.conf文件,内容如下:

KafkaServer {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/opt/kafka_2.12-2.2.1/config/kerberos/kafka-server.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka-server/monkey@WLF.COM";
};
  • KafkaClient 配置
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/opt/kafka_2.12-2.2.1/config/kerberos/kafka-client.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka-client@WLF.COM";
};
  • 修改bin/kafka-server-start.sh,倒数第二行增加如下配置:
export KAFKA_OPTS="-Dzookeeper.sasl.client=false -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/opt/kafka_2.12-2.2.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/kafka_2.12-2.2.1/config/kerberos/kafka-server-jaas.conf"
  • 修改bin/kafka-topics.sh、kafka-console-producer.sh、bin/kafka-console-consumer.sh ,倒数第二行增加如下配置:
export KAFKA_OPTS="-Djava.security.krb5.conf=/opt/kafka_2.12-2.2.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/kafka_2.12-2.2.1/config/kerberos/kafka-client-jaas.conf"
  • 新建 config/kerberos/client.properties 文件,内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka-server
  • 附赠kafka操作脚本operate.sh
#/bin/bash:<<!
【脚本说明】
1、此脚本用于操作kafka:启动、停止、查看运行状态、重启、查看日志、查询所有主题、创建主题、删除主题、订阅或消费主题数据;
2、建议把脚本放在kafka安装目录下;
3、适用单机版。
注意安装kafka修改配置文件:
# IP替换为Kafka所在主机的IP
sed -i '31 a listeners=PLAINTEXT://localhost:9092' config/server.properties
!# kafka安装目录
KAFKA_HOME=/opt/kafka_2.12-2.2.1
# zookeeper地址
ZK_SERVER=monkey
# kafka地址
KAFKA_SERVER=monkey
# zk启动日志
LOG_ZK=$KAFKA_HOME/logs/zookeeper-run.log
# kafka启动日志
LOG_KAFKA=$KAFKA_HOME/logs/kafka-run.log
# sasl
CONF_SASL=config/kerberos/client.properties# 操作
operate=$1
# 参数
param=$2# 进程
pids=`ps -ef | egrep "Kafka|QuorumPeerMain" | egrep -v grep | awk '{print $2}'`# 提示信息
msg='Please input params [<run>|<kil>|<res>|<sta>|<log> [zk]|<list>|<add> <{topic}>|<del> [{topic}]|<consume> <{topic}>|<produce> <{topic}>]'# 定制化shell输出
function custom_print(){echo -e "\033[5;34m ***** \033[0m"echo -e "\033[32m $@ ! \033[0m"echo -e "\033[5;34m ***** \033[0m"
}function run(){rm -rf $LOG_ZK $LOG_KAFKA# 先启动zkecho "start zookeeper ..."nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties > $LOG_ZK 2>&1 &sleep 5# 再启动kafkanohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_KAFKA 2>&1 &
}function stop(){if [[ $pids ]]; thenkill -9 $pidsmsg='Stopped success'custom_print $msgelsemsg='The service is already down'custom_print $msgfi
}function restart(){if [[ $pids ]]; thenkill -9 $pidsfirunmsg='Restart success'custom_print $msg
}function status(){jps | egrep "Kafka|QuorumPeerMain"if [[ $pids ]]; then# 黄底蓝字msg='RUNNING'custom_print $msgelse# 蓝底黑字echo -e "\033[5;34m ***** \033[0m"echo -e "\033[31m STOPPED ! \033[0m"echo -e "\033[5;34m ***** \033[0m"fi
}function log(){if [[ -e $1 ]]; thentail -f $1elsemsg='No log has been generated yet'custom_print $msgfi
}# 判断输入参数
if [[ $operate = "run" || $operate = "start" ]]; thenif [[ $pids ]]; thenmsg='The service is already running'custom_print $msgelserunmsg='Start success'custom_print $msgfi
elif [[ $operate = "kil" || $operate = "stop" ]]; thenstop
elif [[ $operate = "res" || $operate = "restart" ]]; thenrestart
elif [[ $operate = "sta" || $operate = "status" ]]; thenstatus
elif [[ $operate = "log" ]]; thenif [[ $param = "zk" ]]; thenlog $LOG_ZKelselog $LOG_KAFKAfi
elif [[ $operate = "list" ]]; then$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --list
elif [[ $operate = "add" && ! -z $param ]]; then$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_SERVER:9092 --replication-factor 1 --partitions 1 --topic $parammsg="$param create success"custom_print $msg
elif [[ $operate = "del" ]]; thenif [[ -z $param ]]; thentopics=`$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --list`for topic in $topics; doif [[ $topic != "__consumer_offsets" ]]; then$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --delete --topic $topic> /dev/nullmsg="$topic delete success"custom_print $msgfidoneelse$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --delete --topic $param > /dev/nullmsg="$param delete success"custom_print $msgfi
elif [[ $operate = "consume" && ! -z $param ]]; thenif [[ -z $CONF_SASL ]]; then$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER:9092 --from-beginning --topic $paramelse$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER:9092 --from-beginning --topic $param --consumer.config $CONF_SASLfi
elif [[ $operate = "produce" && ! -z $param ]]; thenif [[ -z $CONF_SASL ]]; then$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_SERVER:9092 --topic $param else$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_SERVER:9092 --topic $param --producer.config $CONF_SASLfi
elsecustom_print $msg
fi

3、java 程序连接 Kafka

  • 防火墙放行88/udp端口
  • kdc服务默认端口是88。
firewall-cmd --zone=public --add-port=88/udp --permanent
firewall-cmd --reload
  • 引入maven依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version>
</dependency>
  • 程序示例

注意:需要修改 kafka-client-jaas.conf配置文件中配置的kafka-client.keytab路径!

package com.cloudansys;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;public class TestKafkaKerberos {public static void main(String[] args) {// 消费者testConsumer();// 生产者testProducer();}private static void testConsumer() {System.setProperty("java.security.auth.login.config", "F:\\test\\kerberos\\kafka-client-jaas.conf");System.setProperty("java.security.krb5.conf", "F:\\test\\kerberos\\krb5.conf");Properties props = new Properties();props.put("bootstrap.servers", "monkey:9092");props.put("group.id", "test_group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// saslprops.put("sasl.mechanism", "GSSAPI");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.kerberos.service.name", "kafka-server");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test";consumer.subscribe(Collections.singletonList(topic));while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",record.offset(), record.partition(), record.key(), record.value());}} catch (Exception e) {e.printStackTrace();}}}private static void testProducer() {// JAAS配置文件路径和Kerberos配置文件路径System.setProperty("java.security.auth.login.config", "F:\\test\\kerberos\\kafka-client-jaas.conf");System.setProperty("java.security.krb5.conf", "F:\\test\\kerberos\\krb5.conf");// kafka属性配置Properties props = new Properties();props.put("bootstrap.servers", "monkey:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kerberos安全认证props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.kerberos.service.name", "kafka-server");String topic = "test";String msg = "this is a test msg";KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);// 发送消息记录Future<RecordMetadata> future = kafkaProducer.send(record);try {RecordMetadata metadata = future.get();System.out.printf("Message sent to Kafka topic=%s, partition=%d, offset=%d\n", metadata.topic(), metadata.partition(), metadata.offset());} catch (Exception e) {e.printStackTrace();}kafkaProducer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617149.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vivado 使用项目摘要、配置项目设置、仿真设置

使用项目摘要 Vivado IDE包括一个交互式项目摘要&#xff0c;可根据设计动态更新命令被运行&#xff0c;并且随着设计在设计流程中的进展。项目摘要包括概览选项卡和用户可配置的仪表板&#xff0c;如下图所示。有关信息&#xff0c;请参阅《Vivado Design Suite用户指南&…

Python基础知识:整理11 模块的导入、自定义模块和安装第三方包

1 模块的导入 1.1 使用import 导入time模块&#xff0c;使用sleep功能&#xff08;函数&#xff09; import time print("start") time.sleep(3) print("end")1.2 使用from 导入time的sleep功能 from time import sleep print("start") slee…

高级分布式系统-第6讲 分布式系统的容错性--可靠的组通信

可靠的组通信 组内通信最好是每个进程之间都建立点到点的通信&#xff0c; 但实际中这样的组织结构不是有效的&#xff0c; 因为会浪费很大的通信带宽。 在平等组中&#xff0c; 多播是主要的组织结构。 但多播是具有同步性质的容错结构&#xff0c; 并不适用拜占庭模型。 多…

LeetCode刷题笔记

面试经典150题 1. 数组/字符串 1.1 合并两个有序数组 题目 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺…

spring Data Elasticsearch入门

1.Elasticsearch Elasticsearch提供了两种连接方式&#xff1a; transport&#xff1a;通过TCP方式访问ES。&#xff08;已废弃&#xff09; rest&#xff1a;通过HTTP API 方式访问ES。 描述&#xff1a; Spring Data Elasticsearch 项目提供了与Elasticsearch 搜索引擎的集成…

17. 电话号码的字母组合(回溯)

从第一个数字开始遍历其对应的字母&#xff0c;将其加入StringBuffer中&#xff0c;继续深度优先搜索&#xff0c;当访问到最后一个数字的时候&#xff0c;将StringBuffer存储到ans中&#xff0c;然后回溯到下一个对应字母。 class Solution {public List<String> lette…

instanceof、对象类型转化、static关键字

instanceof 与 对象类型转换 instanceof是判断一个对象是否与一个类有关系的关键字 先看引用类型&#xff0c;再看实际类型 *例子&#xff1a;obj instanceof A 先看obj的类型是否与A有关联&#xff0c;无关联则报错&#xff0c;有关联则判断obj的实际类型 因为obj的实际类…

系分笔记数据库反规范化、SQL语句和大数据

文章目录 1、概要2、反规范化3、大数据4、SQL语句5、总结 1、概要 数据库设计是考试重点&#xff0c;常考和必考内容&#xff0c;本篇主要记录了知识点&#xff1a;反规范化、SQL语句及大数据。 2、反规范化 数据库遵循范式的设计&#xff0c;使得多表查询和连接表查询较多的时…

【面试突击】网关系统面试实战

&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308; 欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术 的推送 发送 资料 可领取 深入理…

基于JAVA+ssm开发的中草药智能采购管理系统设计与实现【附源码】

基于JAVAssm开发的中草药智能采购管理系统设计与实现【附源码】 &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.…

stable diffusion代码学习笔记

前言&#xff1a;本文没有太多公式推理&#xff0c;只有一些简单的公式&#xff0c;以及公式和代码的对应关系。本文仅做个人学习笔记&#xff0c;如有理解错误的地方&#xff0c;请指出。 本文包含stable diffusion入门文献和不同版本的代码。 文献资源 本文学习的代码&…

Android基于Matrix绘制PaintDrawable设置BitmapShader,以手指触点为中心显示原图像圆图,Kotlin(2)

Android基于Matrix绘制PaintDrawable设置BitmapShader&#xff0c;以手指触点为中心显示原图像圆图&#xff0c;Kotlin&#xff08;2&#xff09; 在 https://zhangphil.blog.csdn.net/article/details/135374279 基础上&#xff0c;增加一个功能&#xff0c;当手指在上面的图片…

【DevOps-08-3】Jenkins容器内部使用Docker

一、简要描述 构建镜像和发布镜像到harbor都需要使用到docker命令。而在Jenkins容器内部安装Docker官方推荐直接采用宿主机带的Docker即可。 设置Jenkins容器使用宿主机Docker。 二、配置和操作步骤 1、修改宿主机docker.sock权限 # 修改docker.sock 用户和用户组都为root $ …

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题二 模块二

竞赛需要完成三个阶段的任务&#xff0c;分别完成三个模块&#xff0c;总分共计 1000分。三个模块内容和分值分别是&#xff1a; 1.第一阶段&#xff1a;模块一 网络平台搭建与设备安全防护&#xff08;180 分钟&#xff0c;300 分&#xff09;。 2.第二阶段&#xff1a;模块二…

vue-virtual-scroll-list(可单选、多选、搜索查询、创建条目)

element-ui-解决下拉框数据量过多问题&#xff08;vue-virtual-scroll-list&#xff09;_element-ui下拉框数据太多如何优化-CSDN博客 的升级版 参考链接&#xff1a;封装el-select&#xff0c;实现虚拟滚动,可单选、多选、搜索查询、创建条目-CSDN博客 1.封装组件 select.v…

HTTP 常见协议:选择正确的协议,提升用户体验(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

Vulnhub-HACKSUDO: PROXIMACENTAURI渗透

文章目录 一、前言1、靶机ip配置2、渗透目标3、渗透概括 开始实战一、信息获取二、端口敲门三、web密码爆破四、getShell五、获取新用户六、提权 一、前言 由于在做靶机的时候&#xff0c;涉及到的渗透思路是非常的广泛&#xff0c;所以在写文章的时候都是挑重点来写&#xff0…

也谈人工智能——AI科普入门

文章目录 1. 科普入门人工智能的定义人工智能的类型 - 弱 AI 与强 AI人工智能、深度学习与机器学习人工智能的应用和使用场景语音识别计算机视觉客户服务建议引擎数据分析网络安全 行业应用人工智能发展史![img](https://img-blog.csdnimg.cn/img_convert/66aeaaeac6870f432fc4…

error: undefined reference to ‘cv::imread(std::__ndk1::basic_string<char

使用android studio编译项目时&#xff0c;由于用到了 cv::imread&#xff08;&#xff09;函数&#xff0c;编译时却报错找不到该函数的定义。 cv::imread一般是在highgui.hpp中定义&#xff0c;因此我加上了该头文件&#xff1a; #include “opencv2/highgui/highgui.hpp” 但…

webtim开源即时通讯平台第三版发布

webtim是Web开源通讯平台。服务器是 Tim 。前端使用tim的js客户端 timjs 调用tim服务器接口渲染页面。 webtim开发目的是通过界面来显式表达tim接口功能。tim是去中心化的分布式IM引擎。支持多种基础通讯模式&#xff0c;对端到端的数据流传输支持非常全面&#xff0c;几乎涵…