消息消费过程

前言

本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。

消费者与消费组

Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。

开发流程

  1. 配置consumer参数并创建consumer实例;
  2. 订阅主题;
  3. 拉取消息并消费;
  4. 提交消费偏移量;
  5. 关闭consumer;
class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap serverprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");// group.id, 如果当前consumer需要加入到某个group中, 否则自成一个groupprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");// 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");// 自动提交offset设置, 样例中为手动提交// props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");// 自动提交offset的时间间隔// props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");// offset reset配置props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");// key和value的deserializer配置props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);boolean running = true;while(running) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records) {// 消费消息}// 消费成功提交offsetconsumer.commitSync();}consumer.close();}
}

主题与分区

每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。

例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。

反序列化

consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。

interface Deserializer {T deserialize(String topic, byte[] data);void close();
}

主题订阅

  1. 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
  2. 订阅多个特定主题, subscribe(collection);
  3. 订阅某种模式的主题, subscribe(pattern);
  4. 订阅某个主题的特定partition, assign(partition);
  5. 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;

消息获取

  1. 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
  2. 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
  3. 拉模式: client主动发起消息请求, 从server端拉取消息;
  4. 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)

内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。

位移提交

位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:
请添加图片描述

  1. 消息是按照partition存储的;
  2. 消息写入partition时, offset单调递增;
  3. 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;

因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;

提交方式说明优缺点
自动提交(默认方式) Kafka Client周期性地提交偏移量优点是简单, 确定是重复消费和丢失风险
手动提交由用户主动提交偏移量优点是可细粒度管理, 缺点是相对复杂

自动提交

自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。
在这里插入图片描述

  1. consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
  2. 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  3. 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  4. 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;

手动提交

也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |

指定offset

消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?

  1. 由参数auto.offset.reset设定默认行为
    | 参数值 | 行为 |
    |----|----|
    | earliest | 从分区第一条消息的offset开始 |
    | latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
    | none | 程序逻辑自定义, 如果未设置则抛出异常 |

  2. 程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。

  3. 由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。

再平衡

再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。

Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。

消费者拦截器

Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。

拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/150478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

42、element表格内容溢出自动往上滚动,鼠标移入停止滚动,溢出继续滚动

vue模块&#xff0c;添加ref属性 <v-table ref"rollTable" style"margin: 0!important;" :loading"TBloading" :isIndex"true" :tableData"tableData":tableHead"tableHead":paginationShow"false"…

香港身份和内地身份可以共存吗?

香港身份和内地身份可以共存吗&#xff1f; 很多朋友在刚开始了解香港身份的时候&#xff0c;都会对香港永居身份、居民身份、内地户口等等名词产生疑惑&#xff0c;搞不清它们之间的区别。 还有一部分人误以为拿到香港身份就意味着要放弃内地户口。 今天银河君就来具体解释…

电影:从微缩模型到AI纹理

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 自胶片问世以来&#xff0c;电影制作人必须以模仿现实的方式使用纹理&#xff0c;让观众相信他…

【Linux】环境变量--PATH环境变量/环境变量的操作/命令行参数

文章目录 一、PATH环境变量1.什么是PATH环境变量2.如何添加PATH环境变量3.系统中的其他环境变量4.环境变量的来源 二、环境变量的操作1.设置环境变量2.通过getenv获取环境变量3.环境变量的意义 三、命令行参数 一、PATH环境变量 1.什么是PATH环境变量 这里我们先提出一个问题…

【阿里云】图像识别

一、阿里云官网资料及配置本地 二、配置环境变量 三、C语言调用阿里云Python接口 一、阿里云官网资料及配置本地 阿里云官网 垃圾识别分类 sudo apt install python3-pip pip3 install alibabacloud_imagerecog20190930可能出现的网络问题 二、配置环境变量 配置环境变量A…

Apache POI(Java)

一、Apache POI介绍 Apache POI是Apache组织提供的开源的工具包&#xff08;jar包&#xff09;。大多数中小规模的应用程序开发主要依赖于Apache POI&#xff08;HSSF XSSF&#xff09;。它支持Excel 库的所有基本功能; 文本的导入和导出是它的主要特点。 我们可以使用 POI 在…

mfc140.dll是什么文件?如何修复mfc140.dll丢失的方法分享

​mfc140.dll丢失的原因 未正确安装Microsoft Visual C Redistributable&#xff1a;mfc140.dll是Visual C库的一部分&#xff0c;如果没有正确安装Visual C Redistributable&#xff0c;可能导致mfc140.dll丢失。 系统文件损坏&#xff1a;由于病毒感染、系统错误或其他原因…

济南数字孪生赋能工业制造,加速推进制造业数字化转型

济南数字孪生赋能工业制造&#xff0c;加速推进制造业数字化转型。数字孪生是指通过数字模型对现实世界进行模拟和描述&#xff0c;从而实现数字化转型的技术。数字孪生技术通过利用先进传感与测量技术、实时数据融合及分析技术、虚拟现实技术和仿真技术&#xff0c;在数字空间…

Vite - 配置 - 自动修改 index.html 中的title

需求描述 在Vue3项目的开发过程中&#xff0c;我们为了能区分正式环境和测试环境&#xff0c; 通常会进行环境配置文件的区分&#xff0c; 例如&#xff0c;开发环境一个配置文件、生产环境一个配置文件。因此&#xff0c;我们就希望 在项目的index.html 的 title 标签中&…

element el-date-picker报错Prop being mutated:“placement“快速解决方式

报错信息 Avoid mutating a prop directly since the value will be overwritten whenever the parent component re-renders. Instead, use a data or computed property based on the prop’s value. Prop being mutated: “placement” 报错版本 element-ui 2.15.6 和 2.15…

单链表相关面试题--4.输入一个链表,输出该链表中倒数第k个结点

/* 解题思路&#xff1a; 快慢指针法 fast, slow, 首先让fast先走k步&#xff0c;然后fast,slow同时走&#xff0c;fast走到末尾时&#xff0c;slow走到倒数第k个节点。 */ class Solution { public:ListNode* FindKthToTail(ListNode* pListHead, unsigned int k) {struct Lis…

【windows 清理redis 缓存】

redis-cli.exe flushall flushdb

深度学习领域中的耦合与解耦

在阅读论文的时候应该会看到两个操作&#xff0c;一个是耦合&#xff0c;一个是解耦&#xff0c;经常搭配着出现的就是两个词语&#xff0c;耦合头&#xff08;Coupled head&#xff09;以及Decoupled head&#xff08;解耦合头&#xff09;&#xff0c;那为什么要耦合&#xf…

【docker】iptables实现NAT

iptables是一个Linux内核中的防火墙工具&#xff0c;可以被用来执行各种网络相关的任务&#xff0c;如过滤、NAT和端口转发等&#xff0c;可以监控、过滤和重定向网络流量。 iptables可以用于以下应用场景&#xff1a; 网络安全&#xff1a;iptables可以过滤网络流量&#xf…

Workplace Search 的演变:使用 Elasticsearch 搜索你的私人数据

作者&#xff1a;Dana Juratoni, Aditya Tripathi Workplace Search 功能将来将与 Elastic Search 合并。 这是你需要了解的内容。 生成式人工智能技术的最新进展为搜索带来了一系列可能性。 随着开发人员构建新的体验&#xff0c;用户正在采用新的搜索使用方式 —— 从用自然…

CSS滚动捕获 scroll-snap-align

CSS滚动捕获 scroll-snap-align 看到 align, 就条件反射想到对齐方式, 嗯猜对了. 不过要先看一下若干名词介绍 scroll-snap-align 指定了盒子的 snap position, 即盒子 snap area 和滚动容器的 snapport 的对齐方式. 这个属性是定义在滚动元素上, 而不是滚动容器上 语法 这个…

Python与ArcGIS系列(八)通过python执行地理处理工具

目录 0 简述1 脚本执行地理处理工具2 在地理处理工具间建立联系0 简述 arcgis包含数百种可以通过python脚本执行的地理处理工具,这样就通过python可以处理复杂的工作和批处理。本篇将介绍如何利用arcpy实现执行地理处理工具以及在地理处理工具间建立联系。 1 脚本执行地理处理…

《Fine-Grained Image Analysis with Deep Learning: A Survey》阅读笔记

论文标题 《Fine-Grained Image Analysis with Deep Learning: A Survey》 作者 魏秀参&#xff0c;南京理工大学 初读 摘要 与上篇综述相同&#xff1a; 细粒度图像分析&#xff08;FGIA&#xff09;的任务是分析从属类别的视觉对象。 细粒度性质引起的类间小变化和类内…

python之代理ip的配置与调试

目录 前言 一、代理IP的配置 二、代理IP的调试 2.1 使用curl命令测试代理IP 2.2 使用requests库调试代理IP 三、代理IP的获取 3.1 使用代理IP池 3.2 使用付费代理IP服务 总结 前言 代理IP是网络爬虫中常用的技术手段。通过使用代理服务器&#xff0c;可以实现对特定网…

某60区块链安全之不安全的随机数实战一

区块链安全 文章目录 区块链安全不安全的随机数实战一实验目的实验环境实验工具实验原理实验内容攻击过程分析合约源代码漏洞EXP利用 不安全的随机数实战一 实验目的 学会使用python3的web3模块 学会以太坊不安全的随机数漏洞分析及利用 实验环境 Ubuntu18.04操作机 实验工…