Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

美易官方:欧央行夏季降息预期市场反应

近日,欧洲央行行长拉加德在接受媒体采访时表示,欧洲央行官员可能已经达成在夏季降息的共识。这一消息在金融市场引起了广泛关注,投资者纷纷解读其对未来货币政策的影响。然而,拉加德同时指出激进降息押注无助于抗击通胀&#xff0…

目标检测YOLO实战应用案例100讲-橘子自动采摘机视觉识别

目录 前言 卷积神经网络相关理论基础 2.1传统神经网络模型 2.2卷积神经网络结构

UML-顺序图

提示:用例图从参与者的角度出发,描述了系统的需求(用例图);静态图定义系统中的类和对象间的静态关系(类图、对象图和包图);状态机模型描述系统元素的行为和状态变化流程(…

QT上位机开发(不同场景下界面的设计模板)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 qt由于其优秀的跨平台属性,几乎成了嵌入式开发界面开发的标配。同时呢,由于它在windows平台开发出来的效果也是非常的好&am…

K3S+Rancher

1.查看系统版本,架构等情况便于后续采用对应的安装包以及命令 查看系统版本 uname -a 查看系统命令集 uname -m 这是我系统配置情况 服务器清单 名称IP配置系统主-服务192.168.23.1714Cpu8GUbuntu 20.04.6副主-服务192.168.23.1034Cpu8GUbuntu 20.04.6代理-服务…

微信小程序+前后端开发学习材料2-(视图+基本内容+表单组件)

学习来源 视图 1.swiper 滑块视图容器。其中只可放置swiper-item组件,否则会导致未定义的行为。 显示面板指示点indicator-dots 基础内容 1.icon 图标组件 实例演示 2.progress 进度条。组件属性的长度单位默认为px,咱用rpx。 实例演示 这…

【运维】WSL1如何升级到WSL2

升级WSL1到WSL2:简便快捷版 在这篇博客中,我们将研究如何通过一种更简便的方式,将WSL1迅速升级到WSL2,避免官方文档的繁冗步骤。如果你觉得官方方法太过冗长,那么这里提供的步骤可能更适合你。 官网的办法是&#xf…

oracle 19c容器数据库data dump数据泵传输数据(4)---网络传输

Transporting a Database Over the Network: Example 这个的方式导入可以不需要传输dmp文件,我原本是想从11g导入到pdb2的,但是因为版本的原因,就直接实验从pdb1导入到pdb2吧。 这种方式和前面完全传输的方式类似,不需要事先在目…

如何解决分支机构无法连入总部采购管理系统的难题

案例背景: 某企业业务规模不断壮大,内部采购流程越发复杂,供应商资质情况各异难以管理,为提高内部采购效率和采购品质,优化供应链管理,确保采购环节公正透明可溯,该企业集中化部署了采购管理系…

k8s 集群搭建的一些坑

k8s集群部署的时候会遇到很多的坑,即使看网上的文档也可能遇到各种的坑。 安装准备 1、虚拟机两台(ip按自己的网络环境相应配置)(master/node) 192.168.100.215 k8s-master 192.168.100.216 k8s-node1 2、关闭防火墙(master/node) system…

AJAX入门到实战,学习前端框架前必会的(ajax+node.js+webpack+git)(八)

16.什么是webpack? 压缩:把代码文件的体积缩小 整合:把多个CSS文件、JS文件整合成一个,减少用户浏览器的http请求次数,从而让用户更快访问我们的网页 转译:less、sass转换成css,高版本js降级处理等 时间…

LUA 对象转excel

1. 首先把LUA 转成JSON 对象 因为是excel, 所以第一层要是数组,否则没有什么意义,即lua对象要是一个数组比较合理。这里使用开源的json.lua, 但是开源的,对于数字作下标的,或者是一个数组里,不同类型的key…

Tomcat启动后无法访问主页

1、确认JDK和Tomcat环境变量配置没有问题后&#xff0c;startup启动Tomcat 2、输入localhost:8080显示无法访问 3、找到Tomcat安装目录下的conf目录 4、修改下面两个地方&#xff0c;将port改成8081 <Connector port"8081" protocol"HTTP/1.1"connect…

vue3 实现简单计数器示例——一个html文件展示vue3的效果

目的&#xff1a;作为一个新手开发&#xff0c;我想使用 Vue 3 将代码封装在 HTML 文件中时&#xff0c;进行界面打开展示。 一、vue计数示例 学了一个简单计数器界面展示&#xff0c;代码如下&#xff1a; <!DOCTYPE html> <html lang"en"><head&…

会话层协议

在OSI模型中&#xff0c;会话层&#xff08;Session Layer&#xff09;主要负责建立、管理和终止会话&#xff0c;提供数据交换的服务。然而&#xff0c;相对于物理层、数据链路层、网络层、运输层等层&#xff0c;会话层的协议并没有像其他层次那样具有明确的、广泛应用的协议…

PXE和kickstart无人值守安装

PXE高效批量网络装机 引言 1.系统装机的引导方式 启动 操作 系统 1.硬盘 2.光驱&#xff08;u盘&#xff09; 3.网络启动 pxe 重装系统&#xff1f; 在已有操作系统 新到货了一台服务器&#xff0c; 装操作系统 系统镜像 u盘 光盘 pe&#xff1a; 小型的 操作系统 在操…

SeaTunnel 、DataX 、Sqoop、Flume、Flink CDC 对比

对比 对比项Apache SeaTunnelDataXApache SqoopApache FlumeFlink CDC部署难度容易容易中等,依赖于 Hadoop 生态系统容易中等,依赖于 Hadoop 生态系统运行模式分布式,也支持单机单机本身不是分布式框架,依赖 Hadoop MR 实现分布式分布式,也支持单机分布式,也支持单机健壮…

rust跟我学七:获取外网IP地址

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么获取到本机的外网IP地址。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[…

渗透测试(13)- 费解的三次握手和四次挥手

上一篇文章通过wireshark抓包工具简单了解了TCP/IP协议的五层工作模型&#xff0c;其中传输层我们已经知道主要是用来增加端口信息的。传输层主要有两种协议&#xff0c;一种是面向连接的TCP协议&#xff0c;一种是无连接的UDP协议&#xff0c;无连接的UDP协议比较好理解&#…

智能小程序相关名词解释(汇总)

小程序 ID 小程序 ID 是智能小程序分配给开发者的应用 ID&#xff0c;是应用的唯一标示&#xff0c;只有应用创建后才可以获取。创建小程序应用后&#xff0c;您可获得小程序应用的小程序 ID。 小程序框架 小程序提供一套简单高效的开发框架&#xff0c;帮助您开发具有原生 …