Linux系统上搭建高可用Kafka集群(使用自带的zookeeper)

本次在CentOS7.6上搭建Kafka集群

Apache Kafka 是一个高吞吐量的分布式消息系统,被广泛应用于大规模数据处理和实时数据管道中。本文将介绍在CentOS操作系统上搭建Kafka集群的过程,以便于构建可靠的消息处理平台。

文件分享(KafkaUI、kafka3.3.2、jdk1.8)

链接:https://pan.baidu.com/s/1dn_mQKc1FnlQvuSgGjBc1w?pwd=t2v9 提取码:t2v9

也可以从官网自己下

Kafka官网

步骤概览

本次使用三台机器 hostname别名分别为res01(10.41.7.41)、res02(10.41.7.42)、res03(10.41.7.43)

在这个教程中,我们将覆盖以下主要步骤:

  1. 准备环境:安装和配置Java、Zookeeper和Kafka所需的依赖。
  2. 配置Zookeeper集群:确保Kafka有可靠的分布式协调服务。(本次使用Kafka自带的zookeeper)
  3. 配置Kafka集群:在每个节点上安装和配置Kafka,设置Kafka集群以实现高性能和高可用性。

步骤详解

步骤1:环境准备 (先对第一台服务器操作 其他的使用scp传输之后进行小修改就好)

安装Java

确保Java安装正确,并设置JAVA_HOME环境变量。

如果对于jdk安装有问题的可以看一下这篇Linux安装MySQL、JDK(含环境变量配置)、Tomcat

步骤2:安装Kafka

下载和解压Kafka
[root@res01 module]# clear
[root@res01 module]# ll
总用量 104124
drwxr-xr-x. 4 root root        40 11月 10 10:07 data
-rw-r--r--. 1 root root 106619987 11月 10 10:33 kafka_2.13-3.3.2.tgz
[root@res01 module]# tar -zxvf kafka_2.13-3.3.2.tgz 
解压之后通过mv改名

步骤3:配置zookeeper

进入文件夹kafka3.3.2中找到config

接下来主要修改zookeeper.properties和server.properties这两个文件

zookeeper.properties如下

# 需要去新建/opt/module/data/zookeeper下面这两个文件夹
dataDir=/opt/module/data/zookeeper/data
dataLogDir=/opt/module/data/zookeeper/logs
clientPort=12181
maxClientCnxns=0
admin.enableServer=false
tickTime=2000
initLimit=10
syncLimit=5
# server.X=hostname:peerPort:leaderPort
# peerPort 是服务器之间通信的端口。
# leaderPort 是用于选举 leader 的端口。
server.1=res01:12182:12183
server.2=res02:12182:12183
server.3=res03:12182:12183#res01、res02、res03是我本地设置过的主机名 如果没设置使用ip地址即可
在每个节点上zookeeper的配置文件中dataDir目录下创建一个名为myid的文件,并分别填入相应节点的ID号:123

步骤4:配置Kafka

编辑Kafka配置文件config/server.properties,设置broker.idzookeeper.connect

# 设置 broker.id 这个是 Kafka 集群区分每个节点的唯一标志符。 对应那个myid即可
broker.id=1
# 将监听端口设置为19091
listeners=PLAINTEXT://res01:19091# 将广告给客户端的地址也设置为19091
advertised.listeners=PLAINTEXT://res01:19091num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## 设置 Kafka 的数据存储路径 这个目录下不能有其他非 Kafka 目录,不然会导致 Kafka 集群无法启动。
log.dirs=/opt/module/data/kafka-log
# 默认的 Partition 的个数。
num.partitions=3
# 设置默认的复制因子为3
default.replication.factor=3num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# Kafka 的数据保留的时间,默认是 7 天 168h。 这里使用24小时
log.retention.hours=24log.retention.check.interval.ms=300000
# Kafka 连接的 ZooKeeper 的地址和连接 Kafka 的超时时间。
zookeeper.connect=res01:12181,res02:12181,res03:12181
zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0
# 设置是否可以删除 Topic,默认 Kafka 的 Topic 是不允许删除的 这里打开了
delete.topic.enable=true# 这是用于启用或禁用日志清理的选项,默认值为 true,以确保 Kafka 持续进行日志清理。需要根据实际需求进行设置。
log.cleaner.enable=true
# 这个参数控制日志清理线程的数量。对于你的硬件配置,你可以考虑设置为 4 或 8 来充分利用服务器的性能。
log.cleaner.threads=4
# 这个参数用于控制日志清理线程的 IO 缓冲区大小。对于你的硬件配置,可以设置为 8192 或 16384。
log.cleaner.io.buffer.size=8192
# 这个参数是用来设置主题日志保留的最大字节数。对于控制磁盘空间的使用非常重要。例如,如果你希望限制每个主题的数据量不超过 100GB,可以设置为 107374182400
log.retention.bytes=107374182400
# 这个参数用于控制每个日志段文件的最大大小。对于你的硬件配置,你可以设置为 1073741824(即 1GB)。
log.segment.bytes=1073741824
# 这个参数用于设置 Zookeeper 会话的超时时间。对于较大的集群和连接较慢的网络,你可以考虑将其设置为 10000,即 10 秒。
zookeeper.session.timeout.ms=10000

重点是这个:

# 设置默认的复制因子为3
default.replication.factor=3

在Kafka集群的每个节点上,修改broker.id为对应的节点ID。

配置kafka环境变量

#java环境
export JAVA_HOME=/usr/local/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#kafka环境
export KAFKA_HOME=/opt/module/kafka3.3.2
export PATH=$PATH:$KAFKA_HOME/bin

步骤5:复制虚拟机

(已有其他服务器的直接连网线scp就好 环境变量 配置小改一下就好 还有hosts、ip等等别忘了配置,我这里直接复制虚拟机了 )

配置ip、hostname以及hosts后尝试ping

res02修改kafka的config文件即可(zookeeper配置文件都一样 用res01配置的就好)

res03的kafka配置文件同理

以及各台机器的myid(对应上brokerId即可)

步骤6:配置Kafka集群

确保防火墙或安全组允许Kafka端口通过,通常是9092端口。(我这是修改过的为19091,我直接关防火墙了 方便。)

systemctl stop firewalld.service
#关闭运行的防火墙
systemctl disable firewalld.service
#永久关闭防火墙
本次使用的是绝对路径 各位可以到kafka目录下执行命令 去掉前面的绝对路径就好
zookeeper命令

在每个机器上,先启动zookeeper:

/opt/module/kafka3.3.2/bin/zookeeper-server-stop.sh
#停止命令
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh  /opt/module/kafka3.3.2/config/zookeeper.properties
#启动命令
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh -daemon /opt/module/kafka3.3.2/config/zookeeper.properties
#后台启动命令 常用~
统一启动后jps查看进程

Kafka命令
在每个Kafka节点上,启动Kafka服务器:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh  /opt/module/kafka3.3.2/config/server.properties
#kafka启动命令
/opt/module/kafka3.3.2/bin/kafka-server-start.sh -daemon /opt/module/kafka3.3.2/config/server.properties
#kafka后台启动命令 常用~
/opt/module/kafka3.3.2/bin/kafka-server-stop.sh
#停止命令
统一启动后jps查看进程
创建主题

使用kafka-topics.sh命令创建一个主题:这里设置的复制因子为3 

bin/kafka-topics.sh --create --topic 你的topic--bootstrap-server res01:19091--replication-factor 3 --partitions 3

验证Kafka集群

使用生产者和消费者验证Kafka集群的功能:

# 启动生产者
bin/kafka-console-producer.sh --topic myTopic --bootstrap-server res01:19091# 启动消费者
bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server res01:19091--from-beginning

停止 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-stop.sh
启动 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh  /opt/module/kafka3.3.2/config/zookeeper.properties
后台启动 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh -daemon /opt/module/kafka3.3.2/config/zookeeper.properties
清空 Kafka 日志:
rm -rf //opt/module/data/kafka-logs/*
启动 Kafka 服务:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh  /opt/module/kafka3.3.2/config/server.properties
后台启动 Kafka 服务:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh -daemon /opt/module/kafka3.3.2/config/server.properties
停止 Kafka 服务:
/opt/module/kafka3.3.2/bin/kafka-server-stop.sh
创建 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --create --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT] --partitions [PARTITIONS_SIZE] --replication-factor [REPLICATION_FACTOR]
删除 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --delete --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT]
查看 Topic 信息:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --describe --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT]
列出所有的 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --list --bootstrap-server [SERVER_IP]:[PORT]
控制台生产消息:
/opt/module/kafka3.3.2/bin/kafka-console-producer.sh --bootstrap-server [SERVER_IP]:[PORT] --topic [TOPIC_NAME]
控制台消费信息:
/opt/module/kafka3.3.2/bin/kafka-console-consumer.sh --bootstrap-server [SERVER_IP]:[PORT] --topic [TOPIC_NAME] --from-beginning
查看副本:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --describe --bootstrap-server [SERVER_IP]:[PORT] | grep consumer_offsets
请记住替换 [TOPIC_NAME]、[SERVER_IP]:[PORT]、[PARTITIONS_SIZE]、[REPLICATION_FACTOR] 等位中的值为实际的值。

自己的做的总结如上:

运行Java代码生成topic 可以看到分区都是3符合集群要求

运行kafkaui查看详细情况(百度网盘链接里有,自己输入命令太累了 直接用别人封装好现成的看就好~)

结论

通过这个步骤,我们成功地搭建了一个基本的Kafka集群。在实际生产环境中,您可能需要进一步调整和优化配置,以满足特定需求和性能要求。

希望这个教程可以帮助您成功搭建Kafka集群,为您的数据处理和消息传递架构提供强大的基础设施。

最后温馨提示:如果你远程服务器起了别名,而自己电脑的hosts别名对应其他的服务器 也会发生报错 记得别名对应好ip即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/141224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

No193.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

ctfshow sql171-179

mysql 先打开我们本地的mysql,可以看到这些数据库 information_schema information_schema 库: 是信息数据库,其中保存着关于MySQL服务器所维护的所有其他数据库的信息比如数据库名,数据库表, SCHEMATA表: 提供了当前MySQL实例…

Golang 字符串处理汇总

1. 统计字符串长度:len(str) len(str) 函数用于统计字符串的长度,按字节进行统计,且该函数属于内置函数也不用导包,直接用就行,示例如下: //统计字符串的长度,按字节进行统计: str : "golang你好&qu…

​软考-高级-系统架构设计师教程(清华第2版)【第4章 信息安全技术基础知识(P160~189)-思维导图】​

软考-高级-系统架构设计师教程(清华第2版)【第4章 信息安全技术基础知识(P160~189)-思维导图】 课本里章节里所有蓝色字体的思维导图

postgresql实现job的六种方法

简介 在postgresql数据库中并没有想oracle那样的job功能,要想实现job调度,就需要借助于第三方。本人更为推荐kettle,pgagent这样的图形化界面,对于开发更为友好 优势劣势Linux 定时任务(crontab) 简单易用…

Redhat7设置国内可用yum源

问题: 因为最近安装了redhat7,在使用的时候提示系统未注册订阅,无法使用官方的yum源进行安装软件。为此,我使用centos7国内的yum源替换redhat的官方的yum源实现软件安装。 “This system is not registered with an entitlement …

【2011年数据结构真题】

41题 41题解答: (1)图 G 的邻接矩阵 A 如下所示: 由题意得,A为上三角矩阵,在上三角矩阵A[6][6]中,第1行至第5行主对角线上方的元素个数分别为5, 4, 3, 2, 1 用 “ 平移” 的思想,…

Outlook无法显示阅读窗格

Outlook无法显示阅读窗格 故障现象 Outlook主界面不显示阅读窗格 故障截图 故障原因 阅读窗格被关闭 解决方案 1、打开Outlook - 视图 – 阅读窗格 2、选择“靠右”或者“底部”,正常显示阅读窗格

同济 MBA 携手和鲸课程共建,以数智人才培养持续赋能企业数字化转型

数智化的浪潮席卷全球,我国产业界应如何做出应变?各企业又该如何深化数字化转型?在任重道远的持续探索中,数智人才培养作为企业实现成功转型的关键要素,已然成为大势所趋。 同济大学综合 MBA 项目高度重视工商管理人才…

什么是代理IP池?如何判断IP池优劣?

代理池充当多个代理服务器的存储库,提供在线安全和匿名层。代理池允许用户抓取数据、访问受限制的内容以及执行其他在线任务,而无需担心被检测或阻止的风险。代理池为各种在线活动(例如网页抓取、安全浏览等)提高后勤保障。 读完…

lua 时间差功能概略

简介 在进行程序设计过程中,经常需要对某些函数、某些程序片断从开始运行到运行结束所耗费的时间进行一些量化。这种量化实际上就是计算时间差。 获取函数耗时情景如下: function time_used() --开始计时-- do something at here. --结束计时--时间差&…

基于Python+Django的寻人失物失物招领系统

运行环境 开发语言:Python python框架:django 软件版本:python3.7 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:PyCharm/vscode 前端框架:vue.js 项目介绍 寻人失物失物招领系统交流平台的主要使用…

浅析网络协议-HTTP协议

1.HTTP简介 HTTP协议是Hyper Text Transfer Protocol(超文本传输协议)的缩写,是用于从万维网(WWW:World Wide Web )服务器传输超文本到本地浏览器的传送协议。 HTTP是一个基于TCP/IP通信协议来传递数据(HTML 文件, 图…

MyBatis 反射工具箱:带你领略不一样的反射设计思路

反射是 Java 世界中非常强大、非常灵活的一种机制。在面向对象的 Java 语言中,我们只能按照 public、private 等关键字的规范去访问一个 Java 对象的属性和方法,但反射机制可以让我们在运行时拿到任何 Java 对象的属性或方法。 有人说反射打破了类的封装…

Redis应用之一自增编号

一、前言 前段时间同事用Redis实现收银台商品排行榜,我们都知道Redis最基础的功能是用来缓存数据,但其实它还有很多特性能解决很多实际问题,接下来几篇文章我们就聊聊Reids一些特性的应用,今天先聊一下借助Reids生成不会重复的订…

电源基础元件

文章目录 电源基础元件理想电压源理想电流源受控电源 电源基础元件 理想电压源 定义 其两端电压总能保持定值或一定的时间函数,其值与流过它的电流i无关的元件叫理想电压源 理想电压源的电压、电流关系 1.电源两端电压由电源本身决定,与外电路无关&…

linux espeak语音tts;pyttsx3 ubuntu使用

整体使用espeak声音很机械不太自然 1、linux espeak语音tts 安装: sudo apt install espeak使用: #中文男声 espeak -v zh 你好 #中文女声 espeak -v zhf3 你好 #粤语男声 espeak -v zhy 你好注意:espeak -v zh 你好 (Full d…

什么是M-LAG?为什么需要M-LAG?

你们好,我的网工朋友。 今天给你说说M-LAG技术。 总有人把M-LAG技术和IRF堆叠技术放在一起做对比,说这是可以取代堆叠的技术。 那M-LAG和堆叠到底有没有关系? 其实关系并不是很大,M-LAG可以理解为IRF和链路聚合的结合体。 那…

开放领域问答机器人2——开发流程和方案

开放领域问答机器人是指在任何领域都能够回答用户提问的智能机器人。与特定领域问答机器人不同,开放领域问答机器人需要具备更广泛的知识和更灵活的语义理解能力,以便能够回答各种不同类型的问题。 开发开放领域问答机器人的流程和方案可以包括以下步骤…

SecureCRT 超时自动断开连接问题解决方法

很多人在使用SecureCRT时,会遇到这种情况:SecureCRT 超时自动断开连接,这种情况会给工作带来很多不便, 比如在做数据库还原操作时,连接突然断掉了,会导致还原操作失败,很是令人困扰。 那么我们…