集群Kerberos认证安装参考:https://datamining.blog.csdn.net/article/details/98480008
目录
环境:
配置
Java Producer 代码
文件内容:
kafka_client_jaas.conf
krb5.conf ( kerberos 配置文件复制过来即可)
kafka.keytab
Java Consumer 代码
Linux 控制台消费
Linux 控制台发送数据数据
Linux 控制台创建、删除Topic
环境:
CDH 6.x
Kafka 1.0.1
加入kerberos认证的Kafka是无法直接用Api进行消费,需要进行安全认证。
配置
查看CDH中配置是否和下面一样,不一样则修改
Java Producer 代码
这里只列出配置的代码,其他的与普通producer相同
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaProducer {private static BlockingQueue<Future<RecordMetadata>> queue = new ArrayBlockingQueue<Future<RecordMetadata>>(8192*2);private static long lastCommitTime = 0;private static boolean flag = true;Producer<String, String> producer = null;public KafkaProducer() {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//这里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("max.request.size", 8000000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}}
resources目录下文件
文件内容:
kafka_client_jaas.conf
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;
};
krb5.conf ( kerberos 配置文件复制过来即可)
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = /etc/pki/tls/certs/ca-bundle.crtdefault_realm = JAST.COM
# default_ccache_name = KEYRING:persistent:%{uid}[realms]JAST.COM = {kdc = cs-1admin_server = cs-1
}[domain_realm].jast.com = JAST.COMjast.com = JAST.COM
kafka.keytab
kerberos生成的keytab文件
生成文件方式:
kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"
具体可参考:
https://datamining.blog.csdn.net/article/details/98625330
Java Consumer 代码
与Producer基本一致,文件说明参考Producer代码
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;public class KafkaConsumer {private static KafkaConsumer kafkaSink = null;org.apache.kafka.clients.consumer.KafkaConsumer consumer;private static int number;public KafkaConsumer(String topic,int count) {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//这里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("group.id", "y" );props.put("zookeeper.session.timeout.ms", "600000");props.put("zookeeper.sync.time.ms", "200000");props.put("auto.commit.interval.ms", "100000");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG, count+"");//设置最大消费数props.put("security.protocol", "SASL_PLAINTEXT");consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic)); //"collectionInfo"}}
Linux 控制台消费
生成kafka用户keytab文件
kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"
生成kafka_client_jaas.conf文件,位置随意,内容如下
# cat config/kafka_jaas.conf
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;
};
添加环境变量引用jaas文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_client_jaas.conf"
创建consumer.properties文件,内容如下
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
group.id=test11
此时就可以消费了
/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.200:9092 --topic test --from-beginning --consumer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/consumer.properties
成功消费数据
Linux 控制台发送数据数据
创建producer.properties文件,内容如下
# cat producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
发送数据
/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.200:9092 --topic test --producer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/producer.properties
Producer
查看Consumer 消费成功
Linux 控制台创建、删除Topic
在linux 系统配置上面设置的jaas环境变量后即可
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_jaas.conf"