深入Kafka client

分区分配策略

客户端可以自定义分区分配策略, 当然也需要考虑分区消费之后的offset提交, 是否有冲突。

消费者协调器和组协调器

a. 消费者的不同分区策略, 消费者之间的负载均衡(新消费者加入或者存量消费者退出), 需要broker做必要的协调。
b. Kafka按照消费组管理消费者, 鉴于offset提交最终都是在某个broker节点上完成。该broker扮演GroupCoordinator角色, 具体的选择则是通过hash快速定位。
c. client端存在一个ClientCoordinator与目标的GroupCoordinator进行通信实现最终协调;
d. 具体过程如下

ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer) 1. Find_Coordinator request Find_Coordinator response 2. Join_Group request 3.1 calculate brokerId 3.2 Elect leader consumer 3.3 Elect partition strategy . Join_Group response, isLeader 4. Sync_Group Request Sync_Group Response 5. Poll offset/message, HeartBeat response offset/heartbeat/message ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer)

关于__consumer_offset

__consumer_offset是一个特殊的topic, 用于存储每个topic中partition中client提交的offset。其中的数据保留时间通过offset.retention.minutes配置。如果consumer消费消息的间隔超过了配置时间, 则offset会丢失, consumer再次获取offset时会因为没有存量的offset而自动重置(auto.offset.reset)。该topic下的消息清理采用压缩策略(仅保留最新消息)。Kafka中会有定时清理任务清理过期的消费位移。

消息发送QoS

  1. at-least-once, 至少一次, 消息不会丢失, 但消息会重复;
  2. at-most-once, 至多一次, 消息不会重复, 但可能会丢失;
  3. exact-once, 恰好一次, 消息肯定被传输且只传输一次;(如果开发即时消息系统, 那么这个语义就是我们的目标)
    默认情况下, Kafka producer在发送时, 如果消息发送失败会自动进行重试, 重试过程可能会导致消息重复。而一旦发送成功, Kafka通过多副本机制保证消息一定会被保存。因此从consumer角度观察, producer发送的结果, 其QoS是at-least-once。如果需要exact-once, 则需要启用Kafka的幂等特性。

幂等

  1. 配置参数
    enable.idompotence=true
    retrics > 0
    max.in.flight.requests.per.connection <=5
    ack=-1

  2. 实现细节
    首先幂等是partition级别, broker端自动为producer分配一个PID, 并维护PID->分区(序列号 lastSeq) 的状态。当producer发送消息时, 必须携带该序列号newSeq。broker端收到消息时做校验:
    a. newSeq = lastSeq+1, broker接收;
    b. newSeq > lastSeq+1, 中间存在消息丢失, 抛出OutOfOrderException;
    c. newSeq < lastSeq+1, 消息存在重复, 直接丢弃即可.

事务消息

如果要实现跨parition的exact-once语义, 则需要基于事务消息。一般来说事务有ACID的特性, 但这个是数据库事务的通用场景。Kafka下消息需要考虑生产和消费, 这里的事务消息更多是生产端的事务消息。消费端可能会因为某些原因无法以事务的形式消费。比如:

  1. 对于采用日志压缩策略的主题而言, 事务中的消息被清理(对相同key的消息后写入的消息会覆盖之前写入的消息);
  2. 事务涉及的分区多个日志段, 如果老的日志分段被删除, 对应的消息也会消失;
  3. 消费者通过seek消费消息, 造成消息遗漏;
  4. 消费者在消费时没有消费到事务涉及的所有分区, 因此不能读取事务中的所有消息;
    总的来说, 事务保证了生产者可以以事务的方式实现消息发送的exact
    -once语义, 但消息清理和消费并未引入事务约束。

实现原理

  1. 开启幂等;
  2. 设置事务ID, transactional.id;
  3. 生产者通过事务ID得到PID和producer epoch, 进而实现跨生产者会话的消息发送和事务恢复。前者保证相同transactionId的生产者仅有1个可以有效发送消息, 后者保证如果事务消息发送后宕机新恢复出来的生产者可以继续提交或者终止事务。其中包含2个方面, 生产者的唯一性, 其关联的在途事务的可见性和可操作性。
  4. broker端为支持事务消息引入了事务协调器, 与组协调器类似, 用于处理事务的提交和终止。
  5. 具体交互流程如下
    发送事务消息交互细节

事务存储

  1. 日志存储按Topic, Partition和LogSegment层级存储, 事务消息也不例外;
  2. 与普通消息的区别是, 事务消息更多适用于发送一组消息的场景, 具体到LogSegment就是有一组连续的消息, 因此Kafka引入了ControlBatch消息来标志消息结束。
  3. 事务消息的开始在哪里呢? 严格来说, producer跨分区发送成功后, consumer是无法恢复出原有的顺序, 在分区级别仅可以做到与某个事务关联的一组消息(通过消息的属性标志是否为事务消息), 结束通过ControlBatch标志一组消息结束。

小结

本文讨论了Kafka发送消息的三种语义at-least-once, at-most-once, exact-once,并针对exact-once的单分区实现(幂等控制)和跨分区实现(事务消息)做简要介绍, 希望能帮助你梳理出Kafka broker端对消息发送QoS实现的基本脉络, 为进一步学习打基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/716963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE3:省市区联级选择器

一、实现效果 二、代码展示 <template><div class"page"><select v-model"property.province"><option v-for"item in provinces" :key"item">{{ item }}</option></select><select v-model&…

今日学习总结2024.3.2

最近的学习状态比较好&#xff0c;感觉非常享受知识进入脑子的过程&#xff0c;有点上头。 实验室一个星期唯一一天的假期周六&#xff0c;也就是今天&#xff0c;也完全不想放假出去玩啊&#xff0c;在实验室泡了一天。 很后悔之前胆小&#xff0c;没有提前投简历找实习&…

YOLOv9有效提点|加入MobileViT 、SK 、Double Attention Networks、CoTAttention等几十种注意力机制(五)

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;主力高效涨点&#xff01;&#xff01;&#xff01; 一、本文介绍 本文只有代码及注意力模块简介&#xff0c;YOLOv9中的添加教程&#xff1a;可以看这篇文章。 YOLOv9有效提点|加入SE、CBAM、ECA、SimA…

ETH网络中的区块链

回顾BTC网络的区块链系统 什么是区块链&#xff1f;BTC网络是如何运行的&#xff1f;BTC交易模式 - UXTO ETH网络中的区块链 ETH网络的基石依旧是 区块链。上面 什么是区块链&#xff1f; 的文章依旧适用。 相比BTC网络&#xff0c;ETH网络的账户系统就相对复杂&#xff0c;所…

ZJGSU 1199 表达式计算

题目描述 在数据结构课上&#xff0c;老师给大家布置了一个表达式计算的问题 3*21*5. Its so easy!!! csw同学做了很不过瘾&#xff0c;他想求解更复杂的表达式: 比如(123456)/789. 但一时之间他想不出好的办法&#xff0c;诸位就帮帮他吧. 输入 输入包括多组数据, 每组测试…

实用工具:实时监控服务器CPU负载状态并邮件通知并启用开机自启

作用&#xff1a;在服务器CPU高负载时发送邮件通知 目录 一、功能代码 二、配置开机自启动该监控脚本 1&#xff0c;配置自启脚本 2&#xff0c;启动 三、功能测试 一、功能代码 功能&#xff1a;在CPU负载超过预设置的90%阈值时就发送邮件通知&#xff01;邮件内容显示…

【Spring连载】使用Spring Data访问 MongoDB----对象映射之属性转换器

【Spring连载】使用Spring Data访问 MongoDB----对象映射之属性转换器 一、声明式值转换器二、编程式值转换器注册三、MongoCustomConversions配置 虽然基于类型的转换已经提供了影响目标存储中某些类型的转换和表示的方法&#xff0c;但当仅考虑特定类型的某些值或属性进行转换…

js中Generator函数详解

定义&#xff1a; promise是为了解决回调地狱的难题出现的&#xff0c;那么 Generator 就是为了解决异步问题而出现的。 普通函数&#xff0c;如果调用它会立即执行完毕&#xff1b;Generator 函数&#xff0c;它可以暂停&#xff0c;不一定马上把函数体中的所有代码执行完毕…

Linux基本指令(下)

目录 1. less指令 2. head与tail指令 3. find指令 示例 4. grep指令 示例 ​编辑 5. zip/unzip 打包与压缩 示例 ​编辑 6. tar指令 7. find指令&#xff1a; -name 8. echo指令 9. 时间相关的指令 1.在显示方面&#xff0c;使用者可以设定欲显示的格式&#xff…

分布式ID(6):Redis实现分布式ID生成

Redis是一个高性能的键值数据库,它可以用于生成分布式唯一标识符。需要注意的是Redis实现ID可以用,这也是很多公司的选择。但是在redis服务器宕机的情况下,他也可能会出现重复生成ID的情况。 1 实现原理 利用Redis的原子操作:Redis提供了原子性的INCR和INCRBY命令,可用于…

使用python或AI自动分析数据关联(简介)

有一些Python库可以帮助用户自动发现数据集中的关联关系。通常这类方法被称为关联分析或关联规则挖掘&#xff0c;其中最著名的算法是Apriori和FP-Growth。 两个算法 Apriori算法&#xff1a; 这是一个用于频繁项集挖掘和关联规则学习的经典算法。Python中的mlxtend库提供了一…

【机器学习】有监督学习算法之:K最近邻

K最近邻 1、引言2、决策树2.1 定义2.2 原理2.3 实现方式2.3.1 距离度量2.3.2 K值的选择 2.4 算法公式2.5 代码示例 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c; 这么长时间没更新了&#xff0c;是不是得抓紧时间了。 小鱼&#xff1a;最近可都是在忙的呢&#xff0c;…

已解决ResponseEntityException的Spring MVC异常响应实体异常的正确解决方法,亲测有效!!!

由于ResponseEntityException并非Spring框架中明确定义的异常类&#xff0c;我推断这里可能指的是在使用ResponseEntity时遇到的常见异常或错误。因此&#xff0c;我将根据这个假设&#xff0c;提供一个解决Spring MVC中与ResponseEntity相关异常的通用方法指南。 目录 问题分…

线上历史馆藏系统 Java+SpringBoot+Vue+MySQL

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

day09_商品管理订单管理SpringTaskEcharts

文章目录 1 商品管理1.1 添加功能1.1.1 需求说明1.1.2 核心概念SPUSKU 1.1.3 加载品牌数据CategoryBrandControllerCategoryBrandServiceCategoryBrandMapperCategoryBrandMapper.xml 1.1.4 加载商品单元数据ProductUnitProductUnitControllerProductUnitServiceProductUnitMap…

详解java中的Lambda表达式

Lambda表达式的前世今生&#xff08;来历与概述&#xff09; Lambda表达式的前世------匿名类 以往&#xff0c;使用单一抽象方法的接口被用作函数类型。 它们的实例表示函数&#xff08;functions&#xff09;或行动&#xff08;actions&#xff09;。 自从 JDK 1.1 于 1997…

【MySQL】超详细-基础操作

数据库定义 数据库是一类软件&#xff0c;用来管理数据&#xff0c;组织数据&#xff1b; 关系型数据库MySQL&#xff08;Oracle,SQL Server,SQLite&#xff09;以表格形式组织数据&#xff0c;数据格式要求严格&#xff1b;非关系型数据库Redis&#xff08;MongoDB,HBase&…

数据结构与算法-冒泡排序

引言 在数据结构与算法的世界里&#xff0c;冒泡排序作为基础排序算法之一&#xff0c;以其直观易懂的原理和实现方式&#xff0c;为理解更复杂的数据处理逻辑提供了坚实的入门阶梯。尽管在实际应用中由于其效率问题不常被用于大规模数据的排序任务&#xff0c;但它对于每一位初…

【C++】set、multiset与map、multimap的使用

目录 一、关联式容器二、键值对三、树形结构的关联式容器3.1 set3.1.1 模板参数列表3.1.2 构造3.1.3 迭代器3.1.4 容量3.1.5 修改操作 3.2 multiset3.3 map3.3.1 模板参数列表3.3.2 构造3.3.3 迭代器3.3.4 容量3.3.5 修改操作3.3.6 operator[] 3.4 multimap 一、关联式容器 谈…

Hololens 2应用开发系列(1)——使用MRTK在Unity中设置混合现实场景并进行程序模拟

Hololens 2应用开发系列&#xff08;1&#xff09;——使用MRTK在Unity中进行程序模拟 一、前言二、创建和设置MR场景三、MRTK输入模拟的开启 一、前言 在前面的文章中&#xff0c;我介绍了Hololens 2开发环境搭建和项目生成部署等相关内容&#xff0c;使我们能生成一个简单Ho…