深入Kafka client

分区分配策略

客户端可以自定义分区分配策略, 当然也需要考虑分区消费之后的offset提交, 是否有冲突。

消费者协调器和组协调器

a. 消费者的不同分区策略, 消费者之间的负载均衡(新消费者加入或者存量消费者退出), 需要broker做必要的协调。
b. Kafka按照消费组管理消费者, 鉴于offset提交最终都是在某个broker节点上完成。该broker扮演GroupCoordinator角色, 具体的选择则是通过hash快速定位。
c. client端存在一个ClientCoordinator与目标的GroupCoordinator进行通信实现最终协调;
d. 具体过程如下

ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer) 1. Find_Coordinator request Find_Coordinator response 2. Join_Group request 3.1 calculate brokerId 3.2 Elect leader consumer 3.3 Elect partition strategy . Join_Group response, isLeader 4. Sync_Group Request Sync_Group Response 5. Poll offset/message, HeartBeat response offset/heartbeat/message ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer)

关于__consumer_offset

__consumer_offset是一个特殊的topic, 用于存储每个topic中partition中client提交的offset。其中的数据保留时间通过offset.retention.minutes配置。如果consumer消费消息的间隔超过了配置时间, 则offset会丢失, consumer再次获取offset时会因为没有存量的offset而自动重置(auto.offset.reset)。该topic下的消息清理采用压缩策略(仅保留最新消息)。Kafka中会有定时清理任务清理过期的消费位移。

消息发送QoS

  1. at-least-once, 至少一次, 消息不会丢失, 但消息会重复;
  2. at-most-once, 至多一次, 消息不会重复, 但可能会丢失;
  3. exact-once, 恰好一次, 消息肯定被传输且只传输一次;(如果开发即时消息系统, 那么这个语义就是我们的目标)
    默认情况下, Kafka producer在发送时, 如果消息发送失败会自动进行重试, 重试过程可能会导致消息重复。而一旦发送成功, Kafka通过多副本机制保证消息一定会被保存。因此从consumer角度观察, producer发送的结果, 其QoS是at-least-once。如果需要exact-once, 则需要启用Kafka的幂等特性。

幂等

  1. 配置参数
    enable.idompotence=true
    retrics > 0
    max.in.flight.requests.per.connection <=5
    ack=-1

  2. 实现细节
    首先幂等是partition级别, broker端自动为producer分配一个PID, 并维护PID->分区(序列号 lastSeq) 的状态。当producer发送消息时, 必须携带该序列号newSeq。broker端收到消息时做校验:
    a. newSeq = lastSeq+1, broker接收;
    b. newSeq > lastSeq+1, 中间存在消息丢失, 抛出OutOfOrderException;
    c. newSeq < lastSeq+1, 消息存在重复, 直接丢弃即可.

事务消息

如果要实现跨parition的exact-once语义, 则需要基于事务消息。一般来说事务有ACID的特性, 但这个是数据库事务的通用场景。Kafka下消息需要考虑生产和消费, 这里的事务消息更多是生产端的事务消息。消费端可能会因为某些原因无法以事务的形式消费。比如:

  1. 对于采用日志压缩策略的主题而言, 事务中的消息被清理(对相同key的消息后写入的消息会覆盖之前写入的消息);
  2. 事务涉及的分区多个日志段, 如果老的日志分段被删除, 对应的消息也会消失;
  3. 消费者通过seek消费消息, 造成消息遗漏;
  4. 消费者在消费时没有消费到事务涉及的所有分区, 因此不能读取事务中的所有消息;
    总的来说, 事务保证了生产者可以以事务的方式实现消息发送的exact
    -once语义, 但消息清理和消费并未引入事务约束。

实现原理

  1. 开启幂等;
  2. 设置事务ID, transactional.id;
  3. 生产者通过事务ID得到PID和producer epoch, 进而实现跨生产者会话的消息发送和事务恢复。前者保证相同transactionId的生产者仅有1个可以有效发送消息, 后者保证如果事务消息发送后宕机新恢复出来的生产者可以继续提交或者终止事务。其中包含2个方面, 生产者的唯一性, 其关联的在途事务的可见性和可操作性。
  4. broker端为支持事务消息引入了事务协调器, 与组协调器类似, 用于处理事务的提交和终止。
  5. 具体交互流程如下
    发送事务消息交互细节

事务存储

  1. 日志存储按Topic, Partition和LogSegment层级存储, 事务消息也不例外;
  2. 与普通消息的区别是, 事务消息更多适用于发送一组消息的场景, 具体到LogSegment就是有一组连续的消息, 因此Kafka引入了ControlBatch消息来标志消息结束。
  3. 事务消息的开始在哪里呢? 严格来说, producer跨分区发送成功后, consumer是无法恢复出原有的顺序, 在分区级别仅可以做到与某个事务关联的一组消息(通过消息的属性标志是否为事务消息), 结束通过ControlBatch标志一组消息结束。

小结

本文讨论了Kafka发送消息的三种语义at-least-once, at-most-once, exact-once,并针对exact-once的单分区实现(幂等控制)和跨分区实现(事务消息)做简要介绍, 希望能帮助你梳理出Kafka broker端对消息发送QoS实现的基本脉络, 为进一步学习打基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/716963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE3:省市区联级选择器

一、实现效果 二、代码展示 <template><div class"page"><select v-model"property.province"><option v-for"item in provinces" :key"item">{{ item }}</option></select><select v-model&…

今日学习总结2024.3.2

最近的学习状态比较好&#xff0c;感觉非常享受知识进入脑子的过程&#xff0c;有点上头。 实验室一个星期唯一一天的假期周六&#xff0c;也就是今天&#xff0c;也完全不想放假出去玩啊&#xff0c;在实验室泡了一天。 很后悔之前胆小&#xff0c;没有提前投简历找实习&…

YOLOv9有效提点|加入MobileViT 、SK 、Double Attention Networks、CoTAttention等几十种注意力机制(五)

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;主力高效涨点&#xff01;&#xff01;&#xff01; 一、本文介绍 本文只有代码及注意力模块简介&#xff0c;YOLOv9中的添加教程&#xff1a;可以看这篇文章。 YOLOv9有效提点|加入SE、CBAM、ECA、SimA…

ETH网络中的区块链

回顾BTC网络的区块链系统 什么是区块链&#xff1f;BTC网络是如何运行的&#xff1f;BTC交易模式 - UXTO ETH网络中的区块链 ETH网络的基石依旧是 区块链。上面 什么是区块链&#xff1f; 的文章依旧适用。 相比BTC网络&#xff0c;ETH网络的账户系统就相对复杂&#xff0c;所…

实用工具:实时监控服务器CPU负载状态并邮件通知并启用开机自启

作用&#xff1a;在服务器CPU高负载时发送邮件通知 目录 一、功能代码 二、配置开机自启动该监控脚本 1&#xff0c;配置自启脚本 2&#xff0c;启动 三、功能测试 一、功能代码 功能&#xff1a;在CPU负载超过预设置的90%阈值时就发送邮件通知&#xff01;邮件内容显示…

js中Generator函数详解

定义&#xff1a; promise是为了解决回调地狱的难题出现的&#xff0c;那么 Generator 就是为了解决异步问题而出现的。 普通函数&#xff0c;如果调用它会立即执行完毕&#xff1b;Generator 函数&#xff0c;它可以暂停&#xff0c;不一定马上把函数体中的所有代码执行完毕…

Linux基本指令(下)

目录 1. less指令 2. head与tail指令 3. find指令 示例 4. grep指令 示例 ​编辑 5. zip/unzip 打包与压缩 示例 ​编辑 6. tar指令 7. find指令&#xff1a; -name 8. echo指令 9. 时间相关的指令 1.在显示方面&#xff0c;使用者可以设定欲显示的格式&#xff…

【机器学习】有监督学习算法之:K最近邻

K最近邻 1、引言2、决策树2.1 定义2.2 原理2.3 实现方式2.3.1 距离度量2.3.2 K值的选择 2.4 算法公式2.5 代码示例 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c; 这么长时间没更新了&#xff0c;是不是得抓紧时间了。 小鱼&#xff1a;最近可都是在忙的呢&#xff0c;…

线上历史馆藏系统 Java+SpringBoot+Vue+MySQL

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

day09_商品管理订单管理SpringTaskEcharts

文章目录 1 商品管理1.1 添加功能1.1.1 需求说明1.1.2 核心概念SPUSKU 1.1.3 加载品牌数据CategoryBrandControllerCategoryBrandServiceCategoryBrandMapperCategoryBrandMapper.xml 1.1.4 加载商品单元数据ProductUnitProductUnitControllerProductUnitServiceProductUnitMap…

数据结构与算法-冒泡排序

引言 在数据结构与算法的世界里&#xff0c;冒泡排序作为基础排序算法之一&#xff0c;以其直观易懂的原理和实现方式&#xff0c;为理解更复杂的数据处理逻辑提供了坚实的入门阶梯。尽管在实际应用中由于其效率问题不常被用于大规模数据的排序任务&#xff0c;但它对于每一位初…

【C++】set、multiset与map、multimap的使用

目录 一、关联式容器二、键值对三、树形结构的关联式容器3.1 set3.1.1 模板参数列表3.1.2 构造3.1.3 迭代器3.1.4 容量3.1.5 修改操作 3.2 multiset3.3 map3.3.1 模板参数列表3.3.2 构造3.3.3 迭代器3.3.4 容量3.3.5 修改操作3.3.6 operator[] 3.4 multimap 一、关联式容器 谈…

Hololens 2应用开发系列(1)——使用MRTK在Unity中设置混合现实场景并进行程序模拟

Hololens 2应用开发系列&#xff08;1&#xff09;——使用MRTK在Unity中进行程序模拟 一、前言二、创建和设置MR场景三、MRTK输入模拟的开启 一、前言 在前面的文章中&#xff0c;我介绍了Hololens 2开发环境搭建和项目生成部署等相关内容&#xff0c;使我们能生成一个简单Ho…

matlab 写入格式化文本文件

目录 一、save函数 二、fprintf函数 matlab 写入文本文件可以使用save和fprintf函数 save输出结果: fprintf输出结果: 1.23, 2.34, 3.45 4.56, 5.67, 6.78 7.89, 8.90, 9.01 可以看出fprintf输出结果更加人性化,符合要求,下面分别介绍。 一、save函数 …

MQL5-MT5连接上国内期货

主要原因是昨天在学习MACD时发现给的基础代码感觉不对&#xff0c;但无法证明&#xff0c;因为MT5接的都是外汇交易&#xff0c;数据和国内的文华啥的全对不上&#xff0c;便找了一些国内接CTP的&#xff0c;直接写代码有点麻烦&#xff0c;虽然之前对接过国内CTP的东西&#x…

AI入门笔记(三)

神经网络是如何工作的 神经网络又是如何工作的呢&#xff1f;我们用一个例子来解释。我们看下面这张图片&#xff0c;我们要识别出这些图片都是0并不难&#xff0c;要怎么交给计算机&#xff0c;让计算机和我们得出同样的结果&#xff1f;难点就在于模式识别的答案不标准&…

十二、Nacos源码系列:Nacos配置中心原理(四)- RefreshEvent 事件处理

前面文章&#xff0c;我们说到回调监听器的方法中&#xff0c;主要就是发布了一个RefreshEvent事件&#xff0c;这个事件主要由 SpringCloud 相关类来处理。今天我们继续分析后续的流程。 RefreshEvent 事件会由 RefreshEventListener 来处理&#xff0c;该 listener 含有一个 …

武器大师——操作符详解(下)

目录 六、单目操作符 七、逗号表达式 八、下标引用以及函数调用 8.1.下标引用 8.2.函数调用 九、结构体 9.1.结构体 9.1.1结构的声明 9.1.2结构体的定义和初始化 9.2.结构成员访问操作符 9.2.1直接访问 9.2.2间接访问 十、操作符的属性 10.1.优先性 10.2.结合性 …

sql基本语法+实验实践

sql语法 注释&#xff1a; 单行 --注释内容# 注释内容多行 /* 注释内容 */数据定义语言DDL 查询所有数据库 show databases;注意是databases而不是database。 查询当前数据库 select database();创建数据库 create database [if not exists] 数据库名 [default charset 字符…

备战蓝桥杯Day22 - 计数排序

计数排序问题描述 对列表进行排序&#xff0c;已知列表中的数范围都在0-100之间。设计时间复杂度为O(n)的算法。 比如列表中有一串数字&#xff0c;2 5 3 1 6 3 2 1 &#xff0c;需要将他们按照从小到大的次序排列&#xff0c;得到1 1 2 2 3 3 5 6 的结果。那么此时计数排序是…