kafka(七)——消息偏移(消费者)

概念

消费者消费完消息后,向_consumer_offset主题发送消息,用来保存每个分区的偏移量。

在这里插入图片描述

流程说明

  1. consumer发送JoinGroup请求;
  2. coordinator选出一个consumer作为leader,并将topics发送给leader消费者;
  3. leader consumer负责制定消费方案;
  4. leader consumer将消费方案发送给coordinator;
  5. coordinator将消费方案发送给CG中的每个consumer;
  6. 每个consumer与coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该consumer被移除,触发再平衡,或者消费者处理消息过长(max.poll.interval.ms=300s),也会触发再平衡;

适用场景

消费者数量发生变化、消费者订阅主题发生变化或者分区数量发生变化时,会触发kafka的再平衡(Rebalance),再平衡后,消费者可能被分到新的分区,为保证高可用和伸缩性,消费者需要读取每个分区最后一次偏移量。

注意:再平衡期间,群组不可用,消费者无法读取消息。

再平衡(Rebalance)

再平衡(Rebalance),是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。

触发场景

  • 消费者个数发生变化,有新的消费者或分组中的消费者停止消费;
  • 订阅的主题(topic)个数发生变化;
  • 订阅的主题分区发生变化(partition);

影响

  • 再平衡时,消费者组下的所有消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配;
  • 再平衡过程会对消费者组产生非常严重的影响,所有的消费者都将停止工作,直到再平衡执行完成;

分区分配策略

Range范围分配策略

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
算法

n = 分区数量 / 消费者数量

m = 分区数量 % 消费者数量

前m个消费者消费n+1个,剩余消费者消费n个

图解

n = 2 = 8/3

m = 2 = 8%3

前2个消费者消费(2+1)个,剩余消费者消费2个。

在这里插入图片描述

RoundRobin轮询策略

将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
图解

在这里插入图片描述

Stricky粘性分配策略

在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.StickyAssignor
图解
  • 故障前

在这里插入图片描述

  • 故障后

在这里插入图片描述

代码示例

// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
public:// 消费者组再平衡回调void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition *> &partitions) {if (RdKafka::ERR__ASSIGN_PARTITIONS == err)  // 分区分配成功{// 消费者订阅这些分区consumer->assign(partitions);// 获取消费者组本次订阅的分区数量,可以属于不同的topicm_partitionCount = (int)partitions.size();} else   // 分区分配失败{// 消费者取消订阅所有的分区consumer->unassign();// 消费者订阅分区的数量为0m_partitionCount = 0;}}private:int m_partitionCount;    // 消费者组本次订阅的分区数量
};RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{printf("create conf failed\n");return;
}std::string errorStr = ""; 
RdKafka::RebalanceCb* rebalance_cb = new ConsumerRebalanceCb;
RdKafka::Conf::ConfResult errorCode = t_config->set("rebalance_cb", rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{printf("set conf(rebalance_cb) failed, err:%s\n", errorStr.c_str());delete t_config;return;
}

提交方式

自动提交

参数配置

# 默认自动提交,消费者close时也会自动提交
enable.auto.comnit=true# 自动提交周期,默认5s
auto.commit.interval.ms=5000

代码示例

RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{// 消费消息ConsumeMsg_(msg);// 消息消费完后无需手动处理,kafka自动提交偏移delete msg;
}

存在的问题

如果在周期5s内发生再平衡,导致偏移量未提交,未提交的消息会被重复消费。

手动提交

参数配置

RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{printf("create conf failed\n");return;
}RdKafka::Conf* topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (NULL == topicConfig) 
{printf("create topic conf failed\n");delete t_config;return;
}std::string errorStr = ""; 
RdKafka::Conf::ConfResult errorCode = topicConfig->set("enable.auto.commit", " false", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{printf("set topic conf(enable.auto.commit) failed, err:%s\n", errorStr.c_str());delete topicConfig;delete t_config;return;
}// 设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = topicConfig->set("auto.offset.commit", " earliest", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{printf("set topic conf(auto.offset.commit) failed, err:%s\n", errorStr.c_str());delete topicConfig;delete t_config;return;
}// 默认 topic 配置,用于自动订阅 topics
errorCode = t_config->set("default_topic_conf", topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{printf("set conf(default_topic_conf) failed, err:%s\n", errorStr.c_str());delete topicConfig;delete t_config;return;
}

同步提交

  • 消息消费完,手动调用commitSync;
  • 在同步提交未完成的情况下发生再平衡,消息会被重复消费;
  • commitSync会阻塞直到偏移提交成功;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{// 消费消息ConsumeMsg_(msg, NULL);// 开启手动提交m_consumer->commitSync(); delete msg;
}

异步提交

  • 消息消费完,手动调用commitAsync;
  • commitAsync不会重试提交偏移量;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{// 消费消息ConsumeMsg_(msg, NULL);// 开启手动提交m_consumer->commitAsync(); delete msg;
}

存在的问题

重复消费(同步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为earliest;
  • 上次提交的偏移量为1;
  • 由于网络故障、超时等原因,2~7已消费完的情况下,8未提交成功,由于设置了参数auto.offset.commit=earliest,分区再平衡后会继续从2开始消费,会导致消息重复消费的问题;
消息丢失(异步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为latest;
  • 上次提交的偏移量为1;
  • 本次消费的偏移量范围为27,消费者立马提交了偏移量8,由于网络故障、超时等原因,27未消费完,由于设置了参数auto.offset.commit=latest,再平衡后会继续从8开始消费,会导致消息重复丢失的问题;

解决方案

根据实际场景选择同步提交还是异步提交。如果对消息可靠性要求比较高,不允许数据丢失,建议选择同步提交+“auto.offset.commit=earliest”,性能略差。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全网最详细的Python自动化测试(unittest框架)

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

ssm105基于JAVAEE技术校园车辆管理系统+jsp

校园车辆管理系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本校园车辆管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短…

gtest的编译与使用

文章目录 gtest的编译与使用概述笔记CMake参数官方文档测试程序测试效果END gtest的编译与使用 概述 gTest是 googletest的缩写&#xff0c;如果直接找gTest项目&#xff0c;是找不到的。 库地址 https://github.com/google/googletest.git 迁出到本地后&#xff0c;切到最新…

景源畅信数字:抖音小店的入住门槛大不大?

近年来&#xff0c;随着短视频平台的崛起&#xff0c;抖音小店逐渐成为了众多商家和创业者关注的焦点。那么&#xff0c;抖音小店的入住门槛究竟大不大呢?本文将从四个方面对这一问题进行详细阐述。 一、注册流程 抖音小店的注册流程相对简单&#xff0c;只需按照官方指引完成…

HackBar 新手使用教程(入门)

啥是Hackbar&#xff1f; Hackbar是一个Firefox 的插件,它的功能类似于地址栏,但是它里面的数据不受服务器的相应触发的重定向等其它变化的影响。 有网址的载入于访问,联合查询,各种编码,数据加密功能。 这个Hackbar可以帮助你在测试SQL注入,XSS漏洞和网站的安全性,主要是帮助…

揭秘丨文字游侠AI工具:一键生成高质量爆文赚米,提升20倍写作效率,附上渠道和实操教程!

在这个信息泛滥的时代&#xff0c;内容创作者们不断寻求更高效、更创新的方法&#xff0c;以便在众多竞争者中脱颖而出。虽然平台如今日头条为他们提供了展示才华和获取收益的舞台&#xff0c;但如何在激烈的竞争中站稳脚跟&#xff0c;仍是他们需要面对的挑战。然而&#xff0…

​Inf-DiT:Upsampling Any-Resolution Image、Vidu、MVDiff、Trio-ViT

本文首发于公众号&#xff1a;机器感知 ​Inf-DiT&#xff1a;Upsampling Any-Resolution Image、Vidu、MVDiff、Trio-ViT Inf-DiT: Upsampling Any-Resolution Image with Memory-Efficient Diffusion Transformer Diffusion models have shown remarkable performance in im…

js原生手写一个拖拽小功能

先上效果图 附上代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name"viewport" content"widthd…

Python自动化测试五大框架(测试员收藏夹必备)

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

Java 语法 (杂七杂八的知识)

面向对象三大特性 封装, 多态, 继承 基本数据类型 一字节 (Byte) 占八位 (bit) JDK, JRE, JVM JDK (Java Development Kit) : Java 开发工具包, 包括了 JRE, 编译器 javac, 和调试工具 Jconsole, jstack 等 JRE (Java Runtime Environment) : Java 运行时环境, 包括了 JVM , …

基于Vue3与ElementUI Plus的酷企秀场景可视化DIY设计器:前端技术引领下的数字化展示新篇章

一、引言 在当今信息化高速发展的时代&#xff0c;企业对于展示自身形象、提升用户体验以及增强品牌知名度的需求日益迫切。针对这一市场需求&#xff0c;我们推出了基于Vue3与ElementUI Plus的酷企秀场景可视化DIY设计器。该产品不仅具备电子画册、VR全景、地图秀三大核心功能…

如何在PPT中插入网页?这样操作,免费还高效!

融合课、跨学科课&#xff0c;已经是近两年来教育界的热门词。 在公开课、微课比赛中&#xff0c;不添融合一些较为先进的信息技术&#xff0c;都不好意思拿出手了。 最近&#xff0c;由不坑老师开发制作的Office插件——不坑盒子&#xff0c;实现了在PPT中插入网页&#xff…

ARM(4)缓存一致性

目录 一、缓存一致性问题 二、一致性实现方案 2.1 目录一致性协议 2.2 嗅探一致性协议 三、CHI协议 3.1 cache state 3.2 snoop维护一致性 四、其他一致性协议 4.1 MSI协议 4.2 MESI 协议 4.3 MOESI协议 本文介绍以下内容&#xff1a; 缓存一致性问题一致性实现方案…

设计模式之前端控制器模式

想象一下&#xff0c;你的Java Web应用是个交响乐团&#xff0c;每个功能模块是乐手&#xff0c;而用户请求就像是一首首待演绎的曲目。在这场音乐盛宴中&#xff0c;谁来保证演出的流畅与协调&#xff1f;答案就是——前端控制器模式&#xff01;它如同乐队的指挥&#xff0c;…

java中如何判断一个数是不是素数(质数)

相关概念 质数就是大于1的自然数字中&#xff0c;只能被1和它自己整除的数。 题目 求101~200之间的质素的个数 代码实现 判断一个数是不是质数 for (int j 2; j < i; j) {if(i % j 0){flag false;break;}}if(flag){System.out.println("当前数字是质数");…

【动态规划】:路径问题_地下城游戏

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本专栏是关于各种算法的解析&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数据结构专栏&…

Python的Web框架Flask+Vue生成漂亮的词云图

生成效果图 输入待生成词云图的文本&#xff0c;点击生成词云即可&#xff0c;在词云图生成之后&#xff0c;可以点击下载图片保存词云图。 运行步骤 分别用前端和后端编译器&#xff0c;打开backend和frontend文件夹。前端运行 npm install &#xff0c;安装相应的包。后端…

电脑缺失opencl.dll怎么办,轻松解决opencl.dll的多种方法分享

当我们在操作电脑过程中遇到系统提示“由于找不到opencl.dll&#xff0c;无法继续执行代码”&#xff0c;这个错误会导致软件应用无法正常运行。OpenCL.dll作为一个与Open Computing Language&#xff08;开放计算语言&#xff09;相关的动态链接库文件&#xff0c;它在执行需要…

Baidu Comate——基于AI的智能代码生成让你的编码更快、更好、更简单!

目录 Baidu Comate智能编码助手介绍 支持的编程语言 支持的 IDE 支持的操作系统 System 安装 Baidu Comate 核心场景 智能推荐 单行推荐 多行推荐 智能生成 注释生成代码 增强生成代码 生成单元测试 代码生成注释 生成文档注释 生成行间注释 代码解释 长函…

因表别名引用错误导致查询SQL执行时间长未出结果

问题描述&#xff1a; 项目组人员反馈在执行一条提取数据SQL时执行很慢&#xff0c;每次执行一段时间就报超时&#xff0c;要求帮忙提取下。 解决过程&#xff1a; 项目组人员发来SQL后&#xff0c;看了下SQL&#xff0c;没什么问题&#xff0c;就在客户端上执行了下&#xff0…