【大数据学习 | kafka】消费者的分区分配规则

1. 概述

上面我们提到过,消费者有的时候会少于或者多于分区的个数,那么如果消费者少了有的消费者要消费多个分区的数据,如果消费者多了,有的消费者就可能没有分区的数据消费。

那么这个关系是如何分配的呢?

现在我们知道kafka中存在一个coordinator可以管理这么一堆消费者,它可以帮助一个组内的所有消费者进行分区的分配和对应。

通过coordinator进行协调

这个分配规则分为以下几种。

2. range分配器

按照范围形式进行分配分区数量

# 为了演示分区的分配效果我们创建一个topic_d,设定为7个分区
[hexuan@hadoop106 bin]$ kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_f --partitions 7 --replication-factor 2
consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {}});

然后改版订阅代码,subscribe订阅信息的时候展示出来分区的对应映射关系,这个只是一个监控的作用没有其他的代码影响ConsumerRebalanceListener增加监视。

其中存在两个比较直观的方法

onPartitionsRevoked回收的分区。

onPartitionsAssigned分配的分区。

能够直观展示在分区分配的对应关系

其中我们需要知道两个比较重要的参数。

参数解释
offsets.topic.num.partitions__consumer_offset这个topic的分区数量默认50个
heartbeat.interval.ms消费者和协调器的心跳时间 默认3s
session.timeout.ms消费者断开的超时时间 默认45s,最小不能小于6000
partition.assignment.strategy设定分区分配策略

也就是说我们想要直观查看消费者变化后的映射对应关系需要停止消费者以后45s才可以,这个在代码中我们需要人为设定小点,更加快速查看变化

代码测试原理

首先设定topic_d的分区为7个,然后启动一个组内的两个消费者,可以看到他们的分配关系在onPartitionsAssigned这个方法中打印出来,同时我们关闭一个消费者,可以看到onPartitionsRevoked可以展示回收的分区,onPartitionsAssigned以及这个方法中分配的分区

整体代码如下:

package com.hainiu.kafka.consumer;/*** ClassName : rangeAssigned* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/4 22:04* Version 1.0*/
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class rangeAssigned {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RangeAssignor.class.getName());//设定分区分配策略为rangepro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);//设定consumer断开超时时间最小不能小于6sKafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_f");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {System.out.println("revoke-->"+partition.topic()+"-->"+partition.partition());}}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {System.out.println("assign-->"+partition.topic()+"-->"+partition.partition());}}});while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

我们执行两个实例,两个实例代表两个消费者位于同一个组中,那么两个消费者的分配关系按照,范围进行分割

consumer0[0,1,2,3] consumer1[4,5,6]

执行第一个实例的时候,无需回收,并且七个分区都分配给第一个消费者实例。

执行第二个消费者的时候,需要对第一个消费者实例进行回收分区:

revoke-->topic_f-->0
revoke-->topic_f-->1
revoke-->topic_f-->2
revoke-->topic_f-->3
revoke-->topic_f-->4
revoke-->topic_f-->5
revoke-->topic_f-->6

然后由于一个消费者组中有两个消费者实例,则将分区重新分配个两个消费者实例。

因为coordinator的分配规则基于eager协议,这个协议的规则就是当分配关系发生变化的时候要全部回收然后打乱重分。

consumer1分配分区情况:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3

consumer2分配分区情况:

assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6

缺点:

这个协议只是按照范围形式进行重新分配分区,会造成单个消费者的压力过大问题,多个topic就会不均匀。

一个消费者组消费多个topic时可能会造成数据倾斜。

比如该消费者组有两个消费者:consumer1和consumer2。该消费者组消费两个topic分区:topic_1, topic_2,且假设两个topic都有7个分区,那么range分配规则可能会这么干:

consumer1分配topic_1-0,topic_1-1,topic_1-2, topic_1-3,topic_2-0,topic_2-1,topic_2-2, topic_2-3。

consumer3分配topic_1-4,topic_1-5,topic_1-6,topic_2-4,topic_2-5,topic_2-6。

consumer1要消费8个分区的数据,而consumer2要消费6个分区的数据,

当一个消费者出现消费多个topic主题的时候就可能出现这种数据倾斜的情况。

3. roundRobin轮训分配策略

轮训形式分配分区,一个消费者一个分区

整体代码如下:

pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RoundRobinAssignor.class.getName());

设定分配规则为roundRobin的

启动一个的效果:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6

启动两个应用

第一个消费者consumer实例:

回收所有的七个分区:

revoke-->topic_f-->0
revoke-->topic_f-->1
revoke-->topic_f-->2
revoke-->topic_f-->3
revoke-->topic_f-->4
revoke-->topic_f-->5
revoke-->topic_f-->6

再被分配到3个分区:

assign-->topic_f-->1
assign-->topic_f-->3
assign-->topic_f-->5

第二个消费者consumer2实例:

assign-->topic_f-->0
assign-->topic_f-->2
assign-->topic_f-->4
assign-->topic_f-->6

优点:

同range方式相比,在多个topic的情况下,可以保证多个consumer负载均衡

分配规则如上图,一人一个轮训形式

consumer0 [0 2 4 6 1 3 5]

consumer1 [1 3 5 0 2 4 6]

缺点

不管是range的还是roundRobin的分配方式都是全量收回打乱重新分配,这样的效率太低,所以我们使用下面的粘性分区策略。

4. sticky粘性分区

粘性分区它的重新分区原理和原来的roundRobin的分区方式差不多,但是又不相同,主要是分区逻辑一样,但是重新分配分区的时候优先保留原分区,然后重新分配其他分区,从而不需要全部打乱重分,减少重新分配分区消耗

第二次启动

第三次

分区分配方式一样,但是如果重新分配的话会有很多原来分区的预留,重新分配新的分区

# 为了演示效果再次创建新的topic topic_g 七个分区
kafka-topics.sh --bootstrap-server hadoop106:9092 --topic topic_g --create --partitions 7 --replication-factor 2

然后让复制代码,修改订阅两个topic

  pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());
//修改为粘性分区List<String> topics = Arrays.asList("topic_f","topic_g");
//订阅两个topic

并且运行应用实例分别运行1 ,2 ,3 多种个数的实例

执行第一个消费者实例consumer1,无需回收,分配14个分区(topic_f和topic_g都是七个分区)

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6
assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

执行第二个消费者实例consumer2时:

consumer1:

回收了14个分区:

revoke-->topic_f-->0
revoke-->topic_f-->1
revoke-->topic_f-->2
revoke-->topic_f-->3
revoke-->topic_f-->4
revoke-->topic_f-->5
revoke-->topic_f-->6
revoke-->topic_g-->0
revoke-->topic_g-->1
revoke-->topic_g-->2
revoke-->topic_g-->3
revoke-->topic_g-->4
revoke-->topic_g-->5
revoke-->topic_g-->6

优先保留原来分区,所以分配七个分区:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6

consumer2:

分配了七个分区;

assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

执行第三个实例consumer3:

consumer1将会回收七个分区,consumer2将会回收七个分区。

14 / 3 = 4 ---->  4 + 1       4  +  1       4

comsumer1将会被分配:[0, 1, 2, 3, 4]

consumer2将会被分配 : [0, 1, 2, 3, 4]

consumer3将会被分配:[5, 6, 5, 6]

尽量不改变原分区的规则的前提下进行分区分配。

以上三种都基于eager协议,也就是想要重新分配分区一定要将原来的所有分区回收,全部打乱重新,即使保留原来的分区规则,也需要全部都回收分区,这样效率非常低下,最后一种CooperativeSticky分区策略完全打破以上三种的分区关系。

5. CooperativeSticky分区

以粘性为主,但是不全部收回分区,只是将部分需要重新分配的分区重新调配,效率高于以上三种分区策略。

 pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,CooperativeStickyAssignor.class.getName());
//设定分区策略

运行两个实例,查看控制台信息发现:

运行第一个消费者实例:

consumer1:

分配了14个分区:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6
assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

运行第二个消费者实例:

consumer1:

回收7个分区:与前三种分区规则不同,前三种是分配分区的时候将所有分区全部收回

revoke-->topic_g-->0
revoke-->topic_g-->1
revoke-->topic_g-->2
revoke-->topic_g-->3
revoke-->topic_g-->4
revoke-->topic_g-->5
revoke-->topic_g-->6

详细信息:

	Assigned partitions:                       [topic_f-0, topic_f-1, topic_f-2, topic_f-3, topic_f-4, topic_f-5, topic_f-6]Current owned partitions:                  [topic_f-0, topic_f-1, topic_f-2, topic_f-3, topic_f-4, topic_f-5, topic_f-6]Added partitions (assigned - owned):       []Revoked partitions (owned - assigned):     []

consumer2:

分配七个分区:

assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

详细情况:

	Assigned partitions:                       [topic_g-0, topic_g-1, topic_g-2, topic_g-3, topic_g-4, topic_g-5, topic_g-6]Current owned partitions:                  []Added partitions (assigned - owned):       [topic_g-0, topic_g-1, topic_g-2, topic_g-3, topic_g-4, topic_g-5, topic_g-6]Revoked partitions (owned - assigned):     []

整个分区分配规则和粘性分区策略一致,但是并不需要收回全部分区。

系统默认分区分配规则为:。

range+CooperativeSticky。

范围分区为主,优先粘性并且不急于eager协议。

6. 指定分区消费

在计算处理过程中,有时候我们需要指定一个消费者组消费指定的分区,计算其中的数据,这个时候以上的所有分区策略都不符合我们人为的要求,我们需要指定相应的分区进行消费。

consumer.assign();
//用指定的方式定向消费相应的分区数据

整体代码如下:

package com.hainiu.kafka.consumer;/*** ClassName : rangeAssigned* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/4 22:04* Version 1.0*/
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class Assigned {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<TopicPartition> list = Arrays.asList(new TopicPartition("topic_f", 0),new TopicPartition("topic_g", 0));consumer.assign(list);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

我们只消费topic_e的0号分区和topic_d的0号分区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60011.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python接口自动化测试自学指南(项目实战)

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 接口自动化测试是指通过编写程序来模拟用户的行为&#xff0c;对接口进行自动化测试。Python是一种流行的编程语言&#xff0c;它在接口自动化测试中得到了广…

【Python进阶】Python网络协议与套接字编程:构建客户端和服务器

1、网络通信基础与网络协议 1.1 网络通信模型概述 网络通信是信息时代基石&#xff0c;它如同现实世界中的邮递系统&#xff0c;将数据从一处传递到另一处。其中&#xff0c;OSI七层模型与TCP/IP四层或五层模型是理解和构建网络通信的基础。 1.1.1 OSI七层模型与TCP/IP四层/…

Redis - 哨兵(Sentinel)

Redis 的主从复制模式下&#xff0c;⼀旦主节点由于故障不能提供服务&#xff0c;需要⼈⼯进⾏主从切换&#xff0c;同时⼤量 的客⼾端需要被通知切换到新的主节点上&#xff0c;对于上了⼀定规模的应⽤来说&#xff0c;这种⽅案是⽆法接受的&#xff0c; 于是Redis从2.8开始提…

UE5 metahuman 头发物理模拟

https://www.youtube.com/watch?vyYmdgtP6cQA 头发梳理物理 打开Unreal Engine&#xff0c;选择一个角色模型 点击“Groom”选项卡&#xff0c;在“Physics”部分下&#xff0c;找到“Custom Solver”&#xff0c;点击下拉菜单&#xff0c;选择“WindDrivenSystem”。 在“…

大数据 ETL + Flume 数据清洗 — 详细教程及实例(附常见问题及解决方案)

大数据 ETL Flume 数据清洗 — 详细教程及实例 1. ETL 和 Flume 概述1.1 ETL&#xff08;Extract, Transform, Load&#xff09;1.2 Flume 概述 2. Flume 环境搭建2.1 下载并安装 Flume2.2 启动 Flume 3. Flume 配置和常见 Source、Sink、Channel3.1 Flume Source3.2 Flume Si…

24年配置CUDA12.4,Pytorch2.5.1,CUDAnn9.5运行环境

没什么好介绍的&#xff0c;直接说了。 下载 首先打开命令行&#xff0c;输入代码查看显卡最高支持的cuda版本&#xff0c;下载的版本不要高于该版本 nvidia-smi PyTorch 插件这个是PyTorch下载地址&#xff0c;就按照我这么选CUDA版本就选最新的&#xff0c;看好绿框里的CU…

架构师备考-概念背诵(系统架构)

软件架构概念 一个程序和计算系统软件体系结构是指系统的一个或者多个结构。结构中包括软件的构件,构件的外部可见属性以及它们之间的相互关系。体系结构并非可运行软件。确切地说,它是一种表达,使软件工程师能够: (1)分析设计在满足所规定的需求方面的有效性:(2)在设计变…

Linux服务器软件包管理的使用

在 Linux 系统中&#xff0c;软件包管理器是用于安装、升级、删除和管理软件包的工具。不同的 Linux 发行版使用不同的软件包管理器&#xff0c;通常根据使用的包格式和包管理系统&#xff08;如 .deb 或 .rpm&#xff09;来区分。下面将介绍几种常见的 Linux 软件包管理器及其…

debian系统安装qt的时候 显示xcb相关文件缺失

如果是安装之后的问题 我们可以选择使用ldd的命令查看当前依赖的so那些文件确实 ldd /home/yinsir/Qt/5.15.2/gcc_64/plugins/platforms/libqxcb.so 本人在进行打包的时候 出现则会个报错 ERROR: ldd outputLine: “libxcb-util.so.1 > not found” ERROR: for binary: “/…

esp32s3连接控制HC-08蓝牙设备

趁双十一买了一块esp32S3单片,尝试之后发现他的蓝牙只支持ble,我的机器人以前是使用手机控制的,我想借此机会,为他添加一个esp32S3的大脑。 查了一下资料,写了一个demo,记录一下代码: #include "BLEDevice.h" //#include "BLEScan.h" //hc-08的ble…

WordPress文章自动提交Bing搜索引擎:PHP推送脚本教程

随着网站SEO优化的重要性日益增加,将新发布的内容快速提交到搜索引擎显得尤为重要。尤其对于Bing站长平台,自动化推送能让Bing尽快发现和索引我们网站的新内容。本文将详细介绍如何通过PHP脚本自动推送WordPress当天发布的文章至Bing站长平台,确保新文章被Bing及时收录。 前…

指令重排序:Java程序中的隐秘优化

什么是重排序&#xff1f; 在编写Java程序时&#xff0c;我们通常会期望代码的执行顺序与编写顺序一致。然而&#xff0c;为了优化性能&#xff0c;编译器、JVM或CPU可能会对指令的实际执行顺序进行调整&#xff0c;这种现象被称为重排序。重排序是现代计算机系统中常见的优化…

开源大模型推理引擎现状及常见推理优化方法总结

原文&#xff1a;https://zhuanlan.zhihu.com/p/755874470 前言 前一段时间sglang-v0.3.0和vllm-v0.6.0前后脚发布之后&#xff0c;就一直想总结梳理一下现在主流的大模型推理引擎。因为我觉得这也算是一个有意义的节点吧&#xff0c;从此开源大模型推理引擎总算是由"非…

【信号处理】绘制IQ信号时域图、星座图、功率谱

时域图 # 导入相关的库 import pickle import matplotlib.pyplot as plt import numpy as np from pathlib import Path import oswith open(r"C:\0-数据集\公开\RML2016\RML2016.10a_dict.pkl", rb) as file:Xd pickle.load(file, encodingbytes) snrs, mods map…

第 1 章 - Go语言简介

第 1 章 - Go语言简介 1.1 什么是Go语言 Go语言&#xff0c;又称 Golang&#xff0c;是一种静态类型的编译型语言&#xff0c;由 Google 公司的 Robert Griesemer、Rob Pike 和 Ken Thompson 于 2007 年开始设计&#xff0c;并在 2009 年正式对外发布。Go 语言的设计目标是提…

C++优选算法十二 栈

在C中&#xff0c;stack 是一种标准模板库&#xff08;STL&#xff09;容器适配器&#xff0c;它提供了后进先出&#xff08;LIFO, Last In First Out&#xff09;的数据结构。stack 适配器基于其他底层容器&#xff08;如 deque 或 vector&#xff09;来实现&#xff0c;但只提…

找工作就上万码优才,海量技术岗位等你来

已至岁末&#xff0c;不论你将实习&#xff0c;或正在求职&#xff0c;求职平台千千万万&#xff0c;但简历如落叶般无人问津。 是否因未找到理想职位而心生焦虑&#xff1f;别急&#xff0c;万码优才在这里&#xff0c;为你点亮职业之路的明灯&#xff01; 今天给大家推荐一…

⭐SmartControl: Enhancing ControlNet for Handling Rough Visual Conditions

目录 0 Abstract 1 Motivation 2 Related Work 2.1 Text-to-Image Diffusion Model 2.2 Controllable Text-to-Image Generation 2.3 ControlNet 2.4 Control Scale Exploration 3 Method 3.1 Framework 3.2 Control Scale Predictor 3.3 Unaligned Data Constructi…

Vue数据响应式原理

前言 Vue是一个结构的框架,也就是 数据层、视图层、数据-视图层&#xff1b;响应式的原理就是实现当数据更新时&#xff0c;视图层也要相应的更新 响应式实现 基于发布订阅模式和数据劫持实现 1.发布订阅模式&#xff1a;vue使用发布订阅模式来实现数据变动的通知和更新 2…

python函数小练习(三)

main.py import testwhile True:test.kdc_menu()ch int(input("请选择>>"))match ch:case 1:test.show_menu()case 2:test.sale_menu()case 3:test.money_menu()case 4:test.mess_menu()case -1:breakcase _:print("请重新输入")test.py menu {…