详解Kafka分区机制原理|Kafka 系列 二

Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“设为星标”。

点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达

上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Partition)。

kafka 将 一个Topic 中的消息分成多份,分别存储在不同的 Broker 里,这每一段消息被 kafka 称为分区,其中每条消息只会保存在一个分区中。

如果不太理解请回顾上一篇:

5b2ad62e786f5c79be317c3bd9772c58.jpeg

开始学习 Kafka,一文掌握基本概念|Kafka 系列 一

 

为什么有分区?

为什么要有分区呢?

Kafka 的分区机制的本质就是将一个大的 Topic 进行拆分,将一组很大的队列拆分成了多组队列。这样做有以下几个好处:

  1. 因为一个 Topic 中的消息可能非常多,多到一台Broker存不下,因此需要拆分成多段存储在不同的机器里,实现负载均衡。

  2. 拆分成多个队列,可以在多个生产者和消费者的情况下发挥多机性能,可以分流和并行处理消息,从而提高读写性能,提升系统的吞吐力。

  3. 有利于系统扩缩容,提高系统的可扩展性。不同分区在不同的broker上,可以通过增加新机器提高吞吐,并且增加新机器的时候可以通过调整分区的分布来调配负载。

0de38ad040a27e5ddabd8449fac6d55e.png

但是分区数不是越多越好,需要根据系统具体情况来设置。比如3个Broker就应该至少有3个分区,如果broker性能之间有差异,可以调大分区数进行调配。也可以通过broker的倍数来设置分区数,并且进行性能压测,测试集群的吞吐量。

分区数过多会带来资源管理上的消耗,清除日志时间变长,集群broker故障后分区leader重选时间变长,客户端消费端线程数需求增加,甚至导致连接所需的socket消耗增加。

分区策略

分区策略就是决定生产者将会把消息发送到具体哪个分区的算法,分区策略由 Partitioner 接口实现。

自定义分区策略

用于分区的 partition 方法定义如下:

/*** Compute the partition for the given record.** @param topic topic名 The topic name* @param key 用于分区的key The key to partition on (or null if no key)* @param keyBytes 用于分区的序列号key The serialized key to partition on( or null if no key)* @param value 用于分区的值 The value to partition on or null* @param valueBytes 用于分区的序列号值 The serialized value to partition on or null* @param cluster 当前集群元数据 The current cluster metadata*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

可以看出,这里提供了 Topic 和一些跟消息有关的key参数,cluster 是集群信息,包含Kafka 当前的Node 数据以及Topic、partition数据等。有了这些数据,具体拿到一条消息该发往哪个分区,我们就可以根据已有信息制定自己的分区策略。

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

我们实现了自定义的 Partition 类之后,就可以设置 partitioner.class 为目标策略类,Producer 就会按照我们的自定义策略来对消息进行分区。

默认分区策略

Kafka 提供了默认分区策略 DefaultPartitioner,策略内容如下:

  1. 如果在消息中指定了分区,优先使用指定的分区。

  2. 如果没有指定分区,但存在分区键,则根据序列化key使用murmur2哈希算法对分区数取模。

  3. 如果没有指定分区或分区键,则会使用粘性分区策略。(关于粘性分区策略后面讲解)

在实际生产中,我们一般都默认使用此策略,无需修改。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

注意,这里指的分区键是序列化后的key,也就是变量 keyBytes,其他key、value、valueBytes 并没用到。

byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
default byte[] serialize(String topic, Headers headers, T data) {// data 变量return serialize(topic, data);
}

看到 key 等序列化方法我们可以明白,key 的序列号值只受到 record.key() 的影响,所以同样的key会被固定分配到同样的partition中。(注意这里的key是指用于分区的key,而不是topic)

e16317cdfef524b9779fd048bbc2e591.png

粘性分区策略

实现类为 UniformStickyPartitioner ,他与默认分区策略的区别是:

  • DefaultPartitionerd 默认分区策略:如果有分区键的话,会按照分区键来决定分区,这个时候并不会使用粘性分区策略。

  • UniformStickyPartitioner粘性分区策略:无论有没有分区键,都用粘性分区来分配。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return stickyPartitionCache.partition(topic, cluster);
}

什么是粘性分区策略?

我们需要知道,在Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 然后多条消息批量发送。这样可以减少网络请求次数,提高消息的发送效率。

所以批量发送消息有两个条件:

  1. 一个batch满了,与 batch.size有关,一般大小是16k。

  2. linger.ms时间到了。

满足任意一个条件,都会触发sender线程的发送。如果生产的消息较少,batch没有满,就必须等到等待时间到了,这就导致了较长的延迟。

因为ProducerBatch是多个,为了让消息尽可能快的发送,就需要让其中一个ProducerBatch先变满。

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

注意:一个分区对应一个双端队列Deque<ProducerBatch>>

粘性分区策略就是在相同的分区中,优先填满一个ProducerBatch,发送,再去填充另一个ProducerBatch。参见下图,第一个分区会被优先塞满并发送。

fb4f66d75ce57e9231f4ea8ed6c56c88.png

在一个 ProducerBatch 发送结束,选择新分区的时候,是随机选择的,之后便会继续优先填满新的分区。

  • 可用分区<1 ,所有分区中随机选择。

  • 可用分区=1,选择这个分区。

  • 可用分区>1,所有可用分区中随机选择。

public int nextPartition(String topic, Cluster cluster, int prevPartition) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);Integer oldPart = indexCache.get(topic);Integer newPart = oldPart;// Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed.if (oldPart == null || oldPart == prevPartition) {List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() < 1) {Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = random % partitions.size();} else if (availablePartitions.size() == 1) {newPart = availablePartitions.get(0).partition();} else {while (newPart == null || newPart.equals(oldPart)) {int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = availablePartitions.get(random % availablePartitions.size()).partition();}}// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.if (oldPart == null) {indexCache.putIfAbsent(topic, newPart);} else {indexCache.replace(topic, prevPartition, newPart);}return indexCache.get(topic);}return indexCache.get(topic);}

轮询分区策略

Kafka 中提供了轮训策略的实现 RoundRobinPartitioner。当用户希望将写操作均匀地分发到所有分区时,可以使用此分区策略。

举例,有三个分区,针对于同一个producer,第一条消息发送到partition1,第二条消息发送到partition2,第三条发送到partition3,以此类推。

7372d4c65e8afd01c61335a18caddd6b.png
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 分区数int numPartitions = partitions.size();// 下一个自增值int nextValue = nextValue(topic);// 获取此主题的可用分区列表List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (!availablePartitions.isEmpty()) {// topic可用分区不为空,取余int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// 没有可用的分区,给出一个不可用的分区// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}
}

hash 键的值并不会影响到数据的分布,这应该是数据均匀度最好的策略,可以保证消息最大程度的平均分配到所有分区。

除了官方提供的策略,我们还可以实现自己的分区策略,比如随机策略,实现起来也很简单;比如按照业务键去分区的策略;比如按照ip分区的策略等。

最后,欢迎大家提问和交流。

加入讨论群是升职加薪第一步!

回复:加群

9cc714a9272b10f02503d017f1df6ee5.jpeg

点赞是一种美德,如对您有帮助,欢迎评论和分享,感谢阅读!

实战总结|记一次消息队列堆积的问题排查

2023-07-18

445cd86f0993eb1268b6ff21b1866d61.jpeg

从二叉查找树到B*树,一文搞懂搜索树的演进!|原创

2023-05-23

69792976b8a1929fd83b907b1d130d17.jpeg

CAP、BASE理论真的很重要!|分布式事务系列(一)

2023-05-06

7a8d17512f5d9cd2d3cde5fa8560fac0.jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/27076.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高翔《自动驾驶中的SLAM技术》代码详解 — 第6章 2D SLAM

目录 6.2 扫描匹配算法 6.2.1 点到点的扫描匹配 6.2 扫描匹配算法 6.2.1 点到点的扫描匹配 // src/ch6/test_2dlidar_io.cc // Created by xiang on 2022/3/15. // #include <gflags/gflags.h> #include <glog/logging.h> #include <opencv2/highgui.hpp>…

解释器模式(Interpreter)

解释器模式是一种行为设计模式&#xff0c;可以解释语言的语法或表达式。给定一个语言&#xff0c;定义它的文法的一种表示&#xff0c;然后定义一个解释器&#xff0c;使用该文法来解释语言中的句子。解释器模式提供了评估语言的语法或表达式的方式。 Interpreter is a behav…

行业追踪,2023-08-07

自动复盘 2023-08-07 凡所有相&#xff0c;皆是虚妄。若见诸相非相&#xff0c;即见如来。 k 线图是最好的老师&#xff0c;每天持续发布板块的rps排名&#xff0c;追踪板块&#xff0c;板块来开仓&#xff0c;板块去清仓&#xff0c;丢弃自以为是的想法&#xff0c;板块去留让…

ros tf

欢迎访问我的博客首页。 tf 1. tf 命令行工具1.1 发布 tf1.2 查看 tf 2.参考 1. tf 命令行工具 1.1 发布 tf 我们根据 cartographer_ros 的 launch 文件 backpack_2d.launch 写一个 tf.launch&#xff0c;并使用命令 roslaunch cartographer_ros tf.launch 启动。该 launch 文件…

【Renpy】设置选项不满足条件禁止选择

【要求】如果某个属性不满足某个要求&#xff0c;则无法选择这个选项。 【版本】Renpy 8.1.1 【实现】 1.在options.rpy文件中添加 define config.menu_include_disabled True 2.在选项中增加if条件。 menu:"Yes" if money > 20: ##如果money小于20这个选项…

3.01 用户在确认订单页收货地址操作

用户在确认订单页面&#xff0c;可以针对收货地址做如下操作&#xff1a; 1. 查询用户的所有收货地址列表 2. 新增收货地址 3. 删除收货地址 4. 修改收货地址 5. 设置默认地址步骤1&#xff1a;创建对应用户地址BO public class AddressBO {private String addressId;private…

高抗干扰LCD液晶屏驱动芯片,低功耗的特性适用于水电气表以及工控仪表类产品

VK2C23是一个点阵式存储映射的LCD驱动器&#xff0c;可支持最大224点&#xff08;56SEGx4COM&#xff09;或者最大416点&#xff08;52SEGx8COM&#xff09;的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据&#xff0c;也可通过指令进入省电模式。其高抗干扰&#xff…

zookeeper+kafka

目录 Kafka概述 一、为什么需要消息队列&#xff08;MQ&#xff09; 二、使用消息队列的好处 三、消息队列的两种模式 四、Kafka 定义 五、Kafka 简介 六、Kafka 的特性 七、Kafka 系统架构 分区的原因 八、部署kafka 集群 1.下载安装包 2.安装 Kafka 3.修改…

风控安全产品系统设计的一些思考

背景 本篇文章会从系统架构设计的角度&#xff0c;分享在对业务安全风控相关基础安全产品进行系统设计时遇到的问题难点及其解决方案。 内容包括三部分&#xff1a;&#xff08;1&#xff09;风控业务架构&#xff1b;&#xff08;2&#xff09;基础安全产品的职责&#xff1…

mysql8查看执行sql历史日志、慢sql历史日志,配置开启sql历史日志general_log、慢sql历史日志slow_query_log

0.本博客sql总结 -- 1.查看参数 -- 1.1.sql日志和慢sql日志输出方式(TABLE/FILE)。global参数 SHOW GLOBAL VARIABLES LIKE log_output; -- 1.2.sql日志开关。global参数 SHOW GLOBAL VARIABLES LIKE general_log%; -- 1.3.慢sql日志开关。global参数 SHOW GLOBAL VARIABLE…

AWS Amplify 部署node版本18报错修复

Amplify env&#xff1a;Amazon Linux:2 Build Error : Specified Node 18 but GLIBC_2.27 or GLIBC_2.28 not found on build 一、原因 报错原因是因为默认情况下&#xff0c;AWS Amplify 使用 Amazon Linux:2 作为其构建镜像&#xff0c;并自带 GLIBC 2.26。不过&#xff0c;…

UNIX 入门

与 UNIX 建立连接启动会话登录命令提示符修改口令退出系统 简单的 UNIX 命令命令格式ls 命令who 命令虚拟终端 tty伪终端 ptywho am i 命令 cal 命令help 命令man 命令 shell 概述shell 命令更换 shell临时更改 shell永久更改 shell 登录过程 与 UNIX 建立连接 启动会话 要启…

RabbitMQ 备份交换机和死信交换机

为处理生产者生产者将消息推送到交换机中&#xff0c;交换机按照消息中的路由键即自身策略无法将消息投递到指定队列中造成消息丢失的问题&#xff0c;可以使用备份交换机。 为处理在消息队列中到达TTL的过期消息&#xff0c;可采用死信交换机进行消息转存。 通过上述描述可知&…

P1049 [NOIP2001 普及组] 装箱问题(背包)(内附封面)

[NOIP2001 普及组] 装箱问题 题目描述 有一个箱子容量为 V V V&#xff0c;同时有 n n n 个物品&#xff0c;每个物品有一个体积。 现在从 n n n 个物品中&#xff0c;任取若干个装入箱内&#xff08;也可以不取&#xff09;&#xff0c;使箱子的剩余空间最小。输出这个最…

数据库作业(一)

建立一张表&#xff1a; 表里面有多个字段&#xff0c;每一个字段对应一种数据类型 注意&#xff1a;表名&#xff0c;字段名都要起的有意义 1、首先mysql -uroot -p 进入MySQL 2、选择一个数据库并使用 3、创建一张表定义多个字段使用所有数据类型&#xff0c;数字&…

2. 内存分区模型

一、内存分区模型 C程序在执行时&#xff0c;将内存大方向划分为4个区域 代码区&#xff1a;存放函数体的二进制代码&#xff0c;由操作系统进行管理的全局区&#xff1a;存放全局变量和静态变量以及常量栈区&#xff1a;由编译器自动分配释放&#xff0c;存放函数的参数值&a…

【SQL应知应会】索引(一)• MySQL版

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 索引 • MySQL版 前言一、索引1.简介1.1 索引的优点…

Dockerfile部署golang,docker-compose

使用go镜像打包&#xff0c;运行在容器内 redis和mysql用外部的 项目目录结构 w1go项目&#xff1a; Dockerfile # 这种方式是docker项目加上 本地的mysql和redis环境 # go打包的容器 FROM golang:alpine AS builder# 为我们镜像设置一些必要的环境变量 ENV GO111MODULEon …

音视频技术开发周刊 | 305

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 大神回归学界&#xff1a;何恺明宣布加入 MIT 「作为一位 FAIR 研究科学家&#xff0c;我将于 2024 年加入麻省理工学院&#xff08;MIT&#xff09;电气工程与计算机科学…

国联易安网页防篡改保护系统“渠道招募”启动啦!

作为业内专注于保密与非密领域的分级保护、等级保护、业务连续性安全和大数据安全的领军企业&#xff0c;国联易安网页防篡改保护系统基于“高效同步”、“安全传输”两项技术&#xff0c;具备了独特的“五重防护”新特性&#xff0c;支持网页的全自动发布、网页监控、报警和自…