Kafka第四篇——生产数据总体概括,源码解析分区策略,数据收集器,Sender发送线程,key值

目录

流程图以及总体概述

拦截器

分区器以及分区计算策略

为啥进行分区计算?

producer生产者怎么知道有哪些分区?

分区计算

如何自定义实现分区器?

想说的在图里啦!宝宝!💡 ​编辑

如果key值忘记传递了呢!?

数据校验

数据收集器

注意

Sender发送线程


流程图以及总体概述

producer进行发送record,record对象包含topic,key,value,partition,时间戳,通过拦截器,将数据信息发送给broker,但是咱们也不知道把数据信息发送给哪个broker,而我们的Metadata就可以获取出来这个,如下面代码就是获取到9092.获取到缓存,放在底层。然后经过key对象的序列化,value对象的序列化,对应在代码中就是,configMap.put()那两行,并且这个是必须写的。然后经过分区器,partition,每个数据需要发送到broker中,每个消息发送到特定的主题,主题分为多个分区。kafka在发送数据时候,可以将数据发送到指定主题的指定分区,kafka会自动决定将消息发送到那个分区。分区器有那种判断发送给那个broker。然后进行数据校验。在数据收集器当中,相当于一个缓冲池,将同一个主题的数据可以存放在一个队列中,按“批”为单位进行发送,提高效率,并且指定了每批的大小是16K,

数据已经缓存到数据收集器后,就可以进行发送数据喽!此时就不会按topic为单位进行发送了,就可以重新整合,以节点为主!(why??因为不同的topic可以发送给同一个节点呀傻瓜!也就是说,在缓冲区以topic为单位,在发送线程中以节点为单位)封装请求,然后放在缓冲区中。再由网络通信从缓冲区中取出,发送给socket。在缓冲区,需要注意概念,在途请求缓冲区为5,表示同一个节点同一时间处理的请求数量。

拦截器

数据的规范化处理。可以有多个,可以按顺序执行数据的被拦截。和框架那块的一样。

onsend方法就是主要进行执行拦截规则的,for(ProducerInterceptor<K,V> interceptor:this.intercept)就可以循环执行多个拦截器,并且,看try,catch内容,无论当前拦截器发生什么异常,都不会影响到下一个拦截器的执行,更不会影响整个数据的发送。

自定义实现拦截器,帮助自己更好地了解拦截器。

java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class ValueInterceptorTest implements ProducerInterceptor<String, String> {/*** 实现拦截器规则**/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {}/*** 当记录被Broker确认接收时调用** */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 这个方法在记录被Broker确认接收时被调用// 根据确认情况实现自定义的处理逻辑}/*** 关闭拦截器时调用*/@Overridepublic void close() {}/*** 配置拦截器时调用**configs 配置信息*/@Overridepublic void configure(Map<String, ?> configs) {}
}

分区器以及分区计算策略

为啥进行分区计算?

 数据发送给某个主题,主题会有很多分区,会在不同的broker当中,所以要算分区编号,不然连数据要发送给主题哪个节点都不知道。但是分区标号也得有范围呀!

producer生产者怎么知道有哪些分区?

从元数据缓存中获取到producer需要的主题相关信息

意味着只要元数据信息缓存了,主题的相关信息我们就可以拿到。

 分区器通过Matadata获取到分区,副本id,leadid之类的,

分区计算

¹²³⁴ 如果参数中指定了分区编号就直接返回

如何自定义实现分区器?

1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;public class CustomPartitioner implements Partitioner {/*** 配置分区器** @param configs 配置信息*/@Overridepublic void configure(Map<String, ?> configs) {}/*** 计算分区** @param topic       主题名称* @param key         消息键,可以为null* @param keyBytes    消息键的字节数组表示,可以为null* @param value       消息值* @param valueBytes  消息值的字节数组表示* @param cluster     Kafka集群信息* @return 分配的分区ID*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 如果键为null,则使用轮询分区策略if (keyBytes == null) {return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;}// 使用键的hashCode来计算分区return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}/*** 关闭分区器*/@Overridepublic void close() {// 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作}
}

想说的在图里啦!宝宝!💡 

嘿嘿,这里解决了之前的问题,key并不像之前学到的hashmap中消费者用来消费的key,它的核心作用就是用来进行分区计算

这个点就可以从:没有指定特定的分区标号,并且分区标号没有超过范围!序列化key以及分区器不忽略key的情况下看出来。partitionForKey()方法中就用不加密的hash算法并且对分区数量进行取余处理计算。

如果key值忘记传递了呢!?

return RecordMetadata.UNKNOWN_PARTITION;(这是一个表示未知分区的常量)。表明当前生产者无法确定消息发送到哪个分区,可能需要进一步处理或记录错误信息。那感觉也不太对啊,不知道把key发送到哪一个分区!

其实他是在数据收集器那一步追加了,看这个accumulator.append方法!

点进去哦!分区标号计算:粘性分区策略

如果没有进行传递key参数,也就是当前分区是未知分区,就会根据当前主题的分区负载情况来动态获取分区标号。这就是一种优化后的粘性分区策略!如图1.1

🤔图1.1  当前分区是未知分区,就会根据当前主题的分区负载因子来动态获取分区标号。

会根据当前分区负载情况判断去那个分区!如图1.2

✅当分区负载情况为空,就动态去随机选择分区,然后就尽可能的给这个分区追加数据(粘性分区策略),并且也不能超过数值batch.size=16K。如果超过这个阈值就会切换到下一个分区。并且更新分区负载情况。

✅当前主题分区负载情况不为空,那就不用随机生成了。会根据分区负载使用频率随机生成一个随机权重,然后利用二分查找算法找与权重相近的值,根据这个值获取到相应的分区,就可以得到我们的分区标号啦!

图1.2

数据校验

当数据校验成功,数据就到达了数据收集器当中。数据收集器,生产的数据作为一个临时的存储。

数据收集器

 

如果直接生产一条数据就通过网络通信来发送,这样做效率很低哦!像javaio流读取文件一样,读一个字节写一个字节,性能很低呀!

所以就有了ProducerBatch双端队列,从很减少频繁的网络交互,提高传输效率!

在神魔时候真正进行网络交互呢??

嘿嘿,看最大范围,batch.size=16K。在前面分区计算中,有一个粘性分区策略(一旦确定了一个分区,就尽可能往这个分区中追加数据,追加数据就是往producebatch中追加数据,当到达16K,就会被sender检测到),里面就有“没有传递key,如果没有分区负载情况,就会随机生成分区,不能超过最大

注意

🤔而且这里的16k意思是超过16k就不再接收数据了,不意味着数据不能超过16k!比如数据是20k,kafka要保证数据的完整性,发现这个数据值大于16k,就立马关闭,不再接收!

Sender发送线程

 kafka底层就采用了很多生产者消费者模型,一个放一个取。数据收集器是按照主题分区来放数据,而Sender发送线程会按照broker重新整合。(主题的不同分区会放在不同的节点当中,所以有可能存在不同主题的分区在同一个节点当中)。

当整合好之后,就会封装成produceRequest,进而发送给网络客户端。

默认发送时间0,也就是消息取过来就可以直接发送了!

注意这个在途请求缓冲区数量:5

  • Broker 和 Topic:每个 broker 可以存储一个或多个 topic 的数据分片,Kafka 集群的每个 broker 都可以服务于多个 topic
  • Topic 和 分区:每个 topic 可以被分为多个分区,分区内的消息顺序是有序的,而不同分区之间的消息顺序则不保证,分区允许 Kafka 横向扩展和提高并行处理能力。
  • Broker 和 分区:每个 broker 可能会存储多个 topic 的多个分区数据,这样在整个 Kafka 集群中就形成了数据的分布式存储和处理能力。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/870373.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《财经一线》实地探访|4000+伙伴力挺,格行随身WiFi全国布局加速!随身wifi官方正品推荐,口碑最好的随身wifi品牌

随着无线连接需求的井喷式增长&#xff0c;随身WiFi以其便携性、高效能迅速成为市场宠儿。在这片竞争激烈的蓝海中&#xff0c;格行随身WiFi凭借其独特的创新策略与卓越品质脱颖而出&#xff0c;成为行业内的佼佼者。近日&#xff0c;《财经一线》记者实地参观格行总部&#xf…

vue-使用Worker实现多标签页共享一个WebSocket

文章目录 前言一、SharedWorker 是什么SharedWorker 是什么SharedWorker 的使用方式SharedWorker 标识与独占 二、Demo使用三、使用SharedWorker实现WebSocket共享 前言 最近有一个需求&#xff0c;需要实现用户系统消息时时提醒功能。第一时间就是想用WebSocket进行长连接。但…

stm32——AD采集以及DMA

今天继续我们的STM32的内容学习&#xff0c;我使用的单片机是STM32F103VCT6,通过Keil Array Visualization软件来观测AD采样出来的波形。先来看看本次实验用到的硬件知识。 首先是ADC&#xff08;Analog-to-Digital Converter&#xff09;是模拟信号转数字信号的关键组件&#…

x264 编码器 AArch64 汇编函数模块关系分析

x264 编码器 AArch64 汇编介绍 x264 是一个流行的开源视频编码器,它实现了 H.264/MPEG-4 AVC 标准。x264 项目致力于提供一个高性能、高质量的编码器,支持多种平台和架构。对于 AArch64(即 64 位 ARM 架构),x264 编码器利用该架构的特性来优化编码过程。在 x264 编码器中,…

纹波电流与ESR:解析电容器重要参数与应用挑战

电解电容纹波电流与ESR&#xff08;Equivalent Series Resistance&#xff09;是电容器的重要参数&#xff0c;用来描述电容器对交流信号的响应能力和能量损耗。电解电容纹波电流是指电容器在工作时承受的交流信号电流&#xff0c;而ESR则是电容器内部等效电阻&#xff0c;影响…

下载设计免抠元素,就上这6个网站,免费下载!

寻找免费PNG免抠素材网站是创意设计者们探索的重要一环。这些网站提供了丰富的PNG格式素材&#xff0c;去除了背景&#xff0c;方便在不同项目中使用。精心挑选了6个免费PNG免抠素材网站&#xff0c;它们提供了高品质的素材资源&#xff0c;无论是个人设计还是商业项目&#xf…

CVE-2024-23692: Rejetto HTTP File Server 2.3m Unauthenticated RCE漏洞复现

目录 本文章仅供学习使用&#xff01;&#xff01;&#xff01; Rejetto HTTP介绍 漏洞简介 漏洞环境 漏洞复现 exp 复现 结果 如何修复 本文章仅供学习使用&#xff01;&#xff01;&#xff01; Rejetto HTTP介绍 Rejetto是一个流行的开源软件项目&#xff0c;主要…

python开发-创建项目

一、创建项目 1.1在终端 1. 进入某个目录&#xff08;项目放在哪&#xff09; 2. 执行命令创建项目 django-admin startproject 项目名称1.2 在pycharm中创建项目 二、创建app 创建app命令 django-admin startapp app01注册app 编写URL和视图函数对应关系 编写视图函数…

1950年-2021年中国历年民航航线里程统计报告

数据为1950年到2021年我国每年的民航航线总里程数据。 2021年&#xff0c;我国定期航班航线总里程为689.78万公里&#xff0c;相比2019年下降了258.44万公里。 数据统计单位为&#xff1a;公里. 数据说明&#xff1a; 2011年起民航航线里程改为定期航班航线里程 我国定期航班…

怎么将图片批量压缩处理?不牺牲图片清晰度的压缩秘诀

#北京city清凉walk指南# 夏日的北京&#xff0c;满目的绿色和清新空气让人沉醉。 然而&#xff0c;摄影爱好者们在记录这些美好瞬间的同时&#xff0c;也面临着大量图片的存储与管理难题。 随着手机和相机像素的提高&#xff0c;每张照片都可能成为存储空间的"大户&quo…

从0到1开发一个Vue3的新手引导组件(附带遇到的问题以及解决方式)

1. 前言: 新手引导组件,顾名思义,就是强制性的要求第一次使用的用户跟随引导使用应用,可以让一些第一次使用系统的新手快速上手,正好我最近也遇到了这个需求,于是就想着开发一个通用组件拿出来使用(写完之后才发现element就有,后悔了哈哈哈&#x1f62d;&#x1f62d;) 示例图…

【芯片方案】珠宝手机秤方案

珠宝手机秤作为一种便携式电子称重设备&#xff0c;因其小巧、便携、精度高等特点&#xff0c;广泛应用于各种需要精确称重的场景。可能这个目前在国内使用的人比较少&#xff0c;但在西方国家珠宝手机秤却是可以用来送礼的物品。因为珠宝手机秤的外观跟手机外观大多相似&#…

顶顶通呼叫中心中间件-打电话没声音检查步骤(mod_cti基于FreeSWITCH)

顶顶通呼叫中心中间件-电话没声音检查步骤(mod_cti基于FreeSWITH) 检查步骤 1、检查配置文件 检查配置文件&#xff1a;打开ccadmin -> 配置文件 -> vars -> external_ip$${local_ip_v4}看一下这个有没有配置正确的外网IP&#xff0c;如果没有配置正确就需要配置正…

PyCharm 2023.3.2 关闭时一直显示正在关闭项目

文章目录 一、问题描述二、问题原因三、解决方法 一、问题描述 PyCharm 2023.3.2 关闭时一直显示正在关闭项目 二、问题原因 因为PyCharm还没有加载完索引导致的 三、解决方法 方法一&#xff1a; 先使用任务管理器强制关闭&#xff0c;下次关闭时注意要等待PyCharm加载完索…

C语言-顺序表

&#x1f3af;引言 欢迎来到HanLop博客的C语言数据结构初阶系列。在这个系列中&#xff0c;我们将深入探讨各种基本的数据结构和算法&#xff0c;帮助您打下坚实的编程基础。本次我将为你讲解。顺序表&#xff08;也称为数组&#xff09;是一种线性表&#xff0c;因其简单易用…

ArcGIS Pro入门制图教程

地理信息系统 (GIS) 是一种使用地图显示和分析数据的方式。在本教程中&#xff0c;您将学习桌面 GIS 应用程序 ArcGIS Pro 的基础知识。 新加坡的一家旅行社希望制作一款宣传册&#xff0c;用于向游客介绍距离市中心热门目的地最近的火车站。该宣传册将与带有文本信息的地图相…

使用 `useAppConfig` :轻松管理应用配置

title: 使用 useAppConfig &#xff1a;轻松管理应用配置 date: 2024/7/11 updated: 2024/7/11 author: cmdragon excerpt: 摘要&#xff1a;本文介绍了Nuxt开发中useAppConfig的使用&#xff0c;它便于访问和管理应用配置&#xff0c;支持动态加载资源、环境配置切换、权限…

软考:软件设计师 — 2.操作系统

二. 操作系统 1. 操作系统概念 &#xff08;1&#xff09;操作系统的作用 操作系统是计算机硬件之上的第一层软件系统。 操作系统通常用来&#xff1a; 管理系统的硬件、软件、数据资源。控制程序运行。人机之间的接口。应用软件与硬件之间的接口。 可概括为&#xff1a; …

【Linux】内核文件系统系统调用流程摸索

内核层可以看到当前调用文件处理的进程ID 这个数据结构是非常大的&#xff1a; 我们打印的pid,tgid就是从这里来的&#xff0c;然后只需要找到pid_t的数据类型就好了。 下图这是运行的日志信息&#xff1a; 从上述日志&#xff0c;其实我也把write的系统调用加了入口的打印信…

CSS3实现彩色变形爱心动画【附源码】

随着前端技术的发展&#xff0c;CSS3 为我们提供了丰富的动画效果&#xff0c;使得网页设计更加生动和有趣。今天&#xff0c;我们将探讨如何使用 CSS3 实现一个彩色变形爱心加载动画特效。这种动画不仅美观&#xff0c;而且可以应用于各种网页元素&#xff0c;比如加载指示器或…