CDH Kerberos 认证下Kafka 消费方式

集群Kerberos认证安装参考:https://datamining.blog.csdn.net/article/details/98480008

目录

 

环境:

配置

Java Producer 代码

文件内容:

kafka_client_jaas.conf

krb5.conf   ( kerberos 配置文件复制过来即可)

kafka.keytab

Java Consumer 代码 

Linux 控制台消费

Linux 控制台发送数据数据

Linux 控制台创建、删除Topic


环境:

CDH 6.x

Kafka 1.0.1

 

        加入kerberos认证的Kafka是无法直接用Api进行消费,需要进行安全认证。

配置

查看CDH中配置是否和下面一样,不一样则修改

Java Producer 代码

这里只列出配置的代码,其他的与普通producer相同


import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaProducer  {private static BlockingQueue<Future<RecordMetadata>> queue = new ArrayBlockingQueue<Future<RecordMetadata>>(8192*2);private static long lastCommitTime = 0;private static boolean flag = true;Producer<String, String> producer = null;public KafkaProducer() {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//这里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("max.request.size", 8000000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}}

resources目录下文件

文件内容:

kafka_client_jaas.conf

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;
};

krb5.conf   ( kerberos 配置文件复制过来即可)

# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = /etc/pki/tls/certs/ca-bundle.crtdefault_realm = JAST.COM
# default_ccache_name = KEYRING:persistent:%{uid}[realms]JAST.COM = {kdc = cs-1admin_server = cs-1
}[domain_realm].jast.com = JAST.COMjast.com = JAST.COM

 

kafka.keytab

        kerberos生成的keytab文件

生成文件方式:

kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"

具体可参考:

https://datamining.blog.csdn.net/article/details/98625330

Java Consumer 代码 

与Producer基本一致,文件说明参考Producer代码


import java.util.Arrays;
import java.util.Date;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;public class KafkaConsumer  {private static KafkaConsumer kafkaSink = null;org.apache.kafka.clients.consumer.KafkaConsumer consumer;private static int number;public KafkaConsumer(String topic,int count) {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//这里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("group.id", "y"	);props.put("zookeeper.session.timeout.ms", "600000");props.put("zookeeper.sync.time.ms", "200000");props.put("auto.commit.interval.ms", "100000");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG, count+"");//设置最大消费数props.put("security.protocol", "SASL_PLAINTEXT");consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic)); //"collectionInfo"}}

Linux 控制台消费

生成kafka用户keytab文件

kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"

生成kafka_client_jaas.conf文件,位置随意,内容如下

# cat config/kafka_jaas.conf 
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;
};

添加环境变量引用jaas文件

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_client_jaas.conf"

创建consumer.properties文件,内容如下

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
group.id=test11

此时就可以消费了

/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.200:9092 --topic test --from-beginning --consumer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/consumer.properties

成功消费数据

Linux 控制台发送数据数据

创建producer.properties文件,内容如下

# cat producer.properties 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

发送数据

/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.200:9092 --topic test --producer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/producer.properties

Producer

查看Consumer 消费成功

Linux 控制台创建、删除Topic

在linux 系统配置上面设置的jaas环境变量后即可

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_jaas.conf"

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

glsl基础

目录[-] 变量 基本类型 结构体 数组 修饰符 内置变量 表达式 操作符 数组访问 构造函数 成分选择 控制流 循环 if/else discard 函数 变量 GLSL的变量命名方式与C语言类似。变量的名称可以使用字母&#xff0c;数字以及下划线&#xff0c;但变量名不能以数字开头&#xff0c;还…

1260 不一样的A+B

1260 不一样的AB Time Limit : 2000/1000 MS(Java/Others) | Memory Limit :65536/32768 KB(Java/Others) Submits : 641 | Solved : 357 Description 读入两个小于100的正整数A和B&#xff0c;计算AB的值并输出。需要注意的是&#xff1a;整数A和B的每一位数字由对应的英文单…

IntelliJ IDEA Maven jar包冲突解决,快速发现jar包冲突

IntelliJ IDEA 自带的Diagrams 去查看jar包冲突相当费劲 ~ &#xff0c;安装个插件即可快速解决该问题 打开Settings&#xff0c;搜索maven helper&#xff0c;并安装 安装完成后&#xff0c;直接打开pom文件&#xff0c;点击Dependency Analyzer 删除冲突jar包&#xff0c;在…

GLSL学习教程博客

http://blog.csdn.net/racehorse/article/details/6593719

1185 城市名排序

1185 城市名排序 Time Limit : 2000/1000 MS(Java/Others) | Memory Limit :65536/32768 KB(Java/Others) Submits : 3989 | Solved : 2049 Description 从键盘输入n个城市名&#xff0c;进行升序排序并输出。 Input 第一行输入一个整数n&#xff0c;表示有n个城市&#xff0…

osg基本图元

OSG绘制几何体学习总结&#xff08;超全&#xff09;在osg中&#xff0c;场景图形采用一种自顶向下的&#xff0c;分层的树状数据结构来组织空间数据集&#xff0c;以提高渲染的效率 场景图形树结构的顶部是一个根节点&#xff0c;从根节点向下延伸&#xff0c;各个组节点中均包…

kafka如何彻底删除topic及数据

前言&#xff1a; 删除kafka topic及其数据&#xff0c;严格来说并不是很难的操作。但是&#xff0c;往往给kafka 使用者带来诸多问题。项目组之前接触过多个开发者&#xff0c;发现都会偶然出现无法彻底删除kafka的情况。本文总结多个删除kafka topic的应用场景&#xff0c;总…

如何用css设计出商品购物图片

代码&#xff1a; 常见问题&#xff1a;背景填充时候图片大小不一致、重复等等问题&#xff0c;建议在插入图片的时候最好不用img标签直接在div中添加背景图片&#xff0c;有利于对背景图片的更改。 插入图片&#xff1a;background-image: url(../img/product-auto/benz-amg-s…

OLTP、OLAP与HTAP 区别

OLTP On-Line Transaction Processing联机事务处理过程(OLTP) 也称为面向交易的处理过程&#xff0c;其基本特征是前台接收的用户数据可以立即传送到计算中心进行处理&#xff0c;并在很短的时间内给出处理结果&#xff0c;是对用户操作快速响应的方式之一。 这样做的最大优点…

osg布告板技术(Billboard)

公告牌技术,即billboard技术,在3D游戏中有着广泛的应用.它的本质就是用预先做好的几幅 位图来代替3D物体,极大地节省资源和提高速度.仔细观察<<魔法门>>系列游戏,它的精灵,树木,物 品都是二维图象,但由于它始终朝向观察者,你根本看不到它"扁"的一面,所以…

1224 哥德巴赫猜想(2)

1224 哥德巴赫猜想&#xff08;2&#xff09; Time Limit : 2000/1000 MS(Java/Others) | Memory Limit :65536/32768 KB(Java/Others) Submits : 1564 | Solved : 629 Description 所谓哥德巴赫猜想&#xff0c;就是指任何一个大于2的偶数&#xff0c;都可以写成两个素数的和…

Hbase 查看 rowkey在哪个region中

首先我们先了解下 hbase:meta 表&#xff0c;hbase是指的namespace&#xff0c;meta是表名&#xff0c;这张表存储的是整个集群的Region信息。 hbase:meta表的一个rowkey对应一个region&#xff0c;rowkey设计如下: 表名,region的startRowkey,region创建时的时间戳,EcodedName …

osg坐标系转换

osg中将局部坐标系下的点坐标换算成全局坐标系下点的坐标 标签&#xff1a; matrixlist2012-05-17 16:27 2940人阅读 评论(1) 收藏 举报分类&#xff1a;osg&#xff08;7&#xff09; 坐标变换版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。 今天…

1295 爱翻译

1295 爱翻译 Time Limit : 2000/1000 MS(Java/Others) | Memory Limit :65536/32768 KB(Java/Others) Submits : 390 | Solved : 80 Description 英语是现在世界第一大语言&#xff0c;所以学好英语是very important&#xff0c; 但是宁波大学的某个大牛说&#xff1a;“我不怕…

C#/WPF 播放音频文件

C#播放音频文件的方式&#xff1a; 播放系统事件声音使用System.Media.SoundPlayer播放wav使用MCI Command String多媒体设备程序接口播放mp3&#xff0c;wav&#xff0c;avi等使用WindowsMediaPlayer的COM组件来播放(可视化)使用DirectX播放音频文件使用Speech播放(朗读器&am…

Hbase 二级索引 Solr int字段排序问题 can not sort on multivalued field

Hbase Solr 同步二级索引后&#xff0c;进行int字段排序时报错 报错如下 {"responseHeader":{"zkConnected":true,"status":400,"QTime":75,"params":{"q":"*:*","sort":"hbase_index…

OpenGL渲染管线,着色器,光栅化等概念理解

卧槽&#xff0c;前些日子看这几个概念就十分想吐槽&#xff0c;这么难理解的概念窃以为纯属翻译的不够接地气。————首先&#xff0c;光栅化&#xff08;Rasterize/rasteriztion&#xff09;。这个词儿Adobe官方翻译成栅格化或者像素化。没错&#xff0c;就是把矢量图形转化…

Hbase Solr 二级索引 同步int数据报错com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte

二级索引实现方式&#xff1a;Hbase Key-Value Store Indexer Solr 同步int数据时提示异常 异常如下 2019-12-16 17:39:18,346 WARN com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte value 101 to int java.lang.IllegalArgumentException: off…

jQuery设计动画

一、显隐效果show() show(duration,[callback])show([duration],[easing],[callback]) 参数说明&#xff1a;duration&#xff1a;为一个字符串或者数字&#xff0c;决定动画将运行多久callback&#xff1a;表示在动画完成时执行的函数。easing&#xff1a;为一个字符串&#x…

值得推荐的C/C++框架和库

下次造轮子前先看看现有的轮子吧 值得学习的C语言开源项目 - 1. Webbench Webbench是一个在linux下使用的非常简单的网站压测工具。它使用fork()模拟多个客户端同时访问我们设定的URL&#xff0c;测试网站在压力下工作的性能&#xff0c;最多可以模拟3万个并发连接去测试网站的…