Kafka KRaft 版本集群部署详细教程(附配置文件详细解释)

版本说明

  • Ubuntu 18.04.6
  • Kafka 3.6.0
  • JDK8

集群配置

操作系统ip域名Kafka Broker 端口Kafka Controller 端口
Ubuntu 18.04.6192.168.50.131kafka1.com90929093
Ubuntu 18.04.6192.168.50.132kafka2.com90929093
Ubuntu 18.04.6192.168.50.133kafka3.com90929093

安装 vim, curl

sudo apt update
sudo apt install vim
sudo apt install curl

配置静态 ip 和 hosts

为了使用域名,更加方便的进行配置,这里将虚拟机的 DHCP 改成了静态分配 IP,所以需要手动设置一下每台机器 IP 地址,这里以 192.168.50.131 为例。

  1. 找到网络接口名称,运行以下命令:

    ip addr
    

    查找以 enseth 开头的接口名称。例如,ens33eth0

    hedon@ubuntu:~$ ip addr
    1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever
    2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000link/ether 00:0c:29:82:9e:69 brd ff:ff:ff:ff:ff:ffinet 192.168.50.133/24 brd 192.168.50.255 scope global dynamic noprefixroute ens33valid_lft 1644sec preferred_lft 1644secinet6 fe80::c367:c7cc:3ad4:23b3/64 scope link valid_lft forever preferred_lft forever
    

    可以找到 ens33,其中 inet 192.168.50.133/24 表示 IP 地址为 192.168.50.133,子网掩码为 /24(等于 255.255.255.0)。

    这个 IP 地址是 DHCP 动态分配的,说明宿主机分配给虚拟机的 IP 范围就在 192.168.50.xxx,所以我们会将静态 IP 配置在这个范围内。

  2. 获取网关地址

    ip route | grep default
    

    输出:

    hedon@ubuntu:~$ ip route | grep default
    default via 192.168.50.2 dev ens33 proto dhcp metric 100
    

    说明默认网关是 192.168.50.2

  3. 编辑 /etc/network/interfaces 文件,配置静态 IP 地址,内容如下:

    auto ens33
    iface ens33 inet staticaddress 192.168.50.131netmask 255.255.255.0gateway 192.168.50.2dns-nameservers 8.8.8.8 8.8.4.4
    
  4. 重启

    su reboot
    
  5. 再次查看 ip 地址

    ip addr
    

    有以下输出便说明静态 IP 配置成功了。

    1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever
    2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000link/ether 00:0c:29:82:9e:69 brd ff:ff:ff:ff:ff:ffinet 192.168.50.131/24 brd 192.168.50.255 scope global ens33valid_lft forever preferred_lft foreverinet6 fe80::20c:29ff:fe82:9e69/64 scope link valid_lft forever preferred_lft forever
    
  6. 配置域名

    sudo vim /etc/hosts
    

    追加内容如下:

    192.168.50.131 kafka1.com
    192.168.50.132 kafka2.com
    192.168.50.133 kafka3.com
    
  7. ping 一下

    hedon@ubuntu:~$ ping kafka1.com
    PING kafka1.com (192.168.50.131) 56(84) bytes of data.
    64 bytes from kafka1.com (192.168.50.131): icmp_seq=1 ttl=64 time=0.024 ms
    64 bytes from kafka1.com (192.168.50.131): icmp_seq=2 ttl=64 time=0.021 ms
    64 bytes from kafka1.com (192.168.50.131): icmp_seq=3 ttl=64 time=0.029 ms
    ^C
    --- kafka1.com ping statistics ---
    3 packets transmitted, 3 received, 0% packet loss, time 2029ms
    rtt min/avg/max/mdev = 0.021/0.024/0.029/0.006 ms
    
  8. ping 一下百度,看看能不能访问外网

    hedon@ubuntu:~$ ping baidu.com
    ping: baidu.com: Name or service not known
    

    如果这里可以访问,则直接跳过进入下一步,不可以的话,需要配置一下域名解析系统。

  9. 配置域名解析系统

    Ubuntu 系统使用 systemd-resolved 服务来管理 DNS,你可以在 /etc/systemd/resolved.conf 文件中进行 DNS 配置。

    sudo vim /etc/systemd/resolved.conf
    

    取消或添加 DNS 的注释,并修改为:

    [Resolve]
    DNS=8.8.8.8 8.8.4.4
    

    重启启动 systemd-resolved

    sudo systemctl restart systemd-resolved
    

    再尝试 ping 一下百度:

    hedon@ubuntu:~$ ping www.baidu.com
    PING www.a.shifen.com (153.3.238.110) 56(84) bytes of data.
    64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=1 ttl=128 time=15.9 ms
    64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=2 ttl=128 time=15.9 ms
    64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=3 ttl=128 time=16.1 ms
    64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=4 ttl=128 time=15.3 ms
    ^C
    --- www.a.shifen.com ping statistics ---
    4 packets transmitted, 4 received, 0% packet loss, time 14104ms
    rtt min/avg/max/mdev = 15.368/15.850/16.145/0.291 ms
    

{% note info %}

补充说明:/etc/network/interfaces 文件的配置

这是一个用于配置 Linux 系统上网络接口的文件。在这个示例中,我们为名为 ens33 的网络接口配置了静态 IP 地址和相关的网络设置。下面是各行的解释:

  1. auto ens33: 这一行表示在系统启动时自动激活 ens33 网络接口。auto 关键字后面跟着接口名称。

  2. iface ens33 inet static: 这一行定义了 ens33 网络接口的配置。iface 关键字后面跟着接口名称,inet 表示我们正在配置 IPv4 地址,static 表示我们要为接口分配一个静态 IP 地址(而不是通过 DHCP 获得)。

  3. address 192.168.50.131: 这一行设置了网络接口的静态 IP 地址。在这个例子中,我们为 ens33 接口分配了 192.168.50.131 IP 地址。

    IP 地址是 Internet 协议(IP)用于在网络中唯一标识设备的数字标签。每个连接到网络的设备都需要一个唯一的 IP 地址,以便其他设备可以找到并与之通信。IP 地址通常分为两种版本:IPv4 和 IPv6。在此示例中,我们使用了一个 IPv4 地址。

  4. netmask 255.255.255.0: 这一行定义了子网掩码。在这个例子中,子网掩码是 255.255.255.0,表示前三个字节(24 位)是网络地址,最后一个字节(8 位)是主机地址。

    子网掩码用于划分 IP 地址的网络部分和主机部分。子网掩码与 IP 地址进行按位与操作,从而得到网络地址。这有助于确定哪些 IP 地址属于同一子网,以便正确地将数据包路由到目的地。子网划分有助于组织网络、提高安全性和管理性。

  5. gateway 192.168.50.2: 这一行设置了默认网关。在这个例子中,我们将默认网关设置为 192.168.50.2。默认网关是用于将数据包发送到其他网络的路由器或设备的 IP 地址。

    网关是一个充当网络中数据包传输的中继点的设备,通常是一个路由器。当一个设备需要将数据包发送到不同子网的另一个设备时,它会将数据包发送到网关。网关负责将数据包路由到正确的目的地。默认网关是设备用于将数据包发送到其他网络的首选网关。

  6. dns-nameservers 8.8.8.8 8.8.4.4: 这一行指定了 DNS 服务器的 IP 地址。在这个例子中,我们使用了谷歌的公共 DNS 服务器 8.8.8.88.8.4.4。DNS 服务器用于将主机名解析为 IP 地址。

    域名系统(DNS)是将人类可读的域名(例如 www.baidu.com)IP 地址的系统。DNS 服务器是负责执行此解析过程的服务器。当您在浏览器中输入一个网址时,计算机会向 DNS 服务器查询该域名对应的 IP 地址,然后将请求发送到该 IP 地址以获取网页内容。

配置文件中的这些设置将在系统启动时生效。要立即应用更改,您可以使用以下命令重启网络服务:

sudo systemctl restart networking

{% endnote %}

安装 jdk

sudo apt update
sudo apt install openjdk-8-jdk

验证 java8 是否已经安装成功:

java -version

有以下类似输出的话则表明安装成功:

openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-8u372-ga~us1-0ubuntu1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.362-b09, mixed mode)

安装 Kafka

  1. 下载并解压 Kafka

    wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
    tar -zxvf kafka_2.13-3.6.0.tgz
    
  2. 将解压缩后的文件夹移动到 /opt 目录中:

    sudo mv kafka_2.13-3.6.0 /opt/kafka-3.6.0
    
  3. 使用 Kafka 提供的脚本生成一个 ClusterID

    export KAFKA_CLUSTER_ID="$(/opt/kafka-3.6.0/bin/kafka-storage.sh random-uuid)"
    

    输出 ClusterID

    hedon@ubuntu:/opt/kafka-3.6.0$ echo $KAFKA_CLUSTER_ID
    XiMRcbJ-QEO694L7sfDdBQ
    

    在其他节点上将 KAFKA_CLUSTER_ID 设置为上面的值:

    export KAFKA_CLUSTER_ID=XiMRcbJ-QEO694L7sfDdBQ
    
  4. 备份配置文件,注意这里的配置文件是 config/kraft/server.properties,在 config 目录下的 kraft 目录中:

    cp /opt/kafka-3.6.0/config/kraft/server.properties /opt/kafka-3.6.0/config/kraft/server.properties.bak
    
  5. 修改配置

    vim /opt/kafka-3.6.0/config/kraft/server.properties
    

    主要修改内容如下:

    # 节点 ID,分别为 1,2,3
    node.id=1
    # 日志目录
    log.dirs=/opt/kafka-3.6.0/kafka-combined-logs
    # 可以成为控制器的节点和它们的端口
    controller.quorum.voters=1@kafka1.com:9093,2@kafka2.com:9093,3@kafka3.com:9093
    # 定义 Kafka Broker 如何向外部公布它的地址。
    # 这是 Kafka Broker 通知 Producer 和 Consumer 如何连接到自己的方式。
    # 例如,如果你设置 advertised.listeners=PLAINTEXT://my.public.ip:9092,
    # 那么 Kafka Broker 将告诉 Producer 和 Consumer 它的公共 IP 地址是 my.public.ip,并且它在 9092 端口上监听连接。
    # 这里我们需要在 3 个节点分别设置对应的地址
    advertised.listeners=PLAINTEXT://kafka1.com:9092
    
  6. 格式化日志目录

    /opt/kafka-3.6.0/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka-3.6.0/config/kraft/server.properties
    

    输出:

    Formatting /opt/kafka-3.6.0/kraft-combined-logs with metadata.version 3.6-IV2.
    
  7. 三个节点都启动 Kafka

    /opt/kafka-3.6.0/bin/kafka-server-start.sh -daemon /opt/kafka-3.6.0/config/kraft/server.properties
    
  8. 选择任意一个节点创建一个新 topic

    /opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --replication-factor 1 --partitions=2
    

    输出:

    Created topic test.
    
  9. 在其他节点获取 test 这个 topic 的信息

    /opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
    

    可以看到关于 test 这个 topic 的信息是可以获取到的,说明集群之前信息是互通的,集群搭建完毕。

    Topic: test	TopicId: svJClTUpSFa9Z6FWDvkARg	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2Topic: test	Partition: 1	Leader: 3	Replicas: 3	Isr: 3
    
  10. 随便选择一个节点,往 test 里面写入数据:

    /opt/kafka-3.6.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
    

    输入数据后按回车即发送一条数据,可以随时按 Ctrl + C 退出:

    hedon@ubuntu:~/Downloads$ /opt/kafka-3.6.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
    >msg1
    >msg2
    >msg 3
    >^
    
  11. 随便选择一个节点,启动消费者消费 topic 中的数据:

    /opt/kafka-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    输出:

    hedon@ubuntu:/opt/kafka-3.6.0$ /opt/kafka-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    msg1
    msg2
    msg 3
    ^CProcessed a total of 3 messages
    

至此,Kafka 的 KRaft 版本集群就部署完毕了!

补充说明 - KRaft 配置文件

下面是 Kafka KRaft 版本配置文件每个配置项的解释:

配置项说明
process.rolesKafka 服务器的角色,设置此项将 Kafka 置于 KRaft 模式。可能的值包括 “broker” 和 “controller”。
node.id与此实例关联的节点 ID。
controller.quorum.voters控制器选举的投票节点,格式为 node-id@host:port
listeners服务器监听的地址,格式为 listener_name://host_name:port
inter.broker.listener.name用于 broker 之间通信的监听器名称。
advertised.listeners服务器向客户端宣告的监听器名称、主机名和端口。
controller.listener.names控制器使用的监听器名称列表。
listener.security.protocol.map监听器名称到安全协议的映射。默认情况下,它们是相同的。
num.network.threads服务器用于从网络接收请求和向网络发送响应的线程数。
num.io.threads服务器用于处理请求(可能包括磁盘 I/O)的线程数。
socket.send.buffer.bytes服务器用于发送数据的缓冲区大小。
socket.receive.buffer.bytes服务器用于接收数据的缓冲区大小。
socket.request.max.bytes服务器接受的请求的最大大小(用于防止内存溢出)。
log.dirs用于存储日志文件的目录列表。
num.partitions每个主题的默认日志分区数。
num.recovery.threads.per.data.dir每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数。
offsets.topic.replication.factor内部主题 “__consumer_offsets” 和 “__transaction_state” 的复制因子。
transaction.state.log.replication.factor事务状态日志的复制因子。
transaction.state.log.min.isr事务状态日志的最小同步副本数。
log.flush.interval.messages强制将数据刷新到磁盘之前接受的消息数。
log.flush.interval.ms消息在日志中停留的最大时间,超过这个时间就会强制刷新到磁盘。
log.retention.hours由于年龄而使日志文件有资格被删除的最小年龄。
log.retention.bytes基于大小的日志保留策略。
log.segment.bytes日志段文件的最大大小。
log.retention.check.interval.ms检查日志段是否可以根据保留策略被删除的间隔。

请注意,这只是 Kafka 配置的一部分,Kafka 配置的完整列表可以在 Kafka 的官方文档中找到。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/185188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hive 偏门函数

1.分位数函数percentile percentile(dau_days,0.5) as dau_days_50, percentile(dau_days,0.6) as dau_days_60, percentile(dau_days,0.8) as dau_days_80,2.窗口函数&#xff08;带滚动窗口&#xff09; #滚动30d dauavg(dau) over(order bydtm rows between 29 PRECEDINGa…

路由跳转到另一个页面

点击添加员工跳转到详情页 <el-button size"mini" type"primary" click"$router.push(/employee/detail)">添加员工</el-button>配置员工详情的路由信息 import layout from "/layout"; export default {path: "/em…

OBC、DCDC自动化测试解决方案!

OBC(车载充电机&#xff09;和DCDC&#xff08;直流-直流变换器&#xff09;是电动汽车的核心部件&#xff0c;DCDC和OBC的功能质量对于整车的性能和安全性至关重要。在OBC和DCDC&#xff0c;以及整车开发测试过程中&#xff0c;需要对OBC和DCDC进行功能和性能方面进行全面的测…

水溶性肥料行业分析:预计2028年将达到202亿美元

随着我国农业的集约化、规模化不断发展&#xff0c;以及大型农场涌现&#xff0c;水肥一体化面积将会不断扩大。同时&#xff0c;水溶肥是符合更加环保、更加可持续发展的新一代肥料&#xff0c;是中国肥料产业未来的重点发展课题。 水溶性肥料(Water Soluble Fertilizer&…

ChatGPT生成的一些有趣的文件管理用python小程序

1. 在前位置中的所有文件夹内增加一个名为 abc 的新文件夹 import osdef create_abc_directories(root_dir.):# 获取当前目录下的所有目录subdirectories [d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]# 在每个目录中创建名为abc的子目录f…

co域名是什么

co域名是什么本篇文章给大家分享的是有关co域名是什么&#xff0c;小编觉得挺实用的&#xff0c;因此分享给大家学习&#xff0c;希望大家阅读完这篇文章后可以有 所收获&#xff0c;话不多说&#xff0c;跟着小编一起来看看吧。 .co域名后缀是哥伦比亚的国家顶级域名后缀&…

解决两个依赖的全限定类名完全相同问题

介绍 我们会经常在maven项目中&#xff0c;依赖第三方的sdk或者api。当依赖的多了就会发生一个问题。例如有2个依赖的某个类的包名和类名完全相同&#xff0c;但这两个依赖的这个类的方法不完全相同。这就不好办了。用一个&#xff0c;另一个就会有问题。 解决 至于网上说的…

Android自动化测试中使用ADB进行网络状态管理!

技术分享&#xff1a;使用ADB进行Android网络状态管理 Android自动化测试中的网络状态切换是提高测试覆盖率、捕获潜在问题的关键步骤之一&#xff0c;本文将介绍 如何使用ADB检测和管理Android设备的网络状态。 自动化测试中的网络状态切换变得尤为重要。 网络状态查询 adb s…

【23真题】复录比高达2.24,但题目很棒!

今天分享的是23年广东工业837的信号与系统试题及解析。注意官方不公示真题&#xff0c;所以这套试卷为回忆版本。 本套试卷难度分析&#xff1a;22年广东工业837考研真题&#xff0c;我也发布过&#xff0c;若有需要&#xff0c;戳这里自取&#xff01;平均分107.93&#xff…

Java中的Lambda表达式

lambda表达式是一个可传递的代码块&#xff0c;可以在以后执行一次或多次。 1.lambda表达式的语法 eg&#xff1a;有如下lambda表达式&#xff1a; (int a, int b) -> {return ab}; 这个表达式的本质是一个函数。 一般的函数类似如下&#xff1a; int add(int a, int …

我的创作纪念日--成为创作者的 第1825天(5年) 啦

醉颜凉 &#xff0c;不知不觉今天已经是你成为创作者的 第1825天&#xff08;5年&#xff09; 啦。 机缘 1、作为一个创作者&#xff0c;我最初成为创作者的初心是出于对技术的热爱和对分享的渴望。我希望通过创作&#xff0c;将自己在实战项目中的经验分享给大家&#xff0c;…

ECONGU4280 Corporate Finance

ECONGU4280 Corporate Finance WeChat: zh6-86

P8649 [蓝桥杯 2017 省 B] k 倍区间(前缀和+优化(桶分类))

分析&#xff1a; &#xff08;1&#xff09;任意连续子序列可用两个前缀和的差来表示 &#xff08;2&#xff09;判断该子序列是否为k的倍数 p1-p2 模 0 (mod k) 等价于&#xff1a;前缀和模 k 是否同余 &#xff08;3&#xff09;同余的任意两前缀和组合的序列均满足…

el-select多选multiple数据无法删除,回显成功,但无法编辑,选中和删除都没反应

原因&#xff1a; 回显的数据是从后台接口得来&#xff0c;由于数据层次太多&#xff0c;导致render函数没有自动更新&#xff1b;需要手动强制刷新 解决方案&#xff1a; 使用 change “$forceUpdate()” 强制刷新视图 代码&#xff1a; <el-select multiple filtera…

WEB安全之Python

WEB安全之python python-pyc反编译 python类似java一样&#xff0c;存在编译过程&#xff0c;先将源码文件*.py编译成 *.pyc文件&#xff0c;然后通过python解释器执行 生成pyc文件 创建一个py文件随便输入几句代码(1.py) 通过python交互终端 >>>import py_compil…

CISO在2024年应该优先考虑七项安全任务

专业安全媒体CyberTalk.org主编Shira Landau日前表示&#xff1a;现代企业的CISO们在2024年必须做出改变&#xff0c;要更多关注于企业整体安全路线图的推进与实现&#xff0c;让网络安全工作与业务发展目标保持更紧密的一致性。 首席信息安全官&#xff08;CISO&#xff09;是…

采购业务中的组织概述

目录 一、采购和库存管理中组织单位的概览二、企业的组织结构三、采购中组织结构3.1采购组织3.2采购组 一、采购和库存管理中组织单位的概览 1、 客户端&#xff1a;在SAP ERP系统中&#xff0c;客户端通过三位数字定义&#xff0c;并代表这独立的数据记录和独立的业务流程。客…

MYSQL加密和压缩函数详解和实战(含示例)

MySQL提供了多种加密和压缩方式&#xff0c;可以帮助保护数据库中的敏感数据。以下是一些常见的MySQL加密和压缩方法参考&#xff1a; 建议收藏以备后续用到查阅参考。 目录 一、AES_ENCRYPT AES加密 二、AES_DECRYPT AES解密 三、COMPRESS 压缩字符串 四、UNCOMPRESS 解压…

JavaScript WebApi 一(详讲)

基础知识在前面的部分已经讲过了&#xff0c;大家如果没有学习过JavaScript的可以去看一下 1.DOM 引入 在JavaScript中&#xff0c;DOM&#xff08;文档对象模型&#xff09;提供了一种表示和操作HTML文档的方式。在DOM中&#xff0c;文档被表示为一个由节点组成的树形结构。…

nodejs最新电商jd m端h5st 4.2签名算法4.2版本逆向,jd API接口,jd商品数据采集

前言&#xff1a; jd m端使用最新的h5st 4.2签名算法&#xff0c;与h5st 4.1版本有很大的不同。在这儿分析一下&#xff0c;供大家参考。 一、目标地址(Base64解码) aHR0cHM6Ly9zby5tLmpkLmNvbS93YXJlL3NlYXJjaC5hY3Rpb24/a2V5d29yZD0lRTklOTklQTQlRTYlQjklQkYlRTYlOUMlQkEmc2…