Kafka-消费者-KafkaConsumer分析总结

在这里插入图片描述

KafkaConsumer依赖SubscriptionState管理订阅的Topic集合和Partition的消费状态,通过ConsumerCoordinator与服务端的GroupCoordinator交互,完成Rebalance操作并请求最近提交的offset。

Fetcher负责从Kafka中拉取消息并进行解析,同时参与position的重置操作,提供获取指定Topic的集群元数据的操作。上述操作的所有请求都是通过ConsumerNetworkClient缓存并发送的,在ConsumerNetworkClient中还维护了定时任务队列,用来完成HearbeatTask任务和AutoCommitTask任务。NetworkClient在接收到上述请求的响应时会调用相应回调,最终交给其对应的*Handler以及RequestFuture的监听器进行处理。

KafkaConsumer并不是一个线程安全的类。为了防止多线程并发操作,KafkaConsumer提供了多线程并发的检测机制,涉及的方法是acquire和release。这两个方法的代码如下:

在这里插入图片描述
在这里插入图片描述
我们可以看出,这并不是一种锁的实现,仅实现了检测多线程并发操作的检测。这里使用CAS操作可以保证线程之间的可见性。CAS操作、可见性等相关概念请参考Java并发专栏。

面我们来分析KafkaConsumer.poll方法进行消息消费的整个流程以及相关代码:
在这里插入图片描述
注意,在消费完消息之后,客户端还需要commit offset,手动同步commit offset使用commitSync(),手动异步commit offset使用commitAsync(),自动commit offset使用定时任务AutoCommitTask。

在这里插入图片描述
在pollOnce方法中会先通过ConsumerCoordinator与GroupCoordinator交互完成Rebalance操作,之后从GroupCoordinator获取最近一次提交的offset(或重置position),最后才是使用Fetcher,从Kafka获取消息进行消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于本地缓存制作一个分库分表的分布式ID生成器

引言: 代码在 https://gitee.com/lbmb/mb-live-app 中 【mb-live-id-generate-provider】 模块里面 如果喜欢 希望大家给给star 项目还在持续更新中。 背景介绍 项目整体架构是 基于springboot 3.0 开发 rpc 调用采用 dubbo 注册配置中心 使用 nacos 采用shardin…

vue中数据状态轮询

vue中数据状态轮询 1、数据接口和状态接口是分开的 首先在页面挂在后请求数据,然后判断数据中状态是否有需要轮询的,有的话就轮询: async getTableDataList() {this.tableLoading true;try {let params {page: this.dataPage,page_size:…

[git] windows系统安装git教程和配置

一、何为Git Git(读音为/gɪt/)是一个开源的分布式版本控制系统,可以有效、高速地处理从很小到非常大的项目版本管理。 二、git安装包 有2种版本,Git for Windows Setup和Git for Windows Portable(便携版)两个版本都可以。 三、Git for Windows Por…

jQuery Chaining —— W3school 详解 简单易懂(十)

通过 jQuery,您可以把动作/方法链接起来。 Chaining 允许我们在一条语句中允许多个 jQuery 方法(在相同的元素上)。 jQuery 方法链接 直到现在,我们都是一次写一条 jQuery 语句(一条接着另一条)。 不过…

手动搭建koa+ts项目框架(apidoc文档篇)

文章目录 一、安装apidoc工具二、使用1、项目根目录新建apidoc.json2、定义接口路由上方注解对应信息3、配置静态文件访问目录4、生成api文档如有启发,可点赞收藏哟~ 一、安装apidoc工具 全局安装 npm i apidoc -g查看是否安装成功 apidoc -v二、使用 1、项目根…

1. MySQL 数据库

本章内容 关系型数据库基础 安装 MySQL 管理数据库和表 用户和权限管理 函数,存储过程,触发器和事件 MySQL 架构 存储引擎 服务器选项,系统和状态变量 优化查询和索引管理 锁和事务管理 日志管理 备份还原 MySQL 集群 压力测试…

linux-centos服务器离线安装yapi(包含nodejs、mongodb、yapi、pm2离线安装)

yapi是使用vue框架开发的,借助nodejs 前端直接访问的mongodb数据库,离线安装yapi步骤如下 下载离线安装包 下载地址 https://download.csdn.net/download/qq445829096/88778418 离线安装包先复制到 dev/yapi目录(根据自己习惯自定义目录) node-v12.13.0-linux-x64.tar.xz …

【算法练习】leetcode算法题合集之排序篇

排序算法 快速排序 单路快排 获取随机一个元素,元素左边是小于它的,元素右边是大于它的。 partition:选择一个元素,交换left。比较nums[left]和nums[i],如果nums[i]<nums[left],找到一个小于标的元素的数据,交换到j的位置,j记录着最后一个小于标的元素的数据,切换…

P1065 [NOIP2006 提高组] 作业调度方案题目

题目 我们现在要利用m台机器加工n个工件&#xff0c;每个工件都有m道工序&#xff0c;每道工序都在不同的指定的机器上完成。每个工件的每道工序都有指定的加工时间。 每个工件的每个工序称为一个操作&#xff0c;我们用记号j-k表示一个操作&#xff0c;其中j为1到n中的某个数…

群辉NAS的远程访问

群辉NAS是私有云存储&#xff0c;局域网访问很容易【详见&#xff1a;网上邻居访问设置、其它设备的访问设置】&#xff0c;远程访问相对复杂&#xff0c;涉及很多关键因素&#xff0c;现将过程记录如下&#xff1a; 目录 1、互联网接入 2、绑定MAC与IP地址 3、路由器开启5…

算法随想录第四十三天打卡|1049. 最后一块石头的重量 II ,494. 目标和 ,474.一和零

1049. 最后一块石头的重量 II 本题就和 昨天的 416. 分割等和子集 很像了&#xff0c;可以尝试先自己思考做一做。 视频讲解&#xff1a;动态规划之背包问题&#xff0c;这个背包最多能装多少&#xff1f;LeetCode&#xff1a;1049.最后一块石头的重量II_哔哩哔哩_bilibili 代…

eclipse中导入运行MyEclipse项目...非常实用

eclipse中导入运行MyEclipse项目 进入项目目录&#xff0c;找到.project文件&#xff0c;打开找到<natures>...</natures>代码段。在第2步的代码段中加入如下标签内容并保存&#xff1a; <nature>org.eclipse.wst.common.project.facet.core.nature</na…

mysql中获取一段时间日期

SELECTdate_add(date_sub(20230201, interval 1 day), INTERVAL (cast(help_topic_id AS signed INTEGER) 1) day) as tttt FROM mysql.help_topic WHERE help_topic_id < DATEDIFF(20230201, date_sub(20230101, interval 1 day)) ORDER BY help_topic_id其中mysql.help_t…

SpringMVC-HttpMessageConverter 报文信息转化器

文章目录 HttpMessageConverter一、概念二、RequestBody三、RequestEntity四、 ResponseBody1.返回JSON格式的字符串 五、RestController六、ResponseEntity HttpMessageConverter 一、概念 报文信息转化器&#xff0c;将请求报文转化为Java对象&#xff0c;或将Java对象转化…

Java笔记(死锁、线程通信、单例模式)

一、死锁 1.概述 死锁 : 死锁是指两个或两个以上的进程在执行过程中&#xff0c;由于竞争资源或者由于彼此通信而造成的一种阻塞的现象&#xff0c;若无外力作用&#xff0c;它们都将无法往下执行。此时称系统处于死锁状态或系统产生了死锁&#xff0c;这些永远在互相等待的进…

【Digester解析XML文件的三种方式】

Digester解析XML文件的三种方式 1. Digester解析XML文件的三种方式1.1 作用及依赖jar包 2. 重点和难点3. XML文件4. 通过不同的方式解析这个xml文件4.1 通过java编码方式解析&#xff08;javabean存储&#xff09;4.2 通过java编码方式解析&#xff08;list和map存储&#xff0…

【ChatGPT 和文心一言哪个更好用?】

ChatGPT 和文心一言哪个更好用&#xff1f; 随着人工智能技术的飞速发展&#xff0c;AI 助手已经成为人们日常生活和工作中不可或缺的一部分。在众多 AI 助手中&#xff0c;ChatGPT 和文心一言备受关注。本文将从智能回复、语言准确性、知识库丰富度等方面对这两大 AI 助手进行…

Leetcode 俩数之和(哈希)

一、题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你可以…

【LMDeploy 大模型量化部署实践】学习笔记

参考学习教程【LMDeploy 的量化和部署】 理论 作业 使用 LMDeploy 以本地对话、网页Gradio、API服务中的一种方式部署 InternLM-Chat-7B 模型&#xff0c;生成 300 字的小故事 本地对话 API服务 Client 命令 端口转发 网页Gradio

一文学习Thrift RPC

Thrift RPC引言 Thrift RPC的特点 Thrift 是一个RPC的框架&#xff0c;和Hessian RPC有什么区别&#xff0c;最重要的区别是Thrift可以做异构系统开发。 什么是异构系统&#xff0c;服务的提供者和服务的调用者是用不同语言开发的。 为什么会当前系统会有异构系统的调用&…