kafka配置SASL/PLAIN 安全认证

1 zookeeper配置启动

1.1 zookeeper添加SASL支持

为zookeeper添加SASL支持,在配置文件zoo.cfg添加

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.2 zk_server_jaas.conf文件

新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。

1.3 引入jar包

由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样

-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root  489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root  370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root   41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root   12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]# 

1.4.修改zkEnv.sh

在zkEnv.sh添加


for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
doCLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "

1.5.启动zk服务端

执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题

2 kafka配置和启动

2.1.新建kafka_server_jaas.conf,为kafka添加认证信息

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="cluster"password="cluster"user_cluster=“clusterpasswd”user_kafka="kafkapasswd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule required  username="kafka"  password="kafkapasswd";  
};

KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。

2. 2.在kafka的配置文件开启SASL认证

listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true

2.3 .在server启动脚本JVM参数

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"

2.4 启动

./kafka-server-start.sh ../config/server.properties

2.5

kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功

Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)

3 kafka的SASL认证功能认证和使用

1.使用kafka脚本认证

我们使用kafka自带的脚本进行认证。

1.新建kafka_client_jaas.conf,为客户端添加认证信息

在/home下新建kafka_client_jaas.conf,添加以下信息

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapasswd";
};

2.修改客户端配置信息

修改producer.properties和consumer.properties,添加认证机制

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN 

3.修改客户端启动脚本

修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将

export KAFKA_HEAP_OPTS=“-Xmx512M”

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

kafka-console-consumer.sh的修改类似。

4.客户端启动并认证

启动consumer

./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties

启动producer

./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties

producer端发送消息,consumer端成功接收到消息。

4.Java客户端认证

package com.zte.sdn.oscp.jms.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;import java.util.Collections;
import java.util.Properties;public class KafkaTest {@Testpublic void testProduct() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "IP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");Producer<String, String> producer = new KafkaProducer<>(props);while (true){long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));}System.out.println(System.currentTimeMillis()-startTime);Thread.sleep(5000);}}@Testpublic void testConsumer() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "(IP):9092");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("group.id", "kafka_test_group");props.put("session.timeout.ms", "6000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafkatest"));while (true) {long startTime = System.currentTimeMillis();ConsumerRecords<String, String> records = consumer.poll(1000);System.out.println(System.currentTimeMillis() - startTime);System.out.println("recieve message number is " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",record.offset(),record.key(),record.value(),record.partition());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/63907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序活动报名管理系统设计与实现

摘 要 随着当下的移动互联网技术的不断发展壮大&#xff0c;现在人们对于手机的应用已经非常的成熟&#xff0c;当下的时代基本上达到了人手一部手机&#xff0c;数字化、信息化已经成为了人们的主流生活。有数据统计&#xff0c;截止到2020年末我国的手机网民人数已经接近10亿…

sql:SQL优化知识点记录(五)

&#xff08;1&#xff09;explain之例子 &#xff08;2&#xff09;索引单表优化案例 上面的功能已经实现&#xff0c;但是分析功能&#xff0c; 使用explain分析这条sql&#xff1a; 发现type为All Extra&#xff1a;有Using filesort &#xff08;文件内排序&#xff09; 这…

stable diffusion实践操作-宽高设置以及高清修复

系列文章目录 stable diffusion实践操作 文章目录 系列文章目录前言一、SD宽高怎么设置&#xff1f;1.1 宽高历史 二、高清修复总结 前言 主要介绍SD的宽高设置以及高清修复 一、SD宽高怎么设置&#xff1f; 1.1 宽高历史 SD生成256256图片效果最好。512512是SD一直使用的画…

修改文件/文件夹名如何自动修改引入路径

场景&#xff1a; 在项目中修改文件夹结构或者文件名称后需要到引入了该文件的地方手动修改引入的路径。不想一个个修改&#xff0c;为了让引用了该文件的地方进行自动修改。 解决方案&#xff1a; 在 Visual Studio Code (VSCode) 中&#xff0c;你可以使用一些扩展或功能来…

RT-Thread 中断管理学习(一)

中断管理 什么是中断&#xff1f;简单的解释就是系统正在处理某一个正常事件&#xff0c;忽然被另一个需要马上处理的紧急事件打断&#xff0c;系统转而处理这个紧急事件&#xff0c;待处理完毕&#xff0c;再恢复运行刚才被打断的事件。生活中&#xff0c;我们经常会遇到这样…

OBS Studio 30.0 承诺在 Linux 上支持英特尔 QSV,为 DeckLink 提供 HDR 回放功能

导读OBS Studio 30.0 现已推出公开测试版&#xff0c;承诺为这款广受欢迎的免费开源截屏和流媒体应用程序提供多项令人兴奋的新功能&#xff0c;以及大量其他更改和错误修复。 OBS Studio 30.0 承诺在 Linux 上支持英特尔 QSV&#xff08;快速同步视频&#xff09;、WHIP/WebRT…

Android Native Code开发学习(三)对java中的对象变量进行操作

Android Native Code开发学习&#xff08;三&#xff09; 本教程为native code学习笔记&#xff0c;希望能够帮到有需要的人 我的电脑系统为ubuntu 22.04&#xff0c;当然windows也是可以的&#xff0c;区别不大 对java中的对象变量进行操作 首先我们新建一个java的类 pub…

xsschallenge靶场练习1-13关

文章目录 第一关第二关第三关第四关第五关第六关第七关第八关第九关第十关第十一关第十二关第十三关 第一关 观察页面 http://192.168.80.139/xsschallenge/level1.php?nametest尝试在name后面输入最近基本的xss语法 <script>alert(1)</script>第二关 查看页面源…

el-date-picker 等 点击无反应不回显问题解决

如上图&#xff0c;编辑回显正常&#xff0c;但是时间控件在拖动过程中时间不会跟随改变。 解决办法&#xff1a; <el-date-picker input"onInput()" ...><el-input input"onInput()" ...>js中onInput() {this.$forceUpdate();},

2.单链表练习

1. 链表的基本概念 链表&#xff08;Linked List&#xff09;是一种常见的数据结构&#xff0c;用于存储一系列元素&#xff0c;这些元素可以是任意类型的数据。链表中的每个元素被称为节点&#xff08;Node&#xff09;&#xff0c;每个节点包含两部分&#xff1a;一个存储数…

【Docker】02-安装mysql

参考教程&#xff1a; https://www.bilibili.com/video/BV1Qa4y1t7YH/?p5&spm_id_frompageDriver&vd_source4964ba5015a16eb57d0ac13401b0fe77 docker安装Mysql 1、拉取最新版本的镜像 docker pull mysq:latestl 2、运行mysql服务 docker run --name mysql -e MYSQL_…

2018ECCV Can 3D Pose be Learned from2D Projections Alone?

摘要 在计算机视觉中&#xff0c;从单个图像的三维姿态估计是一个具有挑战性的任务。我们提出了一种弱监督的方法来估计3D姿态点&#xff0c;仅给出2D姿态地标。我们的方法不需要2D和3D点之间的对应关系来建立明确的3D先验。我们利用一个对抗性的框架&#xff0c;强加在3D结构…

【设计模式】Head First 设计模式——构建器模式 C++实现

设计模式最大的作用就是在变化和稳定中间寻找隔离点&#xff0c;然后分离它们&#xff0c;从而管理变化。将变化像小兔子一样关到笼子里&#xff0c;让它在笼子里随便跳&#xff0c;而不至于跳出来把你整个房间给污染掉。 设计思想 ​ 将一个复杂对象的构建与其表示相分离&…

ArcGIS Maps SDK for JS(一):概述与使用

文章目录 1 概述2 如何使用ArcGIS Maps SDK for JavaScript2.1 AMD 模块与 ES 模块2.2 AMD 模块和 ES 模块比较 3 几种安装方式3.1 通过 ArcGIS CDN 获取 AMD 模块3.2 通过 NPM 运行 ES 模块3.3 通过 CDN 获取 ES 模块3.4 本地构建 ES3.5 本地构建 AMD 3 VSCode下载与安装2.1 下…

Redis之分布式锁

背景 分布式应用中&#xff0c;经常会遇到并发问题。熟悉的朋友都知道这个时候就需要加锁。只有原子操作才能保证数据不会混乱。&#xff08;原子操作是不会被线程调度机制所打断的的操作&#xff0c;一旦开始就会执行到最后&#xff0c;要么做要么不做&#xff0c;不会被打断…

#systemverilog# 之 event region 和 timeslot 仿真调度(六)疑惑寄存器采样吗

一 象征性啰嗦 想必大家在刚开始尝试写Verilig HDL代码的时候,都是参考一些列参考代码,有些来自于参考书,有些来自于网上大牛的笔记,甚至有写来自于某宝FPGA开发板的授权代码。我还记得自己当时第一次写代码,参考的是一款Altera 芯片,结合Quartus 开发软件, 在上面练习…

常用框架分析(7)-Flutter

框架分析&#xff08;7&#xff09;-Flutter 专栏介绍Flutter核心思想Flutter的特点快速开发跨平台高性能美观的用户界面 Flutter的架构框架层引擎层平台层 开发过程使用Dart语言编写代码编译成原生代码热重载工具和插件 优缺点优点跨平台开发高性能美观的用户界面热重载强大的…

linux如何抓包数据

分析&回答 (1) 想要截获所有210.27.48.1 的主机收到的和发出的所有的分组&#xff1a; #tcpdump host 210.27.48.1 复制代码 (2) 想要截获主机210.27.48.1 和主机210.27.48.2或210.27.48.3的通信&#xff0c;使用命令(注意&#xff1a;括号前的反斜杠是必须的)&#xff…

225. 用队列实现栈

225. 用队列实现栈 \请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&#xff09;。 实现 MyStack 类&#xff1a; void push(int x) 将元素 x 压入栈顶。 int pop() 移除并…

Redis 7 第四讲 数据持久化

总体 RDB 介绍 RDB 持久化以指定的时间间隔执行数据集的时间点快照 。 把某一时刻的数据和状态以文件的形式写到磁盘上,即使出现故障宕机,快照文件也不会丢失,数据的可靠性得到保证。快照文件就是RDB(Redis DataBase)文件(dump.rdb) 作用 在指定的时间间隔内将内存中的数…