Kafka-生产者

Kafka在实际应用中,经常被用作高性能、可扩展的消息中间件。

Kafka自定义了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息。

在实践生产过程中,一套API封装良好、灵活易用的客户端可以避免开发人员重复劳动,提高开发效率,也可以提高程序的健壮性和可靠性。

Kafka提供了Java版本的生产者的实现——KafkaProducer,使用KafkaProducer的API可以轻松实现同步/异步发送消息、批量发送、超时重发等复杂的功能,在业务模块向Kafka写入消息时,KafkaProducer就显得必不可少。

现在,Kafka的爱好者已经使用多种语言(诸如C++、Java、Python、Go等)实现了Kafka的客户端。

如果读者使用其他语言,可以到Kafka官方网站的wiki(https://cwiki.apache.org/confluence/display/KAFKA/Clients)查找相关资料。

在Kafka core模块的kafka.producer包中,新版本的生产者客户端实现KafkaProducer(Java实现)在Kafka clients模块的org.apache.kafka.clients.producer包中。

KafkaProducer分析

在图中简略描述了KafkaProducer发送消息的整个流程。

在这里插入图片描述
下面简述图中每个步骤的操作:

  1. Producerlnterceptors对消息进行拦截。
  2. Serializer对消息的key和value进行序列化。
  3. Partitioner为消息选择合适的Partition。
  4. RecordAccumulator收集消息,实现批量发送。
  5. Sender从RecordAccumulator获取消息。
  6. 构造ClientRequest。
  7. 将ClientRequest交给NetworkClient,准备发送。
  8. NetworkClient将请求放入KafkaChannel的缓存。
  9. 执行网络I/O,发送请求。
  10. 收到响应,调用ClientRequest的回调函数。
  11. 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。

消息发送的过程中,涉及两个线程协同工作。主线程首先将业务数据封装成ProducerRecord对象,之后调用send方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程之间的缓冲区)中暂存。

Sender线程负责将消息信息构成请求,并最终执行网络IVO的线程,它从RecordAccumulator中取出消息并批量发送出去。

需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象。

KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API,分为四类方法。

  • send方法:发送消息,实际是将消息放入RecordAccumulator暂存,等待发送。
  • flush方法:刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程。
  • partitionsFor方法:在KafkaProducer中维护了一个Metadata对象用于存储Kafka集群的元数据,Metadata中的元数据会定期更新。partitionsFor方法负责从Metadata中获取指定Topic中的分区信息。
  • close方法:关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭Sender线程。
    还有一个metrics方法,用于记录统计信息,与消息发送的流程无关,我们不做详细分析。
    了解了Producer接口的功能之后,我们下面就来分析KafkaProducer的具体实现。

首先,介绍KafkaProducer中比较重要的字段,在后面分析过程中,会逐个进行分析,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:此生产者的唯一标识。
  • partitioner:分区选择器,根据一定的策略,将消息路由到合适的分区。
  • maxRequestSize:消息的最大长度,这个长度包含了消息头、序列化后的key和序列化后的value的长度。
  • totalMemorySize:发送单个消息的缓冲区大小。
  • accumulator:RecordAccumulator,用于收集并缓存消息,等待Sender线程发送。
  • sender:发送消息的Sender任务,实现了Runnable接口,在ioThread线程中执行。
  • ioThread:执行Sender任务发送消息的线程,称为“Sender线程”。
  • compressionType:压缩算法,可选项有none、gzip、snappy、lz4。这是针对RecordAccumulator中多条消息进行的压缩,所以消息越多,压缩效果越好。
  • keySerializer:key的序列化器。
  • valueSerializer:value的序列化器。
  • Metadata metadata:整个Kafka集群的元数据。
  • maxBlockTimeMs:等待更新Kafka集群元数据的最大时长。
  • requestTimeoutMs:消息的超时时间,也就是从消息发送到收到ACK响应的最长时长。
  • interceptors:Producerlnterceptor集合,Producerlnterceptor可以在消息发送之前对其进行拦截或修改;也可以先于用户的Callback,对ACK响应进行预处理。
  • producerConfig:配置对象,使用反射初始化KafkaProducer配置的相对对象。

KafkaProducer构造完成之后,我们来关注KafkaProducer的send方法。图展示了整个send方法的调用流程。

在这里插入图片描述

Producerlnterceptors&Producerlnterceptor

Producerlnterceptors是一个Producerlnterceptor集合,其onSend方法、onAcknowledgement方法、onSendEror方法,实际上是循环调用其封装的Producerlnterceptor集合的对应方法。

Producerlnterceptor对象可以在消息发送之前对其进行拦截或修改,也可以先于用户的Callback,对ACK响应进行预处理。

如果熟悉Java Web开发,可以将其与Filter的功能做类比。

如果要使用自定义Producerlnterceptor类,只要实现Producerlnterceptor接口,创建其对象并添加到Producerlnterceptors中即可。

Producerlnterceptors与ProducerInterceptor之间的关系如图所示。

在这里插入图片描述

Kafka集群元数据

每个Topic中有多个分区,这些分区的Leader副本可以分配在集群中不同的Broker上。

我们站在生产者的角度来看,分区的数量以及Leader副本的分布是动态变化的。

通过简单的示例说明这种动态变化:在运行过程中,Leader副本随时都有可能出现故障进而导致Leader副本的重新选举,新的Leader副本会在其他Broker上继续对外提供服务。

当需要提高某Topic的并行处理消息的能力时,我们可以通过增加其分区的数量来实现。

当然,还有别的方式导致这种动态变化,例如,手动触发“优先副本”选举等。

我们创建的ProducerRecord中只指定了Topic的名称,并未明确指定分区编号。

KafkaProducer要将此消息追加到指定Topic的某个分区的Leader副本中,首先需要知道Topic的分区数量,经过路由后确定目标分区,之后KafkaProducer需要知道目标分区的Leader副本所在服务器的地址、端口等信息,才能建立连接,将消息发送到Kafka中。

因此,在KafkaProducer中维护了Kafka集群的元数据,这些元数据记录了:某个Topic中有哪几个分区,每个分区的Leader副本分配哪个节点上,Follower副本分配哪些节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口。

在KafkaProducer中,使用Node、TopicPartition、PartitionInfo这三个类封装了Kafka集群的相关元数据,其主要字段如图所示。

在这里插入图片描述

  • Node表示集群中的一个节点,Node记录这个节点的host、ip、port等信息。
  • TopicPartition表示某Topic的一个分区,其中的topic字段是Topic的名称,partition字段则此分区在Topic中的分区编号(ID)。
  • PartitionInfo表示一个分区的详细信息。其中topic字段和partition字段的含义与TopicPartition中的相同,除此之外,leader字段记录了Leader副本所在节点的id,replica字段记录了全部副本所在的节点信息,inSyncReplicas字段记录了ISR集合中所有副本所在的节点信息。

通过这三个类的组合,我们可以完整表示出KafkaProducer需要的集群元数据。

这些元数据保存在了Cluster这个类中,并按照不同的映射方式进行存放,方便查询。Cluster类的核心字段如图所示。

在这里插入图片描述

  • nodes:Kafka集群中节点信息列表。
  • nodesById:Brokerld与Node节点之间对应关系,方便按照Brokerld进行索引。
  • partitionsBy TopicPartition:记录了TopicPartition与PartitionInfo的映射关系。
  • partitionsByTopic:记录了Topic名称和Partitionlnfo的映射关系,可以按照Topic名称查询其中全部分区的详细信息。
  • availablePartitionsByTopic:Topic与Partitionlnfo的映射关系,这里的List中存放的分区必须是有Leader副本的Partition,而partitionsByTopic中记录的分区则不一定有Leader副本,因为某些中间状态,例如Leader副本宕机而触发的选举过程中,分区不一定有Leader副本。
  • partitionsByNode:记录了Node与PartitionInfo的映射关系,可以按照节点Id查询其上分布的全部分区的详细信息。

Metadata中封装了Cluster对象,并保存Cluster数据的最后更新时间、版本号(version)、是否需要更新等待信息,如图所示。

在这里插入图片描述

  • topics:记录了当前已知的所有topic,在cluster字段中记录了Topic最新的元数据。
  • version:表示Kafka集群元数据的版本号。Kafka集群元数据每更新成功一次,version字段的值增1。通过新旧版本号的比较,判断集群元数据是否更新完成。
  • metadataExpireMs:每隔多久,更新一次。默认是300×1000,也就是5分种。
  • refreshBackoffMs:两次发出更新Cluster保存的元数据信息的最小时间差,默认为100ms。这是为了防止更新操作过于频繁而造成网络阻塞和增加服务端压力。在Kafka中与重试操作有关的操作中,都有这种“退避(backoff)时间”设计的身影。
  • lastRefreshMs:记录上一次更新元数据的时间戳(也包含更新失败的情况)。
  • lastSuccessfulRefreshMs:上一次成功更新的时间戳。如果每次都成功,则lastSuccessfulRefreshMs、lastRefreshMs相等。 否则,lastRefreshMs>lastSuccessulRefreshMs。
  • cluster:记录Kafka集群的元数据。
  • needUpdate:标识是否强制更新Cluster,这是触发Sender线程更新集群元数据的条件之一。
  • listeners:监听Metadata更新的监听器集合。自定义Metadata监听实现Metadata.Listener.onMetadataUpdate方法即可,在更新Metadata中的cluster字段之前,会通知listener集合中全部Listener对象。
  • needMetadataForAllTopics:是否需要更新全部Topic的元数据,一般情况下,KafkaProducer只维护它用到的Topic的元数据,是集群中全部Topic的子集。

Metadata的方法比较简单,主要是操纵上面的几个字段,这里着重介绍主线程用到的requestUpdate方法和awaitUpdate方法。

requestUpdate()方法将needUpdate字段修改为true,这样当Sender线程运行时会更新Metadata记录的集群元数据,然后返回version字段的值。

awaitUpdate方法主要是通过version版本号来判断元数据是否更新完成,更新未完成则阻塞等待。

在这里插入图片描述

下面回到KafkaProducer.waitOnMetadata方法的分析,它负责触发Kafka集群元数据的更新,并阻塞主线程等待更新完毕。它的主要步骤是:

  1. 检测Metadata中是否包含指定Topic的元数据,若不包含,则将Topic添加到topics集合中,下次更新时会从服务端获取指定Topic的元数据。
  2. 尝试获取Topic中分区的详细信息,失败后会调用requestUpdate)方法设置Metadata.needUpdate字段,并得到当前元数据版本号。
  3. 唤醒Sender线程,由Sender线程更新Metadata中保存的Kafka集群元数据。
  4. 主线程调用awaitUpdate()方法,等待Sender线程完成更新。
  5. 从Metadata中获取指定Topic分区的详细信息(即PartitionInfo集合)。若失败,则回到步骤2继续尝试,若等待时间超时,则抛出异常。

waitOnMetadata()方法的具体实现如下:

在这里插入图片描述

Serializer&Deserializer

客户端发送的消息的key和value都是byte数组,Serializer和Deserializer接口提供了将Java对象序列化(反序列化)为byte数组的功能。在KafkaProducer中,根据配置文件,使用合适的Serializer。

图展示了Serializer和Deserializer接口以及它们的实现类。

在这里插入图片描述
Kafka已经为我们提供了Java基本类型的Serializer实现和Deserializer实现,我们也可以对Java复杂类型的自定义Serializer和Deserializer实现,只要实现Serializer或Deserializer接口即可。

下面简单介绍Serializer,Deserializer是其逆操作。

在Serializer接口中,configure()方法是在执行序列化操作之前的配置,例如,在StringSerializer.configure()方法中会选择合适的编码类型(encoding),默认是UTF-8;IntegerSerializer.configure()方法则是空实现。

serializer方法是真正进行序列化的地方,将传入的Java对象序列化为byte[]。

close方法是在其后的关闭方法,多为空实现。

Partitioner

KafkaProducer.send()方法的下一步操作是选择消息的分区。

在有的应用场景中,由业务逻辑控制每个消息追加到合适的分区中,而有时候业务逻辑并不关心分区的选择。

在KafkaProducer.partition方法中,优先根据ProducerRecord中partition字段指定的序号选择分区,如果ProducerRecord.partition字段没有明确指定分区编号,则通过Partitioner.partition()方法选择Partition。

在这里插入图片描述

Kafka提供了Partitioner接口的一个默认实现——DefaultPartitioner,继承结构如图(左)所示,可以看到上面介绍的ProducerInterceptor接口也继承了Configurable接口。

在创建KafkaProducer时传人的key/value配置项会保存到AbstractConfig的originals字段中,如图(右)所示。AbstractConfig的核心方法是getConfiguredInstance方法,其主要功能是通过反射机制实例化originals字段中指定的类。在前面分析KafkaProducer的构造函数时,也看到过此方法的调用。

DefaultPartitioner.partition方法负责在ProduceRecord中没有明确指定分区编号的时候,为其选择合适的分区:如果消息没有key,会根据counter与Partition个数取模来确定分区编号,count不断递增,确保消息不会都发到同一个Partition里;如果消息有key的话,则对key进行hash(使用的是murmur2这种高效率低碰撞的Hash算法),然后与分区数量取模,来确定key所在的分区达到负载均衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

跟着cherno手搓游戏引擎【8】按键和鼠标的KeyCode

自定义KeyCode 先把glfw3.h里的KeyCode的定义抄到咱这里来。 在YOTO下创建KeyCode.h: #pragma once#ifdef YT_PLATFORM_WINDOWS///从glfw3中拿的 #define YT_KEY_SPACE 32 #define YT_KEY_APOSTROPHE 39 /* */ #define YT_KEY_COMMA 44…

迈向2024:医疗机器人的市场前景与技术革新

原创 | 文 BFT机器人 医疗机器人技术正以前所未有的速度在主流医学领域取得卓越进展,新应用、新技术不断涌现,使得该领域在过去一年中取得了令人惊叹的增长。然而,这仅仅是冰山一角,未来的发展空间仍然广阔无垠。 展望2024年&…

海外云手机:跨境养号的新趋势

近年来,市场综合数据显示,利用海外云手机进行跨境养号已经成为跨境电商发展的新潮流。特别是在社交电商营销和短视频引流领域,海外云手机不仅能够提高流量的质量,还能让商家实现业务翻倍增长。接下来,本文将简要阐述海…

软件测试|使用Python生成PDF文件

简介 PDF(Portable Document Format)是一种常用的文档格式,具有跨平台兼容性、保真性、安全性和交互性等特点。我们日常生活工作中的合同、报告、论文等通常都采用PDF格式,以确保文档在不同的操作系统(例如 Windows、…

Leetcode刷题【每日n题】(2)

🥚今日鸡汤🥚 修仙之道,需有勇气和决心,方能战胜一切困难。 ——《斗破苍穹》 目录 1.题目一 2.思路分析 3.代码实现 4.题目二 5.思路分析 6.代码实现 1.题目一 16. 最接近的三数之和 给你一个长度为 n 的整数数组 nums 和 …

LabVIEW高效振动时效处理系统

概述:介绍了一个利用LabVIEW开发的振动时效处理系统,有效降低加工工件的内部残余应力,提升其质量和寿命。 项目背景 工业加工中的工件常存在残余应力问题,这直接影响工件的质量和寿命。虽然热时效和自然时效法被广泛应用于降低残余…

基于ssm的疫苗预约系统论文

摘 要 传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此,在计算机上安装疫苗预约系统软件来发挥其高效地信息处理的作用&#xff0c…

基于Python的货币识别技术实现

目录 介绍本文的目的和意义货币识别技术的应用场景货币识别的基本原理图像处理技术在货币识别中的应用特征提取方法:SIFT、HOG等支持向量机(SVM)分类器的使用实现过程数据集的收集和预处理特征提取和训练分类器参考文献介绍 本文的目的和意义 本文的目的是介绍如何利用Pyt…

LLMs之GLM-4:GLM-4的简介、安装和使用方法、案例应用之详细攻略

LLMs之GLM-4:GLM-4的简介、安装和使用方法、案例应用之详细攻略 导读:2024年01月16日,智谱AI在「智谱AI技术开放日(Zhipu DevDay)」推出新一代基座大模型GLM-4。GLM-4 的主要亮点和能力如下:>> 性能与GPT-4相近:多模态、长文…

YOLOv5改进系列(26)——添加RFAConv注意力卷积(感受野注意力卷积运算)

【YOLOv5改进系列】前期回顾: YOLOv5改进系列(0)——重要性能指标与训练结果评价及分析 YOLOv5改进系列(1)——添加SE注意力机制 YOLOv5改进系列(2)——添加CBAM注意力机制 YOLOv5改进系列&…

VM安装群晖系统 挂载整个硬盘给群晖系统

前言 在我们日常业务需求中,经常需要把整个磁盘的空间分配给群晖使用,那么如何通过vm分配整个磁盘空间给群晖系统。 操作 需要用管理员运行VM虚拟机 然后添加硬盘 就可以有权限全部添加了。这样会清除要挂载的磁盘的全部的数据。

十五、CSS3DObject和CSS2DObject的区别

在3d场景中我们会遇到很多标签,主要分为三种,css3dobject、css2dobject、css3dsprite三种,今天主要说说css3dobject和css2dobject的相似点和区别。 Threejs版本:0.138.3 下面看实现的效果: 相似点 使用方法都是差不多的,下面讲解下css2dobject的使用方法在index.html增…

[ACM学习] 动态规划基础之一二三维dp

课内学习的动态规划 有记忆的迭代 优化解的结构:原始问题的一部分解是子问题的解 三要素:1.子问题 2.状态的定义 3.状态转移方程 定义 线性dp的一道例题 dp[i]表示以位置 i 结尾的方案总数,dp[4]2,因为:首先只放一…

【驱动】TI AM437x(内核调试-06):网卡(PHY和MAC)、七层OSI

1、网络基础知识 1.1 七层OSI 第一层:物理层。 1)需求: 两个电脑之间如何进行通信? 具体就是一台发比特流,另一台能够收到。于是就有了物理层:主要是定义设备标准,如网线的额接口类型、管线的接口类型、各种传输介质的传输速率等。它的主要作用是传输比特流,就是从1/0…

【GitHub项目推荐--一行命令下载全网视频】【转载】

项目地址:https://github.com/soimort/you-get 首先声明,请不要使用该项目从事违法活动哦~仅供学习使用! 解决痛点 如果你上网的时候看了一些东西不错,想下载下来,或者在线观看喜欢的视频,但是没有找到网…

xilinxi mulitboot 启动

xilinix在线更新有两种方式,一种是使用ICAP原语,另一中是在xdc中约束,根据应用的场景不同,选用不同的启动方式,第二种更为简单。 可参考官方提供的手册和实例 XAPP1247 链接: XAPP1247 golden和updata.b…

【零基础入门Python数据分析】Anaconda3 JupyterNotebookseaborn版

目录 一、安装环境 python介绍 anaconda介绍 jupyter notebook介绍 anaconda3 环境安装 解决JuPyter500:Internal Server Error问题-CSDN博客 Jupyter notebook快捷键操作大全 二、Python基础入门 数据类型与变量 数据类型 变量及赋值 布尔类型与逻辑运算…

基于时域有限差分法的FDTD的计算电磁学算法-YEE网格下的更新公式推导

基于时域有限差分法的FDTD的计算电磁学算法(含Matlab代码)-YEE网格下的更新公式推导 参考书籍:The finite-difference time-domain method for electromagnetics with MATLAB simulations(国内翻译版本:MATLAB模拟的电…

【MATLAB】 SSA奇异谱分析信号分解算法

有意向获取代码,请转文末观看代码获取方式~ 1 基本定义 SSA奇异谱分析(Singular Spectrum Analysis)是一种处理非线性时间序列数据的方法,可以对时间序列进行分析和预测。 它基于构造在时间序列上的特定矩阵的奇异值分解&#…

C#,入门教程(66)——枚举Enum的高等用法

前言:国内码农与国外优秀程序员的最大区别是,我们的专家、教授喜欢唾沫横飞地,夸夸其谈语言特性、框架、性能,唯一目的是带私货(书籍或教程),很少能写出真有用的程序。差距在哪呢?基…