aws msk加密方式和问控制连接方式

msk加密方式

msk提供了两种加密方式

  • 静态加密
  • 传输中加密

创建集群时可以指定加密方式,参数如下

aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3// encryptioninfo.json
{"EncryptionAtRest": {"DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"},"EncryptionInTransit": {"InCluster": true,"ClientBroker": "TLS"}
}

查看证书位置

$ pwd
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.amzn2.0.1.x86_64/jre/lib/security
$ ls -al ../../../../../../../etc/pki/java/cacerts
lrwxrwxrwx 1 root root 40 May  8 09:44 ../../../../../../../etc/pki/java/cacerts -> /etc/pki/ca-trust/extracted/java/cacerts
$ cp /etc/pki/ca-trust/extracted/java/cacerts /tmp/kafka.client.truststore.jks

测试tls加密,创建client.properties

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

列出端点

$ aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:037047667284:cluster/mytest/93d5cf51-9e82-4049-a4bc-cefb6bd61716-3
{"BootstrapBrokerString": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092","BootstrapBrokerStringTls": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094","BootstrapBrokerStringSaslScram": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096","BootstrapBrokerStringSaslIam": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098"
}

测试tls连接

$ ./kafka-topics.sh --bootstrap-server b-2.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
__amazon_msk_canary
__consumer_offsets
first# 连接string端点报错
$ ./kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --command-config client.properties --list[2023-07-20 12:40:04,944] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)

指定iam的客户端配置,之后连接tls端口会报错

  • 可见这个tls broker连接仅仅是给Unauthenticated用的,并且如果开了iam认证会失败
./kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list[2023-07-20 12:26:31,401] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.14.174:9094) failed authentication due to: Unexpected handshake request with client mechanism AWS_MSK_IAM, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
[2023-07-20 12:26:31,403] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)

msk访问控制

https://docs.amazonaws.cn/msk/latest/developerguide/kafka_apis_iam.html

kafka客户端配置,https://kafka.apache.org/documentation/#security_configclients

msk可选的访问控制/加密组合如下,访问控制方式决定了能够选择的加密方式

AuthenticationClient-broker encryption optionsBroker-broker encryption
UnauthenticatedTLS, PLAINTEXT, TLS_PLAINTEXTCan be on or off
mTLSTLS, TLS_PLAINTEXTMust be on
SASL/SCRAMTLSMust be on
SASL/IAMTLSMust be on

集群完毕后提供了多种连接终端节点

端口信息,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/port-info.html

在这里插入图片描述

plaintext

采取Unauthenticated方式,客户端使用PLAINTEXT

在这里插入图片描述

查找bootstrap-server端点

在这里插入图片描述

(可选)在bin/client.properties中加入客户端配置

security.protocol=PLAINTEXT

测试连接,不需要特意配置tls连接

./bin/kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --list

java客户端开启ssl连接

// 开启 tls 连接
properties.put("security.protocol", "SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512");// 创建kafka对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

IAM认证

msk对kafka的源码进行了修改,允许使用iam进行认证,访问事件会发送到cloudtrail中。注意事项

  • 不适用于zk节点

  • 开启iam认证后,allow.everyone.if.no.acl.found配置无效

  • 使用iam认证后创建的kafka acl(存储在zk中),对iam认证无效

  • client和broker之间必须启用tls加密

  • 和连接kafka相关的权限以kafka-cluster作为前缀,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/iam-access-control.html

  • 需要使用9098和9198端口

shell连接

需要在客户端配置如下参数

# config/client.properties
# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE> # if don't specify a value for ssl.truststore.location, the Java process uses the default certificate.
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
awsProfileName="admin";

下载客户端依赖jar到/libs目录下

https://github.com/aws/aws-msk-iam-auth/releases

aws s3 cp s3://zhaojiew/software/aws-msk-iam-auth-1.1.7-all.jar .

测试连接

./bin/kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list

可能出现以下报错

./bin/kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list                     Error while executing topic command : Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[2023-07-20 10:57:39,293] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics(kafka.admin.TopicCommand$)
[2023-07-20 10:57:39,316] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap spaceat java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)at org.apache.kafka.common.network.Selector.poll(Selector.java:481)at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)at java.lang.Thread.run(Thread.java:750)

指定client配置后成功连接

  • .aws/config中的profile需要写成[profile prod]
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --list
[2023-07-20 11:21:06,517] WARN The configuration 'awsProfileName' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
__amazon_msk_canary
__consumer_offsets

创建topic

./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --topic first --create --partitions 2 --replication-factor 2

发送消息

./kafka-console-producer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --producer.config client.properties --topic first

消费信息

./kafka-console-consumer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --consumer.config client.properties --from-beginning --topic first

java代码连接

加入依赖

<dependency><groupId>software.amazon.msk</groupId><artifactId>aws-msk-iam-auth</artifactId><version>1.0.0</version>
</dependency>
// 完整配置
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "AWS_MSK_IAM");
properties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
properties.put("sasl.client.callback.handler.class",IAMClientCallbackHandler.class.getName());
properties.put("awsProfileName","admin");

相关报错

// 没有导上面包的报错如下
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModuleat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)... 4 more// 如果没有找到凭证
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.DefaultAWSCredentialsProviderChain@7e8d5309: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: To use assume role profiles the aws-java-sdk-sts module must be on the class path.,

mTLS

目前中国区不可用,需要依赖于Private CA

SASL/SCRAM

https://docs.amazonaws.cn/en_us/msk/latest/developerguide/msk-password.html

使用secret manager保存username和password

在这里插入图片描述

创建secret

  • 名称必须以AmazonMSK_开头

  • 不能使用默认kms加密secret

    在这里插入图片描述

  • 密钥内容必须为以下格式

    {"username": "alice","password": "alice-secret"
    }
    

    在这里插入图片描述

shell连接

创建配置文件users_jaas.conf,导出为环境变量

# KafkaClient首字母大写
cat > /tmp/users_jaas.conf << EOF
KafkaClient {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="alice"password="alice-secret";
};
EOFexport KAFKA_OPTS=-Djava.security.auth.login.config=/tmp/users_jaas.conf

bin目录下创建客户端配置文件

cat > client_sasl.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
EOF

链接集群

./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096  --command-config client_sasl.properties --list

相关报错

# 密码错误
[2023-07-21 09:40:44,467] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.23.61:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512

java代码连接

java代码连接配置

System.setProperty("java.security.auth.login.config", "/tmp/users_jaas.conf");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512"); //仅支持SCRAM-SHA-512

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/15525.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android四大组件 Broadcast广播机制

一 概述 广播 (Broadcast) 机制用于进程或线程间通信&#xff0c;广播分为广播发送和广播接收两个过程&#xff0c;其中广播接收者 BroadcastReceiver 是 Android 四大组件之一。BroadcastReceiver 分为两类&#xff1a; 静态广播接收者&#xff1a;通过 AndroidManifest.xm…

flutter 实现旋转星球

先看效果 planet_widget.dart import dart:math; import package:flutter/material.dart; import package:vector_math/vector_math_64.dart show Vector3; import package:flutter/gestures.dart; import package:flutter/physics.dart;class PlanetWidget extends StatefulW…

echarts-树图、关系图、桑基图、日历图

树图 树图主要用来表达关系结构。 树图的端点也收symbol的调节 树图的特有属性&#xff1a; 树图的方向&#xff1a; layout、orient子节点收起展开&#xff1a;initialTreeDepth、expandAndCollapse叶子节点设置&#xff1a; leaves操作设置&#xff1a;roam线条&#xff1a…

告别 Dart 中的 Future.wait([])

作为 Dart 开发人员&#xff0c;我们对异步编程和 Futures 的强大功能并不陌生。过去&#xff0c;当我们需要同时等待多个 future 时&#xff0c;我们依赖 Future.wait([]) 方法&#xff0c;该方法返回一个 List<T>。然而&#xff0c;这种方法有一个显着的缺点&#xff1…

2、xss-labs之level2

1、打开页面 2、传入xss代码 payload&#xff1a;<script>alert(xss)</script>&#xff0c;发现返回<script>alert(xss)</script> 3、分析原因 打开f12&#xff0c;没什么发现 看后端源码&#xff0c;在这form表单通过get获取keyword的值赋给$str&am…

跑大模型的经验

LLama2: 1. 使用torchrun来跑&#xff1a; torchrun --nproc_per_node 1 example_text_completion.py \--ckpt_dir llama-2-7b/ \--tokenizer_path tokenizer.model \--max_seq_len 128 --max_batch_size 4 关于集群分布式torchrun命令踩坑记录&#xff08;自用&#xff09;…

【Vue】input框自动聚焦且输入验证码后跳至下一位

场景&#xff1a;PC端 样式&#xff1a; <div class"verification-code-input"><input v-model"code[index]" v-for"(_, index) in 5" :key"index" type"text" maxlength"1" input"handleInput(i…

渲染管线——应用阶段

知识必备——CPU和GPU 应用阶段都做了什么 应用阶段为渲染准备了什么 1.把不可见的数据剔除 2.准备好模型相关数据&#xff08;顶点、法线、切线、贴图、着色器等等&#xff09; 3.将数据加载到显存中 4.设置渲染状态&#xff08;设置网格需要使用哪个着色器、材质、光源属性等…

说些什么好呢

大一&#xff1a;提前学C和C。学完语法去洛谷或者Acwing二选一&#xff0c;刷300道左右题目。主要培养编程思维&#xff0c;让自己的逻辑能够通过代码实现出来。 现在对算法有点感兴趣但是没有天赋&#xff0c;打不了acm&#xff0c;为就业做准备咯。 大二(算法竞赛)&#xff1…

常用损失函数学习

损失函数&#xff08;Loss Function&#xff09;&#xff0c;在机器学习和统计学中&#xff0c;是用来量化模型预测输出与真实结果之间差异的函数。简而言之&#xff0c;损失函数衡量了模型预测的好坏&#xff0c;目标是通过最小化这个函数来优化模型参数&#xff0c;从而提高预…

简述js的事件循环以及宏任务和微任务

前言 在JavaScript中&#xff0c;任务被分为同步任务和异步任务。 同步任务&#xff1a;这些任务在主线程上顺序执行&#xff0c;不会进入任务队列&#xff0c;而是直接在主线程上排队等待执行。每个同步任务都会阻塞后续任务的执行&#xff0c;直到它自身完成。常见的同步任…

【机器学习】机器学习与大型预训练模型的前沿探索:跨模态理解与生成的新纪元

&#x1f512;文章目录&#xff1a; &#x1f4a5;1.引言 ☔2.跨模态理解与生成技术概述 &#x1f6b2;3.大型预训练模型在跨模态理解与生成中的应用 &#x1f6f4;4.前沿探索与挑战并存 &#x1f44a;5.未来趋势与展望 &#x1f4a5;1.引言 近年来&#xff0c;机器学习领…

著名书法家王杰宝做客央视频《笔墨写人生》艺坛人物经典访谈节目

印象网北京讯&#xff08;张春兄、冯爱云&#xff09;展示艺术风采&#xff0c;构建时代精神。5月25日&#xff0c;著名书法家、羲之文化传承人王杰宝&#xff0c;做客央视频《笔墨写人生》艺坛人物经典访谈节目&#xff0c;与中央电视台纪录频道主持人姚文倩一起&#xff0c;分…

MyBatis 中的动态 SQL 的相关使用方法(Javaee/MyBatis)

MyBatis 的动态 SQL 是一种强大的特性&#xff0c;它可以让你在 XML 映射文件内&#xff0c;根据不同的条件编写不同的 SQL 语句。MyBatis 动态 SQL 主要元素有&#xff1a; <if>: 根据提供的条件来动态拼接 SQL。 接口定义 Integer insertUserByCondition(UserInfo u…

c++ list容器

std::list 是 C 标准库中的一个双向链表容器。与 std::vector&#xff08;动态数组&#xff09;和 std::deque&#xff08;双端队列&#xff09;不同&#xff0c;std::list 的元素在内存中不是连续存储的&#xff0c;而是分散存储并通过节点进行连接。这使得 std::list 在插入和…

SpringBoot 集成 ChatGPT(附实战源码)

建项目 项目结构 application.properties openai.chatgtp.modelgpt-3.5-turbo openai.chatgtp.api.keyREPLACE_WITH_YOUR_API_KEY openai.chatgtp.api.urlhttps://api.openai.com/v1/chat/completionsopenai.chatgtp.max-completions1 openai.chatgtp.temperature0 openai.cha…

全局平均池化笔记

全局平均池化&#xff08;Global Average Pooling, GAP&#xff09;是一种用于卷积神经网络&#xff08;CNN&#xff09;中的池化操作&#xff0c;其主要作用和优点包括&#xff1a; 减少参数数量&#xff1a;全局平均池化层将每个特征图通过取其所有元素的平均值&#xff0c;压…

ubuntu安装yum方法【最新可用】

一、安装命令 在根目录&#xff08;root&#xff09;下执行 sudo apt-get install build-essential sudo apt-get install yum二、出错处理 1、E: Package yum has no installation candidate 解决&#xff1a;更换镜像源&#xff0c;找到自己的系统版本用vim进行更换&#xff…

make是什么

make是什么工具 make是一个自动化编译工具,它本身并没有编译和链接的功能,而是用类似于批处理的方式——通过makefile文件中指示的依赖关系,调用makefile文件中使用的命令来完成编译和链接的。makefile文件中记录了源代码文件之间的依赖关系,并说明了如何编译各个源代码文…

GmSSL3.X编译iOS和Android动态库

一、环境准备 我用的Mac电脑编译&#xff0c;Xcode版本15.2&#xff0c;安卓的NDK版本是android-ndk-r21e。 1.1、下载国密源码 下载最新的国密SDK源码到本地。 1.2、安装Xcode 前往Mac系统的AppStore下载安装最新Xcode。 1.3、安卓NDK下载 下载NDK到本地&#xff0c;选…