Kafka Producer发送消息流程之分区器和数据收集器

文章目录

  • 1. Partitioner分区器
  • 2. 自定义分区器
  • 3. RecordAccumulator数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数if (record.partition() != null)return record.partition();// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据if (partitioner != null) {int customPartition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if (customPartition < 0) {throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));}return customPartition;}// 如果没有分区器的情况if (serializedKey != null && !partitionerIgnoreKeys) {// hash the keyBytes to choose a partitionreturn BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());} else {return RecordMetadata.UNKNOWN_PARTITION;}}// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {@Overridepublic int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {// Ensure the key is a non-null stringif (key == null || !(key instanceof String)) {throw new IllegalArgumentException("Key must be a non-null String");}// Parse the key as an integerint keyInt;try {keyInt = Integer.parseInt((String) key);} catch (NumberFormatException e) {throw new IllegalArgumentException("Key must be a numeric string", e);}// Determine the partition based on the key's odd/even natureif (keyInt % 2 == 0) {return 1; // Even keys go to partition 2} else {return 0; // Odd keys go to partition 0}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定拦截器config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());//指定分区器config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test1","key"+i,"我是你爹"+i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47140.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

书生浦语-大模型平台学习-环境搭建01

任务&#xff1a;完成SSH连接与端口映射并运行hello_world.py 详细步骤详见&#xff1a;https://github.com/InternLM/Tutorial/blob/camp3/docs/L0/Linux/readme.md 1、InternStudio介绍 InternStudio 是大模型时代下的云端算力平台。基于 InternLM 组织下的诸多算法库支持…

CentOS快速安装Docker(腾讯镜像源)

这里是引用"> 1、卸载旧版本的 Docker yum list installed | grep docker yum -y remove docker-ce-cli.x86_64 yum -y remove docker-ce.x86_64 yum -y remove containerd.io2、安装相关依赖 yum install -y yum-utils device-mapper-persistent-data lvm23、添加 …

嵌入式人工智能(9-基于树莓派4B的PWM-LED呼吸灯)

1、PWM简介 (1)、什么是PWM 脉冲宽度调制(PWM)&#xff0c;是英文“Pulse Width Modulation”的缩写&#xff0c;简称脉宽调制&#xff0c;是在具有惯性的系统中利用微处理器的数字输出来对模拟电路进行控制的一种非常有效的技术&#xff0c;广泛应用在从测量、通信到功率控制…

学习大数据DAY17 PLSQL基础语法6和Git的基本操作

目录 包 存储过程调试功能 作业 阶段复习作业 Git课程目录 什么是版本控制 没有版本控制的缺点 常见的版本工具 版本控制分类 1. 本地版本控制 2. 集中版本控制 3. 分布式版本控制 Git与SVN主要区别 Git软件安装及配置 Windows系统安装Git 安装Tortoise Git(乌龟…

降Compose十八掌之『震惊百里』| Animations

公众号「稀有猿诉」 原文链接 降Compose十八掌之『震惊百里』| Animations 动画对于UI来说无疑是最重要的核心功能&#xff0c;它能够让UI变得生动有吸引力。适当的使用动画可以提升UI的流畅性&#xff0c;让UI体验更为顺滑。在Jetpack Compose中有丰富的函数可以用来实…

六西格玛设计:以客户为中心,驱动企业持续创新

在当今竞争激烈的市场环境中&#xff0c;企业要想脱颖而出&#xff0c;就必须在产品质量、服务效率和客户满意度上不断追求卓越。六西格玛设计&#xff08;Six Sigma Design&#xff09;作为一种高度规范化的管理方法&#xff0c;正逐步成为众多企业实现这一目标的重要工具。张…

NSSCTF中24网安培训day2中web题目【下】

[NISACTF 2022]easyssrf 这道题目考察的是php伪协议的知识点 首先利用file协议进行flag查找 file:///flag.php 接着我们用file协议继续查找fl4g file:///fl4g 接着我们访问此文件&#xff0c;得到php代码如下 这里存在着stristr的函数&#x…

Linux中的环境变量

一、基本概念 环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数。 如&#xff1a;我们在编写C/C代码的时候&#xff0c;在链接的时候&#xff0c;从来不知道我们的所链接的动态静态库在哪里&#xff0c;但是照样可以链接成功&#xff…

Cesium能做啥,加载哪些数据源,开源免费用商用吗?这里告诉你。

很多小伙伴对Cesium是什么&#xff0c;一知半解&#xff0c;本文是基础知识的扫盲&#xff0c;为大家分享cesium是什么、能做什么、默认数据是什么&#xff0c;为什么首先要进行数据加载&#xff0c;要加载哪些数据&#xff0c;希望通过这些带你入个门&#xff0c;欢迎点赞评论…

vue仿甘特图开发工程施工进度表

前言 本文是根据项目实际开发中一个需求开发的demo&#xff0c;仅用了elementUI&#xff0c;可当作独立组件使用&#xff0c;C V即用。 当然没考虑其他的扩展性和一些数据的校验&#xff0c;主要是提供一个处理思路&#xff0c;有需要的小伙伴可以直接复制&#xff1b;本demo的…

高职院校人工智能人才培养成果导向系统构建、实施要点与评量方法

一、引言 近年来&#xff0c;人工智能技术在全球范围内迅速发展&#xff0c;对各行各业产生了深远的影响。高职院校作为培养高技能人才的重要基地&#xff0c;肩负着培养人工智能领域专业人才的重任。为了适应社会对人工智能人才的需求&#xff0c;高职院校需要构建一套科学、…

【node-RED 4.0.2】连接 Oracle 数据库踩坑解决,使用模组:node-red-contrib-agur-connector

关于 Oracle Oracle 就好像一张吸满水的面巾纸&#xff0c;你稍一用力它就烂了。 PS&#xff1a;我更新了更好的模组的教程&#xff0c;这篇已经是旧款的教程&#xff0c;但是它仍旧包含了必要的配置环境变量等操作。 最新的模组教程&#xff1a;node-red-contrib-agur-connec…

AI时代:探索个人潜能的新视角

文章目录 Al时代的个人发展1 AI的高速发展意味着什么1.1 生产力大幅提升1.2 生产关系的改变1.3 产品范式1.4 产业革命1.5 Al的局限性1.5.1局限一:大模型的幻觉1.5.2 局限二&#xff1a;Token 2 个体如何应对这种改变?2.1 职场人2.2 K12家长2.3 大学生2.4 创业者 3 人工智能发展…

解决vue3中el-input在form表单按下回车刷新页面

问题&#xff1a;在input框中点击回车之后不是调用我写的回车事件&#xff0c;而是刷新页面 原因&#xff1a; 如果表单中只有一个input 框则按下回车会直接关闭表单 所以导致刷新页面 解决方法 &#xff1a; 再写一个input 表单 &#xff0c;并设置style"display:none&…

SimMIM:一个类BERT的计算机视觉的预训练框架

1、前言 呃…好久没有写博客了&#xff0c;主要是最近时间比较少。今天来做一期视频博客的内容。本文主要讲SimMIM&#xff0c;它是一个将计算机视觉&#xff08;图像&#xff09;进行自监督训练的框架。 原论文&#xff1a;SimMIM&#xff1a;用于掩码图像建模的简单框架 (a…

解决虚拟机与主机ping不通,解决主机没有vmware网络

由于注册表文件缺失导致&#xff0c;使用这个工具 下载cclean 白嫖就行 https://www.ccleaner.com/ 是 点击修复就可以了

防火墙双机热备带宽管理综合实验

一、实验拓扑 二、实验要求 12&#xff0c;对现有网络进行改造升级&#xff0c;将当个防火墙组网改成双机热备的组网形式&#xff0c;做负载分担模式&#xff0c;游客区和DMZ区走FW3&#xff0c;生产区和办公区的流量走FW1 13&#xff0c;办公区上网用户限制流量不超过100M&am…

技术速递|Let’s Learn .NET Aspire – 开始您的云原生之旅!

作者&#xff1a;James Montemagno 排版&#xff1a;Alan Wang Let’s Learn .NET 是我们全球性的直播学习活动。在过去 3 年里&#xff0c;来自世界各地的开发人员与团队成员一起学习最新的 .NET 技术&#xff0c;并参加现场研讨会学习如何使用它&#xff01;最重要的是&#…

Java IO中的 InputStreamReader 和 OutputStreamWriter

Java IO 的流&#xff0c;有三个分类的维度&#xff1a; 输入流 or 输出流节点流 or 处理流字节流 or 字符流 在Java IO库中&#xff0c;InputStreamReader和OutputStreamWriter是两个非常重要的类&#xff0c;它们作为字符流和字节流之间的桥梁。 这两个类使得开发者可以方…

整数或小数点后补0操作

效果展示&#xff1a; 整数情况&#xff1a; 小数情况&#xff1a; 小编这里是以微信小程序举例&#xff0c;代码通用可兼容vue等。 1.在utils文件下创建工具util.js文本 util.js页面&#xff1a; // 格式…