Kafka安全认证 SASL/PLAINTEXT,账号密码认证

环境

操作系统:CentOS 7.3

Kafka Version:1.1.1

Zookeeper Version:3.4.14

一、Zookeeper集群配置SASL

zookeeper所有节点都是对等的,只是各个节点角色可能不相同。以下步骤所有的节点配置相同。

1、zoo.cfg文件配置

为zookeeper添加SASL支持,在配置文件zoo.cfg添加

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

2.新建zoo_jaas.conf文件,为Zookeeper添加账号认证信息
这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/home路径下。zk_server_jaas.conf文件的内容如下

Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapwd"user_kafka="kafkapwd";
};

username和paasword是zk集群之间的认证密码。
user_kafka="kafkaUser65#.com"定义了一个用户"kafka",密码是"kafkaUser65#.com"

3.将Kafka相关jar包导入到Zookeeper

Zookeeper的认证机制是使用插件,“org.apache.kafka.common.security.plain.PlainLoginModule”,所以需要导入Kafka相关jar包,kafka-clients相关jar包,在kafka服务下的lib目录中可以找到,根据kafka不同版本,相关jar包版本会有所变化。

所需要jar包如下,在zookeeper下创建目录zk_sasl_lib将jar包放入(目录名与位置可以随便,后续引用指定即可):

kafka-clients-1.1.1.jar
lz4-java-1.4.1.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.1.jar

4.修改zkEnv.sh,主要目的就是将这几个jar包使zookeeper读取到

在$KAFKA_HOME/bin目录下找到zkEnv.sh文件,添加如下代码

注意引用的目录下jar包,与之前创建的zoo_jaas.conf文件

for i in /opt/zookeeper-3.4.14/zk_sasl_lib/*.jar;
doCLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper-3.4.14/conf/zoo_jaas.conf"

5.重启Zookeeper服务

zkServer.sh restart

 查看状态

zkServer.sh status

二、Kafka集群配置SASL

  • 注:所有节点操作相同

1.创建kafka_server_jaas.conf文件,该文件名可以自己修改,为kafka添加认证信息

内容如下(这里的Client与Zookeeper相对应,KafkaServer与后期调用时读取的KafkaClient相对应,是消费生产的账号密码,不要弄混了):

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="zsh"password="zshpwd"user_zsh="zshpwd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapwd";
};

    先解释KafkaServer,使用user_<name>来定义多个用户,供客户端程序(生产者、消费者程序)认证使用,可以定义多个,后续配置可能还可以根据不同的用户定义ACL,这部分内容超出本文范围。这是目前我对配置的理解。上例我定义了三个用户,一个是admin,一个是producer,一个是consumer,等号后面是对应用户的密码(如user_producer定义了用户名为producer,密码为prod-sec的用户)。再选择一个用户,用于Kafka内部的各个broker之间通信,这里我选择admin用户,对应的密码是admin-sec。
    Client配置节则容易理解得多,主要是broker链接到zookeeper,从上文的Zookeeper JAAS文件中选择一个用户,填写用户名和密码即可。

2.在Kafka Server.properties添加、修改如下信息

listeners=SASL_PLAINTEXT://192.168.2.xxx:19092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

 

3.Kafka启动脚本中加入配置,读取第一步创建的文件,kafka_server_jaas.conf

修改kafka的kafka-server-start.sh文件,

在如下代码

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

添加

 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka_2.11-1.1.1/config/kafka_server_jaas.conf"

4.启动Kafka服务,查看日志是否正常

kafka-server-start.sh -daemon /opt/kafka_2.11-1.1.1/config/server.properties

 


三、Java客户端调用认证

这里只对配置进行举例,其他操作不变

  • 客户端文件-kafka_client_jaas.conf
KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule required  username="zsh"  password="zshpwd";  
};
  • 生产者:
public KafkaProducer() {System.setProperty("java.security.auth.login.config", "C://Kafkaanquan//kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP.getValue());props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("max.request.size", 8000000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}
  •  消费者
public KafkaConsumer(String topic,int count) {Properties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP.getValue());//group 代表一个消费组props.put("group.id", "20190305"	);props.put("zookeeper.session.timeout.ms", "600000");props.put("zookeeper.sync.time.ms", "200000");props.put("auto.commit.interval.ms", "100000");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG, count+"");//设置最大消费数//        MAX_POLL_RECORDS_CONFIGprops.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"zsh\" password=\"zshpwd\";");//System.setProperty("java.security.auth.login.config", "C:/Kafkaanquan/kafka_client_jaas.conf"); 读取配置文件方式,与读取配置文件二选一consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic)); //"collectionInfo"
}

主要配置加入配置是以下三项

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config", "C:/Kafkaanquan/kafka_server_jaas.conf");

 

  • 异常说明

1.未读取Kafka认证客户端文件

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumerat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)at com.xxx.kafka.KafkaConsumer.<init>(KafkaConsumer.java:32)at com.xxx.kafka.KafkaConsumer.getInstance(KafkaConsumer.java:38)at com.xxx.kafka.KafkaConsumer.main(KafkaConsumer.java:44)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not setat org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:710)... 5 more

2. 认证文件中的账号密码不正确

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumerat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)at com.xxx.kafka.KafkaConsumer.<init>(KafkaConsumer.java:32)at com.xxx.kafka.KafkaConsumer.getInstance(KafkaConsumer.java:38)at com.xxx.kafka.KafkaConsumer.main(KafkaConsumer.java:44)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not setat org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:710)... 5 more

ACL用户权限控制具体配置请查看:https://datamining.blog.csdn.net/article/details/90291813

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510035.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

预处理指令 #pragma 的使用

#pragma comment( comment-type [,"commentstring"] )该宏放置一个注释到对象文件或者可执行文件。comment-type是一个预定义的标识符&#xff0c;指定注释的类型&#xff0c;应该是compiler&#xff0c;exestr&#xff0c;lib&#xff0c;linker之一。commentstring…

Kafka ACL控制,用户权限能控制

本文章基于Kafka配置 SASL/PLAINTEXT后编写&#xff0c;如未配置请参考&#xff1a;https://datamining.blog.csdn.net/article/details/90264636 进行修改 配置kafka server.properties文件 super.users 指定超级用户&#xff0c;不受权限控制 listenersSASL_PLAINTEXT://ip…

ubuntu 系统U盘中 文件出现小锁子

1.插入不有问题的U盘&#xff0c;输入sudo fdisk -l指令&#xff0c;查看磁盘信息。 Disk /dev/sda: 57.6 GiB, 61872793600 bytes, 120845300 sectors Units: sectors of 1 * 512 512 bytes Sector size (logical/physical): 512 bytes / 512 bytes I/O size (minimum/optim…

mysql配置

MySQL5.6.11安装步骤&#xff08;Windows7 64位&#xff09; http://jingyan.baidu.com/article/f3ad7d0ffc061a09c3345bf0.html1. 下载MySQL Community Server 5.6.21&#xff0c;注意选择系统类型&#xff08;32位/64位&#xff09; 2. 解压MySQL压缩包 将以下载的MySQL压缩包…

Kafka JMX监控报错 Failed to get broker metrics for BrokerIdentity(128,192.168.2.128,9999,true,false,Map

KafkaManager报错 2019-05-19 14:21:53,817 - [ERROR] k.m.a.c.BrokerViewCacheActor - Failed to get broker metrics for BrokerIdentity(128,192.168.2.128,9999,true,false,Map(SASL_PLAINTEXT -> 19092)) java.rmi.ConnectException: Connection refused to host: 127.…

大数据技术讲解

HDFS的体系架构 整个Hadoop的体系结构主要是通过HDFS来实现对分布式存储的底层支持&#xff0c;并通过MR来实现对分布式并行任务处理的程序支持。 HDFS采用主从&#xff08;Master/Slave&#xff09;结构模型&#xff0c;一个HDFS集群是由一个NameNode和若干个DataNode组…

mfc常见面试题

http://www.mianwww.com/html/2014/05/21208.html 理解c&#xff0b;&#xff0b;语言中一些概念以及它们之间的区别&#xff08;需要深刻理解&#xff09;&#xff1a; &#xff08;1&#xff09;局部变量全局变量静态变量 const常量寄存器变量宏定义的常量 static变量注&…

ubuntu16.04安装gcc g++7.5.0及各个版本的切换

sudo add-apt-repository ppa:ubuntu-toolchain-r/test sudo apt-get update sudo apt-get install gcc-7 sudo apt-get install g-7 出现以下错误&#xff1a; Reading package lists... Done Building dependency tree Reading state information... Done You mi…

eclipse闪退打不开问题

起初在网上看了N多篇的解决文章&#xff0c;可惜都试过了没有一个解决我的问题&#xff0c;后来就一顿折腾&#xff0c;把c盘中出现的 .eclipse&#xff08;点eclipse&#xff09;删除之后重新运行eclipse等待一会儿就可以正常启动了&#xff0c;通过这次警告自己以后不能异常…

掌握技能

掌握技能&#xff1a; 熟悉C/C语言,&#xff0c;熟悉常用的设计模式及设计原则&#xff1b; 熟悉常用的重构手法&#xff0c;有良好的编码风格&#xff1b; 熟练使用VS2010集成开发环境和SVN版本控制系统&#xff1b; 熟悉MFC、Qt库的使用&#xff1b; 熟悉C标准程序库STL以及常…

kafka manager 2.0 工具下载 已打包完成

链接&#xff1a;https://pan.baidu.com/s/1YMj-9HzoJLKDEY5C47aOlQ 提取码&#xff1a;hhhr

linux系统备份和恢复

系统备份&#xff1a; sudo su cd / tar cvpzf backup.tgz --exclude/proc --exclude/lostfound --exclude/backup.tgz --exclude/mnt --exclude/sys --exclude/media / 系统恢复&#xff1a; ctrl alt F1 输入用户名和密码 cd / sudo tar xvpfz backup.tgz

多线程Runnable类创建多线程

package com.ajax; //多线程Runnable类创建多线程 public class Example01 {public static void main(String [] args){TicketWindow twnew TicketWindow();new Thread(tw,"窗口1").start();new Thread(tw,"窗口2").start();new Thread(tw,"窗口3&quo…

LPTSTR、LPCSTR、LPCTSTR、LPSTR的区别

UNICODE&#xff1a;它是用两个字节表示一个字符的方法。比如字符A在ASCII下面是一个字符&#xff0c;可A在UNICODE下面是两个字符,高字符用0填充&#xff0c;而且汉字程在ASCII下面是两个字节&#xff0c;而在UNICODE下仍旧是两个字节。UNICODE的用处就是定长表示世界文字&…

Hbase快照Snapshot 数据备份、恢复与迁移

场景 hbase数据迁移时我们需要统计迁移时的数据量&#xff0c;以确保迁移后的数据的完成&#xff0c;但是如果hbase表数据持续增加的话&#xff0c;迁移时无法统计出准确的数据量&#xff0c;此时我们使用快照的方式进行数据迁移&#xff0c;以确保迁移的数量可以和某一时间节点…

多线程Thread类创建多线程

package com.ajax; //多线程Thread类创建多线程 public class Example02 {public static void main(String[] args){new MyThread().start();new MyThread().start();new MyThread().start();//创建一个线程对象并开启new MyThread().start();} } class MyThread extends Threa…

C++面试题一

指针和引用的区别指针指向一块内存&#xff0c;它的内容是指向内存的地址&#xff1b;引用是某内存的别名 引用使用是无需解引用&#xff0c;指针需解引用 引用不能为空&#xff0c;指针可以为空 引用在定义是被初始化一次&#xff0c;之后不可变&#xff1b;指针可变 程序…

maven 打包报错 surefire-reports for the individual test results.

Eclipse Maven打包报错 [ERROR] [ERROR] Please refer to D:\File\workspace\izh-common-util\target\surefire-reports for the individual test results. [ERROR] -> [Help 1] [ERROR] 原因这是因为测试代码时遇到错误&#xff0c;它会停止编译。 解决方法 在pom.xm…

ubuntu 配置 静态ip

1.参看网卡名称 ifconfig lo Link encap:Local Loopback inet addr:127.0.0.1 Mask:255.0.0.0inet6 addr: ::1/128 Scope:HostUP LOOPBACK RUNNING MTU:65536 Metric:1RX packets:10210 errors:0 dropped:0 overruns:0 frame:0TX packets:10210 errors:0 dropped:…

eclipse maven项目 maven build 提示jdk版本不对

eclipse打包maven项目提示jdk版本不对&#xff0c;但eclipse版本已经修改jdk版本&#xff0c;可查看maven配置是否正确 maven项目中pom.xml 添加 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven…