Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、分区分配策略基础

二、Range 分区分配策略

(一)原理

(二)案例

(三)Range 分区分配再平衡案例

三、RoundRobin 分区分配策略

(一)原理

(二)案例

(三)RoundRobin 分区分配再平衡案例

四、Sticky 分区分配策略

(一)原理

(二)案例

(三)Sticky 分区分配再平衡案例

五、CooperativeSticky 分区分配策略

六、消费者事务

七、数据积压(消费者如何提高吞吐量)

八、总结


        在 Kafka 的消费任务处理中,分区的分配以及再平衡是至关重要的环节。合理的分区分配策略能够确保消费者高效地处理消息,而理解再平衡机制则有助于应对消费者组在运行过程中的动态变化。本文将深入探讨 Kafka 中不同的分区分配策略,包括 Range、RoundRobin、Sticky 和 CooperativeSticky,以及它们在各种场景下的再平衡表现,并结合实际案例进行详细分析,并对消费者事务和数据积压进行简单介绍。

一、分区分配策略基础

        在一个 Kafka 消费者组中,包含多个消费者,而一个主题则由多个分区组成。关键问题在于确定哪个消费者来消费哪个分区的数据。Kafka 提供了四种主流的分区分配策略,并且可以通过配置参数 partition.assignment.strategy 来修改分区的分配策略,默认策略是 Range + CooperativeSticky。同时,还有一些相关的重要参数:

参数名称

描述

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡

partition.assignment.strategy

消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

二、Range 分区分配策略

(一)原理

        Range 分区分配策略是基于主题的分区数量和消费者数量进行分配。它会按照顺序将连续的分区分配给每个消费者,尽可能平均地分配分区,但可能会导致不同消费者分配到的分区数量不一致。

(二)案例

首先,将主题 first 修改为 7 个分区:

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意,分区数可增加但不能减少,主题的副本数修改需要制定计划执行,不能直接修改。


         由三个消费者 CustomConsumerCustomConsumer1CustomConsumer2 组成消费者组,组名都为 “test”,同时启动这 3 个消费者。


        启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区(修改发送次数为 500 次)。

package com.bigdata.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.235.128:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 500; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first","bigdata " + i), new Callback() {// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(20);}// 5. 关闭资源kafkaProducer.close();}
}

说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

默认是Range,但是在经过一次升级之后,会自动变为CooperativeSticky。这个是官方给出的解释。

默认的分配器是[RangeAssignor, CooperativeStickyAssignor],默认情况下将使用RangeAssignor,但允许通过一次滚动反弹升级到CooperativeStickyAssignor,该滚动反弹会将RangeAssignor从列表中删除。


        观察消费情况,发现一个消费者消费了 5,6 分区,一个消费了 0,1,2 分区,一个消费了 3,4 分区。这是按照 Range 策略分配的结果。

此时并没有修改分区策略,原因是默认是Range.

(三)Range 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),此时 1 号消费者消费到 3、4 号分区数据,2 号消费者消费到 5、6 号分区数据,0 号的数据无人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 0、1、2、3 号分区数据,2 号消费者消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

 

三、RoundRobin 分区分配策略

(一)原理

        RoundRobin 分区分配策略以轮询的方式将分区分配给消费者,确保每个消费者尽可能均衡地获取分区,不考虑主题的因素,只要是消费者组内的分区都会按照轮询顺序分配。

(二)案例

        在 CustomConsumerCustomConsumer1CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin(指定 org.apache.kafka.clients.consumer.RoundRobinAssignor),并修改消费者组为 test2

package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerWithFenPei {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");// 指定分区的分配方案properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者订阅主题,主题有数据就会拉取数据// 指定消费的主题ArrayList<String> topics = new ArrayList<>();topics.add("first");// 一个消费者可以订阅多个主题kafkaConsumer.subscribe(topics);while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}
}修改一下消费者组为test2

 

重启 3 个消费者,重复发送消息步骤并观察分区结果。

 

 

(三)RoundRobin 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),1 号消费者消费到 2、5 号分区数据,2 号消费者消费到 4、1 号分区数据,0 号消费者以前对应的数据无人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 0、2、4、6 号分区数据,2 号消费者消费到 1、3、5 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

四、Sticky 分区分配策略

(一)原理

        粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

(二)案例

1)需求

设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察

消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

2)步骤

(1)修改分区分配策略为粘性。

注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等

会再重启,或者修改为全新的消费者组。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

(3)使用同样的生产者发送 500 条消息。

可以看到会尽量保持分区的个数近似划分分区。

(三)Sticky 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),1 号消费者消费到 2、5、3 号分区数据,2 号消费者消费到 4、6 号分区数据,0 号消费者的任务无人顶替。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 2、3、5 号分区数据,2 号消费者消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

五、CooperativeSticky 分区分配策略

        CooperativeSticky 是新添加的策略。在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,并且在运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。其好处是实现负载均衡,但多次平衡会浪费性能,它采用动态平衡,在消费过程中实施再平衡,而不是等到某个消费者退出再平衡。

六、消费者事务

        若要实现 Kafka 消费端的精准一次性消费,需要将消费过程和提交 offset 过程做原子绑定。此时可将 Kafka 的 offset 保存到支持事务的自定义介质(如 MySQL),这部分知识将在后续项目中深入涉及,事务具有 ACID 四大特征,例如转账场景(张三 --> 李四)就需要事务的保障来确保数据的准确性和完整性。

七、数据积压(消费者如何提高吞吐量)

        当面临数据积压问题时,消费者可以采取多种方式提高吞吐量,例如增加消费者数量、优化消费者代码处理逻辑、调整相关参数(如 max.poll.interval.ms 等)以适应更高的处理负载等。后续将深入探讨数据积压场景下的优化策略。

八、总结

        通过对 Kafka 分区分配以及再平衡策略的深入理解和实践,可以更好地构建和优化 Kafka 消费任务处理流程,确保系统的高效稳定运行。在实际应用中,需要根据具体的业务需求和场景特点选择合适的分区分配策略,并合理处理再平衡过程中的各种情况。

        消费者事务方面,为实现精准一次性消费,需将消费与提交 offset 原子绑定,可将 offset 存于支持事务的自定义介质如 MySQL 中。在数据积压场景下,消费者可通过增加数量、优化代码处理逻辑、调整参数等方式提高吞吐量,后续会深入探讨相关优化策略。这些知识对于深入理解和优化 Kafka 消费者的性能、可靠性和数据处理准确性具有极为重要的意义,有助于在实际应用中更好地构建和管理基于 Kafka 的系统架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60571.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【2024APMCM亚太赛A题】完整参考论文与代码分享

A题 一、问题重述二、问题分析问题一&#xff1a;水下图像分类问题二&#xff1a;退化原因建模问题三&#xff1a;针对单一退化的图像增强方法问题四&#xff1a;复杂场景的综合增强模型问题五&#xff1a;针对性增强与综合增强的比较 三、问题假设退化特征独立性假设物理模型普…

数据结构(初阶6)---二叉树(遍历——递归的艺术)(详解)

二叉树的遍历与练习 一.二叉树的基本遍历形式1.前序遍历(深度优先遍历)2.中序遍历(深度优先遍历)3.后序遍历(深度优先遍历)4.层序遍历&#xff01;&#xff01;(广度优先遍历) 二.二叉树的leetcode小练习1.判断平衡二叉树1&#xff09;正常解法2&#xff09;优化解法 2.对称二叉…

ChatGPT 与其他 AI 技术在短视频营销中的技术应用与协同策略

摘要&#xff1a; 本文深入探讨了 ChatGPT 及其他 AI 技术在短视频营销中的应用。从技术层面剖析了这些技术如何助力短视频内容创作、个性化推荐、用户互动以及营销效果评估等多方面&#xff0c;通过具体方法分析、数据引用与大模型工具介绍&#xff0c;旨在为短视频营销领域提…

先安装Ubuntu20.04,再安装win10实现双系统

准备 一个刻录好Ubuntu20.04系统u盘一个刻录了Ventory的U盘&#xff0c;其中有Windows10的iso系统文件。Ventory参考Gparted分区软件&#xff0c;用于腾出一块硬盘空间安装Win10 过程 给win10腾出一块硬盘空间&#xff0c;设置为NTFS格式 Ubuntu系统中其实已经有GParted软件…

数据结构------树(Java语言描述)

一、树的基本概念 树是一种非线性的数据结构&#xff0c;它由节点组成&#xff0c;有一个特定的节点称为根节点&#xff0c;其余节点可以分为多个互不相交的子树。 树中的节点具有以下特点&#xff1a; 1.每个节点有零个或多个子节点。 2.除了根节点外&#xff0c;每个节点…

查看浏览器的请求头

爬虫时用到了请求头&#xff0c;虽然可以用网上公开的&#xff0c;但是还是想了解一下本机浏览器的。以 Edge 为例&#xff0c;其余浏览器通用。 打开浏览器任一网页&#xff0c;按F12打开DevTools&#xff1b;或鼠标右键&#xff0c;选择“检查”。首次打开界面应该显示在网页…

如何在Python中进行数学建模?

数学建模是数据科学中使用的强大工具&#xff0c;通过数学方程和算法来表示真实世界的系统和现象。Python拥有丰富的库生态系统&#xff0c;为开发和实现数学模型提供了一个很好的平台。本文将指导您完成Python中的数学建模过程&#xff0c;重点关注数据科学中的应用。 数学建…

前后端分离,解决vue+axios跨域和proxyTable不生效等问题

看到我这篇文章前可能你以前看过很多类似的文章。至少我是这样的&#xff0c;因为一直没有很好的解决问题。 正文 当我们通过webstorm等IDE开发工具启动项目的时候&#xff0c;通过命令控制台可以观察到启动项目的命令 如下&#xff1a; webpack-dev-server --inline --prog…

ES6 、ESNext 规范、编译工具babel

ES6 、ESNext 规范、编译工具简介 ES6ES&#xff08;ECMAScript&#xff09; vs JS常量进一步探讨 obj对象的扩展面试&#xff1a;使对象属性也不能更改——Object.freeze(obj) 解构deconstruction变量的解构赋值&#xff1a;数组解构赋值&#xff1a;对象解构赋值&#xff1a;…

阿里数字人工作 Emote Portrait Alive (EMO):基于 Diffusion 直接生成视频的数字人方案

TL;DR 2024 年 ECCV 阿里智能计算研究所的数字人工作&#xff0c;基于 diffusion 方法来直接的从音频到视频合成数字人&#xff0c;避免了中间的三维模型或面部 landmark 的需求&#xff0c;效果很好。 Paper name EMO: Emote Portrait Alive - Generating Expressive Portra…

candence: 如何快速设置SUBCLASS 的颜色

如何快速设置SUBCLASS 的颜色 一、一般操作 正常情况下修改SUBCLASS&#xff0c;需要如下步骤进行设置&#xff1a; 二、快速操作 右键&#xff0c;选择一个颜色即可

多目标优化算法:多目标海星优化算法(MOSFOA)求解ZDT1、ZDT2、ZDT3、ZDT4、ZDT6,提供完整MATLAB代码

一、海星优化算法 海星优化算法&#xff08;Starfish Optimization Algorithm &#xff0c;SFOA&#xff09;是2024年提出的一种元启发式算法&#xff0c;该算法模拟了海星的行为&#xff0c;包括探索、捕食和再生。 算法灵感&#xff1a; SFOA的灵感来源于海星的捕食行为&…

实时质检-静音检测分析流程(运维人员使用)

前言 用户在实时质检时&#xff0c;开启了主叫或被叫静音检测功能&#xff0c;但是听录音时&#xff0c;主叫或被叫明明没有任何声音&#xff0c;但是通话没有被挂断。 说明主叫或被叫的静音阈值太低&#xff0c;导致系统没有把很小的声音认定为静音&#xff1b;或者检测非静音…

了解Redis(第一篇)

目录 Redis基础 什么事Redis Redis为什么这么快 除了 Redis&#xff0c;你还知道其他分布式缓存方案吗? 说-下 Redis 和 Memcached 的区别和共同点 为什么要用Redis? 什么是 Redis Module?有什么用? Redis基础 什么事Redis Redis &#xff08;REmote DIctionary S…

D77【 python 接口自动化学习】- python基础之HTTP

day77 postman接口请求 学习日期&#xff1a;20241123 学习目标&#xff1a;http 定义及实战&#xfe63;&#xfe63;postman接口请求 学习笔记&#xff1a; get请求 post请求 总结 get请求用于查询数据post请求用于添加数据

Element-Ui组件(icon组件)

一、前言 本篇文章主要是对官网的Icon组件进行总结归纳Icon 图标 | Element Plus 在现代Web应用开发中&#xff0c;图标是用户界面设计中不可或缺的一部分。它们不仅提升了用户体验&#xff0c;还使得信息的传达更加直观和高效。本文主要对Element Plus 官方提供的Icon组件进行…

SpringMVC——简介及入门

SpringMVC简介 看到SpringMVC这个名字&#xff0c;我们会发现其中包含Spring&#xff0c;那么SpringMVC和Spring之间有怎样的关系呢&#xff1f; SpringMVC隶属于Spring&#xff0c;是Spring技术中的一部分。 那么SpringMVC是用来做什么的呢&#xff1f; 回想web阶段&#x…

应急响应靶机——linux2

载入虚拟机&#xff0c;打开虚拟机&#xff1a; 居然是没有图形化界面的那种linux&#xff0c;账户密码&#xff1a;root/Inch957821.&#xff08;注意是大写的i还有英文字符的.&#xff09; 查看虚拟机IP&#xff0c;192.168.230.10是NAT模式下自动分配的 看起来不是特别舒服&…

《Python 股票交易分析:开启智能投资新时代》(二)

Python 进行股票交易分析的优势 简洁易读&#xff1a;Python 的语法简洁明了&#xff0c;即使是编程新手也能较快上手&#xff0c;降低了股票交易分析的门槛。 Python 的简洁易读是其在股票交易分析中受欢迎的重要原因之一。Python 的语法简洁明了&#xff0c;与其他编程语言相…

ECharts柱状图-带圆角的堆积柱状图,附视频讲解与代码下载

引言&#xff1a; 在数据可视化的世界里&#xff0c;ECharts凭借其丰富的图表类型和强大的配置能力&#xff0c;成为了众多开发者的首选。今天&#xff0c;我将带大家一起实现一个柱状图图表&#xff0c;通过该图表我们可以直观地展示和分析数据。此外&#xff0c;我还将提供…