kafka3.X集群安装(不使用zookeeper)

一、kafka集群实例角色规划

在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。

上图中黑色代表broker(消息代理服务),褐色/蓝色代表Controller(集群控制器服务)

  • 左图(kafka2.0):一个集群所有节点都是broker角色,kafka从三个broker中选举出来一个Controller控制器,控制器将集群元数据信息(比如主题分类、消费进度等)保存到zookeeper,用于集群各节点之间分布式交互。
  • 右图(kafka3.0):假设一个集群有四个broker,指定三个作为Conreoller角色(蓝色),从三个Controller中选举出来一个Controller作为主控制器(褐色),其他的2个备用。zookeeper不再被需要!相关的元数据信息以kafka日志的形式存在(即:以消息队列消息的形式存在)。
  • controller通信端口:9093, 作用与zk的2181端口类似 。

在搭建kafka3.0集群之前, 我们需要先做好kafka实例角色规划。(四个broker, 需要通过主动配置指定三个作为Controller, Controller需要奇数个, 这一点和zk是一样的)

主机名称ip角色node.id
kafka-vm1192.168.1.111broker,controller1
kafka-vm2192.168.1.112broker,controller2
kafka-vm3192.168.1.113broker,controller3
kafka-vm4192.168.1.114broker4

二、准备工作

  • kafka3.x不再支持JDK8,建议安装JDK11或JDK17。
  • 新建kafka持久化日志数据mkdir -p /data/kafka;并保证安装kafka的用户具有该目录的读写权限。

各个机器节点执行:

# 安装jdk(kafka3.x不再支持JDK8,建议安装JDK11或JDK17, 这里安装jdk11)
# 下载安装jdk11, 参考: https://blog.csdn.net/justlpf/article/details/127268046# 下载kafka
adduser kafka
cd /opt
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar -xf kafka_2.12-3.3.1.tgzchown -R kafka:kafka kafka_2.12-3.3.1*mkdir -p /data/kafka
chown -R kafka:kafka /data/kafka
vi /etc/hosts,各个节点,添加如下内容:
192.168.1.111 data-vm1
192.168.1.112 data-vm2
192.168.1.113 data-vm3
192.168.1.114 data-vm4

三、修改Kraft协议配置文件

在kafka3.x版本中,使用Kraft协议代替zookeeper进行集群的Controller选举,所以要针对它进行配置。

vi /opt/kafka_2.12-3.3.1/config/kraft/server.properties

具体配置参数如下:

# data-vm1节点
node.id=1
process.roles=broker,controller
listeners=PLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/# data-vm2节点
node.id=2
process.roles=broker,controller
listeners=PLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/# data-vm3节点
node.id=3
process.roles=broker,controller
listeners=PLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
  • node.id:这将作为集群中的节点 ID,唯一标识,按照我们事先规划好的(上文),在不同的服务器上这个值不同。其实就是kafka2.0中的broker.id,只是在3.0版本中kafka实例不再只担任broker角色,也有可能是controller角色,所以改名叫做node节点。
  • process.roles:一个节点可以充当broker或controller或两者兼而有之。按照我们事先规划好的(上文),在不同的服务器上这个值不同。多个角色用逗号分开。
  • listeners: broker将使用9092端口,而kraft controller控制器将使用9093端口。
  • advertised.listeners: 这里指定kafka通过代理暴漏的地址,如果都是局域网使用,就配置PLAINTEXT://:9092即可。
  • controller.quorum.voters:这个配置用于指定controller主控选举的投票节点,所有process.roles包含controller角色的规划节点都要参与,即:zimug1、zimug2、zimug3。其配置格式为:node.id1@host1:9093,node.id2@host2:9093
  • log.dirs:kafka 将存储数据的日志目录,在准备工作中创建好的目录。

所有kafka节点都要按照上文中的节点规划进行配置,完成config/kraft/server.properties配置文件的修改。

四、格式化存储目录

生成一个唯一的集群ID(在一台kafka服务器上执行一次即可),这一个步骤是在安装kafka2.0版本的时候不存在的。

$ /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid
SzIhECn-QbCLzIuNxk1A2A

使用生成的集群ID+配置文件格式化存储目录log.dirs,

所以这一步确认配置及路径确实存在,

并且kafka用户有访问权限(检查准备工作是否做对)。

每一台主机服务器都要执行命令:

/opt/kafka_2.12-3.3.1/bin/kafka-storage.sh format \
-t SzIhECn-QbCLzIuNxk1A2A \
-c /opt/kafka_2.12-3.3.1/config/kraft/server.properties

格式化操作完成之后,log.dirs​目录下多出一个Meta.properties文件​,存储了当前的kafka节点的id(node.id),当前节点属于哪个集群(cluster.id)

[root@data-vm2 ~]# ll /data/kafka/
总用量 8
-rw-r--r--. 1 root root 249 10月 11 18:23 bootstrap.checkpoint
-rw-r--r--. 1 root root  86 10月 11 18:23 meta.properties$ cat /data/kafka/meta.properties
#
#Tue Apr 12 07:39:07 CST 2022
node.id=1
version=1
cluster.id=SzIhECn-QbCLzIuNxk1A2A

五、 启动集群,完成基础测试

zimug1 zimug2 zimug3是三台应用服务器的主机名称(参考上文中的角色规划),实现方式已经在本专栏《linux主机与ip解析》中进行了说明。将下面的命令集合保存为一个shell脚本,并赋予执行权限。执行该脚本即可启动kafka集群所有的节点,前提是:你已经按照本专栏的《集群各节点之间的ssh免密登录》安装方式做了集群各节点之间的ssh免密登录。

启动命令:

bin/kafka-server-start.sh \
/opt/kafka_2.12-3.3.1/config/kraft/server.properties# 后台运行
nohup bin/kafka-server-start.sh \
/opt/kafka_2.12-3.3.1/config/kraft/server.properties 2>&1 &

脚本: 

#!/bin/bash
kafkaServers='data-vm1 data-vm2 data-vm3'
#启动所有的kafka
for kafka in $kafkaServers
dossh -T $kafka <<EOFnohup /opt/kafka_2.12-3.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-3.3.1/config/kraft/server.properties 1>/dev/null 2>&1 &
EOF
echo 从节点 $kafka 启动kafka3.0...[ done ]
sleep 5
done

六、一键停止集群脚本

一键停止kafka集群各节点的脚本,与启动脚本的使用方式及原理是一样的。

停止命令:

/opt/kafka_2.12-3.3.1/bin/kafka-server-stop.sh

执行脚本:

#!/bin/bash
kafkaServers='data-vm1 data-vm2 data-vm3'
#停止所有的kafka
for kafka in $kafkaServers
dossh -T $kafka <<EOFcd /opt/kafka_2.12-3.3.1bin/kafka-server-stop.sh
EOF
echo 从节点 $kafka 停止kafka...[ done ]
sleep 5
done

七、测试Kafka集群

7.1 创建topic

[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
--create \
--topic quickstart-events \
--bootstrap-server data-vm4:9092Created topic quickstart-events.
[root@data-vm1 kafka_2.12-3.3.1]## 
[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
--create \
--topic quickstart-events \
--bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092

7.2 查看topic列表

bin/kafka-topics.sh \
--list \
--bootstrap-server data-vm4:9092# 
bin/kafka-topics.sh \
--list \
--bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092,data-vm4:9092

7.3 查看消息详情

[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
--describe \
--topic quickstart-events \
--bootstrap-server data-vm3:9092Topic: quickstart-events        TopicId: zSOJC6wNRRGQ4MudfHLGvQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1     Isr: 1[root@data-vm1 kafka_2.12-3.3.1]#

7.4 生产消息

[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-console-producer.sh \
--topic quickstart-events \
--bootstrap-server data-vm1:9092# 参考: 创建并配置topic
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 1 \
--replication-factor 1 \
--config max.message.bytes=64000 \
--config flush.messages=1# ------------------------- 参考 ------------------------ #
# 1: 修改已创建topic配置
# (Overrides can also be changed or set later using the alter configs command.)
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config max.message.bytes=128000# 2: 检查已修改的topic配置是否生效
# (To check overrides set on the topic you can do)
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--describe# 3. 恢复到原来的配置
# (To remove an override you can do)
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--delete-config max.message.bytes# 4. 增加分区数
# (To add partitions you can do)
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--alter \
--topic my_topic_name \
--partitions 40# 5. 添加配置
# (To add configs:)
bin/kafka-configs.sh \
--bootstrap-server broker_host:port \
--entity-type topics \
--entity-name my_topic_name \
--alter \
--add-config x=y# 6. 移除配置
# (To remove a config:)
bin/kafka-configs.sh \
--bootstrap-server broker_host:port \
--entity-type topics \
--entity-name my_topic_name \
--alter \
--delete-config x# 7. 删除topic
# (And finally deleting a topic:)
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--delete \
--topic my_topic_name

7.5 消费消息

bin/kafka-console-consumer.sh \
--topic quickstart-events \
--from-beginning \
--bootstrap-server data-vm4:9092

7.6 查看消费者组

# 检查消费者postition
# Checking consumer position
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-groupTOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-IDmy-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2

7.7 查看消费者组列表

# list all consumer groups across all topics
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--listtest-consumer-group# To view offsets, as mentioned earlier, 
# we "describe" the consumer group like this:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-groupTOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-IDtopic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4# 更多配置参考:
# https://kafka.apache.org/32/documentation.html#uses

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119302.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端3D规划

学习基础的3D概念&#xff1a;这包括向量、矩阵、几何、光照和材质等基本3D图形学的概念。这些是理解和使用3D技术的基础。学习WebGL&#xff1a;WebGL是一种在浏览器中实现3D图形的技术&#xff0c;它是OpenGL的Web版本&#xff0c;可以直接在浏览器中使用。学习WebGL可以帮助…

pnpm、npm、yarn的区别

pnpm、npm、yarn是三种不同的包管理器&#xff0c;它们之间有一些区别。 安装速度&#xff1a;pnpm的安装速度比npm和yarn快&#xff0c;因为它使用了只下载必需的模块&#xff0c;而不是下载整个依赖树。此外&#xff0c;pnpm还可以并行下载模块&#xff0c;从而进一步提高下…

【Python机器学习】零基础掌握BaggingClassifier集成学习

何提高分类模型的稳定性和准确性? 在金融风控、医疗诊断或者社交媒体推荐等场景中,分类问题是常见的难题。但是,单一的分类模型(如SVM)在处理复杂或不均衡的数据集时可能会表现不佳。那么,有没有一种方法能够提高模型的稳定性和准确性呢? 假设一家银行想要通过机器学习…

建立一个数据机房的条件

数据机房是一种专门用于存储和处理大量数据的设施。它通常由一系列服务器、存储设备和网络设备组成&#xff0c;用于支持各种应用程序和业务需求。它的的主要功能是确保应用程序的高可用性和数据的可靠存储&#xff0c;还提供备份和恢复系统&#xff0c;以确保在发生故障或灾难…

字节码进阶之JVM Attach API详解

字节码进阶之JVM Attach API详解 文章目录 字节码进阶之JVM Attach API详解附加到虚拟机加载代理和获取信息分离虚拟机 使用Attach API的基本步骤1. **获取虚拟机实例**&#xff1a;2. **附加到虚拟机**&#xff1a;3. **加载代理或获取信息**4. **从虚拟机分离**&#xff1a;…

C++初阶1

目录 介绍&#xff1a; 一&#xff0c;命名空间 1-1&#xff0c;命名空间的定义 1-2&#xff0c;命名空间的使用 1-3&#xff0c;C标准官方命名空间 二&#xff0c;缺省参数 2.1&#xff0c;缺省参数分类 三&#xff0c;函数重载 四&#xff0c;引用 4-1&#xff0c;…

ExoPlayer架构详解与源码分析(6)——MediaPeriod

系列文章目录 ExoPlayer架构详解与源码分析&#xff08;1&#xff09;——前言 ExoPlayer架构详解与源码分析&#xff08;2&#xff09;——Player ExoPlayer架构详解与源码分析&#xff08;3&#xff09;——Timeline ExoPlayer架构详解与源码分析&#xff08;4&#xff09;—…

【LeetCode】102. 二叉树的层序遍历

题目链接 文章目录 Python3方法一&#xff1a; 广度优先搜索 (BFS) ⟮ O ( n ) ⟯ \lgroup O(n) \rgroup ⟮O(n)⟯方法二&#xff1a; 深度优先搜索 (DFS) ⟮ O ( n ) ⟯ \lgroup O(n) \rgroup ⟮O(n)⟯ C方法一&#xff1a; 广度优先搜索 (BFS) ⟮ O ( n ) ⟯ \lgroup O(n…

“Can‘t open workbook - unsupported file type: XML“

java开发&#xff0c;增删改查&#xff0c;涉及到导入excel时&#xff0c;有的excel导入失败提示"Cant open workbook - unsupported file type: XML"。着急赶工期&#xff0c;告诉客户先把excel另存为xls格式&#xff0c;再重新导入。现在有点空余时间&#xff0c;好…

力扣每日一题66:加一

题目描述&#xff1a; 给定一个由 整数 组成的 非空 数组所表示的非负整数&#xff0c;在该数的基础上加一。 最高位数字存放在数组的首位&#xff0c; 数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外&#xff0c;这个整数不会以零开头。 示例 1&#xff1a; 输…

二十三种设计模式全面解析-单例设计模式:解密全局独一无二的实例创造者

在软件开发中&#xff0c;有一种设计模式被誉为"独一无二的实例创造者"&#xff0c;它就是单例设计模式。无论你是初学者还是有经验的开发人员&#xff0c;了解并掌握单例模式都是非常重要的。本文将以通俗易懂的方式&#xff0c;全面解析单例设计模式&#xff0c;并…

Android 中如何使用 App Links

1. 简介 什么是 App Links呢&#xff1f;App Links 是 Android 6.0 (API 级别23) 引入的新功能&#xff0c;它是基于 DeepLinking&#xff0c;允许应用自动处理网站的 URL&#xff0c;而无需提示用户启动相应的应用。 例如&#xff1a;如果你在手机浏览器中输入了某个网站&am…

esp32-S3 + visual studio code 开发环境搭建

一、首先在下面链接网页中下载esp-idf v5.1.1离线安装包 &#xff0c;并安装到指定位置。dl.espressif.cn/dl/esp-idf/https://dl.espressif.cn/dl/esp-idf/ 安装过程中会提示需要长路径支持&#xff0c;所以windows系统需要开启长路径使能 Step 1&#xff1a; 打开运行&…

springboot 拦截器 导致fastjson 大小写失效

实体类 public class Info{JsonFeild(“sDqClose”)private String sDqClose;private String sDqHigh; }拦截器 Configuration EnableWebMvc public class WebMvcConfigurerConfig implements WebMvcConfigurer {private CorsConfiguration buildConfig() {CorsConfiguration …

uniapp解决iOS切换语言——原生导航栏buttons文字不生效

uniapp 切换语言原生导航栏buttons文字不生效&#xff1f; 文章目录 uniapp 切换语言原生导航栏buttons文字不生效&#xff1f;效果图page.json配置解决方式 效果图 场景&#xff1a;在 tabbar 页面中&#xff0c;配置 原生导航栏 buttons &#xff0c;切换语言时&#xff0c;不…

设计大师都在用!电商设计素材网站大公开

双十一即将到来&#xff0c;想必各电商平台的设计师早已开启电商设计项目。找到合适的电商设计模板是电商设计成功的关键因素。高质量的电商设计模板素材不仅能够提升网站的吸引力&#xff0c;还能提升用户体验&#xff0c;从而增加商品的交易率。 Pixso资源社区 在寻找电商设计…

中央设备状态监控系统CMS如何帮助半导体晶圆厂提高产品良率

中央设备状态监控系统&#xff08;CMS&#xff09;在半导体晶圆厂中扮演着关键角色&#xff0c;帮助企业提高产品的良率。本文将介绍CMS是什么、当前半导体晶圆厂产品良率面临的挑战&#xff0c;并重点探讨CMS如何通过实时数据监控、故障预测和预警、以及统计分析和过程改进等方…

ESP32网络开发实例-将 ESP32 连接到 EMQX Cloud MQTT Broker

将 ESP32 连接到 EMQX Cloud MQTT Broker 文章目录 将 ESP32 连接到 EMQX Cloud MQTT Broker1、MQTT介绍2、软件准备3、硬件准备4、代码实现5、MQTT测试在本文中,将介绍使用 EMQX Cloud MQTT 服务器。 首先,我们将介绍如何将 ESP32 开发板连接到 EMQX Cloud MQTT 服务器。 我…

Stable Diffusion AI绘图

提示词&#xff1a; masterpiece, best quality, 1girl, (anime), (manga), (2D), half body, perfect eyes, both eyes are the same, Global illumination, soft light, dream light, digital painting, extremely detailed CGI anime, hd, 2k, 4k background 反向提示词&…

Django 实战开发(一)项目搭建

1.项目搭建 用pycharm 编辑器可以直接 New 一个 Django 项目 2.新建应用 python manage.py startapp demo项目结构如下: 3.编写第一个Django 视图函数 /demo/views: from django.http import HttpResponse def welcome(request):return HttpResponse("welcome to dja…