SpringBoot项目连接,有Kerberos认证的Kafka

在连接Kerberos认证kafka之前,需要了解Kerberos协议
二、什么是Kerberos协议

Kerberos是一种计算机网络认证协议 ,其设计目标是通过密钥系统为网络中通信的客户机(Client)/服务器(Server)应用程序提供严格的身份验证服务,确保通信双方身份的真实性和安全性。不同于其他网络服务,Kerberos协议中不是所有的客户端向想要访问的网络服务发起请求,他就能建立连接然后进行加密通信,而是在发起服务请求后必须先进行一系列的身份认证,包括客户端和服务端两方的双向认证,只有当通信双方都认证通过对方身份之后,才可以互相建立起连接,进行网络通信。即Kerberos协议的侧重在于认证通信双方的身份,客户端需要确认即将访问的网络服务就是自己所想要访问的服务而不是一个伪造的服务器,而服务端需要确认这个客户端是一个身份真实,安全可靠的客户端,而不是一个想要进行恶意网络攻击的用户。

三、Kerberos协议角色组成
Kerberos协议中存在三个角色,分别是:

客户端(Client):发送请求的一方
服务端(Server):接收请求的一方
密钥分发中心(Key distribution KDC)

一,首先需要准备三个文件
(user.keytab,krb5.conf,jass.conf)

其中user.keytab和krb5.conf是两个认证文件,需要厂商提供,就是你连接谁的kafka,让谁提供

jass.conf文件需要自己在本地创建

jass.conf文件内容如下,具体路径和域名需要换成自己的:

debug: truefusioninsight:kafka:bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007security:protocol: SASL_PLAINTEXTkerberos:domain:name: hadoop.798687_97_4a2b_9510_00359f31c5ec.comsasl:kerberos:service:name: kafka


其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com

hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根据现场提供给你的域名

二、文件准备好后可以将三个配置文件,放在自己项目中,也可以放在服务器的某个目录下,只要确保项目启动后能读取到即可
我的目录结构如下:


pom依赖:
我用的是华为云的Kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-sample-01</artifactId><version>2.3.1.RELEASE</version><packaging>jar</packaging><name>kafka-sample-01</name><description>Kafka Sample 1</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0-hw-ei-302002</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 华为 组件 kafka  start -->
<!--        <dependency>-->
<!--            <groupId>com.huawei</groupId>-->
<!--            <artifactId>kafka-clients</artifactId>-->
<!--            <version>2.4.0</version>-->
<!--            <scope>system</scope>-->
<!--            <systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar</systemPath>-->
<!--        </dependency>--></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>huaweicloudsdk</id><url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>central</id><name>Mavn Centreal</name><url>https://repo1.maven.org/maven2/</url></repository></repositories>
</project>

然后再SpringBoot项目启动类如下:

package com.example;import com.common.Foo1;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;import java.io.File;
import java.util.HashMap;
import java.util.Map;/*** @author*/
@SpringBootApplication
public class Application {private final Logger logger = LoggerFactory.getLogger(Application.class);@Value("${fusioninsight.kafka.bootstrap-servers}")public String boostrapServers;@Value("${fusioninsight.kafka.security.protocol}")public String securityProtocol;@Value("${fusioninsight.kafka.kerberos.domain.name}")public String kerberosDomainName;@Value("${fusioninsight.kafka.sasl.kerberos.service.name}")public String kerberosServiceName;public static void main(String[] args) {
//        String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main"
//        String filePath = "D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\";String filePath = "/home/yxxt/";System.setProperty("java.security.auth.login.config", filePath + "jaas.conf");System.setProperty("java.security.krb5.conf", filePath + "krb5.conf");SpringApplication.run(Application.class, args);}@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {System.out.println(boostrapServers);ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}// 指定消费监听,该topic有消息时立刻消费@KafkaListener(id = "fooGroup1", topics = "topic_ypgk")public void listen(ConsumerRecord<String, String> record) {System.out.println("监听到了消息-----");logger.info("Received:消息监听成功! " );System.out.println("监听到了-----");System.out.println(record);
//        if (foo.getFoo().startsWith("fail")) {
//            // 触发83行的 ErrorHandler,将异常数据写入 topic名称+.DLT的新topic中
//            throw new RuntimeException("failed");
//        }}// 创建topic,指定分区数、副本数
//    @Bean
//    public NewTopic topic() {
//        return new NewTopic("topic1", 1, (short) 1);
//    }@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("kerberos.domain.name", kerberosDomainName);return new KafkaAdmin(configs);}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new DefaultKafkaConsumerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);return new KafkaTemplate<>(producerFactory);}
}

生产者:通过发送请求进行向主题里发送消息

package com.example;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import com.common.Foo1;/*** @author haosuwei**/
@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String, String> template;@PostMapping(path = "/send/foo/{what}")public void sendFoo(@PathVariable String what) {Foo1 foo1 = new Foo1(what);this.template.send("topic1", foo1.toString());}}

运行成功,就可以监听到主题消息了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/166299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 升级3.x 指南

Spring Boot 升级3.x 指南 1. 升级思路 先创建一个parent项目&#xff0c;打包类型为pom&#xff0c;继承自spring boot的parent项目 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId&…

历时三个月,我发布了一款外卖返钱小程序

近几年&#xff0c;推广外卖红包爆火&#xff0c;各种推广外卖红包的公众号层出不穷。于是&#xff0c;我就在想外卖红包究竟是怎么一回事。就这样&#xff0c;我带着问题开始了关于外卖红包的研究。 在研究的过程中&#xff0c;我开始了解商品联盟、推广分成、cps等一系列相关…

网络攻击当搭配什么产品比较好

网络攻击无处不在&#xff0c;当要时刻谨记 2014年&#xff0c;索尼影业受到黑客攻击&#xff0c;导致公司内部文件和电子邮件泄露。 2015年&#xff0c;美国联邦政府的办公人员信息遭到盗窃&#xff0c;影响了超过2100万人的个人信息。 2016年&#xff0c;Yahoo的3亿用户账…

java 中集合之一【map】,map循环

在Java中&#xff0c;常用的集合框架有以下几个&#xff1a; 1、List&#xff08;列表&#xff09;&#xff1a;List是有序的集合&#xff0c;允许包含重复元素。常用的实现类有ArrayList和LinkedList。ArrayList是基于动态数组实现的&#xff0c;支持快速随机访问&#xff1b;…

android之图片选择器--pictureselector

推荐一个安卓图片/视频/文件选择器。简单好用。 不多废话。直接上代码&#xff1a; 首先&#xff0c;添加依赖&#xff1a; //图片选择器api io.github.lucksiege:pictureselector:v3.11.1//图片压缩api io.github.lucksiege:compress:v3.11.1//图片裁剪api io.github.lucksie…

Springboot3+vue3从0到1开发实战项目(一)

一. 可以在本项目里面自由发挥拓展 二. 知识整合项目使用到的技术 后端开发 &#xff1a; Validation, Mybatis,Redis, Junit,SpringBoot3 &#xff0c;mysql&#xff0c;Swagger, JDK17 &#xff0c;项目部署 前端开发&#xff1a; Vue3&#xff0c;Vite&#xff0c;Router…

Java数组和集合

在Java中&#xff0c;数组和集合是两个重要的概念&#xff0c;它们用于存储和操作数据。本文将详细介绍Java中的数组和集合&#xff0c;包括它们的定义、初始化、访问和常见操作 一、数组&#xff08;Array&#xff09; 数组是一种用于存储相同类型数据的容器&#xff0c;它可…

DNS的各种进阶新玩法

你们好&#xff0c;我的网工朋友&#xff0c;今天和你聊聊DNS。 01 什么是DNS&#xff1f; mac地址诞生&#xff0c;可是太不容易记忆了&#xff0c;出现了简化了IP形式&#xff0c;它被直接暴露给外网不说&#xff0c;还让人类还是觉得比较麻烦&#xff0c;干脆用几个字母算了…

【Git】一文教你学会 submodule 的增、删、改、查

添加子模块 $ git submodule add <url> <path>url 为想要添加的子模块路径path 为子模块存放的本地路径 示例&#xff0c;添加 r-tinymaix 为子模块到主仓库 ./sdk/packages/online-packages/r-tinymaix 路径下&#xff0c;命令如下所示&#xff1a; $ git subm…

用自己热爱的事赚钱,是多么的幸福

挖掘天赋可能有些困难&#xff0c;但挖掘爱好就简单多啦&#xff01;最幸福的事情就是能用自己喜欢的事情赚钱。 我们要说的是一个博主&#xff0c;他非常喜欢骑自行车&#xff0c;虽然他的工作是在外贸公司做销售&#xff0c;但每当有空时&#xff0c;他都会骑自行车。而且他…

Nginx同时支持Http和Https的配置详解

当配置Nginx同时支持HTTP和HTTPS时&#xff0c;需要进行以下步骤&#xff1a; 安装和配置SSL证书&#xff1a; 获得SSL证书&#xff1a;从可信任的证书颁发机构&#xff08;CA&#xff09;或使用自签名证书创建SSL证书。 将证书和私钥保存到服务器&#xff1a;将SSL证书和私钥…

spring 的事务隔离;Spring框架的事务管理的优点

文章目录 说一下 spring 的事务隔离&#xff1f;Spring框架的事务管理有哪些优点&#xff1f;你更倾向用哪种事务管理类型&#xff1f; 聊一聊spring事务的隔离&#xff0c;事务的隔离对于一个系统来说也是非常重要的&#xff0c;直接上干货&#xff01;&#xff01;&#xff0…

Python与设计模式--享元模式

10-Python与设计模式–享元模式 一、网上咖啡选购平台 假设有一个网上咖啡选购平台&#xff0c;客户可以在该平台上下订单订购咖啡&#xff0c;平台会根据用户位置进行 线下配送。假设其咖啡对象构造如下&#xff1a; class Coffee:name price 0def __init__(self,name):se…

Go iota简介

当声明枚举类型或定义一组相关常量时&#xff0c;Go语言中的iota关键字可以帮助我们简化代码并自动生成递增的值。本文档将详细介绍iota的用法和行为。 iota关键字 iota是Go语言中的一个预定义标识符&#xff0c;它用于创建自增的无类型整数常量。iota的行为类似于一个计数器…

数据库基础入门 — SQL排序与分页

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 本…

[深度理解] 重启 Splunk Search Head Cluster

1: 背景: 关于释放Splunk search head 的job 运行压力:splunk search head cluster 要重启的话,怎么办? 答案是:splunk rolling-restart shcluster-members Initiate a rolling restart from the command line Invoke the splunk rolling-restart command from any me…

3款免费次数多且功能又强大的国产AI绘画工具

hi&#xff0c;同学们&#xff0c;本期是我们第55 期 AI工具教程 最近两个月&#xff0c;国内很多AI绘画软件被关停&#xff0c;国外绝大部分AI绘画工具费用不低&#xff0c;因此 这两天我 重新整理 国产 AI绘画 工具 &#xff0c; 最终 筛选了 3款功能强大&#xf…

LeeCode前端算法基础100题(3)- N皇后

一、问题详情&#xff1a; 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回所有不同的 n 皇后…

虚拟机系列:vmware和Oracle VM VirtualBox虚拟机的区别,简述哪一个更适合我?以及相互转换

一. VMware和Oracle VM VirtualBox虚拟机的区别主要体现在以下几个方面: 首先两种软件的安装使用教程如下: VMware ESXI 安装使用教程 Oracle VM VirtualBox安装使用教程 商业模式:VMware是一家商业公司,而Oracle VM VirtualBox是开源软件; 功能:VMware拥有更多的功能和…

Leetcode200. 岛屿数量

Every day a Leetcode 题目来源&#xff1a;200. 岛屿数量 解法1&#xff1a;深度优先搜索 设目前指针指向一个岛屿中的某一点 (i, j)&#xff0c;寻找包括此点的岛屿边界。 从 (i, j) 向此点的上下左右 (i1,j)&#xff0c;(i-1,j)&#xff0c;(i,j1)&#xff0c;(i,j-1) …