CDH Kerberos 认证下Kafka 消费方式

集群Kerberos认证安装参考:https://datamining.blog.csdn.net/article/details/98480008

目录

 

环境:

配置

Java Producer 代码

文件内容:

kafka_client_jaas.conf

krb5.conf   ( kerberos 配置文件复制过来即可)

kafka.keytab

Java Consumer 代码 

Linux 控制台消费

Linux 控制台发送数据数据

Linux 控制台创建、删除Topic


环境:

CDH 6.x

Kafka 1.0.1

 

        加入kerberos认证的Kafka是无法直接用Api进行消费,需要进行安全认证。

配置

查看CDH中配置是否和下面一样,不一样则修改

Java Producer 代码

这里只列出配置的代码,其他的与普通producer相同


import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaProducer  {private static BlockingQueue<Future<RecordMetadata>> queue = new ArrayBlockingQueue<Future<RecordMetadata>>(8192*2);private static long lastCommitTime = 0;private static boolean flag = true;Producer<String, String> producer = null;public KafkaProducer() {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//这里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("max.request.size", 8000000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}}

resources目录下文件

文件内容:

kafka_client_jaas.conf

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;
};

krb5.conf   ( kerberos 配置文件复制过来即可)

# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = /etc/pki/tls/certs/ca-bundle.crtdefault_realm = JAST.COM
# default_ccache_name = KEYRING:persistent:%{uid}[realms]JAST.COM = {kdc = cs-1admin_server = cs-1
}[domain_realm].jast.com = JAST.COMjast.com = JAST.COM

 

kafka.keytab

        kerberos生成的keytab文件

生成文件方式:

kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"

具体可参考:

https://datamining.blog.csdn.net/article/details/98625330

Java Consumer 代码 

与Producer基本一致,文件说明参考Producer代码


import java.util.Arrays;
import java.util.Date;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;public class KafkaConsumer  {private static KafkaConsumer kafkaSink = null;org.apache.kafka.clients.consumer.KafkaConsumer consumer;private static int number;public KafkaConsumer(String topic,int count) {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//这里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("group.id", "y"	);props.put("zookeeper.session.timeout.ms", "600000");props.put("zookeeper.sync.time.ms", "200000");props.put("auto.commit.interval.ms", "100000");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG, count+"");//设置最大消费数props.put("security.protocol", "SASL_PLAINTEXT");consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic)); //"collectionInfo"}}

Linux 控制台消费

生成kafka用户keytab文件

kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"

生成kafka_client_jaas.conf文件,位置随意,内容如下

# cat config/kafka_jaas.conf 
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;
};

添加环境变量引用jaas文件

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_client_jaas.conf"

创建consumer.properties文件,内容如下

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
group.id=test11

此时就可以消费了

/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.200:9092 --topic test --from-beginning --consumer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/consumer.properties

成功消费数据

Linux 控制台发送数据数据

创建producer.properties文件,内容如下

# cat producer.properties 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

发送数据

/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.200:9092 --topic test --producer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/producer.properties

Producer

查看Consumer 消费成功

Linux 控制台创建、删除Topic

在linux 系统配置上面设置的jaas环境变量后即可

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_jaas.conf"

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IntelliJ IDEA Maven jar包冲突解决,快速发现jar包冲突

IntelliJ IDEA 自带的Diagrams 去查看jar包冲突相当费劲 ~ &#xff0c;安装个插件即可快速解决该问题 打开Settings&#xff0c;搜索maven helper&#xff0c;并安装 安装完成后&#xff0c;直接打开pom文件&#xff0c;点击Dependency Analyzer 删除冲突jar包&#xff0c;在…

osg基本图元

OSG绘制几何体学习总结&#xff08;超全&#xff09;在osg中&#xff0c;场景图形采用一种自顶向下的&#xff0c;分层的树状数据结构来组织空间数据集&#xff0c;以提高渲染的效率 场景图形树结构的顶部是一个根节点&#xff0c;从根节点向下延伸&#xff0c;各个组节点中均包…

如何用css设计出商品购物图片

代码&#xff1a; 常见问题&#xff1a;背景填充时候图片大小不一致、重复等等问题&#xff0c;建议在插入图片的时候最好不用img标签直接在div中添加背景图片&#xff0c;有利于对背景图片的更改。 插入图片&#xff1a;background-image: url(../img/product-auto/benz-amg-s…

OLTP、OLAP与HTAP 区别

OLTP On-Line Transaction Processing联机事务处理过程(OLTP) 也称为面向交易的处理过程&#xff0c;其基本特征是前台接收的用户数据可以立即传送到计算中心进行处理&#xff0c;并在很短的时间内给出处理结果&#xff0c;是对用户操作快速响应的方式之一。 这样做的最大优点…

osg布告板技术(Billboard)

公告牌技术,即billboard技术,在3D游戏中有着广泛的应用.它的本质就是用预先做好的几幅 位图来代替3D物体,极大地节省资源和提高速度.仔细观察<<魔法门>>系列游戏,它的精灵,树木,物 品都是二维图象,但由于它始终朝向观察者,你根本看不到它"扁"的一面,所以…

osg坐标系转换

osg中将局部坐标系下的点坐标换算成全局坐标系下点的坐标 标签&#xff1a; matrixlist2012-05-17 16:27 2940人阅读 评论(1) 收藏 举报分类&#xff1a;osg&#xff08;7&#xff09; 坐标变换版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。 今天…

C#/WPF 播放音频文件

C#播放音频文件的方式&#xff1a; 播放系统事件声音使用System.Media.SoundPlayer播放wav使用MCI Command String多媒体设备程序接口播放mp3&#xff0c;wav&#xff0c;avi等使用WindowsMediaPlayer的COM组件来播放(可视化)使用DirectX播放音频文件使用Speech播放(朗读器&am…

Hbase 二级索引 Solr int字段排序问题 can not sort on multivalued field

Hbase Solr 同步二级索引后&#xff0c;进行int字段排序时报错 报错如下 {"responseHeader":{"zkConnected":true,"status":400,"QTime":75,"params":{"q":"*:*","sort":"hbase_index…

OpenGL渲染管线,着色器,光栅化等概念理解

卧槽&#xff0c;前些日子看这几个概念就十分想吐槽&#xff0c;这么难理解的概念窃以为纯属翻译的不够接地气。————首先&#xff0c;光栅化&#xff08;Rasterize/rasteriztion&#xff09;。这个词儿Adobe官方翻译成栅格化或者像素化。没错&#xff0c;就是把矢量图形转化…

Hbase Solr 二级索引 同步int数据报错com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte

二级索引实现方式&#xff1a;Hbase Key-Value Store Indexer Solr 同步int数据时提示异常 异常如下 2019-12-16 17:39:18,346 WARN com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte value 101 to int java.lang.IllegalArgumentException: off…

CDH 6 安装 Hbase 二级索引 Solr + Key-Value Store Indexer

目录 一、集群安装Solr Key-Value Store Indexer 二、创建Hbase二级索引 1.更改表结构&#xff0c;允许复制 2.创建相应的SolrCloud集合 3.创建 collection实例并将配置文件上传到 zookeeper 4.创建 Lily HBase Indexer 配置 5.配置Morphline文件 6.注册 Lily HBase I…

glClipPlane剪裁平面

glClipPlane裁剪平面 (2012-02-21 12:49:18) 转载▼标签&#xff1a; 半平面 裁剪 线框 球体 表示 杂谈 分类&#xff1a; OPENGL void glClipPlane(GLenum plane, const GLdouble *equation); 定义一个裁剪平面。equation参数指向平面方程Ax By Cz D …

html思维导图

网页版&#xff1a;https://www.processon.com/view/link/5a658afae4b010a6e728e492

c#事件和委托

一、委托(Delegate) 1、定义 delegate是C#中的一种类型&#xff0c;它实际上是一个能够持有对某个方法的引用的类。与其它的类不同&#xff0c;delegate类能够 拥有一个签名&#xff08;signature&#xff09;&#xff0c;并且它"只能持有与它的签名相匹配的方法的引用&qu…

MonoBehaviour常用方法

1.Start()在Update方法被调用之前开始调用Start方法&#xff0c;而且Start方法在整个MonoBehaviour生命周期内只被调用一次。Awake和Start不同的地方在于Start方法仅仅在脚本初始化后被调用&#xff0c;这样允许你延迟加载任何代码&#xff0c;直到代码真正被使用时。Awake方法…

CDH6.x Solr7.x 集成 Ik 分词

下载ik相关jar包&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/19fydKWw15g8rPg4LW1cOtw 提取码&#xff1a;f2l8 在CDH安装目录下 查找CDH6 solr 的启动目录 [roothostname1 ~]# find /opt -name WEB-INF |grep solr /opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p…

行为树的原理及实现

查阅了一些行为树资料&#xff0c;目前最主要是参考了这篇文章&#xff0c;看完后感觉行为树实乃强大&#xff0c;绝对是替代状态机的不二之选。但从理论看起来很简单的行为树&#xff0c;真正着手起来却发现很多细节无从下手。 总结起来&#xff0c;就是&#xff1a; 1、行为树…

Unity 3D中的射线与碰撞检测

在我们的游戏开发过程中&#xff0c;有一个很重要的工作就是进行碰撞检测。例如在射击游戏中子弹是否击中敌人&#xff0c;在RPG游戏中是否捡到装备等等。在进行碰撞检测时&#xff0c;我们最常用的工具就是射线&#xff0c;Unity 3D的物理引擎也为我们提供了射线类以及相关的函…

HugeGraphServer 部署安装

官方文档链接&#xff1a;https://hugegraph.github.io/hugegraph-doc HugeGraphServer Quick Start 1 概述 HugeGraph-Server 是 HugeGraph 项目的核心部分&#xff0c;包含Core、Backend、API等子模块。 Core模块是Tinkerpop接口的实现&#xff0c;Backend模块用于管理数…