Kubernetes kafka系列 | k8s部署kafka+zookeepe集群

一、kafka.zookeeper介绍

Kafka
简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。

特点:

高吞吐量: Kafka 能够处理大规模的消息流,并具有很高的吞吐量。
持久性: 它将消息持久化到磁盘上,因此即使消费者不在线,也能保证消息不会丢失。
可伸缩性: Kafka 可以很容易地水平扩展以处理大量数据。
实时性: Kafka 可以提供几乎实时的消息传递,适用于大多数实时数据处理需求。
用途:

日志收集: Kafka 可以用作集中式的日志收集系统,收集来自不同源头的日志数据。
消息队列: Kafka 可以用作分布式应用程序之间的消息队列,用于解耦和异步通信。
流处理: Kafka 可以与流处理框架(如Apache Spark、Apache Flink等)结合使用,用于实时数据处理和分析。
ZooKeeper
简介: Apache ZooKeeper 是一个开源的分布式协调服务,最初也是由Yahoo开发的,并于2010年成为Apache软件基金会的顶级项目。

特点:

分布式协调: ZooKeeper 提供了分布式应用程序的协调服务,包括配置管理、命名服务、分布式锁等。
高可用性: ZooKeeper 通过在集群中保持多个节点的复制来实现高可用性和容错性。
一致性: ZooKeeper 提供了严格的一致性,确保所有的客户端在同一时间看到相同的数据视图。
用途:

配置管理: ZooKeeper 可以用于分布式系统的配置管理,例如动态配置更新。
命名服务: ZooKeeper 可以提供命名服务,帮助分布式系统中的节点发现和通信。
分布式锁: ZooKeeper 可以用于实现分布式锁,确保在分布式系统中对共享资源的互斥访问。
Kafka 和 ZooKeeper 的关系

在 Kafka 中,ZooKeeper 主要用于管理集群的元数据(如主题、分区、副本分配等)、领导者选举以及生产者和消费者的协调。Kafka 依赖于 ZooKeeper 来确保分布式系统的稳定运行。通常情况下,Kafka 和 ZooKeeper 会一起部署,但它们是两个独立的项目,各自提供不同的功能。

二、创建存储卷

nfs动态供给直通车

三、搭建Kafka集群

# 操作系统
# CentOS Linux release 7.9.2009 (Core)
lsb_release -a# 内核版本
# 3.10.0-1160.90.1.el7.x86_64
uname -a
# k8s 版本 1.21
# zookeeper 版本 3.4.10  kafka镜像版本0.11(嫌低可以自己换)

kafka需要依赖zookeeper
kafka的生产者与消费者需要在zookeeper中注册,不然消费者怎么知道生产者是否存活之类的哈哈。废话不多说,直接上干货!

本文用的是statefulset和动态存储部署zookeeper和kafka集群。

zookeeper.yaml

apiVersion: v1
kind: Service
metadata:name: zk-hslabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
---
apiVersion: v1
kind: Service
metadata:name: zk-cslabels:app: zk
spec:ports:- port: 2181targetPort: 2181name: clientselector:app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: zk-pdb
spec:selector:matchLabels:app: zkmaxUnavailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zk
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: kubernetes-zookeeperimagePullPolicy: IfNotPresentimage: "zhaoguanghui6/kubernetes-zookeeper:1.0-3.4.10"resources:requests:memory: "0.5Gi"cpu: "0.5"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- "start-zookeeper \--servers=3 \--data_dir=/var/lib/zookeeper/data \--data_log_dir=/var/lib/zookeeper/data/log \--conf_dir=/opt/zookeeper/conf \--client_port=2181 \--election_port=3888 \--server_port=2888 \--tick_time=2000 \--init_limit=10 \--sync_limit=5 \--heap=512M \--max_client_cnxns=60 \--snap_retain_count=3 \--purge_interval=12 \--max_session_timeout=40000 \--min_session_timeout=4000 \--log_level=INFO"readinessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:storageClassName: nfs-clientaccessModes: [ "ReadWriteOnce" ]resources:requests:storage: 1Gi

for i in 0 1 2; do kubectl exec zk-$i – hostname -f; done
zk-0.zk-headless.default.svc.cluster.local

zk-1.zk-headless.default.svc.cluster.local

zk-2.zk-headless.default.svc.cluster.local
kafka.yaml

---
apiVersion: v1
kind: Service
metadata:name: kafka-hslabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
--- 
apiVersion: v1
kind: Service
metadata:name: kafka-cslabels:app: kafka
spec:selector:app: kafkatype: NodePortports:- name: clientport: 9092nodePort: 30092
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: kafka-pdb
spec:selector:matchLabels:app: kafkaminAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:serviceName: kafka-hsreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: kafkaimagePullPolicy: IfNotPresentimage: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} --override listeners=PLAINTEXT://:9092 --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181 --override log.dir=/var/lib/kafka --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override background.threads=10 --override compression.type=producer --override delete.topic.enable=true --override leader.imbalance.check.interval.seconds=300 --override leader.imbalance.per.broker.percentage=10 --override log.flush.interval.messages=9223372036854775807 --override log.flush.offset.checkpoint.interval.ms=60000 --override log.flush.scheduler.interval.ms=9223372036854775807 --override log.retention.bytes=-1 --override log.retention.hours=168 --override log.roll.hours=168 --override log.roll.jitter.hours=0 --override log.segment.bytes=1073741824 --override log.segment.delete.delay.ms=60000 --override message.max.bytes=1000012 --override min.insync.replicas=1 --override num.io.threads=8 --override num.network.threads=3 --override num.recovery.threads.per.data.dir=1 --override num.replica.fetchers=1 --override offset.metadata.max.bytes=4096 --override offsets.commit.required.acks=-1 --override offsets.commit.timeout.ms=5000 --override offsets.load.buffer.size=5242880 --override offsets.retention.check.interval.ms=600000 --override offsets.retention.minutes=1440 --override offsets.topic.compression.codec=0 --override offsets.topic.num.partitions=50 --override offsets.topic.replication.factor=3 --override offsets.topic.segment.bytes=104857600 --override queued.max.requests=500 --override quota.consumer.default=9223372036854775807 --override quota.producer.default=9223372036854775807 --override replica.fetch.min.bytes=1 --override replica.fetch.wait.max.ms=500 --override replica.high.watermark.checkpoint.interval.ms=5000 --override replica.lag.time.max.ms=10000 --override replica.socket.receive.buffer.bytes=65536 --override replica.socket.timeout.ms=30000 --override request.timeout.ms=30000 --override socket.receive.buffer.bytes=102400 --override socket.request.max.bytes=104857600 --override socket.send.buffer.bytes=102400 --override unclean.leader.election.enable=true --override zookeeper.session.timeout.ms=6000 --override zookeeper.set.acl=false --override broker.id.generation.enable=true --override connections.max.idle.ms=600000 --override controlled.shutdown.enable=true --override controlled.shutdown.max.retries=3 --override controlled.shutdown.retry.backoff.ms=5000 --override controller.socket.timeout.ms=30000 --override default.replication.factor=1 --override fetch.purgatory.purge.interval.requests=1000 --override group.max.session.timeout.ms=300000 --override group.min.session.timeout.ms=6000 --override inter.broker.protocol.version=0.10.2-IV0 --override log.cleaner.backoff.ms=15000 --override log.cleaner.dedupe.buffer.size=134217728 --override log.cleaner.delete.retention.ms=86400000 --override log.cleaner.enable=true --override log.cleaner.io.buffer.load.factor=0.9 --override log.cleaner.io.buffer.size=524288 --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio=0.5 --override log.cleaner.min.compaction.lag.ms=0 --override log.cleaner.threads=1 --override log.cleanup.policy=delete --override log.index.interval.bytes=4096 --override log.index.size.max.bytes=10485760 --override log.message.timestamp.difference.max.ms=9223372036854775807 --override log.message.timestamp.type=CreateTime --override log.preallocate=false --override log.retention.check.interval.ms=300000 --override max.connections.per.ip=2147483647 --override num.partitions=1 --override producer.purgatory.purge.interval.requests=1000 --override replica.fetch.backoff.ms=1000 --override replica.fetch.max.bytes=1048576 --override replica.fetch.response.max.bytes=10485760 --override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx1G -Xms1G"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"timeoutSeconds: 5periodSeconds: 5initialDelaySeconds: 70securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: "nfs-client"spec:accessModes: [ "ReadWriteMany" ]resources:requests:storage: 5Gi

四、验证集群
验证kafka是否可用:

1、进入kafka-0命令: kubectl exec -it kafka-0 bash
进入容器目录:cd /opt/kafka/config

2、创建一个名为aaa的topc命令:kafka-topics.sh --create --topic aaa --zookeeper zk-0.zk-headless.default.svc.cluster.local:2181,zk-1.zk-headless.default.svc.cluster.local:2181,zk-2.zk-headless.default.svc.cluster.local:2181 --partitions 3 --replication-factor 2
结果为:
Created topic “aaa”.

3、进入topic为aaa的生产者消息中心:kafka-console-consumer.sh --topic aaa --bootstrap-server localhost:9092

4、复制新的会话,进入另一个容器kafka-1:kubectl exec -it kafka-1 bash

进入消费者,输入命令:kafka-console-producer.sh --topic aaa --broker-list localhost:9092

输入:

hello

i lovle you

回车后,可在生产者消息中心看到消息

最新文章链接,含镜像制作,v3.5.2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/750811.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【TB作品】MSP430,波形发生器,单片机,Proteus仿真

文章目录 题目效果梯形波100个点产生方法锯齿波100个点产生方法c代码和proteus仿真 题目 114 波形发生器的制作 设计要求 设计一个能产生正弦波、方波、三角波、梯形波、锯齿波的波形发生器。设置5个开关K1~K5(从 上到下),分别对应正弦波、方波、三角波、梯形波、锯齿波,按一下…

短剧分销怎么赚钱的?保姆级教程助你短剧cps推广赚大钱

短剧分销怎么赚钱的?小白也能月入过万/“蜂小推“保姆级教程,助你短剧分销赚大钱! 相信大家或多或少都在某些群里看到一些“霸道总裁爱上职场小菜鸟...”“这类链接,无利不起早,为什么会有那么多在群里分享这些狗血视…

习题11-2 查找星期

本题要求实现函数,可以根据下表查找到星期,返回对应的序号。 序号星期0Sunday1Monday2Tuesday3Wednesday4Thursday5Friday6Saturday 函数接口定义: int getindex( char *s ); 函数getindex应返回字符串s序号。如果传入的参数s不是一个代表…

Pycharm连接远程服务器Anoconda中的虚拟环境

在配置远程解释器时,踩过一些坑,现在记录一下配置过程: 步骤1: 打开pycharm的File里面的Settings 里面的Project:你的项目名称目录下的Python Interpreter。 步骤二: 点击右上角的“add interpreter”,选择…

String为什么要设计成不可变的

String被设计成不可变的主要有以下几个原因: 线程安全:不可变的String对象可以被多个线程安全地共享,因为它们的值是在创建时确定的,不会改变。这样就避免了多线程环境下的并发访问和修改带来的竞争问题。 缓存哈希值&#xff1a…

美光领跑HBM3e赛道,率先获得NVIDIA H200 AI GPU订单

据韩国《中央日报》的一篇报道,在美光于2024年2月启动最新高带宽内存HBM3e的大规模生产后,近日已成功获得NVIDIA为其H200 AI GPU的订单。据悉,NVIDIA即将推出的H200处理器将会搭载最新的HBM3e,性能超越了H100处理器所使用的HBM3。…

Tomcat内存马

Tomcat内存马 前言 描述Servlet3.0后允许动态注册组件 这一技术的实现有赖于官方对Servlet3.0的升级,Servlet在3.0版本之后能够支持动态注册组件。 而Tomcat直到7.x才支持Servlet3.0,因此通过动态添加恶意组件注入内存马的方式适合Tomcat7.x及以上。…

【设计模式】Java 设计模式之组合模式(Composite)

组合模式(Composite Pattern)深入讲解 一、组合模式概述 组合模式允许你将对象组合成树形结构以表示“部分-整体”的层次结构,使得客户端对单个对象和复合对象的使用具有一致性。组合模式使得用户可以对单个对象和复合对象的使用具有一致性…

【AI+编程】利用chatGPT编写python程序处理日常excel工作提升效率小技巧

之前写过一篇AI编程相关的文章 【人工智能】为啥我最近很少写python编程文章了,浅谈AI编程RPA提升工作效率 。 最近有同学私信我,怎么利用AI编程来提升工作效率,除了文章里讲的 使用AI帮忙写算法、代码提示、代码优化、不同语言转换(如J…

Axios异步框架和Json数据格式

一.Axios异步框架 对原生的Ajax进行封装,简化书写。 给大家提供一下axios的源码: 链接:https://pan.baidu.com/s/1ZSrUBe0B4dIq7d6NpUzqOQ 提取码:gr86 将源码粘贴到项目之中。 1.基础使用 首先单独创建一个Servlet&#xf…

Godot 学习笔记(2):信号深入讲解

文章目录 前言相关链接环境信号简单项目搭建默认的信号先在label里面预制接收函数添加信号 自定义无参数信号为了做区分,我们在label新增一个函数 自定义带参数信号Button代码label代码连接信号 自定义复杂参数信号自定义GodotObject类ButtonLabel连接信号 信号函数…

数字IC实践项目(9)—SNN加速器的设计和实现(tiny_ODIN)

数字IC实践项目(9)—基于Verilog的SNN加速器 写在前面的话项目整体框图完整电路框图 项目简介和学习目的软件环境要求 Wave&CoverageTiming,Area & Power总结 写在前面的话 项目介绍: SNN硬件加速器是一种专为脉冲神经网…

uniapp样式穿透修改uview的按钮button图标

需求&#xff1a; 想给按钮icon和text改字体颜色&#xff0c;结果发现图标颜色并没有改变 .u-button{width: 300rpx;background-color: aliceblue;color: #aaaa7f;margin-top: 20rpx; }接下来用样式穿透解决 1、首先&#xff0c;给UI组件包裹一层view <view class"t…

ElasticSearch:数据的魔法世界

​ 欢迎来到ElasticSearch的奇妙之旅&#xff01;在这个充满魔法的搜索引擎世界中&#xff0c;数据不再是沉闷的数字和字母&#xff0c;而是变得充满活力和灵动。无论你是刚刚踏入数据探索的小白&#xff0c;还是已经对搜索引擎有所了解的行者&#xff0c;本篇博客都将为你揭示…

unity内存优化之AB包篇(微信小游戏)

1.搭建资源服务器使用(HFS软件(https://www.pianshen.com/article/54621708008/)) using System.Collections; using System.Collections.Generic; using UnityEngine;using System;public class Singleton<T> where T : class, new() {private static readonly Lazy<…

【集成开发环境】-VS Code:C/C++ 环境配置

简介 VS Code&#xff0c;全称Visual Studio Code&#xff0c;是一款由微软开发的跨平台源代码编辑器。它支持Windows、Linux和macOS等操作系统&#xff0c;并且具有轻量级、高效、可扩展等特点&#xff0c;深受广大开发者的喜爱。 VS Code拥有丰富的功能特性&#xff0c;包括…

c语言(字符串函数和内存函数的模拟实现)

1、模拟strlen&#xff08;临时变量法&#xff09; #include <stdio.h> #include <assert.h> int my_strlen(const char* str); int main() {char str[] "abcdefh";int ret my_strlen(str);printf("%d", ret);return 0; } int my_strlen( c…

代码随想录算法训练营第四十七天|198.打家劫舍, 213.打家劫舍II , 337.打家劫舍III

198.打家劫舍 https://leetcode.com/problems/house-robber/description/ 思路&#xff1a; 经典的动态规划问题&#xff0c;首先确定dp 数组记录的是打劫到第i家时的收获&#xff0c; dp[0] 0&#xff0c; dp[1] values[0]. 然后到第i 家有两个选择&#xff0c; 一个是打劫…

koa2+vue3通过exceljs实现数据导出excel文档

服务端安装exceljs依赖 npm i exceljs服务端实现代码 实现导出excel文件工具方法 const ExcelJS require(exceljs); /*** description: 导出excel文件* param {*} fileHeader 导出的表头* param {*} data 导出的数据* param {*} ctx 上下文对象* return {*}*/ async funct…

计算机三级网络技术综合题第三题、第四题详细解析

第三大题 DHCP报文分析&#xff08;10分&#xff09; 一、DHCP工作流程&#xff08;一般情况下&#xff09; 报文摘要 对应上面报文1—4 报文1、3DHCP&#xff1a;Request&#xff1b; 报文2、4DHCP&#xff1a;Reply。 例题&#xff08;第三套&#xff09;&#xff1a;在一…