kafka+Kraft模式集群+安全认证

Kraft模式安全认证

前章内容聊到了Kafka的Kraft集群的配置及使用。本篇再来说说kafka的安全认证方面的配置,。

Kafka提供了多种方式来进行安全认证,包括身份认证、授权和加密传输。一些常用的Kafka安全认证方式:

  1. SSL/TLS:使用SSL/TLS协议来加密Kafka与客户端之间的通信,保证数据的机密性和完整性。可以通过配置Kafka的SSL证书、密钥和信任的CA证书来启用SSL/TLS。客户端也需要使用相应的证书与Kafka进行通信。
  2. SASL(Simple Authentication and Security Layer):使用SASL进行身份认证。Kafka支持多种SASL机制,如PLAIN、GSSAPI等。可以通过配置Kafka的SASL机制和用户凭证(用户名和密码、密钥等)来启用SASL身份认证。
  3. ACL(Access Control List):使用ACL进行授权管理。ACL允许你配置哪些用户或组可以访问Kafka的哪些主题和分区,并对其进行读取或写入权限的控制。ACL的配置可以在Kafka的配置文件中进行。

这些安全认证方式可以单独使用,也可以组合使用,以实现更高级别的安全性。为了配置Kafka的安全认证,需要对Kafka和客户端进行相应的配置,并生成所需的证书和凭证。

本文针对SASL进行身份认证

开始配置

服务器数量有限,暂时使用单机部署kafka集群,此文给予配置参考,实际还是要按项目的真实情况去处理了。

准备3个kafka,分别是kafka01kafka02kafka03,分别到它们的config/kraft/server.properties中做配置:

kafka01的server.properties

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:19093,2@localhost:29093,3@localhost:39093
listeners=SASL_PLAINTEXT://:19092,CONTROLLER://:19093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.8.122:19092
controller.listener.names=CONTROLLER
log.dirs=/wlh/kafka01/data

kafka02

process.roles=broker,controller
node.id=2
controller.quorum.voters=1@localhost:19093,2@localhost:29093,3@localhost:39093
listeners=SASL_PLAINTEXT://:29092,CONTROLLER://:29093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29092
controller.listener.names=CONTROLLER
log.dirs=/wlh/kafka02/data

kafka03

process.roles=broker,controller
node.id=3
controller.quorum.voters=1@localhost:19093,2@localhost:29093,3@localhost:39093
listeners=SASL_PLAINTEXT://:39092,CONTROLLER://:39093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.8.122:39092
controller.listener.names=CONTROLLER
log.dirs=/wlh/kafka03/data

先确保你的kafka的数据目录是空的,执行下删除(后面初始化时会自动创建目录)

rm -rf /wlh/kafka01/data /wlh/kafka02/data /wlh/kafka03/data

创建一个kafka sasl认证的服务配置

可以在kafka的config目录下新建一个kafka_server_jaas.conf文件,然后认证信息写好:

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredserviceName="kafka"username="admin"password="eystar8888"user_kafka="kafka1234";
};

上面的配置中声明了管理员为admin,密码是eystar8888,并且声明了一个用户名为kafka,密码是kafka1234的用户,客户端连接时使用用户为kafka可以成功进行认证。

而需要注意的是:上面的配置中的分号;,不能少,否则就掉坑里了。

配置kafka服务的启动脚本

上面设置好sasl认证的配置后,我们需要在kafka启动的服务脚本中,将此配置加入进去。

可以直接修改bin/kafka-server-start.sh,亦或者拷贝一份kafka-server-start.sh命名为kafka-server-start-saal.sh(名称自定义即可)

在这里插入图片描述

export KAFKA_OPTS="-Djava.security.auth.login.config=/wlh/kafka01/config/kafka_server_jaas.conf"

kafka02和kafka03同样这样配置好

export KAFKA_OPTS="-Djava.security.auth.login.config=/wlh/kafka02/config/kafka_server_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=/wlh/kafka03/config/kafka_server_jaas.conf"

开始执行启动kafka集群

# 生成一个uuid,后面需要用
/wlh/kafka01/bin/kafka-storage.sh random-uuid# 格式化存储
/wlh/kafka01/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka01/config/kraft/server.properties
/wlh/kafka02/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka02/config/kraft/server.properties
/wlh/kafka03/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka03/config/kraft/server.properties

开始启动kafka(-daemon后台启动)

# 分别启动它们
/wlh/kafka01/bin/kafka-server-start-saal.sh -daemon /wlh/kafka01/config/kraft/server.properties
/wlh/kafka02/bin/kafka-server-start-saal.sh -daemon /wlh/kafka02/config/kraft/server.properties
/wlh/kafka03/bin/kafka-server-start-saal.sh -daemon /wlh/kafka03/config/kraft/server.properties

在这里插入图片描述

服务启动完成。。。

Tip:服务器端口要打开,服务器端口要打开,端口打开!!!或者关了防火墙也行。

使用java进行连接

无论是使用kafka的API还是直接使用spring集成kafka都是可以的。

我这里就采用kafka的API方式了。

导入kafka-clients依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.2</version>
</dependency>

application.properties中配置相关属性,注意spring.kafka.jaas-config是结尾是有一个分号;的,若不写,是连接不到kafka的。

spring.kafka.bootstrap-servers=192.168.8.122:19092,192.168.8.122:29092,192.168.8.122:39092
spring.kafka.jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka1234";
spring.kafka.topics=test

在java配置类中进行接收并且创建生产者和消费者

package xxx.xxx.xxx;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;import java.util.Properties;/*** @author wlh* @date 2023/8/10*/
@ConditionalOnProperty("spring.kafka.bootstrap-servers")
@Component
public class KafkaProperties {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServer;@Value("${spring.kafka.jaas-config}")private String jaasConfig;public static String topics;@Value("${spring.kafka.topics}")private void setTopics(String topics) {KafkaProperties.topics = topics;}/*** 获取生产者配置** @return 配置信息*/public Properties getProducerProperties() {Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServer);String SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";properties.put("key.serializer", SERIALIZER);properties.put("value.serializer", SERIALIZER);fillSecurityProperties(properties);return properties;}// 消费者配置public Properties getConsumerProperties() {Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServer);properties.put("group.id", "test");	// group.id可以自定义String DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";properties.put("key.deserializer", DESERIALIZER);properties.put("value.deserializer", DESERIALIZER);fillSecurityProperties(properties);return properties;}// 安全认证的配置private void fillSecurityProperties(Properties properties) {properties.setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);String SASL_MECHANISM = "PLAIN";properties.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);}}

创建生产者和消费者

package xxx.xxx.xxx;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** @author wlh* @date 2023/08/10*/
@ConditionalOnProperty("spring.kafka.bootstrap-servers")
@Slf4j
@RequiredArgsConstructor
@Configuration
public class KafkaConfig {private final KafkaProperties kafkaProperties;// 创建生产者@Beanpublic KafkaProducer<String, String> kafkaProducer() {return new KafkaProducer<>(kafkaProperties.getProducerProperties());}// 创建消费者@Beanpublic KafkaConsumer<String, String> kafkaConsumer() {KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties.getConsumerProperties());List<String> topicList = Collections.singletonList("test"); // 这里写死了,可自行扩展kafkaConsumer.subscribe(topicList);log.info("消息订阅成功! topic:{}", topicList);log.info("消费者配置:{}", kafkaProperties.getConsumerProperties().toString());return kafkaConsumer;}}

信息发送的Util工具类

package xxx.xxx.xxx;import com.alibaba.excel.util.StringUtils;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.Collections;
import java.util.List;@Component
@Slf4j
public class KafkaSendUtil {@AutowiredKafkaProducer<String, String> kafkaProducer;@Asyncpublic void sendMsg(String topic, String msg) {List<String> topics;if (StringUtils.isBlank(topic)) {topics = Arrays.asList(KafkaProperties.topics.split(","));} else {topics = Collections.singletonList(topic);}for (String sendTopic : topics) {ProducerRecord<String, String> record = new ProducerRecord<>(sendTopic, msg);log.info("正在发送kafka数据,数据=====>{}", msg);kafkaProducer.send(record);}}}

实例

简单做一个实例,调通一下数据。监听方式可以不按照本文的,本文只是做测试。

kafka消费者监听器

package xxx.xxx.xxx;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class KafkaListener implements ApplicationRunner {public static ExecutorService executorService = Executors.newFixedThreadPool(2);@Overridepublic void run(ApplicationArguments args) {log.info("监听服务启动!");executorService.execute(() -> {MessageHandler kafkaListenMessageHandler = SpringBeanUtils.getBean(MessageHandler.class);kafkaListenMessageHandler.onMessage(SpringBeanUtils.getBean("kafkaConsumer"), Arrays.asList("test"));	// 这里是监听的kafka的topic,这里写死了,自己扩展即可});}
}

Bean的工具类

package com.bjmetro.top.global.kafka;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;@SuppressWarnings("unchecked")
@Component
public class SpringBeanUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtils.applicationContext = applicationContext;}public static <T> T getBean(String beanName) {if (applicationContext.containsBean(beanName)) {return (T) applicationContext.getBean(beanName);} else {return null;}}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
}

消费者处理消息

package com.bjmetro.top.global.kafka;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
@Component
public class MessageHandler {void onMessage(KafkaConsumer kafkaConsumer, List<String> topic) {log.info("队列开始监听:topic {}", topic);while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);for (ConsumerRecord<String, String> record : records) {log.info("partition:{} offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());try {String messageData = new String(record.value().getBytes(), StandardCharsets.UTF_8);System.out.println("收到消息:" + messageData);} catch (Exception e) {log.error("消息处理异常");}}}}}

做一个消息推送的接口

@Autowired
KafkaSendUtil sendUtil;
@PostMapping("/kafka/send")
public ResponseResult sendKafka(@RequestParam("msg") String msg) {sendUtil.sendMsg(null, msg);    // 这里topic传空,默认从application.properties中取了return new ResponseResult(ResponseConstant.CODE_OK, ResponseConstant.MSG_OK);
}

访问一下,看消费者日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/59920.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker技术--Docker镜像管理

1.Docker镜像特性 ①.镜像创建容器的特点 Docker在创建容器的时候需要指定镜像,每一个镜像都有唯一的标识:image_id,也可也使用镜像名称和版本号做唯一的标识,如果不指定版本号,那么默认使用的是最新的版本标签(laster)。 ②.镜像分层机制 Docker镜像是分层构建的,并通过…

【UniApp开发小程序】小程序首页完善(滑到底部数据翻页、回到顶端、基于回溯算法的两列数据高宽比平衡)【后端基于若依管理系统开发】

文章目录 说明细节一&#xff1a;首页滑动到底部&#xff0c;需要查询下一页的商品界面预览页面实现 细节二&#xff1a;当页面滑动到下方&#xff0c;出现一个回到顶端的悬浮按钮细节三&#xff1a;商品分列说明优化前后效果对比使用回溯算法实现ControllerService回溯算法 优…

使用 Netty 实现群聊功能的步骤和注意事项

文章目录 前言声明功能说明实现步骤WebSocket 服务启动Channel 初始化HTTP 请求处理HTTP 页面内容WebSocket 请求处理 效果展示总结 前言 通过之前的文章介绍&#xff0c;我们可以深刻认识到Netty在网络编程领域的卓越表现和强大实力。这篇文章将介绍如何利用 Netty 框架开发一…

基于非洲秃鹫算法优化的BP神经网络(预测应用) - 附代码

基于非洲秃鹫算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于非洲秃鹫算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.非洲秃鹫优化BP神经网络2.1 BP神经网络参数设置2.2 非洲秃鹫算法应用 4.测试结果&#xff1a;5…

实现带头双向循环链表

&#x1f308;带头双向循环链表 描述&#xff1a;一个节点内包含两个指针&#xff0c;一个指向上一个节点&#xff0c;另一个指向下一个节点。哨兵位指向的下一个节点为头节点&#xff0c;哨兵位的上一个指向尾节点。 结构优势&#xff1a;高效率找尾节点&#xff1b;高效率插入…

度矩阵、邻接矩阵

度矩阵&#xff08;degree matrix&#xff09; 度矩阵是对角阵&#xff0c;对角上的元素为各个顶点的度&#xff0c;顶点vi的度表示和该顶点相关联的变得数量。 在无向图中&#xff0c;顶点vi的度d(vi)N(i)&#xff08;即与顶点相连的边的数目&#xff09;有向图中&#xff0…

【力扣】55、跳跃游戏

var canJump function(nums){let cover 0;for(let i0;i<nums.length;i){if(i<cover){cover Math.max(nums[i]i,cover);if(cover >nums.length-1){return true;}}}}

stm32之DS18B20

DS18B20与stm32之间也是通过单总线进行数据的传输的。单总线协议在DHT11中已经介绍过。虽说这两者外设都是单总线&#xff0c;但时序电路却很不一样&#xff0c;DS18B20是更为麻烦一点的。 DS18B20 举例&#xff08;原码补码反码转换_原码反码补码转换_王小小鸭的博客-CSDN博客…

打开软件报错mfc100u.dll缺失是什么意思?简单式修复mfc100u.dll问题

首先&#xff0c;我们需要了解什么是MFC100U.dll文件以及它的作用。MFC100U.dll是一个Microsoft Foundation Class (MFC)库文件&#xff0c;它是Visual C应用程序开发的一部分。MFC库提供了许多通用的功能&#xff0c;如窗口管理、消息处理等&#xff0c;可以帮助开发者更快速地…

C++中前置++和后置++的详细讲解

参考链接&#xff08;链接讲的很全&#xff09;&#xff1a;C前置和后置的区别 对于迭代器和其他模板对象使用前缀形式 (i) 的自增, 自减运算符.&#xff0c;理由是 前置自增 (i) 通常要比后置自增 (i) 效率更高。 class Age { public: Age& operator() //前置 {…

fastjson-1.2.24-rce(CVE-2017-18349)fastjson-1.2.47-rce(CNVD-2019-22238)

一.fastjson 1.2.24 反序列化导致任意命令执行漏洞(CVE-2017-18349) fastjson在解析json的过程中&#xff0c;支持使用autoType来实例化某一个具体的类&#xff0c;并调用该类的set/get方法来访问属性。通过查找代码中相关的方法&#xff0c;即可构造出一些恶意利用链 影响范围…

Ansible学习笔记2

Ansible是Python开发的自动化运维工具&#xff0c;集合了众多运维工具&#xff08;Puppet、cfengine、chef、func、fabric&#xff09;的优点&#xff0c;实现了批量系统配置&#xff0c;批量程序部署、批量运行命令等功能。 特点&#xff1a; 1&#xff09;部署简单&#xff…

基于金枪鱼群算法优化的BP神经网络(预测应用) - 附代码

基于金枪鱼群算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于金枪鱼群算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.金枪鱼群优化BP神经网络2.1 BP神经网络参数设置2.2 金枪鱼群算法应用 4.测试结果&#xff1a;5…

【Java基础增强】Stream流

1.Stream流 1.1体验Stream流【理解】 案例需求 按照下面的要求完成集合的创建和遍历 创建一个集合&#xff0c;存储多个字符串元素 把集合中所有以"张"开头的元素存储到一个新的集合 把"张"开头的集合中的长度为3的元素存储到一个新的集合 遍历上一步得…

Swift使用PythonKit调用Python

打开Xcode项目。然后选择“File→Add Packages”&#xff0c;然后输入软件包依赖链接&#xff1a; ​https://github.com/pvieito/PythonKit.git https://github.com/kewlbear/Python-iOS.git Python-iOS包允许在iOS应用程序中使用python模块。 用法&#xff1a; import Pyth…

【项目 计网7】4.20 多进程实现并发服务器 4.22 多线程实现并发服务器

文章目录 4.20 多进程实现并发服务器server_process.cclient.c4.22 多线程实现并发服务器客户端代码&#xff1a;服务端代码&#xff1a; 4.20 多进程实现并发服务器 要实现TCP通信服务器处理并发的任务&#xff0c;使用多线程或者多进程来解决。 思路&#xff1a; 1、一个父进…

【leetcode 力扣刷题】字符串翻转合集(全部反转///部分反转)

字符串翻转合集 344. 反转字符串541. 反转字符串Ⅱ151. 反转字符串中的单词剑指 Offer 58 - II. 左旋转字符串反转单词思路循环挪动子串和子串的拼接 344. 反转字符串 题目链接&#xff1a;344. 反转字符串 题目内容&#xff1a; 题目中重点强调了必须原地修改输入数组&#…

2023_Spark_实验三:基于IDEA开发Scala例子

一、创建一个空项目&#xff0c;作为整个项目的基本框架 二、创建SparkStudy模块&#xff0c;用于学习基本的Spark基础 三、创建项目结构 1、在SparkStudy模块下的pom.xml文件中加入对应的依赖&#xff0c;并等待依赖包下载完毕。 在pom.xml文件中加入对应的依赖 ​<!-- S…

理论转换实践之keepalived+nginx实现HA

背景&#xff1a; keepalivednginx实现ha是网站和应用服务器常用的方法&#xff0c;之前项目中单独用nginx实现过负载均衡和服务转发&#xff0c;keepalived一直停留在理论节点&#xff0c;加之最近工作编写的一个技术文档用到keepalived&#xff0c;于是便有了下文。 服务组件…

基于MyBatis注解的学生管理程序--mybatis注解开发的练手项目

基于MyBatis注解的学生管理程序 需求&#xff1a;完成基于MyBatis注解的学生管理程序&#xff0c;能够用MyBatis注解实现查询操作、实现修改操作、实现一对多查询 &#xff08;1&#xff09;MyBatis注解开发实现查询操作。根据表1和表2在数据库分别创建一个学生表tb_student和…