kafka 集群 KRaft 模式搭建

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:https://kafka.apache.org/

Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群

本文讲解 Kafka KRaft 模式集群搭建

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、创建 KRaft 集群

4、启动 Kafka KRaft 集群

5、关闭 Kafka KRaft 集群

6、测试 KRaft 集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件

vi server.properties

server.properties 配置说明

node.id 是kafka的broker节点id

controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址

advertised.listeners 是节点自己的监听地址

192.168.3.232 节点配置

node.id = 1

192.168.2.90 节点配置

node.id = 2

192.168.2.11节点配置

node.id = 3

3、创建 KRaft 集群

生成集群id

在任意一个节点上执行就行,笔者使用 192.168.3.232 节点

进入bin 目录

cd /usr/local/kafka/kafka_2.13-3.6.0/bin

执行生成集群 id 命令

./kafka-storage.sh random-uuid

生成后保存生成的字符串    82vqfbdSTO2QzS_M0Su1Bw

然后分别在3台机器上执行下面命令

为方便执行命令,先回到 kafka安装目录

cd /usr/local/kafka/kafka_2.13-3.6.0

再执行命令,完成集群元数据配置

bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties

192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口

kafka 需要开放 9092 端口和 9093 端口

3台机器上分别开放 9092 和 9093 端口

查看开放端口

firewall-cmd --zone=public --list-ports

 开放9092 端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

  开放9093 端口

firewall-cmd --zone=public --add-port=9093/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

4、启动 Kafka KRaft 集群

在3台机器上分别启动

下面2个命令均可启动

bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功

5、关闭 Kafka KRaft 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

6、测试 KRaft 集群

新建 maven 项目,添加 Kafka 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}

运行测试

效果图

消息成功发送并成功消费

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/172480.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

紫光展锐 展讯芯片 展讯处理器解锁BL 各分区结构 ROM 分区列表代表什么 bin img 表示什么意思

是展锐 Android 10.0、Android 9.0 平台 ROM 空间划分情况以及分区格式、分区大小和分区功能的 初步描述。 prodnv 开机后系统中的 productinfo 分区&#xff0c;保 存 adc 校准参数、eng.db 数据库。 Miscdata 保存 ota、recovery 时的一些数据 recovery 存放 recovery.i…

Horizon地平线财富一直坚持“创新、开放、协作、共享”的运营理念

在“寒风凛冽”的熊市&#xff0c;投资人需要一颗不断探索、勇于尝试的心。 勇气意味着即使你知道这条路很难&#xff0c;你仍然选择坚持。而信念则是相信&#xff0c;即使现在很多人不理解、甚至嘲笑&#xff0c;未来总会有一天他们会明白。 Horizon一直坚持着“创新、开放、…

058-第三代软件开发-文件Model

第三代软件开发-文件Model 文章目录 第三代软件开发-文件Model项目介绍文件Model 关键字&#xff1a; Qt、 Qml、 关键字3、 关键字4、 关键字5 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&#xff08;Qt Meta-Object Language&#xff09;…

【MATLAB源码-第90期】基于matlab的OQPSKsimulink仿真,对比初始信号和解调信号输出星座图。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 正交偏移二进制相移键控&#xff08;OQPSK, Orthogonal Quadrature Phase Shift Keying&#xff09;是一种数字调制技术&#xff0c;主要用于高效无线数据传输。它是传统二进制相移键控&#xff08;BPSK&#xff09;的一个变…

单链表原来是这样实现的!

文章目录 前言1. 链表的概念及结构1.1在链表里&#xff0c;每节“车厢”是什么样的呢&#xff1f;1.2为什么还需要指针变量来保存下⼀个节点的位置&#xff1f; 2. 单链表的实现1. 定义结构体(Seqlist)2. 打印函数(SLTPrint)小插曲&#xff0c;创建节点函数CreateNode3. 尾插函…

嵌入式的学习需要合理规划时间

低级的欲望放纵即可获得&#xff0c;高级的欲望只有克制才能达成。——卡耐基1、粉丝的误会 很多粉丝&#xff0c;问我&#xff0c; "胡老师我想报您的培训班。" ... 得知我知识业余时间写文章&#xff0c;紧接着又会问&#xff0c; "jg单位这么清闲啊&#…

预制构件二维码如何生成?

PC预制构件张贴二维码标识牌&#xff0c;可实现预制构建基本信息、设计图纸、安装说明书信息展示微信扫一扫即可查看预制件信息&#xff0c;大大提高施工的精度和效率&#xff1b;同时也可以实现预制生成过程管理、运输过程管理、安装过程管理、后期运维管理实现预制件的过程质…

Windows安装mysql8.0

官网地址&#xff1a;MySQL :: MySQL Community Downloads 选择相应版本信息下载 默认选择点击下一步 默认配置点击next 设置密码 默认配置

Linux系统管理:WinSCP 安装与使用

目录 一、实验 1.下载WinSCP 2.安装WinSCP 3.使用WinSCP 一、实验 1.下载WinSCP &#xff08;1&#xff09;地址 Downloading WinSCP-6.1.2-Setup.exe :: WinSCP 2.安装WinSCP &#xff08;1&#xff09;选择安装程序模式 &#xff08;2&#xff09;点击 &#xff08;3…

机器学习库:pandas

☁️主页 Nowl &#x1f525;专栏《机器学习实战》 《机器学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 文章目录 写在开头 基本数据格式 DataFrame 数据选取 iloc 数据描述 head describe 数据合并 merge 数据删除 drop drop删除多列 处理缺失…

SpringBoot进阶——解释springboot的自动配置原理

相关的博客文章如下&#xff1a; SpringBootApplication注解的理解——如何排除自动装配 & 分布式情况下如何自动加载 & nacos是怎么被发现的 引出 1.spring.factories文件存储能够进行自动配置的Bean信息&#xff1b; 2.EnableAutoConfiguration关闭数据源的自动配置…

Flask学习二:项目拆分、请求与响应、cookie

教程 教程地址&#xff1a; 千锋教育Flask2框架从入门到精通&#xff0c;Python全栈开发必备教程 老师讲的很好&#xff0c;可以看一下。 项目拆分 项目结构 在项目根目录下&#xff0c;创建一个App目录&#xff0c;这是项目下的一个应用&#xff0c;应该类似于后端的微服…

灯塔的安装

Docker 安装 docker 安装参考&#xff1a;https://docs.docker.com/engine/install/ shell脚本: curl -fsSL https://get.docker.com -o get-docker.sh sudo sh get-docker.sh灯塔安装 mkdir docker-ARL;cd docker-ARL curl https://bootstrap.pypa.io/get-pip.py -o get-pip…

【Linux学习】基础IO

目录 八.系统文件IO 8.1 前言 8.2 C语言文件IO C语言常用的基本函数 C语言默认打开的的三个流 8.3 系统文件IO open接口 close接口 write接口 read接口 8.4 C语言文件IO与系统文件IO的关系 八.系统文件IO 8.1 前言 系统文件 I/O&#xff08;输入/输出&#xff09;是指在…

pandas 如何获取dataframe的行的数量

pandas的dataframe提供了多种方法获取其中数据的行的数量&#xff0c;本偏文章就是介绍几种获取dataframe行和列出量的方法。 为了能够详细说明如何通过代码获取dataframe的行数和列数&#xff0c;需要先创建一个dataframe如下&#xff1a; import pandas as pdtechnologies …

C++设计模式之策略模式

策略模式 介绍示例示例测试运行结果应用场景优点总结 介绍 策略模式是一种行为设计模式。在策略模式中&#xff0c;可以创建一些独立的类来封装不同的算法&#xff0c;每一个类封装一个具体的算法&#xff0c;每一个封装算法的类叫做策略(Strategy)&#xff0c;为了保证这些策…

NX二次开发UF_CURVE_ask_wrap_curve_parents 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_wrap_curve_parents Defined in: uf_curve.h int UF_CURVE_ask_wrap_curve_parents(tag_t curve_tag, tag_t * defining_face, tag_t * defining_plane, tag_t * defin…

18. Python 数据处理之 Numpy

目录 1. 简介2. 安装和导入Numpy3. ndarray 对象4. 基本运算5. 索引、切片和迭代6. 条件和布尔数组7. 变换形状8. 操作数组 1. 简介 数据分析的流程概括起来主要是&#xff1a;读写、处理计算、分析建模和可视化4个部分。 Numpy 是Python 进行科学计算&#xff0c;数据分析时…

MySQL进阶_10.锁

文章目录 一、概述二、MySQL并发事务访问相同记录2.1、读-读2.2、写-写2.3、读-写2.4、并发问题的解决方案 三、锁的不同角度分类3.1、 读锁、写锁3.1.1、 锁定读 3.2、表级锁、页级锁、行锁3.2.1、表锁3.2.2、意向锁3.2.2.1、意向锁的作用3.2.2.2、意向锁的互斥性 3.2.3、自增…

2019年全国硕士研究生入学统一考试管理类专业学位联考英语(二)试题

文章目录 2019年考研英语二真题SectionⅠ Use of EnglishSection II Reading ComprehensionText 121——细节信息题22——细节信息题23——细节信息题24——细节信息题25——词义题 Text 226——细节信息题27——细节信息题28——细节信息题29——细节信息题30——态度题 Text …