Kafka 入门到起飞系列 - 生产者发送消息流程解析

在这里插入图片描述

  • 生产者通过producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等

  • 生产者通过send()方法发送消息,send()方法会经过如下几步
    1. 首先将消息交给拦截器(Interceptor)处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的效果,拦截器一般将一些通用的功能加进来,通常在消息发送前,producer回调逻辑前对消息做一些定制化需求,消息头部添加消息的属性等
    2. 接下来交给序列化器(Serializer),Key的序列化器和value的序列化器,对消息的key和value进行序列化,序列化为字节数组,
    3. 然后将序列化的结果交给分区器(Partitioner),分区器有3种策略来计算消息应该属于哪个分区,

    • 在producerRecord中直接指定分区,分区器会直接将消息放到指定分区

    • 如果没有指定分区器,但是消息有key,分区器会根据消息的key计算hash值,根据主题分区数量取模,来决定将消息放到哪个分区

    • 如果没有指定分区、也没有指定key,分区器会以轮询(Round Robin)的方式给消息分配分区

      在这里插入图片描述

  • 消息经过以上拦截器->序列化器->分区器 进行加工后,会将消息放到RecordAccumulator缓冲区,对每个分区都会有一个单独的缓冲区,经过分区器计算出分区号之后,不同的消息就会分配给不同的缓冲区,缓冲区里面消息也是有序的,我们可以指定对缓冲区里的消息进行分批次,也可以指定缓冲区大小

  • 在这里插入图片描述

  • 当缓冲区中消息达到条件会按批次发送到broker对应分区上

  • broker将接收到的消息进行刷盘持久化

  • 一个消息发出去之后,服务器(broker)会返回给producer响应,producer再来判断消息是否发送成功,

  • broker返回元数据信息 - > 落盘成功 ->生产者继续发送后面消息

  • broker返回元数据信息 - >落盘失败 - 生产者设置了重试次数 -> producer 会将消息重新放入缓冲区进行排队,等待再次发送,当一个消息发送失败重试需要重发,消息是放到缓冲区队尾,

  • 生产者去缓冲区重试发送


生产者在重试消息时,消息的顺序就错了,那怎么保证消息的有序性呢?

在这里插入图片描述

针对这种情况,可以做一个配置,
参数:max.in.flight.requests.per.connection表示producer 在收到broker响应之前可以发送多少批消息,默认5,
设置此值是1,表示broker在响应之前producer不能再向同一个broker发送请求,就是我确认一批你再发下一批,这样可以保证消息有序性,对消息顺序要求不高情况可以不考虑


补充:

  • Producer 创建时,会创建一个Sender线程(IO线程)设置为守护线程

  • Producer 创建时,会创建缓冲区

  • Producer 生产消息,内部是一个异步流程,Sender线程不断轮询RecordAccumulator,满足条件后进行真正的网络IO发送消息

  • 在这里插入图片描述

  • RecordAccumulator(缓冲区) 对每一个分区都有一个缓冲区

    • 每个分区的缓冲区中消息也是有序的
    • 可以指定缓冲区中的消息按批次发送
      • 缓冲区大小达到batch.size,默认16KB
      • 在缓冲区等待时间 lingger.ms 达到上限
      • 以上两个条件满足一个即发送一批
    • 可以指定整个缓冲区的大小

批次的概念很好理解,缓冲区就像一辆公交车,有两种发车方式,一是人满了就发车,一是等5分钟就发车,不管是人满了还是到5分钟了,发车,go~
分批发送可以减少网络IO,节省带宽使用,减少网络传输的压力,提升吞吐量

  • 一个批次消息发送后,通过网络,发往Kafka指定分区,然后刷盘到broker
  • 如果Producer设置了retries参数值>0,那么允许消息发送失败进行重试,重试机制由客户端Producer内部实现
  • Broker端消息落盘成功,会返回元数据给生产者
    • 通过阻塞直接返回 (同步发送)
    • 通过回调函数返回(异步发送)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/3903.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

静态数码管——FPGA

文章目录 前言一、数码管1、数码管简介2、共阴极数码管or共阳极数码管3、共阴极与共阳极的真值表 二、系统设计1、模块框图2、RTL视图 三、源码1、seg_led_static模块2、time_count模块3、top_seg_led_static(顶层文件) 四、效果五、总结六、参考资料 前言 环境: 1、…

数字化时代,智能文件工具让办公升级

无论是在办公室还是在学校,文件管理是我们日常工作中不可或缺的一环。传统的文件整理方式可能需要花费大量的时间和精力,而且常常容易出现混乱和遗漏。然而,随着科技的不断进步,我们现在有幸生活在一个数字化时代,因此…

如何正确有效的学习java前端(合集)

大量阅读 我是一个劲头十足的读者。所以,我的第一个关于学习JavaScript的技巧就是关于阅读,这绝不是巧合。书籍和其他的资源(如文章)可以在很大程度上帮助你学习JavaScript。通过实践学习,书籍是我学习新学科最喜欢的方式。在学习JavaScript的…

测试用例(2)

项目管理工具 主要用tapd,jira少用 acp 敏捷项目管理证书 task:故事,一个故事有开始也有结束,那么在项目管理里面,会把每个任务按照一个task来看,那么这个task也可以叫story,具体指的就是任务有开始有结…

ChatGPT火热之下的冷思考

作为一款基于人工智能的自然语言处理(NLP)​​聊天机器人​​程序,ChatGPT通过大量来自互联网的文本进行训练,并使用深度学习和机器学习算法来理解用户的问题并提供准确的回答。并且,ChatGPT还内置了情感分析、关键字提取和实体识别等功能&am…

Beyond Compare 代码比较工具

一、下载 官网下载地址: https://www.scootersoftware.com/download.php 选择 Windows 系统,简体中文版本,点击下载。 下载完成 二、安装 步骤1:双击安装包 步骤2:进入安装向导,点击下一步 步骤3&a…

在LLM的支持下使游戏NPC具有记忆化的方法

问题 使用GPT这样的LLM去处理游戏中的NPC和玩家的对话是个很好的点子,那么如何处理记忆化的问题呢。 因为LLM的输入tokens是有限制的,所以伴随着问题的记忆context是有窗口大小限制的,将所有的记忆输入LLM并不现实。 所以这里看到了stanfo…

Zookeeper

作为分布式中间件,zookeeper有以下几个重要功能 服务注册服务监听 :观察者模式,有服务上线或下线可以感知,并进行响应回调处理服务拉取配置中心CP特性数据存储方式为标准的文件结构 安装zk需要java环境,可参考 linux…

面试中关于自动化测试的认识

目录 一、什么是自动化测试,自动化测试的优势是什么? 二、什么样的项目比较适合做自动化测试,什么样的不适合做自动化测试? 三、在制定自动化测试计划的时候一般要考虑哪些点? 四、编写自动化脚本时的一些规范&…

怎么给pdf文件加密?pdf文档如何加密

在数字化时代,保护个人和机密信息的重要性越来越受到关注。PDF(Portable Document Format)是一种广泛使用的文件格式,用于共享和存储各种类型的文档。然而,由于其易于编辑和复制的特性,保护PDF文件中的敏感…

xss跨站脚本攻击总结

XSS(跨站脚本攻击) 跨站脚本攻击(Cross Site Scripting),为了不和层叠样式表(Cascading Style Sheets )CSS的缩写混淆,故将跨站脚本攻击缩写为XSS。恶意攻击者往Web页面里插入恶意Script代码,当…

css基本样式的使用

1、高度和宽度 .c1{height: 300px;width: 500px; }注意事项: 宽度,支持百分比行内标签,默认无效块级标签,默认有效(即使右侧空白,也不给你占用) 块级和行内标签 css样式 标签: di…

Android JNI线程的同步 (十三)

🔥 Android Studio 版本 🔥 🔥 了解线程同步的两个变量 🔥 pthread_mutex_t 互斥锁 线程的互斥: 目前存在两个线程 , 线程A和线程B, 只允许只有一个资源对临界资源进程操作 (大概意思就是 : A线程 进入操作临界资源的时候 , 那么 B线程 就要进行等待 . 等到 A线程…

离线安装docker

目录 1、下载docker 安装包 2、上传docker 到服务器目录/opt/ 3、解压docker-19.03.9.tgz 4、解压的docker文件夹全部移动至/usr/bin目录 5、将docker注册为系统服务 6、重启生效 6.1、重新加载配置文件 6.2、启动Docker服务 6.3、查看启动状态 6.4、 设置docker为开…

CS 144 Lab Two -- TCPReceiver

CS 144 Lab Two -- TCPReceiver TCPReceiver 简述索引转换TCPReceiver 实现 测试 对应课程视频: 【计算机网络】 斯坦福大学CS144课程 Lab Two 对应的PDF: Lab Checkpoint 2: the TCP receiver TCPReceiver 简述 在 Lab2,我们将实现一个 TCPReceiver,用…

【深度学习笔记】训练 / 验证 / 测试集

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记,视频由网易云课堂与 deeplearning.ai 联合出品,主讲人是吴恩达 Andrew Ng 教授。感兴趣的网友可以观看网易云课堂的视频进行深入学习,视频的链接如下: 神经网络和…

Bootstrap编写一个兼容主流浏览器的受众巨幕式风格页面

Bootstrap编写一个兼容主流浏览器的受众巨幕式风格页面 虽然说IE6除了部分要求苛刻的需求以外已经被可以不考虑了,但是WIN7自带的浏览器IE8还是需要支持的。 本文这个方法主要的优点,个人觉得就是准备少,不需要上网寻找大量的图片做素材&…

CSS ::file-selector-button伪元素修改input上传文件按钮的样式

默认样式 修改后的样式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdev…

【MySQL】查询进阶

查询进阶 数据库约束约束类型NULL , DEFAULT , UNIQUE 约束主键约束外键约束 聚合查询聚合函数group by子句HAVING 联合查询内连接外连接自连接子查询单行子查询多行子查询 数据库约束 约束类型 NOT NULL #表示某行不能储存空值 UNIQUE #保证每一行必须有唯一的值 DEFAULT #规…

CSS科技感四角边框

实现效果:使用before和after就可以实现,代码量不多,长度颜色都可以自己调整 <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Title</title><style>*{margin:0;padding:0;}html,body{…