Kafka-生产者(producer)发送信息流程详解

Kafka概述

在这里插入图片描述

在 Kafka 消息发送的过程中,涉及到了两个重要的线程:主线程(main thread)和发送线程(Sender thread)。

1.主线程(main thread):

  • 应用程序在主线程中创建 Kafka 生产者实例。
  • 这个生产者实例负责与 Kafka 集群通信,发送消息到指定的主题。
  • 主线程还会创建一个称为 RecordAccumulator 的缓冲区。
  • 这个缓冲区是 Kafka 生产者内部用来暂存待发送消息的地方。
  • 主线程将生产的消息写入 RecordAccumulator 中。

在这里插入图片描述

2.RecordAccumulator 缓冲区:

  • RecordAccumulator 是 Kafka 生产者的一个重要组件,用于收集和管理待发送的消息记录(ProducerRecord)。
  • 主线程通过调用 Kafka 生产者的 send() 方法将消息记录发送给 RecordAccumulator。
  • RecordAccumulator 管理多个分区的消息队列,并根据配置的分区器(Partitioner)将消息分配到相应的分区队列中。

3.发送线程(Sender thread)的作用:

  • 发送线程是 Kafka 生产者内部的一个后台线程,它负责从 RecordAccumulator缓冲区中拉取待发送的消息,并将这些消息批量发送到 Kafka Broker
  • 发送线程会周期性地检查 RecordAccumulator 中是否有待发送的消息,如果有则获取这些消息并准备发送。
  • 发送线程的主要任务是通过网络与 Kafka Broker 进行通信,将消息推送到目标主题的分区中。

在这里插入图片描述


4.消息发送的具体流程:

(1)消息发送请求产生:

  • 应用程序创建 Kafka 生产者实例,并对发送的消息进行封装成 ProducerRecord 对象。
  • ProducerRecord 中包含了消息的主题、键、值等信息。

(2)消息分区:

  • 如果消息没有指定分区,分区器(Partitioner)将为消息选择一个目标分区。
  • Partitioner 可以根据消息的键、消息内容等信息选择分区,以确保消息被均匀地分配到不同的分区中。

(3)消息缓冲:

  • Kafka 生产者将消息发送到 RecordAccumulator(记录累加器)中缓冲,等待批量处理和发送。
  • RecordAccumulator 是用来批量处理和管理待发送消息的缓冲区,可以在内存中暂存一段时间的消息。

在这里插入图片描述

(4)批量处理:

  • 根据配置的批处理大小和等待时间**,RecordAccumulator 中的消息可以被批量处理。**
  • 批量处理有利于提高性能和吞吐量,减少单独发送消息的开销。
    在这里插入图片描述

(5)消息序列化与压缩:

  • 在发送之前,消息会被序列化为字节数组。
  • 可选地,消息还可以被压缩以减少网络传输的数据量。

(6)请求到达发送器:

  • 发送器(Sender)线程周期性地或根据条件触发,从 RecordAccumulator 中拉取待发送的消息。

(7)消息发送到 Broker:

  • Sender 线程将消息批量发送到 Kafka Broker。
  • 发送器与 Broker 建立连接,将消息发送到指定分区的 Leader 副本。

(8)消息持久化:

  • 消息被 Leader 副本持久化到磁盘。
  • Leader 副本将消息复制到 ISR(In-Sync Replicas)集合中的其他副本。

(9)消息确认:

  • Broker 在成功持久化消息后会向生产者发送确认信息。
  • 生产者可以配置不同的确认级别(acks)来控制消息的可靠性,例如等待 Leader 确认或等待所有 ISR 集合中的副本都确认。

(10)消息发送完成:

  • 一旦收到确认,生产者可以选择提交下一批消息或处理其他逻辑。
  • 在接收到确认之前,生产者可以选择等待重试或处理发送失败的情况。

通过以上步骤,Kafka 生产者实现了高效、可靠的消息传递机制,确保消息被安全地发送到 Kafka Broker,并最终持久化到磁盘以供消费者消费。


5.异步发送和确认机制:

  • Kafka 生产者支持异步发送消息的方式,即主线程在发送消息后不必等待发送的结果即可继续执行其他操作。
  • 生产者可以配置消息确认机制(acks),以确保消息是否成功发送到 Kafka Broker。确认机制可以是无需确认、Leader 确认或者 Leader 和 ISR 集合中的所有副本都确认。

6.错误处理:

  • 在发送消息的过程中,如果发生网络故障、Broker 不可用等异常情况,发送线程会尝试重试发送消息,以确保消息的可靠性。
  • Kafka 生产者提供了一些配置选项来控制重试次数、重试间隔等参数,以应对不同的故障情况。
    在这里插入图片描述
    在这里插入图片描述

Tips:想了解更多相关知识,可以移步我的主页哦~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/15012.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

详解CSS(三)及案例实现

目录 1.弹性布局 1.1 弹性布局案例 1.2flex 布局基本概念 1.3常用属性 1.3.1justify-content 1.3.2align-items 2.案例实现:小广告 3.案例实现:百度热榜 1.弹性布局 弹性布局(Flex布局)是一种用于创建自适应和响应式布局的…

“AIGC行业投资时机分析:评估当前市场发展阶段与未来需求趋势“

文章目录 每日一句正能量前言行业前景当前发展前景相关领域的发展趋势行业潜力竞争情况结论 市场需求人才需求情况机会挑战结论 选择与规划自我评估行业调研职业规划风险管理个人陈述示例 后记 每日一句正能量 胖了就减,没钱就赚,不会就学,不…

男士内裤什么材质的好?推荐男士内裤的注意事项

天气已经逐渐热了起来,广大男士们在夏天难免会出一身的汗,不少男士朋友都觉得一些吸湿性、透气性不好的内裤会在夏天穿着很不适,想挑选一些比较适合夏天的男士内裤,但现在的男士内裤品牌和材质分类却比较多,看得大家眼…

Python游戏编程:一步步用Python打造经典贪吃蛇小游戏

贪吃蛇作为一款极其经典且广受欢迎的小游戏,是早期 Windows 电脑和功能手机(特别是诺基亚手机)流行度极高的小游戏,是当时功能手机时代最具代表性的游戏之一。游戏的基本规则和目标十分简单,但却极具吸引力&#xff0c…

共享单车(八):数据库

实现后台数据库访问模块的框架&#xff0c;能够实现验证请求并响应&#xff08;支持数据库操作&#xff09;。 数据库设计 class SqlTabel //负责数据库表的创建 { public:SqlTabel(std::shared_ptr<MysqlConnection> sqlconn) :sqlconn_(sqlconn) {}bool CreateUserI…

详细分析crontab定时执行任务(附Demo | 定时清空Tomcat的实战)

目录 前言1. 基本知识2. Demo3. 实战3.1 错误版本3.2 正确版本 前言 由于用户量大&#xff0c;且导出的日志以及缓存特别多&#xff0c;急需定期删除文件 1. 基本知识 crontab 是一个用于定时执行任务的命令行工具&#xff0c;通常在 Unix 和类 Unix 系统中可用&#xff0c;表…

【微信小程序开发】小程序前后端交互--发送网络请求实战解析

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

三、自定义信号和槽函数(无参和有参)

需求&#xff1a; 下班后&#xff0c;小明说请小红吃好吃的&#xff0c;随便吃&#xff0c;吃啥买啥 无参&#xff1a;小红没有提出吃啥 有参&#xff1a;小红提出自己想吃的东西&#xff0c;吃啥取决于一时兴起&#xff08;emit触发&#xff09; 思路&#xff1a; 1&#xff…

Unreal Engine5 Landscape地形材质无法显示加载

UE5系列文章目录 文章目录 UE5系列文章目录前言一、解决办法 前言 在使用ue5做地形编辑的时候&#xff0c;明明刚才就保存的Landscape地形完全消失不见&#xff0c;或者是地形的材质不见了。重新打开UE5发现有时候能解决&#xff0c;但大多数时候还是没有解决&#xff0c;我下…

如何在 ASP.NET Core 中实现中间件管道

概述:借助 ASP.NET Core,中间件流水线可以作为一种轻量级、灵活的机制,使开发人员能够在请求流水线的不同阶段插入功能。这些中间件组件可以执行各种任务,例如日志记录、身份验证、授权、异常处理等。它们提供了一种封装和组织代码的方法,促进了更简洁、更易于维护的应用程…

聚观早报 | 华为畅享 70S真机图赏;vivo Y200 GT开售

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 5月25日消息 华为畅享 70S真机图赏 vivo Y200 GT开售 一加13部分细节曝光 马斯克谈AI未来 三星Galaxy Z Fold6将…

有一个3x4的矩阵,要求编写程序求出其中值最大的那个元素,以及其所在的行号和列号

解题思路&#xff1a; 先考虑解此问题的思路。从若干数中求最大数的方法很多&#xff0c;现在采用"打擂台"的算法。如果有若干人比武&#xff0c;先有一人站在台上&#xff0c;再上去一人与其交手&#xff0c;败者下台&#xff0c;胜者留在台上。第3个人再上…

Font shape `U/rsfs/m/n‘ in size <29.86> not available size <24.88>

解决方法&#xff1a;mathrsfs 删除这个包 其他可以参考&#xff1a;koma script - Size substitution with fontsize14 - TeX - LaTeX Stack Exchange

【C语言】深入理解指针(一)(中)

2、指针变量和解引用操作符&#xff08;*&#xff09; &#xff08;1&#xff09;指针变量 我们通过取地址操作符&#xff08;&&#xff09;拿到的地址是一个数值&#xff0c;比如&#xff1a;0x006FFD70&#xff0c;这个数值有时候是需要存储起来&#xff0c;方便后期再…

详解最新版RabbitMQ 基于RPM 方式的安装

如何选择安装版本 已经不支持的发布系列 版本最后补丁版本首次发布时间停止更新时间3.73.7.282017年11月28日2020年09月30日3.63.6.162015年12月22日2018年05月31日3.53.5.82015年03月11日2016年10月31日3.43.4.42014年10月21日2015年10月31日3.33.3.52014年04月02日2015年03…

Mybatis Cache(一)MybatisCache+Redis

前面提到了&#xff0c;使用mybatis cache&#xff0c;一般是结合redis使用。 一、demo 1、数据表 create table demo.t_address (id int auto_incrementprimary key,address_name varchar(200) null,address_code varchar(20) null,address_type int n…

计算机毕业设计Python+Scrapy+Vue.js机器学习招聘推荐系统 招聘数据可视化 招聘爬虫 招聘数据分析 大数据毕业设计 大数据毕设

桂林学院 本科生毕业论文&#xff08;设计、创作&#xff09;开题报告 二级学院 理工学院 专业 数据科学与大数据技术&#xff08;专升本&#xff09; 年级 2022级 姓名 徐彬彬 学号 202213018222 指导教师 沈岚岚 职称/学位 高级实验师 第二 导师 职称/学…

【ZYNQ】AXI-Quad-SPI SDK 开发记录 测试

前人工作 如前人工作&#xff0c;在Navigate to BSP Settings中找到历例程 file:///F:/Xilinx/Vitis/2019.2/data/embeddedsw/XilinxProcessorIPLib/drivers/spi_v4_5/doc/html/api/example.html使用XSpi_LowLevelExample例子&#xff0c;源代码的AI解析 int XSpi_LowLeve…

Java使用apache.poi生成word

加油&#xff0c;打工人&#xff01; 工作需求&#xff0c;将现有的word模板有段落和表格&#xff0c;从数据库中查出数据并填充&#xff0c;word里面也有表格数据&#xff0c;需要将excel表格数据单独处理&#xff0c;然后插入到生成好的word文档中。 下面代码模拟从数据库查出…

Kubernetes——Kubectl详解

目录 前言 一、陈述式资源管理方法 二、Kubectl命令操作 1.查 1.1kubectl version——查看版本信息 1.2kubectl api-resources——查看资源对象简写 1.3kubectl cluster-info——查看集群信息 1.4配置Kubectl补全 1.5journalctl -u kubelet -f——查看日志 1.6kubec…