golang kafka sarama 源码解析

  • 消费者组重平衡

github.com/!shopify/sarama@v1.27.2/consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
  • 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer:         c,broker:           broker,input:            make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait:             make(chan none),subscriptions:    make(map[*partitionConsumer]none),refs:             0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/764542.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爱普生EPSON全新传感技术方案亮相高交会,创造新时代“精智生活”

2023年中国国际高新技术成果交易会在深圳福田会展中心盛大举行&#xff0c;是目前中国规模最大、最具影响力的科技类展会之一。爱普生作为始终坚持“科技本地化”战略的技术创新前沿企业参与此次展会&#xff0c;为中国用户带来爱普生电子元器件三款创新技术与四大成熟传感器解…

Python 将HTML转为PDF、图片、XML、XPS格式

网页内容是信息传播的主要形式之一。在Web开发中&#xff0c;有时候我们需要将HTML文件以不同的格式保存或分享&#xff0c;比如PDF、图片&#xff08;如PNG或JPEG&#xff09;、XML或XPS等。这些格式各有优势&#xff0c;适合不同的用途。在这篇文章中&#xff0c;我们将介绍如…

51单片机入门:定时器

定时器的介绍 定时器&#xff1a;51单片机的定时器属于单片机的内部资源&#xff0c;其电路的设计连接和运转均在单片机内部完成。根据单片机内部的时钟或者外部的脉冲信号对寄存器中的数据加1&#xff0c;定时器实质就是加1计数器。因为又可以定时又可以计数&#xff0c;又称…

Apipost智能Mock功能详解

在接口开发过程中&#xff0c;Mock功能可以帮助开发者快速测试和验证接口的正确性和稳定性&#xff0c;以便快速迭代和修复问题。Apipost推出智能Mock功能&#xff0c;可以在智能期望中填写一些触发条件&#xff0c;开启后&#xff0c;Apipost会根据已设置的触发条件&#xff0…

python食品安全信息管理系统flask-django-nodejs-php

。 食品安全信息管理系统是在安卓操作系统下的应用平台。为防止出现兼容性及稳定性问题&#xff0c;编辑器选择的是Hbuildex&#xff0c;安卓APP与后台服务端之间的数据存储主要通过MySQL。用户在使用应用时产生的数据通过 python等语言传递给数据库。通过此方式促进食品安全信…

Day43:WEB攻防-PHP应用SQL注入符号拼接请求方法HTTP头JSON编码类

目录 PHP-MYSQL-数据请求类型 PHP-MYSQL-数据请求方法 PHP-MYSQL-数据请求格式 知识点&#xff1a; 1、PHP-MYSQL-SQL注入-数据请求类型 2、PHP-MYSQL-SQL注入-数据请求方法 3、PHP-MYSQL-SQL注入-数据请求格式 PHP-MYSQL-数据请求类型 SQL语句由于在黑盒中是无法预知写法的…

大模型分布式推理ray

一、目录 1 框架 2. 入门 3. 安装教程 4. 相关文档、案例阅读 二、实现 1 框架&#xff1a;Ray&#xff1a;将一个模型拆分到多个显卡中&#xff0c;实现分布式预测、训练等功能。 2. 入门 &#xff1a; 案例&#xff1a;通过ray 实现分布式部署&#xff0c;分布式推理服务。…

python拍卖行系统的设计与实现flask-django-nodejs-php

此系统设计主要采用的是python语言来进行开发&#xff0c;采用django/flask框架技术&#xff0c;框架分为三层&#xff0c;分别是控制层Controller&#xff0c;业务处理层Service&#xff0c;持久层dao&#xff0c;能够采用多层次管理开发&#xff0c;对于各个模块设计制作有一…

排序算法记录(冒泡+快排+归并)

文章目录 前言冒泡排序快速排序归并排序 前言 冒泡 快排 归并&#xff0c;这三种排序算法太过经典&#xff0c;但又很容易忘了。虽然一开始接触雀氏这些算法雀氏有些头大&#xff0c;但时间长了也还好。主要是回忆这些算法干了啥很耗时间。 如果在笔试时要写一个o(nlogn)的…

408学习笔记-17-C-C/C++中程序内存区域划分

C/C中程序内存区域划分 C/C程序内存分配的几个区域&#xff1a; 1、栈区(stack)&#xff1a;在执行函数时&#xff0c;函数内局部变量的存储单元都可以在栈上创建&#xff0c;函数执行结束时这些存储单元自动被释放。栈内存分配运算内置于处理器的指令集中&#xff0c;效率很高…

基于python+vue拍卖行系统的设计与实现flask-django-nodejs-php

拍卖行系统的目的是让使用者可以更方便的将人、设备和场景更立体的连接在一起。能让用户以更科幻的方式使用产品&#xff0c;体验高科技时代带给人们的方便&#xff0c;同时也能让用户体会到与以往常规产品不同的体验风格。 与安卓&#xff0c;iOS相比较起来&#xff0c;拍卖行…

前端小白的学习之路(ES6 三)

提示&#xff1a;类&#xff1a;class, 模块化:modules(基本概念) 目录 一、类class 1.声明类 2.类的继承 3.类的静态成员 4.私有属性 二、模块化(基本) 1.定义组件与引入组件 2.书写方式 1) 按需导出导入 2) 全部导出导入 3) 设置模块的别名 一、类class 在 ECMAS…

Flink:维表 Join 难点和技术方案汇总

博主历时三年精心创作的《大数据平台架构与原型实现&#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行&#xff0c;点击《重磅推荐&#xff1a;建大数据平台太难了&#xff01;给我发个工程原型吧&#xff01;》了解图书详情&#xff0c;…

线性表的顺序表示(顺序表)

静态分配&#xff1a; #include <stdbool.h> #include <stdio.h>typedef int ElementType;#define MaxSize 50 typedef struct {ElementType data[MaxSize];int length; }SqList;//初始化 //SqList L; void InitList(SqList L) {L.length 0; }//插入 bool ListIn…

主机OS/390汇编程序入门篇

主机OS/390 汇编程序曾经风靡一时。它曾应用于很多商业应用系统&#xff0c;特别是联机应用系统。随着高级语言的普及&#xff0c;以及计算机硬件资源的成本日趋下降&#xff0c;汇编语言不再是人们开发的首选&#xff0c;但是汇编语言还是有它的优点&#xff0c;现在仍然有着它…

SQLiteC/C++接口详细介绍sqlite3_stmt类(十二)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;十一&#xff09; 下一篇&#xff1a; SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;十三&#xff09; 48、sqlite3_stmt_isexplain sqlite3_stmt_is…

2024蓝桥杯每日一题(单调队列)

备战2024年蓝桥杯 -- 每日一题 Python大学A组 试题一&#xff1a;单调栈 试题二&#xff1a;滑动窗口 试题三&#xff1a;子矩阵 试题四&#xff1a;最大子序和 试题一&#xff1a;单调栈 【题目描述】 给定一个长度为 N 的整数数列&#xff0c;输出每…

jvm提供的远程调试 简单使用

JVM自带远程调试功能 JVM远程调试&#xff0c;其实是两个虚拟机之间&#xff0c;通过socket通信&#xff0c;达到远程调试的目的&#xff1b; 前提 确保本地和远程的网络是开通的&#xff1b; 本地操作 远程操作 在启动命令参数中 把上面的内容复制进去

【3GPP】【核心网】【4G】4G手机接入过程,手机附着过程(超详细)

1. 4G手机接入过程&#xff0c;手机附着过程 附着&#xff08;Attach&#xff09;&#xff1a; 终端在PLMN中注册&#xff0c;从而建立自己的档案&#xff0c;即终端上下文 进行附着的三种情况&#xff1a; ①终端开机后的附着&#xff0c;初始附着 ②终端从覆盖盲区返回到…

解决用POI库生成的word文件中的表格在python-docx无法解析的问题

问题背景 用apache-poi生成word文件中表格&#xff0c;在使用python-docx库解析时报错&#xff1a; 问题分析 1. word文档本质上是一个rar压缩包&#xff0c;用winrar解析后如下&#xff1a; 2. 查看document.xml&#xff0c;可以看到table元素下面是没有<w:tblGrid>这…