Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UML-顺序图

提示:用例图从参与者的角度出发,描述了系统的需求(用例图);静态图定义系统中的类和对象间的静态关系(类图、对象图和包图);状态机模型描述系统元素的行为和状态变化流程(…

QT上位机开发(不同场景下界面的设计模板)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 qt由于其优秀的跨平台属性,几乎成了嵌入式开发界面开发的标配。同时呢,由于它在windows平台开发出来的效果也是非常的好&am…

微信小程序+前后端开发学习材料2-(视图+基本内容+表单组件)

学习来源 视图 1.swiper 滑块视图容器。其中只可放置swiper-item组件,否则会导致未定义的行为。 显示面板指示点indicator-dots 基础内容 1.icon 图标组件 实例演示 2.progress 进度条。组件属性的长度单位默认为px,咱用rpx。 实例演示 这…

【运维】WSL1如何升级到WSL2

升级WSL1到WSL2:简便快捷版 在这篇博客中,我们将研究如何通过一种更简便的方式,将WSL1迅速升级到WSL2,避免官方文档的繁冗步骤。如果你觉得官方方法太过冗长,那么这里提供的步骤可能更适合你。 官网的办法是&#xf…

如何解决分支机构无法连入总部采购管理系统的难题

案例背景: 某企业业务规模不断壮大,内部采购流程越发复杂,供应商资质情况各异难以管理,为提高内部采购效率和采购品质,优化供应链管理,确保采购环节公正透明可溯,该企业集中化部署了采购管理系…

AJAX入门到实战,学习前端框架前必会的(ajax+node.js+webpack+git)(八)

16.什么是webpack? 压缩:把代码文件的体积缩小 整合:把多个CSS文件、JS文件整合成一个,减少用户浏览器的http请求次数,从而让用户更快访问我们的网页 转译:less、sass转换成css,高版本js降级处理等 时间…

Tomcat启动后无法访问主页

1、确认JDK和Tomcat环境变量配置没有问题后&#xff0c;startup启动Tomcat 2、输入localhost:8080显示无法访问 3、找到Tomcat安装目录下的conf目录 4、修改下面两个地方&#xff0c;将port改成8081 <Connector port"8081" protocol"HTTP/1.1"connect…

vue3 实现简单计数器示例——一个html文件展示vue3的效果

目的&#xff1a;作为一个新手开发&#xff0c;我想使用 Vue 3 将代码封装在 HTML 文件中时&#xff0c;进行界面打开展示。 一、vue计数示例 学了一个简单计数器界面展示&#xff0c;代码如下&#xff1a; <!DOCTYPE html> <html lang"en"><head&…

PXE和kickstart无人值守安装

PXE高效批量网络装机 引言 1.系统装机的引导方式 启动 操作 系统 1.硬盘 2.光驱&#xff08;u盘&#xff09; 3.网络启动 pxe 重装系统&#xff1f; 在已有操作系统 新到货了一台服务器&#xff0c; 装操作系统 系统镜像 u盘 光盘 pe&#xff1a; 小型的 操作系统 在操…

rust跟我学七:获取外网IP地址

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么获取到本机的外网IP地址。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[…

渗透测试(13)- 费解的三次握手和四次挥手

上一篇文章通过wireshark抓包工具简单了解了TCP/IP协议的五层工作模型&#xff0c;其中传输层我们已经知道主要是用来增加端口信息的。传输层主要有两种协议&#xff0c;一种是面向连接的TCP协议&#xff0c;一种是无连接的UDP协议&#xff0c;无连接的UDP协议比较好理解&#…

QT上位机开发(MySql访问)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 网上介绍的关于QT和mysql部分的内容&#xff0c;都是利用Qt自带的mysql库来实现数据读写的。但是事实上来说&#xff0c;即使不用qt带的库&#xf…

利用 ChatGPT 高效搜索:举一反三的思考方式,高效查找解决方案

文章目录 基础思路举一反三Go 语言 Web 框架延伸思考思考结论 本文只是我的一些尝试&#xff0c;基于 ChatGPT 实现系统化快速搜索某编程语言的特定领域相关包或者基于其他语言类推荐落地方案的尝试。 这篇文章中描述的方式不一定是好方式&#xff0c;但应该会有一定的启示作用…

深入剖析 Git 对象底层原理

一、引言 在我们日常使用 Git 时&#xff0c;通常的操作是&#xff1a; 在写完一段代码后&#xff0c;执行 git add命令&#xff0c;将这段代码添加到暂存区中然后再执行 git commit和 git push 命令&#xff0c;将 本地 Git 版本库中的提交同步到服务器中的版本库中 Git 在…

Three.js 学习笔记之模型(学习中1.17更新)

文章目录 模型 几何体 材质模型点模型Points - 用于显示点线模型Line | LineLoop | LineSegments网格模型mesh - 三角形 几何体BufferGeometry缓冲类型几何体BufferGeometry - 没有任何形状的空几何体创建几何体的方式BufferAttribute Types定义顶点法线 geometry.attributes…

Maven工程 — 继承与聚合 相关知识点详解

简介&#xff1a;这篇帖子主要讲解Maven工程中的继承与聚合的相关知识点&#xff0c;用简洁的语言和小编自己的理解&#xff0c;深入浅出的说明Maven工程的继承与聚合。 目录 1、继承 1.1 继承关系的实现 1.2 版本锁定 2、聚合 2.1 聚合方法 3、总结 1、继承 图 1-1 继承…

2018年认证杯SPSSPRO杯数学建模C题(第一阶段)机械零件加工过程中的位置识别全过程文档及程序

2018年认证杯SPSSPRO杯数学建模 基于轮廓特征的机械零件位置识别研究 C题 机械零件加工过程中的位置识别 原题再现&#xff1a; 在工业制造自动生产线中&#xff0c;在装夹、包装等工序中需要根据图像处理利用计算机自动智能识别零件位置&#xff0c;并由机械手将零件自动搬…

[go语言]输入输出

目录 知识结构 输入 1.Scan ​编辑 2.Scanf 3.Scanln 4.os.Stdin --标准输入&#xff0c;从键盘输入 输出 1.Print 2.Printf 3.Println 知识结构 输入 为了展示集中输入的区别&#xff0c;将直接进行代码演示。 三者区别的结论&#xff1a;Scanf格式化输入&#x…

ElasticSearch(1):Elastic Stack简介

1 简介 ELK是一个免费开源的日志分析架构技术栈总称&#xff0c;官网https://www.elastic.co/cn。包含三大基础组件&#xff0c;分别是Elasticsearch、Logstash、Kibana。但实际上ELK不仅仅适用于日志分析&#xff0c;它还可以支持其它任何数据搜索、分析和收集的场景&#xf…

MySQL中根据出生日期计算年龄

创建student表 mysql> create table student( -> sid int primary key comment 学生号, -> sname varchar(20) comm…