记一次Kafka重复消费解决过程

        起因:车联网项目开发,车辆发生故障需要给三个系统推送消息,故障上报较为频繁,所以为了不阻塞主流程,采用了使用kafka。消费方负责推送并保存推送记录,但在一次压测中发现,实际只发生了10次故障,但是推送记录却有30多条。

        问题排查,发现是因为其中一个系统宕机,导致往这个系统推送消息时,一直连接超时,导致每条消息的推送时长被拉长。而且kafka消息拉取参数max-poll-records设置了500,意味着一次会批量拉取500条消息到本地处理,而max.poll.interval.ms参数默认是5分钟,当500条消息处理时长超过5分钟后,就会认为消费者死掉了,触发再均衡,导致同一个消息被重复消费。

解决:

        主要是提高消费者的处理速度,避免不必要的Rebalance。主要采用2种措施:

  1. 减少每次拉去消息数max-poll-records,从500,降到20
  2. 拉取到消息之后异步处理(创建线程池,对推送消息的部分利用多线程处理)

常见配置

fetch.min.byte:配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量,默认为1B,如果小于这个参数配置的值,就需要进行等待,直到数据量满足这个参数的配置大小。调大可以提交吞吐量,但也会造成延迟

fetch.max.bytes,一次拉取数据的最大数据量,默认为52428800B,也就是50M,但是如果设置的值过小,甚至小于每条消息的值,实际上也是能消费成功的

fetch.wait.max.ms,若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间,默认是500ms

max.poll.records,单次poll调用返回的最大消息记录数,如果处理逻辑很轻量,可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完,默认值为500

consumer.poll(100) ,100 毫秒是一个超时时间,一旦拿到足够多的数据(fetch.min.bytes 参数设置),consumer.poll(100)会立即返回 ConsumerRecords<String, String> records。如果没有拿到足够多的数据,会阻塞100ms,但不会超过100ms就会返回

max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作(将分区分配给组内其他消费者成员)

若超过这个时间则报如下异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has alreadyrebalanced and assigned the partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either by increasing the session timeout or byreducing the maximum size of batches returned in poll() with max.poll.records. 

  即:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间来处理消息。

可以通过增加max.poll.interval.ms来解决这个问题,也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题。

max.partition.fetch.bytes:该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,将触发再均衡操作。

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

  1. 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  2. 协调Group成员的行为。

poll机制

  •    每次poll的消息处理完成之后再进行下一次poll,是同步操作
  •    每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移
  •    每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息
  •    poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/38022.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

“深入探究JVM内部机制:理解Java虚拟机的工作原理“

标题&#xff1a;深入探究JVM内部机制&#xff1a;理解Java虚拟机的工作原理 摘要&#xff1a;本文将深入探究Java虚拟机&#xff08;JVM&#xff09;的内部机制&#xff0c;帮助读者理解JVM的工作原理。我们将介绍JVM的组成部分、类加载过程、内存管理和垃圾回收机制&#xf…

带你了解ChatGPT

目录 什么是ChatGPT 从ChatGPT角度看聊天机器人的历史 聊天机器人的早期历史 ChatGPT的出现 ChatGPT和其他聊天机器人的比较 总结 ChatGPT相比其他聊天机器人的优势在哪里 1. 自然语言处理能力更强 2. 编程能力高&#xff0c;应用领域广泛 3. 可以满足个性化需求 4.…

Golang实现完整聊天室(内附源码)

项目github地址&#xff1a; 由于我们项目的需要&#xff0c;我就研究了一下关于websocket的相关内容&#xff0c;去实现一个聊天室的功能。 经过几天的探索&#xff0c;现在使用Gin框架实现了一个完整的聊天室消息实时通知系统。有什么不完善的地方还请大佬指正。 用到的技术…

使用自己的数据利用pytorch搭建全连接神经网络进行回归预测

使用自己的数据利用pytorch搭建全连接神经网络进行回归预测 1、导入库2、数据准备3、数据拆分4、数据标准化5、数据转换6、模型搭建7、模型训练8、模型预测9、完整代码 1、导入库 引入必要的库&#xff0c;包括PyTorch、Pandas等。 import numpy as np import pandas as pd f…

tp6 RabbitMQ

1、composer 安装 AMQP 扩展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目录下创建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…

中国生产了5.07亿台,库存高达近4亿台?国产手机彻底卖不动了?

统计数据显示今年上半年中国的手机产量达到5.07亿台&#xff0c;国内市场手机出货量仅有1.24亿台&#xff0c;都出现了下滑&#xff0c;那么中国手机的产量比销量多出了3.83亿台&#xff0c;这些手机都成为了库存&#xff1f; 中国手机市场确实不如早年那么辉煌&#xff0c;201…

【FAQ】安防监控视频EasyCVR平台分发的FLV视频流在VLC中无法播放

众所周知&#xff0c;TSINGSEE青犀视频汇聚平台EasyCVR可支持多协议方式接入&#xff0c;包括主流标准协议国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。在视频流的处理与分发上&#xff0c;视频监控…

P12-Retentive NetWork-RetNet挑战Transformer

论文地址:https://arxiv.org/abs/2307.08621 目录 Abstract 一.Introduction 二.Retentive Networks 2.1Retention 2.2Gated Multi-Scale Retention 2.3Overall Architecture of Retention Networks 2.4Relation to and Differences from Previous Methods 三.Experime…

Codeforces Round 892 (Div. 2)(VP)

A //b里放最小值&#xff0c;其他值放c。如果最大值最小值&#xff0c;则无解。 void solve() {int n; cin >> n;vi a(n); liter(x, a) cin >> x; sort(all(a));if (a[0] a[n - 1]){print(-1); return;}vi b, c;for (int i 0; i < sz(a); i){if (a[i] a[0])…

小米基于 Flink 的实时计算资源治理实践

摘要&#xff1a;本文整理自小米高级软件工程师张蛟&#xff0c;在 Flink Forward Asia 2022 生产实践专场的分享。本篇内容主要分为四个部分&#xff1a; 发展现状与规模框架层治理实践平台层治理实践未来规划与展望 点击查看原文视频 & 演讲PPT 一、发展现状与规模 如上图…

【03】基础知识:typescript中的函数

一、typescript 中定义函数的方法 函数声明法 function test1(): string {return 返回类型为string }function test2(): void {console.log(没有返回值的方法) }函数表达式/匿名函数 const test3 function(): number {return 1 }二、typescript 中 函数参数写法 1、typesc…

helm安装harbor + nerdctl 制作push 镜像

参考 文章&#xff1a;Helm部署Harbor_helm harbor_风向决定发型丶的博客-CSDN博客 安装好后使用 nerd containerd对接harbor_containerd 容器 insecure-registries 配置_柠是柠檬的檬的博客-CSDN博客 推送镜像 Containerd 对接私有镜像仓库 Harbor - 知乎 接下来我们来…

麒麟系统相关

创建虚拟机 镜像下载地址 选择合适的镜像&#xff0c;进入引导后注意不要选择默认的第一条&#xff0c;选择第二条进入安装程序。 root密码修改 使用命令 sudo passwd root 开启ssh 配置好网络后发现能ping通&#xff0c;但无法ssh连接&#xff0c;ps -ef | grep ssh 得…

01 qt快速入门

一 qt介绍 1.基本概念 1991年由Qt Company(奇趣)开发的跨平台C++图形用户界面应用程序开发框架,GUI程序和非GUI程序。优点:一套源码在不同的平台通过不同的编译器进行编译,就可以运行到该平台上目标机。面向对象的封装机制来对其接口封装。 GUI —图形用户界面(Graphic…

软件测试面试题【2023整理版(含答案)】

01、您所熟悉的测试用例设计方法都有哪些&#xff1f;请分别以具体的例子来说明这些方法在测试用例设计工作中的应用。 答&#xff1a;有黑盒和白盒两种测试种类&#xff0c;黑盒有等价类划分方法 边界值分析方法 错误推测方法 因果图方法 判定表驱动分析方法 正交实验设…

Vue组件之间的传值汇总

组件之间的传值 1、父传子 props 2、父传子 slot 3、父传子 不建议用 attrs 4、 子传父 ref 5、子传父 emit 6、povide/inject只能在setup的时候用。 7、利用vuex和pinia去实现数据的交互 1、实现代码App.vue <script setup>import TestProps from ./components/T…

stable-diffusion 模型效果+prompt

摘自个人印象笔记&#xff0c;图不完整可查看原笔记&#xff1a;https://app.yinxiang.com/fx/55cda0c6-2af5-4d66-bd86-85da79c5574ePrompt运用规则及技巧 &#xff1a; 1. https://publicprompts.art/&#xff08;最适用于OpenArt 线上模型 https://openart.ai/&#xff09;…

【Vue-Router】别名

后台返回来的路径名不合理&#xff0c;但多个项目在使用中了&#xff0c;不方便改时可以使用别名。可以有多个或一个。 First.vue <template><h1>First Seciton</h1> </template>Second.vue&#xff0c;Third.vue代码同理 UserSettings.vue <tem…

R语言生存分析(机器学习)(2)——Enet(弹性网络)

弹性网络&#xff08;Elastic Net&#xff09;:是一种用于回归分析的统计方法&#xff0c;它是岭回归&#xff08;Ridge Regression&#xff09;和lasso回归&#xff08;Lasso Regression&#xff09;的结合&#xff0c;旨在克服它们各自的一些限制。弹性网络能够同时考虑L1正则…

mysql 索引 区分字符大小写

mysql 建立索引&#xff0c;特别是unique索引&#xff0c;是跟字符集、字符排序规则有关的。 对于utf8mb4_0900_ai_ci来说&#xff0c;0900代表Unicode 9.0的规范&#xff0c;ai表示accent insensitivity&#xff0c;也就是“不区分音调”&#xff0c;而ci表示case insensitiv…