拒绝 Helm? 如何在 K8s 上部署 KRaft 模式 Kafka 集群?

首发:运维有术

今天分享的主题是:不使用 Helm、Operator,如何在 K8s 集群上手工部署一个开启 SASL 认证的 KRaft 模式的 Kafka 集群?

本文,我将为您提供一份全面的实战指南,逐步引导您完成以下关键任务:

  1. 配置 Kafka Secret:管理用户密码和集群 ID
  2. 配置 Kafka Service:使用 NodePort 对外发布 Kafka 服务
  3. 使用 StatefulSet 部署 KRaft 模式 Kafka 集群
  4. 如何测试 Kafka 集群的可用性

通过本文的指导,您将掌握在 Kubernetes 上部署 KRaft 模式 Kafka 集群的必备技能。

实战服务器配置(架构1:1复刻小规模生产环境,配置略有不同)

主机名IPCPU内存系统盘数据盘用途
ksp-control-1192.168.9.1214840100KubeSphere/k8s-control-plane
ksp-control-2192.168.9.1224840100KubeSphere/k8s-control-plane
ksp-control-3192.168.9.1234840100KubeSphere/k8s-control-plane
ksp-worker-1192.168.9.12481640100k8s-worker/CI
ksp-worker-2192.168.9.12581640100k8s-worker
ksp-worker-3192.168.9.12681640100k8s-worker
ksp-storage192.168.9.1272440100NFS Storage
合计33876280700

实战环境涉及软件版本信息

  • 操作系统:CentOS Linux 7.9
  • KubeSphere:v3.3.1
  • Kubernetes:v1.24.3
  • Kafka: 3.6.2

1. 部署方案规划

目前在 K8s 集群部署 Kafka 的主流方案有以下几种:

  • Kafka Helm chart (Bitnami 出品,简单可定制,但是需要花时间成本学习可配置参数,建议生产环境使用
  • 手写资源配置清单(麻烦,涉及的组件、配置多,适用于不想或是不能使用 Helm、Operator 只能手撸的场景

往期我们实战演练过,如何使用 Helm 部署 Kafka 集群,具体内容可以参考KubeSphere 部署 Kafka 集群实战指南。本文我们使用手撸资源配置清单的方式部署 Kafka 集群。

资源配置清单规划如下:

  • 创建 Secret 资源存储 集群 ID和用户密码
  • 创建 HeadLess 和 NodePort 服务用于创建集群和外部访问
  • 创建 StatefulSet 部署 Kafka 服务
  • 考虑应用程序 Kafka 版本兼容性,选择了 Bitnami 提供的 3.6.2 版本的镜像
  • 持久化存储选择 NFS,考虑到性能、稳定性等因素,生产环境不建议使用

1.1 部署架构图

1.2 准备持久化存储

本实战环境使用 NFS 作为 K8s 集群的持久化存储,新集群可以参考探索 Kubernetes 持久化存储之 NFS 终极实战指南 部署 NFS 存储。

1.3 命名空间

Kafka 集群所有资源部署在命名空间 opsxlab内。

2. 部署 Kafka 集群

2.1 创建 Secret

  1. 创建管理 Kafka 集群各 Listener 所需密码的保密字典

明文密码必须使用 base64 加密,echo -n "PleaseChangeMe" | base64 -w0,生产环境请生成不同的密码。

请使用 vi 编辑器,创建资源清单文件 kafka-sasl-passwords-secret.yaml,并输入以下内容:

kind: Secret
apiVersion: v1
metadata:name: kafka-sasl-passwordslabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka
data:client-passwords: UGxlYXNlQ2hhbmdlTWU=controller-password: UGxlYXNlQ2hhbmdlTWU=inter-broker-password: UGxlYXNlQ2hhbmdlTWU=
type: Opaque
  1. 创建 Kafka 集群 UUID 保密字典

使用下面的命令,创建一个临时 Pod,生成 UUID 后自动删除。

$ kubectl run app-kafka-client --rm -i --image registry.opsxlab.cn:8443/bitnami/kafka:3.6.2 -n opsxlab -- /opt/bitnami/kafka/bin/kafka-storage.sh random-uuidRpOTPIfMRTiPpmCYJHF9KQ

将生成的明文 UUID 使用 base64 加密,echo -n "RpOTPIfMRTiPpmCYJHF9KQ" | base64 -w0

请使用 vi 编辑器,创建资源清单文件 kafka-kraft-cluster-id.yaml,并输入以下内容:

kind: Secret
apiVersion: v1
metadata:name: kafka-kraft-cluster-idlabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka
data:kraft-cluster-id: UnBPVFBJZk1SVGlQcG1DWUpIRjlLUQ==
type: Opaque
  1. 创建资源

执行下面的命令,创建资源。

kubectl apply -f kafka-sasl-passwords-secret.yaml -n opsxlab
kubectl apply -f kafka-kraft-cluster-id.yaml -n opsxlab
  1. 验证资源

执行下面的命令,查看创建结果。

$ kubectl get secret -n opsxlab
NAME                     TYPE     DATA   AGE
kafka-kraft-cluster-id   Opaque   1      5s
kafka-sasl-passwords     Opaque   3      6s

2.2 创建服务

服务规划说明:

  • 3个 Kafka 节点,采用 NodePort 方式在 Kubernetes 集群外发布 Kafka 服务
  • 3个 Kafka 节点,共用一个 Headless 服务,作用是给 Internal 和 Controller 两个 LISTENERS 提供内部域名。
  1. 创建 HeadLess 服务

请使用 vi 编辑器,创建资源清单文件 kafka-controller-headless.yaml,并输入以下内容:

kind: Service
apiVersion: v1
metadata:name: kafka-controller-hslabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka
spec:ports:- name: tcp-internalprotocol: TCPport: 9092targetPort: internal- name: tcp-controllerprotocol: TCPport: 9093targetPort: controllerselector:app.kubernetes.io/instance: app-kafkaclusterIP: Nonetype: ClusterIP
  1. 创建 Kafka-controller 节点1的 NodePort 服务

请使用 vi 编辑器,创建资源清单文件 kafka-controller-0-external.yaml,并输入以下内容:

kind: Service
apiVersion: v1
metadata:name: kafka-controller-0-externallabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka
spec:ports:- name: tcp-externalprotocol: TCPport: 9094targetPort: 9094nodePort: 31211selector:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-0type: NodePort
  1. 创建 kafka-controller 节点2 的 NodePort 服务

请使用 vi 编辑器,创建资源清单文件 kafka-controller-1-external.yaml,并输入以下内容:

kind: Service
apiVersion: v1
metadata:name: kafka-controller-1-externallabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka
spec:ports:- name: tcp-externalprotocol: TCPport: 9094targetPort: 9094nodePort: 31212selector:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-1type: NodePort
  1. 创建 Kafka-controller 节点3 的 NodePort 服务

请使用 vi 编辑器,创建资源清单文件 kafka-controller-2-external.yaml,并输入以下内容:

kind: Service
apiVersion: v1
metadata:name: kafka-controller-2-externallabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka
spec:ports:- name: tcp-externalprotocol: TCPport: 9094targetPort: 9094nodePort: 31213selector:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-2type: NodePort
  1. 创建资源

执行下面的命令,创建资源。

kubectl apply -f kafka-controller-headless.yaml -n opsxlab
kubectl apply -f kafka-controller-0-external.yaml -n opsxlab
kubectl apply -f kafka-controller-1-external.yaml -n opsxlab
kubectl apply -f kafka-controller-2-external.yaml -n opsxlab
  1. 验证资源

执行下面的命令,查看创建结果。

$ kubectl get svc -n opsxlab
NAME                          TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
kafka-controller-0-external   NodePort    10.233.1.92    <none>        9094:31211/TCP      8s
kafka-controller-1-external   NodePort    10.233.18.62   <none>        9094:31212/TCP      8s
kafka-controller-2-external   NodePort    10.233.38.37   <none>        9094:31213/TCP      8s
kafka-controller-hs           ClusterIP   None           <none>        9092/TCP,9093/TCP   8s

2.3 创建 Kafka 集群

使用 StatefulSet 部署 Kafka 集群,3个 Kafka 节点使用内容大部分相同的配置文件,必须修改的参数如下:

  • KAFKA_CFG_ADVERTISED_LISTENERS: 修改 EXTERNAL 对应的 IP 地址
  • KAFKA_HEAP_OPTS:根据资源和并发需求调整
  1. 创建节点1 资源清单

请使用 vi 编辑器,创建资源清单文件 kafka-controller-0-sts.yaml,并输入以下内容:

kind: StatefulSet
apiVersion: apps/v1
metadata:name: kafka-controller-0labels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-0
spec:replicas: 1selector:matchLabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-0template:metadata:labels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-0spec:containers:- name: kafkaimage: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2'ports:- name: intelrnalcontainerPort: 9092protocol: TCP- name: controllercontainerPort: 9093protocol: TCP- name: externalcontainerPort: 9094protocol: TCPenv:- name: BITNAMI_DEBUGvalue: 'false'- name: HOST_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.hostIP- name: KAFKA_HEAP_OPTSvalue: '-Xmx2048m -Xms1024m'- name: KAFKA_KRAFT_CLUSTER_IDvalueFrom:secretKeyRef:name: kafka-kraft-cluster-idkey: kraft-cluster-id- name: KAFKA_CLIENT_USERSvalue: user1- name: KAFKA_CLIENT_PASSWORDSvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: client-passwords- name: KAFKA_INTER_BROKER_USERvalue: inter_broker_user- name: KAFKA_INTER_BROKER_PASSWORDvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: inter-broker-password- name: KAFKA_CONTROLLER_USERvalue: controller_user- name: KAFKA_CONTROLLER_PASSWORDvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: controller-password- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLvalue: PLAIN- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLvalue: PLAIN- name: KAFKA_CFG_NODE_IDvalue: '0'- name: KAFKA_CFG_PROCESS_ROLESvalue: 'controller,broker'- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: >-0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093- name: KAFKA_CFG_LISTENERSvalue: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094'- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: >-INTERNAL://kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31211- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: >-INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: INTERNAL- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORvalue: '3'- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: '3'- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRvalue: '2'resources:limits:cpu: '1'memory: 2Girequests:cpu: 50mmemory: 512MivolumeMounts:- name: datamountPath: /bitnami/kafkalivenessProbe:exec:command:- pgrep- '-f'- kafkainitialDelaySeconds: 10timeoutSeconds: 5periodSeconds: 10successThreshold: 1failureThreshold: 3readinessProbe:tcpSocket:port: controllerinitialDelaySeconds: 5timeoutSeconds: 5periodSeconds: 10successThreshold: 1failureThreshold: 6terminationMessagePath: /dev/termination-logterminationMessagePolicy: FileimagePullPolicy: IfNotPresentrestartPolicy: AlwaysterminationGracePeriodSeconds: 30dnsPolicy: ClusterFirstaffinity:podAntiAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchLabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafkatopologyKey: kubernetes.io/hostnamevolumeClaimTemplates:- kind: PersistentVolumeClaimapiVersion: v1metadata:name: dataspec:accessModes:- ReadWriteOnceresources:requests:storage: 10GistorageClassName: nfs-scvolumeMode: FilesystemserviceName: kafka-controller-hs
  1. 创建节点2 资源清单

请使用 vi 编辑器,创建资源清单文件 kafka-controller-1-sts.yaml,并输入以下内容:

kind: StatefulSet
apiVersion: apps/v1
metadata:name: kafka-controller-1labels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-1
spec:replicas: 1selector:matchLabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-1template:metadata:labels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-1spec:containers:- name: kafkaimage: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2'ports:- name: intelrnalcontainerPort: 9092protocol: TCP- name: controllercontainerPort: 9093protocol: TCP- name: externalcontainerPort: 9094protocol: TCPenv:- name: BITNAMI_DEBUGvalue: 'false'- name: HOST_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.hostIP- name: KAFKA_HEAP_OPTSvalue: '-Xmx2048m -Xms1024m'- name: KAFKA_KRAFT_CLUSTER_IDvalueFrom:secretKeyRef:name: kafka-kraft-cluster-idkey: kraft-cluster-id- name: KAFKA_CLIENT_USERSvalue: user1- name: KAFKA_CLIENT_PASSWORDSvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: client-passwords- name: KAFKA_INTER_BROKER_USERvalue: inter_broker_user- name: KAFKA_INTER_BROKER_PASSWORDvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: inter-broker-password- name: KAFKA_CONTROLLER_USERvalue: controller_user- name: KAFKA_CONTROLLER_PASSWORDvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: controller-password- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLvalue: PLAIN- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLvalue: PLAIN- name: KAFKA_CFG_NODE_IDvalue: '1'- name: KAFKA_CFG_PROCESS_ROLESvalue: 'controller,broker'- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: >-0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093- name: KAFKA_CFG_LISTENERSvalue: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094'- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: >-INTERNAL://kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31212- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: >-INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: INTERNAL- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORvalue: '3'- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: '3'- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRvalue: '2'resources:limits:cpu: '1'memory: 2Girequests:cpu: 50mmemory: 512MivolumeMounts:- name: datamountPath: /bitnami/kafkalivenessProbe:exec:command:- pgrep- '-f'- kafkainitialDelaySeconds: 10timeoutSeconds: 5periodSeconds: 10successThreshold: 1failureThreshold: 3readinessProbe:tcpSocket:port: controllerinitialDelaySeconds: 5timeoutSeconds: 5periodSeconds: 10successThreshold: 1failureThreshold: 6terminationMessagePath: /dev/termination-logterminationMessagePolicy: FileimagePullPolicy: IfNotPresentrestartPolicy: AlwaysterminationGracePeriodSeconds: 30dnsPolicy: ClusterFirstaffinity:podAntiAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchLabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafkatopologyKey: kubernetes.io/hostnamevolumeClaimTemplates:- kind: PersistentVolumeClaimapiVersion: v1metadata:name: dataspec:accessModes:- ReadWriteOnceresources:requests:storage: 10GistorageClassName: nfs-scvolumeMode: FilesystemserviceName: kafka-controller-hs
  1. 创建节点3 资源清单

请使用 vi 编辑器,创建资源清单文件 kafka-controller-2-sts.yaml,并输入以下内容:

kind: StatefulSet
apiVersion: apps/v1
metadata:name: kafka-controller-2labels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-2
spec:replicas: 1selector:matchLabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-2template:metadata:labels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafka-controller-2spec:containers:- name: kafkaimage: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2'ports:- name: intelrnalcontainerPort: 9092protocol: TCP- name: controllercontainerPort: 9093protocol: TCP- name: externalcontainerPort: 9094protocol: TCPenv:- name: BITNAMI_DEBUGvalue: 'false'- name: HOST_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.hostIP- name: KAFKA_HEAP_OPTSvalue: '-Xmx2048m -Xms1024m'- name: KAFKA_KRAFT_CLUSTER_IDvalueFrom:secretKeyRef:name: kafka-kraft-cluster-idkey: kraft-cluster-id- name: KAFKA_CLIENT_USERSvalue: user1- name: KAFKA_CLIENT_PASSWORDSvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: client-passwords- name: KAFKA_INTER_BROKER_USERvalue: inter_broker_user- name: KAFKA_INTER_BROKER_PASSWORDvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: inter-broker-password- name: KAFKA_CONTROLLER_USERvalue: controller_user- name: KAFKA_CONTROLLER_PASSWORDvalueFrom:secretKeyRef:name: kafka-sasl-passwordskey: controller-password- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLvalue: PLAIN- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLvalue: PLAIN- name: KAFKA_CFG_NODE_IDvalue: '2'- name: KAFKA_CFG_PROCESS_ROLESvalue: 'controller,broker'- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: >-0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093- name: KAFKA_CFG_LISTENERSvalue: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094'- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: >-INTERNAL://kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31213- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: >-INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: INTERNAL- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORvalue: '3'- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: '3'- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRvalue: '2'resources:limits:cpu: '1'memory: 2Girequests:cpu: 50mmemory: 512MivolumeMounts:- name: datamountPath: /bitnami/kafkalivenessProbe:exec:command:- pgrep- '-f'- kafkainitialDelaySeconds: 10timeoutSeconds: 5periodSeconds: 10successThreshold: 1failureThreshold: 3readinessProbe:tcpSocket:port: controllerinitialDelaySeconds: 5timeoutSeconds: 5periodSeconds: 10successThreshold: 1failureThreshold: 6terminationMessagePath: /dev/termination-logterminationMessagePolicy: FileimagePullPolicy: IfNotPresentrestartPolicy: AlwaysterminationGracePeriodSeconds: 30dnsPolicy: ClusterFirstaffinity:podAntiAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchLabels:app.kubernetes.io/instance: app-kafkaapp.kubernetes.io/name: kafkatopologyKey: kubernetes.io/hostnamevolumeClaimTemplates:- kind: PersistentVolumeClaimapiVersion: v1metadata:name: dataspec:accessModes:- ReadWriteOnceresources:requests:storage: 10GistorageClassName: nfs-scvolumeMode: FilesystemserviceName: kafka-controller-hs
  1. 创建资源

执行下面的命令,创建资源。

kubectl apply -f kafka-controller-0-sts.yaml -n opsxlab
kubectl apply -f kafka-controller-1-sts.yaml -n opsxlab
kubectl apply -f kafka-controller-2-sts.yaml -n opsxlab
  1. 验证资源

执行下面的命令,查看创建结果(初次创建比较慢)。

$ kubectl get sts,pod -n opsxlab
NAME                                  READY   AGE
statefulset.apps/kafka-controller-0   1/1     25s
statefulset.apps/kafka-controller-1   1/1     25s
statefulset.apps/kafka-controller-2   1/1     24sNAME                       READY   STATUS    RESTARTS   AGE
pod/kafka-controller-0-0   1/1     Running   0          24s
pod/kafka-controller-1-0   1/1     Running   0          24s
pod/kafka-controller-2-0   1/1     Running   0          23s

3. 验证测试 Kafka 服务可用性

分别在 k8s 集群内和集群外验证 Kafka 服务的可用性。

3.1 k8s 集群内部验证

  1. 创建测试 Pod
kubectl run opsxlab-kafka-client --restart='Never' --image registry.opsxlab.cn:8443/bitnami/kafka:3.6.2 --namespace opsxlab --command -- sleep infinity
  1. 生成 client.properties
cat << EOF > /tmp/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="PleaseChangeMe";
EOF
  1. 复制到测试容器app-kafka-client内部
kubectl cp --namespace opsxlab /tmp/client.properties opsxlab-kafka-client:/tmp/client.properties
  1. 打开测试 Pod 终端
kubectl exec --tty -i opsxlab-kafka-client --namespace opsxlab -- bash
  1. 创建主题
kafka-topics.sh --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config /tmp/client.properties
  1. 查看副本
$ kafka-topics.sh --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 --topic test-topic --describe --command-config /tmp/client.properties
Topic: test-topic       TopicId: yNWQQ6yKSBeLmvVUFf2IVw PartitionCount: 3       ReplicationFactor: 3    Configs:Topic: test-topic       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2Topic: test-topic       Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0Topic: test-topic       Partition: 2    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
  1. 执行命令,生产数据
kafka-console-producer.sh \--broker-list kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \--topic test-topic --producer.config /tmp/client.properties
  1. 再打开一个测试 Pod 终端,消费数据

再打开一个终端,然后再执行下面的命令。

kafka-console-consumer.sh \--bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \--topic test-topic \--from-beginning --consumer.config /tmp/client.properties
  1. 生产并消费数据测试

在生产者一侧随便输入测试数据,观察消费者一侧是否正确收到信息。

生产者侧:

I have no name!@opsxlab-kafka-client:/$ kafka-console-producer.sh \--broker-list kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \--topic test-topic --producer.config /tmp/client.properties
>cluster kafka test 1
>cluster kafka test 2
>cluster kafka test 3

消费者侧:

I have no name!@opsxlab-kafka-client:/$ kafka-console-consumer.sh \--bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \--topic test-topic \--from-beginning --consumer.config /tmp/client.properties
cluster kafka test 1
cluster kafka test 2
cluster kafka test 3

3.2 K8s 集群外部验证

为了更严谨的测试 Kafka 在 K8s 集群外的可用性,我在 K8s 集群外找了一台机器,安装 JDK 和 Kafka。安装方式上 JDK 选择了 Yum 安装 openjdk,Kafka 则选用了官方提供的3.9.0最新版本的二进制包。

实际测试时还可以选择 Docker 镜像或是在 K8s 集群上再创建一个 Pod,测试时连接 K8s 节点的宿主机 IP 和 NodePort。

  1. 准备外部测试环境
# 安装 JDK
yum install java-1.8.0-openjdk# 下载 Kafka
cd /srv
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.9.0.tgz# 解压
tar xvf kafka_2.13-3.9.0.tgz
cd /srv/kafka_2.13-3.9.0/bin
  1. 获取 Kafka 外部访问配置信息

本文使用一个 Master 节点,作为 Kafka NodePort 的 IP,实际使用中建议使用多个 Worker 节点,每个 Pod 对应一个 Worker节点IP。

下面测试的 Broker Server 地址使用 192.168.9.121:31211

  1. 生成 client.properties
cat << EOF > /tmp/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="PleaseChangeMe";
EOF
  1. 外部节点连接 Kafka 测试

跟 K8s 集群内部验证测试过程一样,打开两个终端,运行生产者和消费者脚本。执行下面的命令验证测试(细节略过,直接上结果)。

外部生产者侧:

$ ./kafka-console-producer.sh --broker-list 192.168.9.121:31211 --topic test-topic --producer.config /tmp/client.properties
>external kafka test 10
>external kafka test 20
>external kafka test 30

外部消费者侧:

$ ./kafka-console-consumer.sh --bootstrap-server 192.168.9.121:31211 --topic test-topic --from-beginning --consumer.config /tmp/client.propertiesexternal kafka test 10
external kafka test 20
external kafka test 30
cluster kafka test 1
cluster kafka test 2
cluster kafka test 3

注意: K8s 集群外部消费者能消费到所有数据,包括集群内部测试时生成的数据。

集群内消费者侧: 集群内的消费者,同样能获取外部生产者产生的数据。

I have no name!@opsxlab-kafka-client:/$ kafka-console-consumer.sh \--bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \--topic test-topic \--from-beginning --consumer.config /tmp/client.properties
cluster kafka test 1
cluster kafka test 2
cluster kafka test 3
external kafka test 10
external kafka test 20
external kafka test 30
  1. 删除测试 Topic
./kafka-topics.sh --bootstrap-server 192.168.9.121:31211 --delete --topic test-topic --command-config /tmp/client.properties
  1. 查看 Topic
./kafka-topics.sh --bootstrap-server 192.168.9.121:31211 --list --command-config /tmp/client.properties

以上,就是我今天分享的全部内容。

免责声明:

  • 笔者水平有限,尽管经过多次验证和检查,尽力确保内容的准确性,但仍可能存在疏漏之处。敬请业界专家大佬不吝指教。
  • 本文所述内容仅通过实战环境验证测试,读者可学习、借鉴,但严禁直接用于生产环境由此引发的任何问题,作者概不负责

近期活动推荐

本文由博客一文多发平台 OpenWrite 发布!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/64799.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面向微服务的Spring Cloud Gateway的集成解决方案:用户登录认证与访问控制

&#x1f3af;导读&#xff1a;本文档详细描述了一个基于Spring Cloud Gateway的微服务网关及Admin服务的实现。网关通过定义路由规则&#xff0c;利用负载均衡将请求转发至不同的后端服务&#xff0c;并集成了Token验证过滤器以确保API的安全访问&#xff0c;同时支持白名单路…

浅析InnoDB引擎架构(已完结)

大家好&#xff0c;我是此林。 今天来介绍下InnoDB底层架构。 1. 磁盘架构 我们所有的数据库文件都保存在 /var/lib/mysql目录下。 由于我这边是docker部署的mysql&#xff0c;用如下命令查看mysql数据挂载。 docker inspect mysql-master 如下图&#xff0c;目前只有一个数…

Ajax中的axios

既然提到Ajax&#xff0c;那就先来说一说什么是Ajax吧 关于Ajax Ajax的定义 Asynchronous JavaScript And XML&#xff1a;异步的JavaScript和XML。 反正就是一句话总结&#xff1a; 使用XML HttpRequest 对象与服务器进行通讯。 AJAX 是一种在无需重新加载整个网页的情况下&…

苹果手机怎么清理空间:拯救你的拥挤手机

在数字生活的海洋中&#xff0c;我们的苹果手机就像一艘小船&#xff0c;载满了照片、应用、视频和各种下载的“宝贝”。随着时间的推移&#xff0c;这艘小船开始变得拥挤&#xff0c;航行速度放缓&#xff0c;甚至有时候直接卡壳。苹果手机怎么清理空间&#xff1f;是时候学会…

三、使用langchain搭建RAG:金融问答机器人--检索增强生成

经过前面2节数据准备后&#xff0c;现在来构建检索 加载向量数据库 from langchain.vectorstores import Chroma from langchain_huggingface import HuggingFaceEmbeddings import os# 定义 Embeddings embeddings HuggingFaceEmbeddings(model_name"m3e-base")#…

C语言 函数嵌套

#include <stdio.h> void new_line() {printf("hehe\n"); } void three_line() {int i 0;for (i 0; i < 3; i){new_line;} } int main() {three_line();return 0; } 函数可以嵌套调用&#xff0c;但不能嵌套定义 链式访问 main有三个参数 //main函数的…

问题解决:发现Excel中的部分内容有问题。是否让我们尽量尝试恢复? 如果您信任此工作簿的源,请单击“是”。

在开发同步导出功能是遇到了如标题所示的问题&#xff0c;解决后遂记录下来供大家参考。 RestController public class XxxController {PostMapping("/export")public BaseResponse export(RequestBody PolicyErrorAnalysisExportReq exportReq, HttpServletRespons…

基于ST STM32MP257FAK3的MP2控制器之工业PLC 方案

简介 1.可编程逻辑控制器&#xff08;PLC&#xff09;是种专门为在工业环境下应用而设计的数字运算操作电子系统。它采用一种可编程的存储器&#xff0c;在其内部存储执行逻辑运算、顺序控制、定时、计数和算术运算等操作的指令&#xff0c;通过数字式或模拟式的输入输出来控制…

golang自定义MarshalJSON、UnmarshalJSON 原理和技巧

问题出现的原因&#xff1a;在前后端分离的项目中&#xff0c;经常出现的问题是时间戳格式的问题。 后端的日期格式兼容性强&#xff0c;比较完善。前端由于各种原因&#xff0c;日期格式不完善。 就会产生矛盾。 ms int64比较通用&#xff0c;但是unix时间没有可读性&#xff…

初始Python篇(7)—— 正则表达式

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a; Python 目录 正则表达式的概念 正则表达式的组成 元字符 限定符 其他字符 正则表达式的使用 正则表达式的常见操作方法 match方法的…

使用 AI 辅助开发一个开源 IP 信息查询工具:一

本文将分享如何借助当下流行的 AI 工具,一步步完成一个开源项目的开发。 写在前面 在写代码时&#xff0c;总是会遇到一些有趣的机缘巧合。前几天&#xff0c;我在翻看自己之前的开源项目时&#xff0c;又看到了 DDNS 相关的讨论。虽然在 2021 年我写过两篇相对详细的教程&am…

Powershell学习笔记

声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&a…

《Java源力物语》-2.异常训练场

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” \quad 在java.lang古域的一处偏僻角落&#xff0c;矗立着一座古老的训练场。青灰色的围墙上布满了密密麻麻的源力符文&#xff0c;这些符文闪烁着…

一起学Git【第二节:创建版本库】

创建库 这个库相当于一个目录&#xff0c;目录中的文件都被Git管理&#xff0c;会记录每个文件的修改删除和添加工作&#xff0c;便于之后随时跟踪历史记录还原到之前的某一版本。如何创建库呢&#xff1f;有两种方式&#xff0c;本地创建库和云端克隆一个库。 1.本地创建库 …

HarmonyOS NEXT 技术实践-基于基础视觉服务的多目标识别

在智能手机、平板和其他智能设备日益智能化的今天&#xff0c;视觉识别技术成为提升用户体验和智能交互的重要手段。HarmonyOS NEXT通过基础视觉服务&#xff08;HMS Core Vision&#xff09;提供了一套强大的视觉识别功能&#xff0c;其中多目标识别作为其关键技术之一&#x…

nginx-静态资源部署

目录 静态资源概述 静态资源配置指令 listen指令 server_name指令 精确匹配 ​编辑 ​编辑 使用通配符匹配 使用正则表达式匹配 匹配执行顺序 default_server属性 location指令 root指令 alias指令 root与alisa指令的区别 index指令 error_page指令 直接使用 …

时空信息平台架构搭建:基于netty封装TCP通讯模块(IdleStateHandler网络连接监测,处理假死)

文章目录 引言I 异步TCP连接操作II 心跳机制:空闲检测(读空闲和写空闲)基于Netty的IdleStateHandler类实现心跳机制(网络连接监测)常规的处理假死健壮性的处理假死方案获取心跳指令引言 基于netty实现TCP客户端:封装断线重连、连接保持 https://blog.csdn.net/z92911896…

Linux之RPM和YUM命令

一、RPM命令 1、介绍 RPM(RedHat Package Manager).,RedHat软件包管理工具&#xff0c;类似windows里面的setup,exe是Liux这系列操作系统里而的打包安装工具。 RPMI包的名称格式&#xff1a; Apache-1.3.23-11.i386.rpm “apache’” 软件名称“1.3.23-11” 软件的版本号&am…

aosp15 - Activity生命周期切换

本文探查的是&#xff0c;从App冷启动后到MainActivity生命周期切换的系统实现。 调试步骤 在com.android.server.wm.RootWindowContainer#attachApplication 方法下断点&#xff0c;为了attach目标进程在com.android.server.wm.ActivityTaskSupervisor#realStartActivityLock…

【漫话机器学习系列】017.大O算法(Big-O Notation)

大 O 表示法&#xff08;Big-O Notation&#xff09; 大 O 表示法是一种用于描述算法复杂性的数学符号&#xff0c;主要用于衡量算法的效率&#xff0c;特别是随着输入规模增大时算法的运行时间或占用空间的增长趋势。 基本概念 时间复杂度 描述算法所需的运行时间如何随输入数…