Kafka集群与可靠性

Kafka集群与可靠性

1.Kafka集群搭建实战

使用两台Linux服务器:一台192.168.182.137 一台192.168.182.138

安装kafka首先,我们需要配置java环境变量(这里就略过了)

mkdir /opt/kafka
#上传压缩包kafka_2.13-3.3.1.tgz并解压
tar -zxvf kafka_2.13-3.3.1.tgz
#进入目录
cd /opt/kafka/kafka_2.13-3.3.1
# 修改config目录下的server.properties配置文件

修改config目录下的server.properties配置文件

  1. 修改broker.id,确保每个集群的节点broker的id都是不一样的

    broker.id=0
    

image-20240116155502101

  1. 修改监听的端口和ip

    # 192.168.182.137是我虚拟机的ip, 9092是kafka默认的端口
    listeners = PLAINTEXT://192.168.182.137: 
    

    image-20240116155646799

  2. 修改zookeeper的ip

    zookeeper.connect=192.168.182.137:2181
    

image-20240116155828472

  1. 编写脚本启动kafka和zk

    start_ZK.sh

    #!/bin/bash
    ./zookeeper-server-start.sh ../config/zookeeper.properties
    

    stop_Kafka.sh

    #!/bin/bash
    ./kafka-server-stop.sh ../config/server.properties
    

    start_Kafka.sh

    #!/bin/bash
    ./kafka-server-start.sh ../config/server.properties
    

启动zk

# 将启动日志打印到zk.log文件中,后台启动
nohup ./start_ZK.sh -> zk.log &

启动kafka

nohup ./start_Kafka.sh &

查看启动日志

tail -f nohup.out 

image-20240116160554154

ps:其他节点的配置类似,要确保每个集群的节点broker的id都是不一样的,这里138我使用的是137的zookeeper(确保多个broker注册到同一个zookeeper)

2.Kafka集群规模如何预估

吞吐量:

集群可以提高处理请求的能力。单个Broker的性能不足,可以通过扩展broker来解决。

磁盘空间:

比如,如果一个集群有10TB的数据需要保留,而每个broker可以存储2TB,那么至少需要5个broker。如果启用了数据复制,则还需要一倍的空间,那么这个集群需要10个broker。

3.Kafka集群原理

成员关系与控制器

控制器其实就是一个broker, 只不过它除了具有一般 broker的功能之外, 还负责分区首领的选举。

当控制器发现一个broker加入集群时, 它会使用 broker ID来检査新加入的 broker是否包含现有分区的副本。 如果有, 控制器就把变更通知发送给新加入的 broker和其他 broker, 新 broker上的副本开始从首领那里复制消息。

简而言之, Kafka使用 Zookeeper的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。 控制器负责在节点加入或离开集群时进行分区首领选举。

从下面的两台启动日志中看出,192.168.140.103 这台服务器是控制器。

image-20240116221553427

集群工作机制

复制功能是 Kafka 架构的核心。在 Kafka 的文档里, Kafka 把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。

复制之所以这么关键, 是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。
Kafka 使用主题来组织数据, 每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在 broker 上, 每个 broker 可以保存成百上千个属于 不同主题和分区的副本。

replication-factor参数

比如我们创建一个llp的主题,复制因子是2,分区数是2

./kafka-topics.sh --bootstrap-server 192.168.140.103:9092  --create --topic llp --replication-factor 2 --partitions 2

replication-factor用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的 broker 上,也就是说副本的数量不能超过 broker 的数量。

./kafka-topics.sh --bootstrap-server 192.168.140.103:9092 --describe

image-20240116223301961

从这里可以看出,llp分区有两个分区,partition0和partition1 ,其中

在partition0 中,broker1(broker.id =0)是Leader,broker2(broker.id =1)是跟随副本。

在partition1 中,broker2(broker.id =1)是Leader,broker1(broker.id =0)是跟随副本。

image-20240116222641119

image.png

首领副本

每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本 。

跟随者副本

首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一一的任务就是从首领那里复制消息, 保持与首领一致的状态 。 如果首领发生崩溃, 其中的一个跟随者会被提升为新首领 。

auto.leader.rebalance.enable参数

是否允许定期进行 Leader 选举。

设置它的值为true表示允许Kafka定期地对一些Topic 分区进行Leader重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。

比如Leader A一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后Leader A就要被强行卸任换成Leader B。
换一次Leader 代价很高,原本向A发送请求的所有客户端都要切换成向B发送请求,而且这种换Leader本质上没有任何性能收益,因此建议在生产环境中把这个参数设置成false。

集群消息生产

复制系数、不完全的首领选举、最少同步副本

可靠系统里的生产者

发送确认机制

3 种不同的确认模式。

acks=0 意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka 。

acks=1 意味若首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。

acks=all 意味着首领在返回确认或错误响应之前,会等待(min.insync.replicas)同步副本都收到悄息。

ISR

image.png

Kafka的数据复制是以Partition为单位的。而多个备份间的数据复制,通过Follower向Leader拉取数据完成。从一这点来讲,有点像Master-Slave方案。不同的是,Kafka既不是完全的同步复制,也不是完全的异步复制,而是基于ISR的动态复制方案。

ISR,也即In-Sync Replica。每个Partition的Leader都会维护这样一个列表,该列表中,包含了所有与之同步的Replica(包含Leader自己)。每次数据写入时,只有ISR中的所有Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。

这种方案,与同步复制非常接近。但不同的是,这个ISR是由Leader动态维护的。如果Follower不能紧“跟上”Leader,它将被Leader从ISR中移除,待它又重新“跟上”Leader后,会被Leader再次加加ISR中。每次改变ISR后,Leader都会将最新的ISR持久化到Zookeeper中。

至于如何判断某个Follower是否“跟上”Leader,不同版本的Kafka的策略稍微有些区别。

从0.9.0.0版本开始,replica.lag.max.messages被移除,故Leader不再考虑Follower落后的消息条数。另外,Leader不仅会判断Follower是否在replica.lag.time.max.ms时间内向其发送Fetch请求,同时还会考虑Follower是否在该时间内与之保持同步。

示例

image.png

在第一步中,Leader A总共收到3条消息,但由于ISR中的Follower只同步了第1条消息(m1),故只有m1被Commit,也即只有m1可被Consumer消费。此时Follower B与Leader A的差距是1,而Follower C与Leader A的差距是2,虽然有消息的差距,但是满足同步副本的要求保留在ISR中。

在第二步中,由于旧的Leader A宕机,新的Leader B在replica.lag.time.max.ms时间内未收到来自A的Fetch请求,故将A从ISR中移除,此时ISR={B,C}。同时,由于此时新的Leader B中只有2条消息,并未包含m3(m3从未被任何Leader所Commit),所以m3无法被Consumer消费。

使用ISR方案的原因

由于Leader可移除不能及时与之同步的Follower,故与同步复制相比可避免最慢的Follower拖慢整体速度,也即ISR提高了系统可用性。

ISR中的所有Follower都包含了所有Commit过的消息,而只有Commit过的消息才会被Consumer消费,故从Consumer的角度而言,ISR中的所有Replica都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。

ISR相关配置说明

Broker的min.insync.replicas参数指定了Broker所要求的ISR最小长度,默认值为1。也即极限情况下ISR可以只包含Leader。但此时如果Leader宕机,则该Partition不可用,可用性得不到保证。

只有被ISR中所有Replica同步的消息才被Commit,但Producer发布数据时,Leader并不需要ISR中的所有Replica同步该数据才确认收到数据。Producer可以通过acks参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。而如果将该值设置为0,则Producer发送完数据后,立即认为该数据发送成功,不作任何等待,而实际上该数据可能发送失败,并且Producer的Retry机制将不生效。

更推荐的做法是,将acks设置为all或者-1,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

总结

设置acks=all,且副本数为3
极端情况1:
默认min.insync.replicas=1,极端情况下如果ISR中只有leader一个副本时满足min.insync.replicas=1这个条件,此时producer发送的数据只要leader同步成功就会返回响应,如果此时leader所在的broker crash了,就必定会丢失数据!这种情况不就和acks=1一样了!所以我们需要适当的加大min.insync.replicas的值。

极端情况2:
min.insync.replicas=3(等于副本数),这种情况下要一直保证ISR中有所有的副本,且producer发送数据要保证所有副本写入成功才能接收到响应!一旦有任何一个broker crash了,ISR里面最大就是2了,不满足min.insync.replicas=3,就不可能发送数据成功了!

根据这两个极端的情况可以看出min.insync.replicas的取值,是kafka系统可用性和数据可靠性的平衡!

减小 min.insync.replicas 的值,一定程度上增大了系统的可用性,允许kafka出现更多的副本broker crash并且服务正常运行;但是降低了数据可靠性,可能会丢数据(极端情况1)。
增大 min.insync.replicas 的值,一定程度上增大了数据的可靠性,允许一些broker crash掉,且不会丢失数据(只要再次选举的leader是从ISR中选举的就行);但是降低了系统的可用性,会允许更少的broker crash(极端情况2)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629828.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rust-Panic

什么是panic 在Rust中,有一类错误叫作panic。示例如下: 编译,没有错误,执行这段程序,输出为: 这种情况就引发了一个panic。在这段代码中,我们调用了Option::unwrap()方法,正是这个方…

SparkSQL初体验

SparkSQL初体验 命令式的 API RDD 版本的 WordCount val conf new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc new SparkContext(conf)sc.textFile("hdfs://master:9000/dataset/wordcount.txt").flatMap(_.split("…

设计一个抽奖系统

👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家📕系列专栏:Spring原理、JUC原理、Kafka原理、分布式技术原理、数据库技术🔥如果感觉博主的文章还不错的…

在uniapp Vue3版本中如何解决web/H5网页浏览器跨域的问题

问题复现 uniapp项目在浏览器运行,有可能调用某些接口会出现跨域问题,报错如下图所示: 什么是跨域? 存在跨域问题的原因是因为浏览器的同源策略,也就是说前端无法直接发起跨域请求。同源策略是一个基础的安全策略&a…

前端下载文件流,设置返回值类型responseType:‘blob‘无效的问题

前言: 本是一个非常简单的请求,即是下载文件。通常的做法如下: 1.前端通过Vue Axios向后端请求,同时在请求中设置响应体为Blob格式。 2.后端相应前端的请求,同时返回Blob格式的文件给到前端(如果没有步骤…

shell脚本 $0-$n $* $@ $# $? $$

各命令详解 1.$0-$n :表示脚本或函数的参数。$0 是脚本的名称,$1 到 $n 是位置参数,每个对应一个传递给脚本或函数的参数。 2.$* :表示所有传递给脚本或函数的参数。它将所有位置参数作为单个字符串显示。 3.$ :表示所…

时序预测 | MATLAB实现GRNN广义回归神经网络时间序列未来多步预测(程序含详细预测步骤)

时序预测 | MATLAB实现GRNN广义回归神经网络时间序列未来多步预测(程序含详细预测步骤) 目录 时序预测 | MATLAB实现GRNN广义回归神经网络时间序列未来多步预测(程序含详细预测步骤)预测效果基本介绍程序设计参考资料预测效果 基本介绍 MATLAB实现GRNN广义回归神经网络时间序列…

大语言模型面试问题【持续更新中】

自己在看面经中遇到的一些面试题,结合自己和理解进行了一下整理。 transformer中求和与归一化中“求和”是什么意思? 求和的意思就是残差层求和,原本的等式为y H(x)转化为y x H(x),这样做的目的是防止网络层数的加深而造成的梯…

Angular系列教程之观察者模式和RxJS

文章目录 引言RxJS简介RxJS中的设计模式观察者模式迭代器模式 示例代码RxJS 在 Angular 中的应用总结 引言 在Angular开发中,我们经常需要处理异步操作,例如从后端获取数据或与用户的交互。为了更好地管理这些异步操作,Angular中引入了RxJS&…

el-table嵌套两层el-dropdown-menu导致样式错乱

问题&#xff1a; 解决方式&#xff1a; <el-table-column label"操作" fixed"right" width"132" align"center"><template slot-scope"scope"><div v-if"scope.row._index ! 合计"><el-d…

【PWN · GOT表劫持 | 整数溢出】[HGAME 2023 week1]choose_the_seat

整数溢出&#xff0c;加之保护开的不全&#xff0c;可以反复越界修改got表&#xff0c;劫持puts函数实现利用 一、题目概述 限制&#xff1a;v0不可以大于9 理想中数组所在bss端地址&#xff1a; 注意到与got表项距离很近 危险函数只能执行一遍&#xff0c;然后回exit(0) 二…

Next.js 开发指​南(GitHub 115k star​)

Next.js 是一个构建于 Node.js 之上的开源 Web 开发框架&#xff0c;它扩展了最新的 React 特性&#xff0c;集成了基于 Rust 的 JavaScript 工具&#xff0c;可以帮助你快速创建全栈 Web 应用 &#xff08;full-stack Web applications&#xff09; 。 对于有一定 React 基础…

华为数通方向HCIP-DataCom H12-831题库(判断题:21-40)

第21题 OSPF的NSSA区域内,在ASBR路由器上不论路由表中是否存在缺省路由,都会自动产生描述缺省路由的Type7LSA,通告到整个NSSA区域 正确 错误 答案:错误 解析: 在NSSA区域中,ASBR默认情况下不会产生7类LSA表示的默认路由。 第22题 BFD单跳检测是指对两个直连接口进行IP连…

Express安装与基础使用

一、express 介绍 express 是一个基于 Node.js 平台的极简、灵活的 WEB 应用开发框架&#xff0c; 官方网站&#xff1a; Express - 基于 Node.js 平台的 web 应用开发框架 - Express中文文档 | Express中文网 中文文档&#xff1a; 路由 - Express 中文文档 简单来说&am…

Kafka生产消费流程

Kafka生产消费流程 1.Kafka一条消息发送和消费的流程图(非集群) 2.三种发送方式 准备工作 创建maven工程&#xff0c;引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1…

笔试面试题——继承和多态

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、什么是多态&#xff1f;二、什么是重载、重写(覆盖)、重定义(隐藏)&#xff1f;三、 inli…

QT发布成exe不能运行解决方案

原因 qt发布成exe后不会把依赖的dll自动拷贝到文件夹中. 解决方案&#xff1a; 输入&#xff1a;windeployqt 拖拉 生产的exe到命令行. 会自动copy依赖到文件夹中&#xff1a; 然后就可以单击运行了&#xff01;

电路原理1-线性电阻

前言&#xff1a;整理笔记基于清华大学于歆杰老师的《电路原理》&#xff0c;电路原理是基于无源负载和电源组成电路的分析方法。 1.基础数学知识 算术&#xff1a;数字之间的运算 代数&#xff1a;用变量和函数来代替数字 微积分&#xff1a;描述函数的累积效应&#xff0…

贝叶斯方法家族

贝叶斯方法 机器学习框架贝叶斯方法贝叶斯和其他推断方法的区别朴素贝叶斯分类五个 NB 分类器 贝叶斯推断马尔科夫-蒙特卡洛方法变分推断隐马尔科夫模型 贝叶斯网络贝叶斯置信网络 贝叶斯深度学习贝叶斯神经网络贝叶斯卷积神经网络贝叶斯图神经网络贝叶斯优化方法 机器学习框架…

shell简单截取curl GET返回的body消息体

目录 需求背景&#xff1a; 示例&#xff1a; 解决方式&#xff1a; 需求背景&#xff1a; 用shell解析 curl命令GET到的消息体&#xff0c;获取body消息体里的某个字段的值,只是个简单的示例&#xff0c;可以在此基础上更改满足自己的需求 示例&#xff1a; curl一个API…