Kafka生产者消息发送流程原理及源码分析

Kafka是一个分布式流处理平台,它能够以极高的吞吐量处理数据。在Kafka中,生产者负责将消息发送到Kafka集群,而消费者则负责从Kafka集群中读取消息。本文将探讨Kafka生产者消息发送流程的细节,包括消息的序列化、分区分配、记录提交等关键步骤。

先看一个生产者发送消息的代码样例

public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<>();// 指定初始连接用到的broker地址configs.put("bootstrap.servers", "node164:9092");// 指定key的序列化类configs.put("key.serializer", IntegerSerializer.class);// 指定value的序列化类configs.put("value.serializer", StringSerializer.class);//borker集群消息持久化控制configs.put("acks", "all");//重试次数configs.put("reties", "3");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test_topic",0,0,"hello world 0",headers);// 消息异步确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区号:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());} else {System.out.println("异常消息:" + exception.getMessage());}}});// 关闭生产者producer.close();}
}

通过跟踪producer.send源码可知生产者发送消息的大体流程如下图,RecordAccumulator的消息发送到brokers实际上由Sender线程处理,下图暂时忽略,先看producer主线程处理的一些细节。

  • KafkaProducer构造函数根据客户端参数初始化拦截器、序列化器、分区器,创建Sender守护线程。
  • 调用send函数发送消息时,其内部使用异步消息发送,消息经过拦截器、序列化器、分区器后缓存到缓冲区。
  • 批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限。
  • 缓冲区消息发送到指定分区,落盘到broker。如果发送失败,客户端将根据设置的重试参数进行重试,如果超过了重试次数,抛出异常。
  • 发送成功,返回RecordMetadata元数据到客户端。如果是同步调用将阻塞等待元数据返回,如果是异步调用将通过Callback接口进行回调返回元数据

生产者拦截器

KafkaProducer调用send方法后,如果有设置拦截器,会先经过拦截器,默认是不会经过任何拦截器的,除非客户端配置了拦截器(interceptor.classes参数),send函数如下

    @Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}

可见,拦截器列表会被首先执行,而拦截器的初始化则是在KafkaProducer的 构造函数中,部分源码如下

List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

可见,拦截器是通过客户端配置的ProducerConfig.INTERCEPTOR_CLASSES_CONFIG来初始化的,拦截器必须实现ProducerInterceptor接口。

public interface ProducerInterceptor<K, V> extends Configurable {public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();
}

拦截器接口共有三个接口,第一个onSend接口把ProducerRecord直接传了进来,我们可以在实现接口时,对原消息进行统一处理,比如添加一些业务相关的头部信息等。onAcknowledgement接口则可以在确认消息发送成功后做一些操作,最后close接口则可以在拦截器关闭时清理一些资源。

如需要自定义拦截器则直接实现ProducerInterceptor接口,实现相关方法,在客户端进行配置即可,客户端配置示例:

 // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xxx.CustomInterceptorOne,com.xxx.CustomInterceptorTwo");

生产者序列化器

拦截器处理完后,将进入到doSend方法,在发送消息前,首先会根据客户端配置的序列化器对key和value进行序列化。

序列化接口如下:

public interface Serializer<T> extends Closeable {/*** Configure this class.* @param configs configs in key/value pairs* @param isKey whether is for key or value*/void configure(Map<String, ?> configs, boolean isKey);/*** Convert {@code data} into a byte array.** @param topic topic associated with data* @param data typed data* @return serialized bytes*/byte[] serialize(String topic, T data);/*** Close this serializer.** This method must be idempotent as it may be called multiple times.*/@Overridevoid close();
}

在Kafka中,消息可以是任何类型的数据,如字符串、JSON、二进制数据等。为了将这些数据存储到Kafka集群中,Kafka需要对它们进行序列化。Kafka提供了多种序列化器,如StringSerializer、JsonSerializer等。生产者可以根据自己的需求选择合适的序列化器来序列化消息。如果默认提供的序列化器仍未满足需求,实现上面的Serializer接口,然后在客户端配置自己的序列化器即可。通过接口可以看出,序列化器最终将key和value序列化成字节数组。

doSend方法使用序列化器的部分源码:

byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/27661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LaDM3IL:多实例学习用于免疫库分类

一个人的免疫组库由某一时间点的大量适应性免疫受体组成&#xff0c;代表了该个体的适应性免疫状态。免疫组库分类和相关受体识别有可能为新型疫苗的开发做出贡献。大量的实例对免疫组库分类提出了挑战&#xff0c;这可以表述为大规模多实例学习 (MMIL&#xff0c;Massive Mult…

通信协议—Modbus

1、modbus简介 Modbus服务器&#xff1a;接收处理来自客户端的请求&#xff0c;并返回相应的响应&#xff1b; Modbus客户端&#xff1a;向Modbus服务器发送请求&#xff0c;并接收服务器返回的响应的设备或程序&#xff1b; 2、modbus poll调试工具下载 modbus poll用于测…

Python基础教程(二十):SMTP发送邮件

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

微信监控销售防飞单系统,让你的团队业绩稳如泰山!

团队中偶尔出现的私单、飞单问题而烦恼不已&#xff1f;你是否渴望拥有一个神器&#xff0c;能够实时监控销售过程&#xff0c;确保团队业绩的稳健增长&#xff1f;今天&#xff0c;就让我们一起探索这款神奇的“微信监控销售防飞单系统”&#xff0c;让你的销售团队如虎添翼&a…

React 渲染流程分析

React 页面是由组件组成的&#xff0c;从根组件直到叶组件&#xff0c;内部的组件数通过 Fiber 来保存并触发并发更新。页面的展示分为两部分&#xff0c;首先是初始化&#xff0c;所有组件首次展示&#xff0c;都要进行渲染&#xff0c;之后是更新流程&#xff0c;也就是页面产…

实况:老菜鸟自力更生从零开始重学spring目标是画出一张唬人大图(二、源码下载编译)

前情提要&#xff1a;调试前的基础知识梳理 速览 “Spring”包含哪些东西源码下载源码编译1、编译工具选择&#xff1a;gradle2、使用gradle编译spring并导入idea预编译spring-oxm导入IDEA确认合适的jdk版本排除spring-aspects模块 开始调试 “Spring”包含哪些东西 可以明确的…

代码随想录算法训练营第二十四天| (回溯) 77. 组合、 216.组合总和III、17.电话号码的字母组合

77. 组合 题目链接&#xff1a;77. 组合 文档讲解&#xff1a;代码随想录 状态&#xff1a;很多细节忘了 思路&#xff1a;先画图&#xff0c;然后可以发现&#xff0c;从1到n中选择k个数&#xff0c;可以看成是一个递归过程&#xff0c;这个递归的深度就是k。然后遍历当前这层…

Centos7安装jdk8或11以及切换方案

目录 jdk安装 安装OpenJDK11 安装OpenJDK8 配置默认的 Java 版本 验证 全局环境变量&#xff08;选配&#xff09; 个人版&#xff08;自己可以用&#xff0c;公司不建议&#xff09; 公司版本 /etc/profile 和 ~/.bash_profile 区别 前言-与正文无关 生活远不止眼前的苦…

【动态规划】| 路径问题之不同路径 力扣62

&#x1f397;️ 主页&#xff1a;小夜时雨 &#x1f397;️ 专栏&#xff1a;动态规划 &#x1f397;️ 如何活着&#xff0c;是我找寻的方向 目录 1. 题目解析2. 代码 1. 题目解析 题目链接: https://leetcode.cn/problems/unique-paths/description/ 通常动态规划的题目有…

建筑电工精选最新模拟试题(含答案)

一、填空题 1、我国安全生产的基本方针是 安全 第一&#xff0c;预防 为主&#xff0c;综合治理。 2、特种作业人员&#xff0c;必须积极主动参加培训与考核 。既是法律法规的规定&#xff0c;也是自身工作&#xff0c;生产及生命安全 的需要 3、触电急救&#x…

【Tkinter界面】Canvas 图形绘制(03/5)

文章目录 一、说明二、画布和画布对象2.1 画布坐标系2.2 鼠标点中画布位置2.3 画布对象显示的顺序2.4 指定画布对象 三、你应该知道的画布对象操作3.1 什么是Tag3.2 操作Tag的函数 https://www.cnblogs.com/rainbow-tan/p/14852553.html 一、说明 Canvas&#xff08;画布&…

【Windows】配置Flutter开发环境

一、下载 flutter sdk 点此跳至下载官网 下载好flutter sdk&#xff0c;并解压到自定义的位置。 二、配置环境变量 此电脑 --> 右键 选择 属性 --> 点击 高级系统设置 --> 会弹出系统属性的窗口&#xff0c;点击 环境变量 按钮 1.配置加速镜像地址 PUB_HOSTED_…

【python】python指南(四):typing静态类型注解综述

一、引言 对于算法工程师来说&#xff0c;语言从来都不是关键&#xff0c;关键是快速学习以及解决问题的能力。大学的时候参加ACM/ICPC一直使用的是C语言&#xff0c;实习的时候做一个算法策略后台用的是php&#xff0c;毕业后做策略算法开发&#xff0c;因为要用spark&#x…

大话C语言:第24篇 预处理

1 C语言编译流程 C语言的编译流程包括&#xff1a; 预编译&#xff1a;将.c 中的头文件展开、宏展开&#xff0c;生成的文件是.i 文件。gcc指令&#xff1a;gcc -E file.c -o file.i 编译&#xff1a;将预处理之后的.i 文件生成 .s 汇编文件。gcc指令&#xff1a;gcc -S file…

AI影像时代来临,联发科天玑以专业无畏精神重新定义手机专业影像

近期&#xff0c;联发科与Discovery探索频道联合举办了一场以“越极境&#xff0c;见芯境”为主题的天玑影像展&#xff0c;活动地点位于我国桂林阳朔。活动现场展示了阳朔壮美山水的画卷&#xff0c;以及救援队员在岩壁上进行训练的极限瞬间。令人意想不到的是&#xff0c;这些…

【课程系列01】某乎的AI大模型全栈工程师-第4期

网盘链接 链接&#xff1a;https://pan.baidu.com/s/1QLkRW_DmIm1q9XvNiOGwtQ --来自百度网盘超级会员v6的分享 课程目标 AI大模型全栈工程师是指具备人工智能领域全方位能力的工程师&#xff0c;特别是在大模型开发和应用方面具有深厚的专业知识和技能。以下是关于AI大模型…

第12章.STM32标准库简介

目录 0. 《STM32单片机自学教程》专栏 12.1 CMSIS 标准 12.2 STM32标准库文件结构 12.2.1 主结构 12.2.2 Libraries固件库文件 CMSIS文件夹 1.core_cm3.c&core_cm3.h 2.startup启动文件 3.Stm32f10x.h 4.system_stm32f10x.c&system_stm32f10…

Linux常用命令及或g++(或gcc)编辑器运用

一. 实验内容 1&#xff0e;打开VMware Workstation虚拟机进入Ubuntu系统&#xff0c;打开终端。 练习使用常用的Linux命令&#xff0c;主要包括如下命令&#xff1a; mkdir, rmdir, cd, pwd, ls, clear, cat, rm等。&#xff08;其中&#xff0c;cat、rm命令请在下面实验内容3…

IIC通信总线

文章目录 1. IIC总线协议1. IIC简介2. IIC时序1. 数据有效性2. 起始信号和终止信号3. 数据格式4. 应答和非应答信号5. 时钟同步6. 写数据和读数据 2. AT24C023. AT24C02读写时序4. AT24C02配置步骤5. 代码部分1. IIC基本信号2. AT24C02驱动代码3. 实验结果分析 1. IIC总线协议 …

【C++提高编程-05】----C++之Deque容器实战

&#x1f3a9; 欢迎来到技术探索的奇幻世界&#x1f468;‍&#x1f4bb; &#x1f4dc; 个人主页&#xff1a;一伦明悦-CSDN博客 ✍&#x1f3fb; 作者简介&#xff1a; C软件开发、Python机器学习爱好者 &#x1f5e3;️ 互动与支持&#xff1a;&#x1f4ac;评论 &…