Producer源码解读

Producer源码解读

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer之整体流程

image.png

但是站在源码的核心角度,我们可以把Producer分成以下几个核心部分:

1、Producer之初始化

2、Producer之发送流程

3、Producer之缓冲区

4、Producer之参数与调优

Producer源码解读

从生产流程可以知道,Producer里面的核心有序列化器,分区器,还有缓冲,所以初始化的流程肯定是围绕这几个核心来处理。

image.png

KafkaProducer之初始化

image.png

image.png

1)、设置分区器

设置分区器(partitioner),分区器是支持自定义的

image.png

2)、设置每次重试间隔时间

设置每次重试间隔时间(retryBackoffMs)默认100ms

如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

3)、设置序列化器

设置序列化器(Serializer)

image.png

4)、设置拦截器

设置拦截器(interceptors),关于拦截器,这个后面会有讲解和介绍。

image.png

5)、设置缓冲区

image.png

在之前,还有一些参数的设置。

image.png

1、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

2、设置缓存大小(totalMemorySize) 默认是32M

3、设置压缩格式(compressionType)

4、初始化RecordAccumulator也就是缓冲区指定为32M

6)、设置消息累加器

因为生产者是通过缓冲的方式发送,发送的条件之前的课程讲过,所以这里需要一个消息累加器配合才能完成消息的发送。

image.png

5、初始化集群元数据(metadata),刚开始空的

image.png

6)、创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

拦截器使用及介绍

这里讲一讲拦截器的使用和基本作用,拦截器一般用得不多,所以这里只是讲一讲案例,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

1、想要把发送的数据都带上时间戳image.png

2、实现统计发送消息的成功次数和失败次数

onAcknowledgement(RecordMetadata, Exception)里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。

然后定义两个变量,并根据Exception的值分别累加就可以统计到了

最后在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数

image.png

image.png

不过这里要注意一个点:

onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。

3、拦截链路

   // 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");// 设置String的序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//设置自定义拦截器properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SelfInterceptor.class);//设置拦截链路(设置多个 SelfInterceptor  先执行   再执行SelfInterceptor2)ArrayList<String> interceptors = new ArrayList<>();interceptors.add("com.llp.interceptor.SelfInterceptor");//注意:这里是拦截器的全类名interceptors.add("com.llp.interceptor.SelfInterceptor2"); //这里假设有SelfInterceptor2properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器1的onsend方法不能返回null,不然拦截器2的onsend就丢失了信息,会发生异常。

Producer之发送流程

Producer之发送流程

Kafka Producer 发送消息流程如下:

1)、执行拦截器逻辑

执行拦截器逻辑,预处理消息, 封装 Producer Record

image.png

2)、集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

3)、序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

4)、分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

5)、消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

7)、消息发送

前面我们也知道真正的消息发送是Sender线程来做,并且这里还要结合缓冲区来处理。

批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个

Producer之缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销, 这样内存池可以对 RecordBatch 做到反复利用, 防止引起Full GC问题。那我们看看 Kafka 内存池是怎么设计的。

核心就是这段代码:

image.png

image.png

   Kafka 内存设计有两部分,下面的粉色的是可用的内存(未分配的内存,初始的时候是 32M),上面紫色的是已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存,  两部分加起来是 32M。
申请内存的过程

从 Producer 发送流程的第6步中可以看到会把消息放入 accumulator中, 即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送, 然后去申请内存(free.allocate())

image.png

image.png

(1)如果申请的内存大小超过了整个缓存池的大小,则抛异常出来

image.png

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())return this.free.pollFirst();

image.png

(4)如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图粉色的区域)申请一块内存。并且可用内存要去掉申请的那一块内存。

image.png

Sender线程

image.png

Producer之参数调优

Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。接下来我们就来看看生产端都有哪些重要的参数,及调优建议。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证, 详细可以查看

发送确认机制 3 种不同的确认模式。

acks=0 意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka 。
acks=1 意味若首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。
acks=all 意味着首领在返回确认或错误响应之前,会等待(min.insync.replicas)同步副本都收到悄息。

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

  调优建议:这个配置对于生产环境来说有点小, **为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)** 。

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

 调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, 主要跟 retries 配合使用, **在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

 调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。**对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。**

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

 调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

  调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/633189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TypeScript依赖注入框架Typedi的使用、原理、源码解读

简介 typedi是一个基于TS的装饰器和reflect-metadata的依赖注入轻量级框架&#xff0c;使用简单易懂&#xff0c;方便拓展。 使用typedi的前提是安装reflect-metadata&#xff0c;并在项目的入口文件的第一行中声明import ‘reflect-metadata’&#xff0c;这样就会在原生的R…

【图解数据结构】深度解析时间复杂度与空间复杂度的典型问题

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;图解数据结构、算法模板 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. ⛳️上期回顾二. ⛳️常见时间复杂度计算举例1️⃣实例一2️⃣实例二3️⃣实例三4️⃣实例四5…

FPGA引脚选择(Select IO)--认知1

主要考虑功能角度&#xff08;速度&#xff0c;电平匹配&#xff0c;内部程序编写&#xff09;去找研究芯片内部资源 1. 关键字 HP I/O Banks, High performance The HP I/O banks are deisgned to meet the performance requirements of high-speed memory and other chip-to-…

参照oracle按名称排序,用js在前端对附件封装排序方法

此前因客户需求需要附件按照名称排序 而后台无法对单个文件夹做单独处理。虽可以在每次点击之后重新调用接口&#xff0c;再组装数据&#xff0c;但效率太低&#xff0c;且无须存储&#xff0c;而存储在当前文件夹的排序方法也需要更新。索性自己写了一个通用的方法。经测试排序…

彩超框架EchoSight开发日志记录

EchoSight开发记录 蒋志强 我会不定期的更新 开发进展。最近更新进展于2024年1月15日 1.背景 由于某些不可抗逆的原因&#xff0c;离开了以前的彩超大厂&#xff0c;竞业在家&#xff0c;难得有空闲的时间。我计划利用这段时间 自己独立 从零开始 搭建一套 彩超系统的软件工…

【陈老板赠书活动 - 22期】- 人工智能(第三版)

陈老老老板&#x1f9d9;‍♂️ &#x1f46e;‍♂️本文专栏&#xff1a;赠书活动专栏&#xff08;为大家争取的福利&#xff0c;免费送书&#xff09; &#x1f934;本文简述&#xff1a;活就像海洋,只有意志坚强的人,才能到达彼岸。 &#x1f473;‍♂️上一篇文章&#xff…

浅谈CPU进入保护模式的方法

看程序要想思路不乱&#xff0c;最重要的就是要抓到程序的主线&#xff0c;不要被一些只是用来保护的代码打乱。如何抓到主线呢&#xff1f;比较法学习代码是比较有效的&#xff0c;比如对于CPU如何进入保护模式的理解。 不同的操作系统作者有自己的方法&#xff0c;代码看起来…

高级编程JavaScript中的数据类型?存储上能有什么差别?

在JavaScript中&#xff0c;我们可以分成两种类型&#xff1a; 基本类型复杂类型 两种类型的区别是&#xff1a;存储位置不同 一、基本类型 基本类型主要为以下6种&#xff1a; NumberStringBooleanUndefinednullsymbol Number 数值最常见的整数类型格式则为十进制&…

Liunx:线程控制

目录 创建线程&#xff1a;pthread_create(); 线程等待&#xff1a;pthread_join(); 线程退出&#xff1a;pthread_exit(); 线程取消&#xff1a;pthread_cancel() 说线程的时候说过&#xff0c;liunx没有选择单独定义线程的数据结构和适配算法&#xff0c;而是用轻量级进程…

【计算机网络】OSI七层模型与TCP/IP四层模型的对应与各层介绍

1 OSI七层模型与TCP/IP四层模型对应 2 OSI七层模型介绍 OSI&#xff08;Open Systems Interconnection&#xff09;模型是一个由国际标准化组织&#xff08;ISO&#xff09;定义的七层网络体系结构&#xff0c;用于描述计算机网络中的通信协议。每一层都有特定的功能&#xff…

基于arcgis js api 4.x开发点聚合效果

一、代码 <html> <head><meta charset"utf-8" /><meta name"viewport"content"initial-scale1,maximum-scale1,user-scalableno" /><title>Build a custom layer view using deck.gl | Sample | ArcGIS API fo…

启动低轨道卫星LEO通讯产业与6G 3GPP NTN标准

通讯技术10年一个大跃进&#xff0c;从1990年的2G至2000年的3G网路&#xff0c;2010年的4G到近期2020年蓬勃发展的5G&#xff0c;当通讯技术迈入融合网路&#xff0c;当前的 5G 技术不仅可提供高频宽、低延迟&#xff0c;同时可针对企业与特殊需求以 5G 专网的模式提供各式服务…

【.NET Core】 多线程之(Thread)详解

【.NET Core】 多线程之&#xff08;Thread&#xff09;详解 文章目录 【.NET Core】 多线程之&#xff08;Thread&#xff09;详解一、概述二、线程的创建和使用2.1 ThreadStart用于无返回值&#xff0c;无参数的方法2.2 ParameterizedThreadStart:用于带参数的方法 三、线程的…

使用 Python 第三方库 xlwt 写入数据到 Excel 工作表

1. 安装 xlwt 库 Python 写入数据到 Excel 工作簿中可以使用第三方库 xlwt. xlwt 拆分下来看就是 excel 和 write 的简化拼接&#xff0c;意思就是写数据到 Excel. 这个第三方库的 pip 安装命令如下所示&#xff1a; pip install xlwt -i https://mirrors.aliyun.com/pypi/si…

FairGuard游戏安全2023年度报告

导 读&#xff1a;2023年&#xff0c;游戏行业摆脱了疫情带来诸多负面影响&#xff0c;国内游戏市场收入与用户规模双双实现突破&#xff0c;迎来了历史新高点。但游戏黑灰产规模也在迅速扩大&#xff0c;不少游戏饱受其侵扰&#xff0c;游戏厂商愈发重视游戏安全问题。 为帮助…

WordPress怎么禁用文章和页面古腾堡块编辑器?如何恢复经典小工具?

现在下载WordPress最新版来搭建网站&#xff0c;默认的文章和页面编辑器&#xff0c;以及小工具都是使用古腾堡编辑器&#xff08;Gutenberg块编辑器&#xff09;。虽然有很多站长说这个编辑器很好用&#xff0c;但是仍然有很多站长用不习惯&#xff0c;觉得操作太难了&#xf…

C/C++ BM5 合并K个已排序的链表

文章目录 前言题目1 解决方案一1.1 思路阐述1.2 源码 2 解决方案二2.1 思路阐述2.2 源码 总结 前言 在接触了BM4的两个链表合并的情况&#xff0c;对于k个已排序列表&#xff0c;其实可以用合并的方法来看待问题。 这里第一种方法就是借用BM4的操作&#xff0c;只不过是多个合…

怎么处理vue项目中的错误详解

文章目录 一、错误类型二、如何处理后端接口错误代码逻辑问题全局设置错误处理生命周期钩子 三、源码分析小结参考文献 一、错误类型 任何一个框架&#xff0c;对于错误的处理都是一种必备的能力 在 Vue 中&#xff0c;则是定义了一套对应的错误处理规则给到使用者&#xff0…

【MATLAB源码-第117期】基于matlab的蜘蛛猴优化算法(SMO)机器人栅格路径规划,输出做短路径图和适应度曲线。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 蜘蛛猴优化算法&#xff08;Spider Monkey Optimization, SMO&#xff09;是一种灵感来源于蜘蛛猴觅食行为的群体智能优化算法。蜘蛛猴是一种生活在南美洲热带雨林中的灵长类动物&#xff0c;它们在寻找食物时展现出的社会行…

深入探究 JavaScript 中的 String:常用方法和属性全解析(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…