Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower

消费者想要拉取主题分区的数据,首先必须要加入到一个组中。
在这里插入图片描述
但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办?

所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则:

  • 同一个消费者组的消费者都订阅同一个主题,所以消费者组中的多个消费者可以共同消费一个主题中的所有数据。
  • 为了避免数据被重复消费,所以主题一个分区的数据只能被组中的一个消费者消费,也就是说不能两个消费者同时消费一个分区的数据。但是反过来,一个消费者是可以消费多个分区数据的。
    在这里插入图片描述
  • 消费者组中的消费者数量最好不要超出主题分区的数据,就会导致多出的消费者是无法消费数据的,造成了资源的浪费。
    在这里插入图片描述
    消费者中的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者的Leader决定的,这个Leader我们称之为群主。群主是多个消费者中,第一个加入组中的消费者,其他消费者我们称之为Follower,称呼上有点类似与分区的Leader和Follower。
    在这里插入图片描述

当消费者加入群组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。每个消费者只知道自己的分配信息,只有群主知道群组内所有消费者的分配信息。

【2】leader指定分配策略的基本流程

(1) 假设第一个消费者设定group.id为test,向当前负载最小的节点发送请求查找消费调度器
在这里插入图片描述
(2) 找到消费调度器后,消费者向调度器节点发出JOIN_GROUP请求,加入消费者组
在这里插入图片描述
(3) 当前消费者当选为群主后,根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器
在这里插入图片描述
(4) 此时第二个消费者设定group.id为test,申请加入消费者组
在这里插入图片描述
(5) 加入成功后,kafka将消费者组状态切换到准备rebalance,关闭和消费者的所有链接,等待它们重新加入。客户端重新申请加入,kafka从消费者组中挑选一个作为leader,其它的作为follower。(步骤和之前相同,我们假设还是之前的消费者为Leader)

在这里插入图片描述
(6) Leader会按照分配策略对分区进行重分配,并将方案发送给调度器,由调度器通知所有的成员新的分配方案。组成员会按照新的方案重新消费数据
在这里插入图片描述

【3】四种分配策略

① RoundRobinAssignor(轮询分配策略)

每个消费者组中的消费者都会含有一个自动生产的UUID作为memberid。
在这里插入图片描述
轮询策略中会将每个消费者按照memberid进行排序,所有member消费的主题分区根据主题名称进行排序。
在这里插入图片描述
将主题分区轮询分配给对应的订阅用户,注意未订阅当前轮询主题的消费者会跳过。
在这里插入图片描述
在这里插入图片描述

从图中可以看出,轮询分配策略是存在缺点的,并不是那么的均衡,如果test1-2分区能够分配给消费者ccc是不是就完美了。

② RangeAssignor(范围分配策略)

按照每个topic的partition数计算出每个消费者应该分配的分区数量,然后分配,分配的原则就是一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐即可。

#所谓按顺序向前补齐就是:
假设【1,2,3,4,5】5个分区分给2个消费者:
5 / 2 = 2, 5 % 2 = 1 => 剩余的一个补在第一个中[2+1][2] => 结果为[1,2,3][4,5]假设【1,2,3,4,5】5个分区分到3个消费者:
5 / 3 = 1, 5 % 3 = 2 => 剩余的两个补在第一个和第二个中[1+1][1+1][1] => 结果为[1,2][3,4][5]

在这里插入图片描述
缺点: Range分配策略针对单个Topic的情况下显得比较均衡,但是假如Topic多的话, member排序靠前的可能会比member排序靠后的负载多很多。
在这里插入图片描述
还有就是如果新增或移除消费者成员,那么会导致每个消费者都需要去建立新的分区节点的连接,更新本地的分区缓存,效率比较低。
在这里插入图片描述

③ StickyAssignor(粘性分区)

在第一次分配后,每个组成员都保留分配给自己的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。
在这里插入图片描述
在这里插入图片描述
从图中可以看出,粘性分区分配策略分配的会更加均匀和高效一些。

④ CooperativeStickyAssignor

前面的三种分配策略再进行重分配时使用的是EAGER协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从Kafka2.4版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE协议。

CooperativeStickyAssignor 是 Apache Kafka 中用于消费者组分区分配的一种策略。它旨在解决传统的轮询(RoundRobin)和粘性(Sticky)分配策略中的一些问题,特别是当消费者组中的消费者动态变化时,能够更有效地保持数据的局部性和均衡性。

什么是 CooperativeStickyAssignor?

CooperativeStickyAssignor 是一种改进的粘性分配策略,它试图在保持消费者组内部的分区分配尽可能稳定的同时,也能应对消费者组成员的变化。这种分配策略的目标是在消费者组成员动态变化的情况下,最小化重新分配对现有消费者的影响。

主要特点

  1. 粘性分配

    • 最小化重新分配:当消费者组中的消费者数量发生变化时,尽量减少重新分配的次数,以保持数据处理的连续性和一致性。
    • 保持局部性:尽量让每个消费者保留其已经处理过的分区,这样可以减少数据的重新加载和处理开销。
  2. 合作分配

    • 动态调整:在消费者组成员变化时,能够动态调整分区分配,使得新增加的消费者能够平滑地加入到处理过程中。
    • 均衡负载:确保每个消费者都能获得均衡的负载,避免过载或资源浪费的情况。

如何工作

CooperativeStickyAssignor 工作的基本原理如下:

  1. 初始化分配

    • 当消费者组首次启动时,分配策略会根据消费者组中的消费者数量和主题的分区数量来分配分区。
  2. 动态调整

    • 当有新的消费者加入时,CooperativeStickyAssignor 会尝试将新加入的消费者分配到负载较少的消费者所拥有的分区中,同时尽量保持原有消费者的分区不变。
    • 当有消费者离开时,CooperativeStickyAssignor 会重新分配离开消费者所拥有的分区,尽量将这些分区分配给其他负载较低的消费者。
  3. 均衡负载

    • 在每次重新分配时,分配器都会考虑消费者的负载情况,确保每个消费者承担的分区数量大致相等。

CooperativeStickyAssignor 是一种先进的分区分配策略,它在保持分区分配的稳定性和局部性的同时,也能有效应对消费者组成员的变化。通过使用这种策略,可以提高消费者组处理数据的效率和稳定性。

【5】不同版本默认分配策略

Kafka 0.10.x 及之前版本

在 Kafka 0.10.x 及之前的版本中,默认的分区分配策略是 范围分配策略(Range Assignor)。这种策略会根据消费者的加入顺序来分配分区,每个消费者会获得一定范围内的分区。

Kafka 0.11.x 及之后版本

从 Kafka 0.11.x 版本开始,默认的分区分配策略是 粘性分配策略(Sticky Assignor)。粘性分配策略旨在保持消费者组中分区分配的稳定性,即在消费者组中的消费者数量发生变化时,尽量保持分区分配的一致性,减少重新分配的频率,从而降低数据处理的开销。

然而,值得注意的是,从 Kafka 2.8 版本开始,引入了 合作粘性分配策略(Cooperative Sticky Assignor),这是一种改进版的粘性分配策略。虽然默认分配策略仍然是 Sticky Assignor,但 CooperativeStickyAssignor 作为一种更高级的分配策略,已经在很多场景中被推荐使用。

分配策略的比较

  1. Range Assignor

    • 在 Kafka 0.10.x 及之前版本中,默认使用 Range Assignor。
    • 这种策略按照消费者的加入顺序分配分区,每个消费者会获得一个连续的分区范围。
    • 缺点是当消费者组中的消费者数量发生变化时,可能会导致较大的重新分配,影响性能。
  2. Sticky Assignor

    • 从 Kafka 0.11.x 开始,默认使用 Sticky Assignor。
    • 目标是在消费者组中的消费者数量发生变化时,尽量保持分区分配的一致性,减少重新分配。
    • 适用于需要保持分区分配稳定性的场景。
  3. CooperativeStickyAssignor

    • 从 Kafka 2.8 版本开始引入。
    • 这种策略进一步优化了 Sticky Assignor,使得在消费者组中的消费者数量发生变化时,能够更平滑地进行分区重新分配。
    • 特别适用于需要动态调整消费者数量的场景。

总结

  • 在 Kafka 0.10.x 及之前的版本中,默认的分区分配策略是 Range Assignor。
  • 在 Kafka 0.11.x 及之后的版本中,默认的分区分配策略是 Sticky Assignor。
  • 从 Kafka 2.8 版本开始,引入了 CooperativeStickyAssignor,这是一种更高级的粘性分配策略,虽然不是默认的分配策略,但在很多场景中被推荐使用。

通过合理的分区分配策略选择,可以优化消费者组的性能和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/53495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker容器创建时,无法访问镜像源:Could not connect to archive.ubuntu.com:80

1.问题描述 当基于dockerfile创建容器时,遇到Could not connect to ...、Failed to fetch ...等异常时,大概原因是没有配置好容器创建所需的镜像源。这里以Ubuntu基础镜像源为例。 dockerfile内容 FROM ubuntu RUN apt update && apt install…

MySQL 锁分类有哪些?一文带你详解!!

MySQL 锁 全局锁全局锁的应用场景全局锁的缺点 表级锁表锁元数据(MDL)锁MDL 锁的问题 意向锁AUTO-INC 锁 行级锁记录锁(Record Lock)间隙锁(Gap Lock)临键锁(Next-Key Lock)插入意向…

Opencv中的直方图(2)计算图像的直方图函数calcHist()的使用

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 计算一组数组的直方图。 函数 cv::calcHist 计算一个或多个数组的直方图。用于递增直方图bin的元组的元素是从相同位置的相应输入数组中获取的。…

vue多环境配置和打包

件名的后缀来指定它们仅在特定模式下被加载。 .env:所有环境下都会加载的通用配置。 .env.local:本地覆盖配置,不加入版本控制。 .env.[mode]:仅为指定的模式加载的配置文件,例如.env.development、.env.production、…

Cursor是什么?Cursor Pro Plus 如何订阅升级教程

一、Cursor是什么? Cursor 是一个基于 Visual Studio Code(VS Code)技术构建的高级代码编辑器,专为提高编程效率并更深度地整合 AI 功能而设计。它不仅继承了 VS Code 的强大功能和用户界面,还增加了专门针对 AI 支持…

Agent(智能体)和 MetaGPT,一句话实现整个需求应用代码

前面 2 篇文章,我们使用文生文、文生图和文生音频三个大模型共同实现了图文并茂的儿童绘本故事和绘本故事音频需求: 第一篇 根据主题生成儿童绘本故事:GLM-4-Flash 大模型 API 免费了,手把手构建“儿童绘本”应用实战&#xff08…

Nuxt3入门:过渡效果(第5节)

你好同学&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注。 Nuxt 利用 Vue 的 <Transition> 组件在页面和布局之间应用过渡效果。 一、页面过渡效果 你可以启用页面过渡效果&#xff0c;以便对所有页面应用自动过渡效果。 nuxt.config.js export defaul…

概率DP (由一道绿题引起的若干问题。目前为一些老题,蒟蒻的尝试学习1.0)

概率DP&#xff1a; 利用动态规划去解决 概率 期望 的题目。 概率DP 求概率&#xff08;采用顺推&#xff09; 从 初始状态推向结果&#xff0c;同一般的DP类似&#xff0c;只是经历了概率论知识的包装。 老题&#xff1a; 添加链接描述 题意&#xff1a; 袋子里有w只白鼠&am…

linux编译器——gcc/g++

1.gcc linux上先要安装&#xff0c; sudo yum install gcc gcc --version 可以查看当前的版本 &#xff0c;我们默认安装的是4.8.5的版本&#xff0c;比较低&#xff0c; gcc test.c -stdc99 可以使他支持更高版本的c标准 -o 可以殖指明生成文件的名字&#xff0c;可以自己…

上海市计算机学会竞赛平台2024年7月月赛丙组求和问题

题目描述 给定 nn 个整数 a1,a2,…,ana1​,a2​,…,an​&#xff0c;请问这个序列最长有多少长的前缀&#xff0c;满足元素的和大于或等于 00&#xff1f;如果任何长度大于 00 的前缀之和都为负数&#xff0c;则输出 00 输入格式 第一行&#xff1a;单个整数表示 nn第二行&a…

经验笔记:JSP(JavaServer Pages)

JSP&#xff08;JavaServer Pages&#xff09;经验笔记 JSP&#xff08;JavaServer Pages&#xff09;是一种用于创建动态网页的技术&#xff0c;它允许在HTML页面中嵌入Java代码&#xff0c;从而实现动态内容的生成。JSP与Servlet一样&#xff0c;都是Java EE平台的一部分&am…

随机森林的知识博客:原理与应用

随机森林&#xff08;Random Forest&#xff09;是一种基于决策树的集成学习算法&#xff0c;它通过组合多棵决策树的预测结果来提升模型的准确性和稳健性。随机森林具有强大的分类和回归能力&#xff0c;广泛应用于各种机器学习任务。本文将详细介绍随机森林的原理、构建方法及…

大数据系列之:Java8和java11查看进程堆内存使用情况

大数据系列之:Java8和java11查看进程堆内存使用情况 Java8查看进程堆内存java11查看进程堆内存进程堆内存使用情况评估Java8查看进程堆内存 jmap -F -heap 2723jmap -F -heap 2723是一个Java命令行工具jmap的使用示例。它用于生成Java进程2723的堆内存信息。其中,-F选项表示…

JavaSE-易错题集-001

1. AccessViolationException异常触发后&#xff0c;下列程序的输出结果为&#xff08; &#xff09; 1 2 3 4 5 6 7 8 9 10 11 12 13 static void Main(string[] args) { try { throw new AccessViolationException(); Console.Write…

OpenCV图像分割教程

OpenCV 图像分割教程 OpenCV 是一个非常强大的计算机视觉库&#xff0c;支持各种图像处理任务。图像分割是 OpenCV 支持的一个重要功能&#xff0c;它用于将图像划分为不同的区域&#xff0c;识别感兴趣的部分。我们将通过介绍 OpenCV 中的图像分割方法&#xff0c;包括基础功…

ubantu安装mysql + redis数据库并使用C/C++操作数据库

mysql 安装mysql ubuntu 安装 MySql_ubuntu安装mysql-CSDN博客 Ubuntu 安装 MySQL 密码设置_ubuntu安装mysql后设置密码-CSDN博客 service mysql restart1 C/C连接数据库 C/C 连接访问 MySQL数据库_c mysql-CSDN博客 ubuntu安装mysql的c开发环境_ubuntu 搭建mysql c开发…

测试一些概念

软件测试 软件测试流程 需求分析&#xff1a;在这个阶段&#xff0c;测试人员会审查和分析项目的需求文档&#xff0c;以确保他们理解需要测试的功能和特性。 制定测试计划&#xff1a;在这个阶段&#xff0c;测试人员会制定一个详细的测试计划&#xff0c;包括测试目标、测…

跨越技术壁垒:EasyCVR为何选择支持FMP4格式,重塑视频汇聚平台标准

随着物联网、大数据、云计算等技术的飞速发展&#xff0c;视频监控系统已经从传统的安防监控扩展到智慧城市、智能交通、工业制造等多个领域。视频流格式作为视频数据传输与存储的基础&#xff0c;其兼容性与效率直接影响到整个视频监控系统的性能。 在众多视频流格式中&#…

TCP Analysis Flags 之 TCP Port numbers reused

前言 默认情况下&#xff0c;Wireshark 的 TCP 解析器会跟踪每个 TCP 会话的状态&#xff0c;并在检测到问题或潜在问题时提供额外的信息。在第一次打开捕获文件时&#xff0c;会对每个 TCP 数据包进行一次分析&#xff0c;数据包按照它们在数据包列表中出现的顺序进行处理。可…

pytorch计算网络参数量和Flops

from torchsummary import summary summary(net, input_size(3, 256, 256), batch_size-1)输出的参数是除以一百万&#xff08;/1000000&#xff09;M&#xff0c; from fvcore.nn import FlopCountAnalysis inputs torch.randn(1, 3, 256, 256).cuda() flop_counter FlopCo…