KRaft使用SASL_PLAINTEXT进行认证

需要有KRaft相关的基础,才行。可参阅之前学习记录Kafka

一、配置

首先需要了解SASL的含义,SASL全称为Simple Authentication and Security Layer,它主要是用于在客户端和服务器之间提供安全的身份验证机制。

Kafka 支持以下几种 SASL 验证机制如下

  • GSSAPI (Kerberos)
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512
  • OAUTHBEARER

其中,PLAIN相对来说更简单,本文就记录SASL/PLAIN的配置与使用。

1.1 配置单机SASL/PLAIN认证

安装的过程参考一键安装的脚本,安装好Kafka后,按照如下操作进行配置

1.) 创建kafka_server_jaas.conf

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret"user_admin="admin-secret"user_alice="alice-secret";
};

user_admin="admin-secret"表示用户名admin,对应的密码为admin-secretuser_alice同理。

usernamepassword表示节点建立集群时,需要验证的身份信息,只有验证通过的节点,方能成功建立集群。

2.)进入到kafka的bin路径中, 复制一份kafka-server-start.sh出来。

cp kafka-server-start.sh kafka-server-start-jaas.sh

并修改复制出来的文件,添加三行。

if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/root/kafka_server_jaas.conf"
fi

3.) 将config/kraft/server.properties再复制一份。

cp server.properties server.jaas.properties

并修改复制出来的文件,修改参数如下。其中advertised.listeners中的ip要改为实际使用的ip地址。

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAINlisteners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://10.0.0.10:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

其中,listener.security.protocol.map的格式为{别名}:{监听器类型},配置了别名,那么在配置listeners时,就可以通过{别名}://:{端口}来进行配置。前缀为 SASL 的两个参数是新增的,目的是为了配置 SASL 认证,其表示的含义如下

  • sasl.enabled.mechanisms:指定Kafka进行SASL验证的机制。PLAIN表示使用文本验证。
  • sasl.mechanism.inter.broker.protocol:指定broker之间通信所使用的SASL验证的机制。PLAIN表示使用文本验证。

Kafka支持不同的监听器类型,如下

监听器类型数据加密身份验证适用场景
PLAINTEXT————内网环境、安全性有保障
SSL/TLSSSL/TLS——在公网传输敏感数据
SASL_PLAINTEXT——SASL需要身份验证但不需要数据加密的环境
SASL_SSLSSL/TLSSASL安全性要求极高的环境

4.) 启动

格式化集群信息,并启动

/root/kafka_2.13-3.3.1/bin/kafka-storage.sh format -t T1CYXg2DQPmdSYSUI-FNFw -c /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties
/root/kafka_2.13-3.3.1/bin/kafka-server-start-jaas.sh /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties

会了单节点的配置,集群的配置只是简单调下参数,不过还是记录一下。

具体的配置参阅1.2即可。

1.2 配置集群SASL/PLAIN认证

准备三台机器

  • 10.0.0.101
  • 10.0.0.102
  • 10.0.0.102

按照单机Kafka的配置步骤进行配置,但是在第三步时,略有调整。

10.0.0.101的示例配置文件如下。每个节点中,需要单独修改三个参数,分别为node.idcontroller.quorum.votersadvertised.listeners

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@10.0.0.101:9093,2@10.0.0.102:9093,3@10.0.0.103:9093
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://10.0.0.101:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

其中node.id表示节点的编号,controller.quorum.voters表示集群中的所有具有controller角色的节点,其配置格式为{node.id}@{ip}:{port}

在三节点上均执行如下命令

/root/kafka_2.13-3.3.1/bin/kafka-storage.sh format -t T1CYXg2DQPmdSYSUI-FNFw -c /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties
/root/kafka_2.13-3.3.1/bin/kafka-server-start-jaas.sh /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties

启动后日志如图。

1.3 client接入授权

1.) 如果kafka未开启身份认证

./kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups

2.) 如果kafka开启了身份认证,则需要创建存储身份认证的文件kafka.properties

security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";

之后再执行命令

./kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups --command-config ./kafka.properties

1.4 集群为何是奇数

常见的分布式一致性算法

  1. Paxos
  2. Raft

KRaft全称为Kafka Raft,是基于Raft算法实现的分布式副本管理协议。

Raft的多数派原则,规定了集群的建立无需全员存活,只要存活多数(n个节点,存活节点数>n/2)即可视为集群为正常状态。

这个规定解决了集群中出现的脑裂问题。

比如4台节点组成的集群,其中2台为上海机房,另外2台为北京机房,当上海与北京的网络出现故障,那么一个集群就会分裂为2个集群,这就是脑裂现象。

由于这个规定,导致集群中n台和n+1台节点,他们的容灾能力是一样的(n为奇数),都只能坏一台。使用奇数个节点反而能节省资源。

二、监控

2.1 监控组件比较

常见的Kafka监控如下

  1. CMAK (previously known as Kafka Manager)
    • 优点:监控功能更强大
    • 缺点:a. 重量级 b. 不支持KRaft,参考Kafka 3.3.1 with KRaft Support · Issue #898 · yahoo/CMAK
  2. kafdrop
    • 优点:a. 轻量,开箱即用 b. 支持KRaft
    • 缺点:a. 实时性监控并不好 b. 交互做得并不好,当节点开启身份验证时,会存在严重卡顿情况

2.2 kafdrop使用

首先下载Release 4.0.1 · obsidiandynamics/kafdrop

执行命令,启动监控

java -jar kafdrop-4.0.1.jar --kafka.brokerConnect=10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092

kafdrop本身是一个springboot项目,对于javaer来说是很有好的。其他一些复杂的配置,可以直接下载源码查看application.yml

# 省略以上若干配置。详细内容可自行查看
kafka:brokerConnect: localhost:9092saslMechanism: "PLAIN"securityProtocol: "SASL_PLAINTEXT"truststoreFile: "${KAFKA_TRUSTSTORE_FILE:kafka.truststore.jks}"propertiesFile : "${KAFKA_PROPERTIES_FILE:kafka.properties}"keystoreFile: "${KAFKA_KEYSTORE_FILE:kafka.keystore.jks}"

“${KAFKA_PROPERTIES_FILE:kafka.properties}”

表示获取环境变量KAFKA_PROPERTIES_FILE,如果存在则使用环境变量值,否则使用默认值kafka.properties

比如我开启了授权,那么需要再kafdrop的同级目录下创建kafka.properties,然后再次启动。

security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";

最终kafdrop监控效果如图。

三、使用

3.1 SpringKafka

3.1.1 使用

创建springboot项目,添加依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

生产者application.yml配置

spring:kafka:
#    bootstrap-servers: 10.0.0.101:9092bootstrap-servers: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092# 接入授权properties:security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

消费者application.yml配置

spring:kafka:
#    bootstrap-servers: 10.0.0.10:9092bootstrap-servers: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092# 接入授权properties:security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: group # 默认的分组名,消费者都会有所属组

源码参考kafka-cluster-demo/spring-kafka-demo at master · meethigher/kafka-cluster-demo

3.1.2 重试机制

关于SpringKafka的重试机制,未进行深入探索,只参考了文章Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎。

感觉这篇文章讲得很透彻了。按照该文章的说明,验证了以下两种重试策略

  1. 默认重试策略
  2. 自定义重试策略
默认重试策略

快速重试10次,无间隔时间,如果最后还是失败,则自动commit。

@KafkaListener(topics = "test-retry")
public void test(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {int i=1/0;
}

不需要额外配置,这是默认的重试策略。

自定义重试策略

Spring单独提供了RetryableTopic注解,及重试后的回调注解DltHandler。底层逻辑是新建了topic对这些失败的数据进行存储,以及监听这些新建的topic再进行消费,细节的话还是参考文章Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎。

@KafkaListener(topics = "test-retry")
@org.springframework.kafka.annotation.RetryableTopic(attempts = "5", backoff = @org.springframework.retry.annotation.Backoff(delay = 5000, maxDelay = 10000, multiplier = 2))
public void test(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {int i = 1 / 0;
}@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.OFFSET) long offset) {log.error("DLT Received: {} from {} @ {}", in, topic, offset);
}

更多的案例可以参考spring-kafka/samples at main · spring-projects/spring-kafka

3.2 KafkaClient

添加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>

生产者示例

import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;public class Producer {private static Logger log = LoggerFactory.getLogger(Producer.class);public static void main(String[] args) throws Exception {// 配置生产者属性Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 配置 SASL_PLAINTEXT 认证props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");// 创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息到指定主题String topic = "meethigher";String key = "timestamp";while (true) {// 发送消息String value= String.valueOf(System.currentTimeMillis());ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);producer.send(record, (metadata, exception) -> log.info("sent key={},value={} to topic={},partition={}", record.key(), record.value(), metadata.topic(), metadata.partition()));System.in.read();}// 关闭生产者
//        producer.close();}
}

消费者示例

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class Consumer {private static final Logger log = LoggerFactory.getLogger(Consumer.class);public static void main(String[] args) {// 配置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置 SASL_PLAINTEXT 认证props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题String topic = "meethigher";consumer.subscribe(Collections.singletonList(topic));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));records.forEach(record -> {log.info("Consumed record with key {} and value {}", record.key(), record.value());});}}
}

四、参考致谢

Apache Kafka

spring-projects/spring-kafka: Provides Familiar Spring Abstractions for Apache Kafka

Listeners in KAFKA

kafka+Kraft模式集群+安全认证_kafka认证机制_鸢尾の的博客-CSDN博客

我的 Kafka 旅程 - SASL+ACL 认证授权 · 配置 · 创建账号 · 用户授权 · 应用接入 - Sol·wang - 博客园

从k8s集群主节点数量为什么是奇数来聊聊分布式系统 - 知乎

万字长文解析raft算法原理 - 知乎

Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/194156.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MathType公式编辑器安装教程

一、下载 MathType7是一款可以帮助用户快速完成数学公式编辑的应用软件&#xff0c;这款软件适合在进行教育教学、科研机构、论文写作的时候使用。我们可以直接通过这款软件来获取到大量数学上使用到的函数、数学符号等内容&#xff0c;然后使用这些内容来完成公式编辑。 …

Shell循环:expect(一)

一、名词解释&#xff1a; 我们通过Shell可以实现简单的控制流功能&#xff0c;如&#xff1a;循环、判断等。但是对于需要交互的场合则必须通过人工来干预&#xff0c;有时候我们可能会需要实现和交互程序如ssh服务器等进行交互的功能。而Expect就使用来实现这种功能的工具。E…

qt-C++笔记之addItem(), addWidget(), addLayout()

qt-C笔记之addItem(), addWidget(), addLayout() code review! 文章目录 qt-C笔记之addItem(), addWidget(), addLayout()0.《官方文档》相关示例截图1.ChatGPT解释2.《Qt 5.12实战》相关示例截图《Qt 5.12实战》&#xff1a;5.6 组合框 3.addWidget()4.addWidget()和addChil…

矢量图形设计软件CorelDRAW 2023 mac界面说明

CorelDRAW 2023 mac是一款专业的矢量图形设计软件&#xff0c;由Corel公司开发。它提供了广泛的创意工具和功能&#xff0c;旨在满足设计师、艺术家和创意专业人士的需求。 CorelDRAW 2023具有直观的用户界面和工作流程&#xff0c;使用户能够轻松创建各种类型的图形设计&#…

XSS漏洞原理

XSS漏洞介绍&#xff1a; 跨站脚本攻击XSS(Cross Site Scripting)&#xff0c;为了不和层叠样式表(Cascading Style Sheets, CSS)的缩写混淆&#xff0c;故将跨站脚本攻击缩写为XSS。恶意攻击者往Web页面里插入恶意Script代码&#xff0c;当用户浏览该页面时&#xff0c;嵌入We…

基于Java SSM框架+Vue实现企业公寓后勤管理系统项目【项目源码+论文说明】

基于java的SSM框架Vue实现企业宿舍后勤管理网站演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所…

【Linux】进程控制-进程终止

目录 一、进程终止&#xff0c;OS做了什么&#xff1f; 二、进程终止的常见方式 1、代码跑完&#xff0c;结果正确 2、代码跑完&#xff0c;结果不正确 补充 (1)、main函数的返回值的意义是什么&#xff1f; (2)、return 0的含义是什么&#xff1f; (3)、退出码是什么和…

基于OpenCV和改进深度学习网络的香菇分级图像分割系统

1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义 近年来&#xff0c;随着计算机视觉和深度学习的快速发展&#xff0c;图像分割技术在各个领域中得到了广泛应用。图像分割是将图像划分为不同的区域或对象的过程&…

Visual Studio 2022+Python3.11实现C++调用python接口

大家好&#xff01;我是编码小哥&#xff0c;欢迎关注&#xff0c;持续分享更多实用的编程经验和开发技巧&#xff0c;共同进步。 查了一些资料&#xff0c;不是报这个错&#xff0c;就是报哪个错&#xff0c;没有找到和我安装的环境的一致的案例&#xff0c;于是将自己的摸索分…

SQL错题集1

1.找出选修课程成绩最差的选课记录 注&#xff1a; 聚合函数只能用在group by和&#xff08;&#xff09;括号中 找最值可用排序order bylimit 1 2. 查询选修成绩 合格的课程 超过2门的 学生编号 3.删除姓名为"LiMing"的学生信息 注&#xff1a; 删除一整行信息&…

CnosDB有主复制演进历程

分布式存储系统的复杂性涉及数据容灾备份、一致性、高并发请求和大容量存储等问题。本文结合CnosDB在分布式环境下的演化历程&#xff0c;分享如何将分布式理论应用于实际生产&#xff0c;以及不同实现方式的优缺点和应用场景。 分布式系统架构模式 分布式存储系统下按照数据复…

java消息中间件简介

一、为什么要使用消息中间件 消息中间件就是可以省去繁琐的步骤&#xff0c;直达目的&#xff0c;怎么讲呢&#xff0c;就是比如你想很多人&#xff0c;知道你的动态&#xff0c;而知道的人可能手机没电&#xff0c;可能手机信号不好&#xff0c;可能手机不在服务区&#xff0c…

智能优化算法应用:基于狮群算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于狮群算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于狮群算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.狮群算法4.实验参数设定5.算法结果6.参考文献7.MATLAB…

Maven的安装与配置本地仓库,镜像源,环境变量详细步骤

参考视频&#xff1a; 黑马程序员2023新版JavaWeb开发教程&#xff0c;实现javaweb企业开发全流程 【小飞非系列】最新Maven实战教程-项目实战构建利器 文章目录 一.下载Maven安装包二.配置Maven的本地仓库(本机仓库)三.配置镜像源&#xff08;加速jar包的下载)四.配置Maven的环…

Zookeeper 安装与部署

Zookeeper官网 目录 1 配置文件参数解读2 Zookeeper 单点安装3 Zookeeper 分布式安装 1 配置文件参数解读 Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下&#xff1a; &#xff08;1&#xff09;tickTime 2000&#xff1a;通信心跳数&#xff0c;Zookeeper 服务器与客户…

IdleStateHandler 心跳机制源码详解

优质博文&#xff1a;IT-BLOG-CN 一、心跳机制 Netty支持心跳机制&#xff0c;可以检测远程服务端是否存活或者活跃。心跳是在TCP长连接中&#xff0c;客户端和服务端定时向对方发送数据包通知对方自己还在线&#xff0c;保证连接的有效性的一种机制。在服务器和客户端之间一…

bean依赖属性配置

bean依赖属性配置 文章目录 bean依赖属性配置 Data ConfigurationProperties(prefix "cartoon") public class CartoonProperties {private Cat cat;private Mouse mouse; }cartoon:cat:name: whatage: 5mouse:name: howage: 6这样的话&#xff0c;业务bean无需在读…

FPC和PCB有哪些区别?

现在电子技术越来越先进&#xff0c;CPU可以做到5nm工艺&#xff0c;电路板可以做到几十层&#xff0c;可折叠屏应用多款手机中。 什么是FPC&#xff1f; FPC&#xff1a;Flexible Printed Circuit&#xff0c;柔性电路板&#xff0c;又被称为“软板” FPC 以聚酰亚胺或聚酯薄…

Active Stereo Without Pattern Projector论文精读

1.背景补充 主动立体相机和被动立体相机的主要区别在于它们获取立体视觉信息的方式 主动立体相机12&#xff1a; 主动立体视觉是指寻找最佳的视角去重建目标或者场景1。主动视觉的实现方式通常有&#xff1a;改变环境中的光照条件、改变相机的视角、移动相机自身位置等&…

利用 LD_PRELOAD劫持动态链接库,绕过 disable_function

目录 LD_PRELOAD 简介 程序的链接 动态链接库的搜索路径搜索的先后顺序&#xff1a; 利用LD_PRELOAD 简单的劫持 执行id命令 反弹shell 引申至 PHP 绕过disable_function 方法1&#xff1a;使用蚁剑的扩展工具绕过disable_function 方法2&#xff1a;利用 mail 函数…