SpringBoot项目连接,有Kerberos认证的Kafka

在连接Kerberos认证kafka之前,需要了解Kerberos协议
二、什么是Kerberos协议

Kerberos是一种计算机网络认证协议 ,其设计目标是通过密钥系统为网络中通信的客户机(Client)/服务器(Server)应用程序提供严格的身份验证服务,确保通信双方身份的真实性和安全性。不同于其他网络服务,Kerberos协议中不是所有的客户端向想要访问的网络服务发起请求,他就能建立连接然后进行加密通信,而是在发起服务请求后必须先进行一系列的身份认证,包括客户端和服务端两方的双向认证,只有当通信双方都认证通过对方身份之后,才可以互相建立起连接,进行网络通信。即Kerberos协议的侧重在于认证通信双方的身份,客户端需要确认即将访问的网络服务就是自己所想要访问的服务而不是一个伪造的服务器,而服务端需要确认这个客户端是一个身份真实,安全可靠的客户端,而不是一个想要进行恶意网络攻击的用户。

三、Kerberos协议角色组成
Kerberos协议中存在三个角色,分别是:

客户端(Client):发送请求的一方
服务端(Server):接收请求的一方
密钥分发中心(Key distribution KDC)

一,首先需要准备三个文件
(user.keytab,krb5.conf,jass.conf)

其中user.keytab和krb5.conf是两个认证文件,需要厂商提供,就是你连接谁的kafka,让谁提供

jass.conf文件需要自己在本地创建

jass.conf文件内容如下,具体路径和域名需要换成自己的:

debug: truefusioninsight:kafka:bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007security:protocol: SASL_PLAINTEXTkerberos:domain:name: hadoop.798687_97_4a2b_9510_00359f31c5ec.comsasl:kerberos:service:name: kafka


其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com

hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根据现场提供给你的域名

二、文件准备好后可以将三个配置文件,放在自己项目中,也可以放在服务器的某个目录下,只要确保项目启动后能读取到即可
我的目录结构如下:


pom依赖:
我用的是华为云的Kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-sample-01</artifactId><version>2.3.1.RELEASE</version><packaging>jar</packaging><name>kafka-sample-01</name><description>Kafka Sample 1</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0-hw-ei-302002</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 华为 组件 kafka  start -->
<!--        <dependency>-->
<!--            <groupId>com.huawei</groupId>-->
<!--            <artifactId>kafka-clients</artifactId>-->
<!--            <version>2.4.0</version>-->
<!--            <scope>system</scope>-->
<!--            <systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar</systemPath>-->
<!--        </dependency>--></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>huaweicloudsdk</id><url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>central</id><name>Mavn Centreal</name><url>https://repo1.maven.org/maven2/</url></repository></repositories>
</project>

然后再SpringBoot项目启动类如下:

package com.example;import com.common.Foo1;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;import java.io.File;
import java.util.HashMap;
import java.util.Map;/*** @author*/
@SpringBootApplication
public class Application {private final Logger logger = LoggerFactory.getLogger(Application.class);@Value("${fusioninsight.kafka.bootstrap-servers}")public String boostrapServers;@Value("${fusioninsight.kafka.security.protocol}")public String securityProtocol;@Value("${fusioninsight.kafka.kerberos.domain.name}")public String kerberosDomainName;@Value("${fusioninsight.kafka.sasl.kerberos.service.name}")public String kerberosServiceName;public static void main(String[] args) {
//        String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main"
//        String filePath = "D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\";String filePath = "/home/yxxt/";System.setProperty("java.security.auth.login.config", filePath + "jaas.conf");System.setProperty("java.security.krb5.conf", filePath + "krb5.conf");SpringApplication.run(Application.class, args);}@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {System.out.println(boostrapServers);ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}// 指定消费监听,该topic有消息时立刻消费@KafkaListener(id = "fooGroup1", topics = "topic_ypgk")public void listen(ConsumerRecord<String, String> record) {System.out.println("监听到了消息-----");logger.info("Received:消息监听成功! " );System.out.println("监听到了-----");System.out.println(record);
//        if (foo.getFoo().startsWith("fail")) {
//            // 触发83行的 ErrorHandler,将异常数据写入 topic名称+.DLT的新topic中
//            throw new RuntimeException("failed");
//        }}// 创建topic,指定分区数、副本数
//    @Bean
//    public NewTopic topic() {
//        return new NewTopic("topic1", 1, (short) 1);
//    }@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("kerberos.domain.name", kerberosDomainName);return new KafkaAdmin(configs);}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new DefaultKafkaConsumerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);return new KafkaTemplate<>(producerFactory);}
}

生产者:通过发送请求进行向主题里发送消息

package com.example;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import com.common.Foo1;/*** @author haosuwei**/
@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String, String> template;@PostMapping(path = "/send/foo/{what}")public void sendFoo(@PathVariable String what) {Foo1 foo1 = new Foo1(what);this.template.send("topic1", foo1.toString());}}

运行成功,就可以监听到主题消息了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/166299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

历时三个月,我发布了一款外卖返钱小程序

近几年&#xff0c;推广外卖红包爆火&#xff0c;各种推广外卖红包的公众号层出不穷。于是&#xff0c;我就在想外卖红包究竟是怎么一回事。就这样&#xff0c;我带着问题开始了关于外卖红包的研究。 在研究的过程中&#xff0c;我开始了解商品联盟、推广分成、cps等一系列相关…

Springboot3+vue3从0到1开发实战项目(一)

一. 可以在本项目里面自由发挥拓展 二. 知识整合项目使用到的技术 后端开发 &#xff1a; Validation, Mybatis,Redis, Junit,SpringBoot3 &#xff0c;mysql&#xff0c;Swagger, JDK17 &#xff0c;项目部署 前端开发&#xff1a; Vue3&#xff0c;Vite&#xff0c;Router…

DNS的各种进阶新玩法

你们好&#xff0c;我的网工朋友&#xff0c;今天和你聊聊DNS。 01 什么是DNS&#xff1f; mac地址诞生&#xff0c;可是太不容易记忆了&#xff0c;出现了简化了IP形式&#xff0c;它被直接暴露给外网不说&#xff0c;还让人类还是觉得比较麻烦&#xff0c;干脆用几个字母算了…

【Git】一文教你学会 submodule 的增、删、改、查

添加子模块 $ git submodule add <url> <path>url 为想要添加的子模块路径path 为子模块存放的本地路径 示例&#xff0c;添加 r-tinymaix 为子模块到主仓库 ./sdk/packages/online-packages/r-tinymaix 路径下&#xff0c;命令如下所示&#xff1a; $ git subm…

用自己热爱的事赚钱,是多么的幸福

挖掘天赋可能有些困难&#xff0c;但挖掘爱好就简单多啦&#xff01;最幸福的事情就是能用自己喜欢的事情赚钱。 我们要说的是一个博主&#xff0c;他非常喜欢骑自行车&#xff0c;虽然他的工作是在外贸公司做销售&#xff0c;但每当有空时&#xff0c;他都会骑自行车。而且他…

Go iota简介

当声明枚举类型或定义一组相关常量时&#xff0c;Go语言中的iota关键字可以帮助我们简化代码并自动生成递增的值。本文档将详细介绍iota的用法和行为。 iota关键字 iota是Go语言中的一个预定义标识符&#xff0c;它用于创建自增的无类型整数常量。iota的行为类似于一个计数器…

3款免费次数多且功能又强大的国产AI绘画工具

hi&#xff0c;同学们&#xff0c;本期是我们第55 期 AI工具教程 最近两个月&#xff0c;国内很多AI绘画软件被关停&#xff0c;国外绝大部分AI绘画工具费用不低&#xff0c;因此 这两天我 重新整理 国产 AI绘画 工具 &#xff0c; 最终 筛选了 3款功能强大&#xf…

LeeCode前端算法基础100题(3)- N皇后

一、问题详情&#xff1a; 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回所有不同的 n 皇后…

虚拟机系列:vmware和Oracle VM VirtualBox虚拟机的区别,简述哪一个更适合我?以及相互转换

一. VMware和Oracle VM VirtualBox虚拟机的区别主要体现在以下几个方面: 首先两种软件的安装使用教程如下: VMware ESXI 安装使用教程 Oracle VM VirtualBox安装使用教程 商业模式:VMware是一家商业公司,而Oracle VM VirtualBox是开源软件; 功能:VMware拥有更多的功能和…

Leetcode200. 岛屿数量

Every day a Leetcode 题目来源&#xff1a;200. 岛屿数量 解法1&#xff1a;深度优先搜索 设目前指针指向一个岛屿中的某一点 (i, j)&#xff0c;寻找包括此点的岛屿边界。 从 (i, j) 向此点的上下左右 (i1,j)&#xff0c;(i-1,j)&#xff0c;(i,j1)&#xff0c;(i,j-1) …

“圆柱-计算公式“技术支持网址

该软件可以计算圆柱的底面圆周长、底面积、侧面积和体积。 您在使用中有遇到任何问题都可以和我们联系。我们会在第一时间回复您。 邮箱地址&#xff1a;elmo30zeongmail.com 谢谢&#xff01;

如何将本地websocket发布至公网并实现远程访问?

本地websocket服务端暴露至公网访问【cpolar内网穿透】 文章目录 本地websocket服务端暴露至公网访问【cpolar内网穿透】1. Java 服务端demo环境2. 在pom文件引入第三包封装的netty框架maven坐标3. 创建服务端,以接口模式调用,方便外部调用4. 启动服务,出现以下信息表示启动成功…

VR云游:让旅游产业插上数字化翅膀,打造地方名片

自多地入冬降温以来&#xff0c;泡温泉成了许多人周末度假的选择&#xff0c;在气温持续走低的趋势下&#xff0c;温泉游也迎来了旺季&#xff1b;但是依旧有些地区温度依旧温暖&#xff0c;例如南京的梧桐美景也吸引了不少游客前去打卡&#xff0c;大家穿着汉服与金黄的树叶合…

【AI考证笔记】NO.1人工智能的基础概念

以下部分内容来自于百度智能云人才认证培训讲义&#xff0c;腾讯等也有人工智能类似的讲义&#xff0c;限时免费&#xff0c;也就是不报考&#xff0c;也能系统学习&#xff0c;课程做的都是不错的。有感兴趣的朋友&#xff0c;可以去检索学习。 本系列是学习笔记&#xff0c;…

6个常用的聚类评价指标

评估聚类结果的有效性&#xff0c;即聚类评估或验证&#xff0c;对于聚类应用程序的成功至关重要。它可以确保聚类算法在数据中识别出有意义的聚类&#xff0c;还可以用来确定哪种聚类算法最适合特定的数据集和任务&#xff0c;并调优这些算法的超参数(例如k-means中的聚类数量…

C语言——从键盘输人三角形的三个边长 a、b、c,求出三角形的面积。

从键盘输人三角形的三个边长 a、b、c,求出三角形的面积。求三角形的面积用公式areasqrt(s*(s-a)*(s-b)*(s-c)),其中 s1/2(a十bc)。注:要求对输人三角形的三个边长做出有效性判断。 #define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> #include<math.h> int main…

前置微小信号放大器在生物医学中有哪些应用

前置微小信号放大器在生物医学领域中具有广泛的应用。生物医学信号通常具有较小的振幅和较低的幅频响应&#xff0c;因此需要借助放大器来增强信号以便进行准确的测量、监测和分析。以下是前置微小信号放大器在生物医学中的主要应用。 心电图&#xff08;ECG&#xff09;放大器…

Spring第一课,了解IDEA里面的文件,回顾Cookie和Session,获取Session,Cookie,Header的方式

目录 IDEA第一课&#xff08;熟悉里面内容&#xff09; 建立连接 -RequestMapping 路由映射 请求 1.传递单个参数​编辑 2.多个参数​编辑 3.传递数组 4.传递一个集合&#xff0c;但是这里我们传递的时候发生了500的错误 简单介绍JSON 回顾Cookie和S…

js检测dom变化的方法:MutationObserver

前言 检测一个原生dom的变化,如一个div的颜色,大小,所在位置,内部元素的属性是否变化,更深层dom树上的变化等等。 都可以使用一个window上暴露出来的一个api:MutationObserver 语法 官方地址:MutationObserver.MutationObserver() - Web API 接口参考 | MDN 使用new Mutat…

【大数据】Docker部署HMS(Hive Metastore Service)并使用Trino访问Minio

本文参考链接置顶&#xff1a; Presto使用Docker独立运行Hive Standalone Metastore管理MinIO&#xff08;S3&#xff09;_hive minio_BigDataToAI的博客-CSDN博客 一. 背景 团队要升级大数据架构&#xff0c;需要摒弃hadoop&#xff0c;底层使用Minio做存储&#xff0c;应用…