kafka消费堆积问题探索

背景

我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目中消费kafka中的日志,并打印到控制台,最后,统一使用阿里sls抓取日志。我们kafka的分区有12个,go程序部署在k8s集群中,开启了弹性扩缩容,最多开启了8个pod进行消费,每秒产生的日志数量高峰在1500条左右,在这种情况下,依然产生了消息的堆积。

消费中执行的逻辑只有对象的映射和日志写控制台,所以,这种情况下产生了消息堆积,令我倍感困惑。

探索之路

第一步,确认一下每一步的执行时间。

func (s KafkaLogService) ReaderCreateLog(ctx context.Context, msg *customerkafka.CustomKafkaMsg) error {now := time.Now().UnixNano()var logEntry LogEntrydata, ok := msg.Data.(string)if !ok {global.GIN_LOG.Error(ctx, "消息数据类型错误", "data", msg.Data)return fmt.Errorf("消息数据类型错误: %v", msg.Data)}// 解析 JSON 数据if err := json.Unmarshal([]byte(data), &logEntry); err != nil {global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", data)return fmt.Errorf("解析消息失败: %w", err)}now2 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************Unmarshal logEntry 耗时:%d 纳秒\n", now2-now)var logMessage LogMessageif err := json.Unmarshal([]byte(logEntry.Message), &logMessage); err != nil {global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", logEntry.Message)return fmt.Errorf("解析消息失败: %w", err)}now3 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************Unmarshal LogMessage 耗时:%d 纳秒\n", now3-now2)// 日志等级判断switch logEntry.Level {case "ERROR":global.GIN_LOG.Error(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))case "WARN":global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))case "INFO":global.GIN_LOG.Info(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))default:global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))}now4 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************log 耗时:%d 纳秒\n", now4-now3)return nil
}

 

 json的Unmarshal的耗时,倒是符合我的认知,在预期之中。

但是,打印日志尽然需要耗时1.5毫秒,这个有点超出我的意料之外。这个时间似乎有点夸张啊。

但是,即便如此,一个消费者每秒也可以消费670条左右的消息,在起了8个实例的情况下,也不应该造成kafka消息的阻塞。

继续我们的探索之路

下面这一段是我对于kakfa消费者的封装。大概的逻辑就是每一个ActionType起一个协程进行消费。在这篇《基于kafka-go写的生产者和消费者》文章中写过这个封装背后的设计逻辑,有兴趣的可以移步过去一探究竟。

// Start 方法启动消费者并开始读取消息,根据actionType调用不同的处理函数
func (c *ConsumerClient) Start(ctx context.Context, handlers map[string]ActionHandler) error {for {select {case <-ctx.Done():return nil // 上下文取消,直接返回default:msg, err := c.reader.ReadMessage(ctx)if err != nil {c.logger.Error(ctx, "Failed to read message from Kafka", "error", err)continue}c.logger.Info(ctx, fmt.Sprintf("Message on topic: %s value: %s partion:%d offset:%d", msg.Topic, string(msg.Value), msg.Partition, msg.Offset))var kafkaMsg CustomKafkaMsgif err := json.Unmarshal(msg.Value, &kafkaMsg); err != nil {c.logger.Error(ctx, "Failed to unmarshal Kafka message", "error", err)continue}channel := make(chan *CustomKafkaMsg)// 使用 sync.Map 来管理 workerworker, loaded := c.workerMap.LoadOrStore(kafkaMsg.ActionType, channel)if !loaded {c.wg.Add(1) // 增加 WaitGroup 计数if ch, ok := worker.(chan *CustomKafkaMsg); ok {go c.startWorker(ctx, kafkaMsg.ActionType, handlers, ch)}}// 发送消息到对应的通道,避免阻塞其他消息消费// 只有在 handlers 中存在对应的 actionType 时才发送消息到对应的通道if _, ok := handlers[kafkaMsg.ActionType]; ok {if ch, ok := worker.(chan *CustomKafkaMsg); ok {ch <- &kafkaMsg}}}}
}

 由于我的这个写法,让我产生了一点担忧,虽然,我想的是每个ActionType只起一个协程进行消费,难道,实际情况并不是如我预期一样运行,而是,一条kafka消息就起了一个协程进行消费,如果是这种情况的话,那么,会导致大量的垃圾回收,程序的性能就会下降,那么,消息阻塞的问题也就可以解释了。

为了,验证我的这一想法,基于pprof工具看一下实际情况。

实际验证,排除我的担忧,符合我的预期,不是有一条kafka消息就开一个协程进行消费,而是,一个ActionType就只有一个协程进行消费。 

模拟生产环境测试

上述的探索,依然不能够完美解释文章开头提到的现象,起了8个消费者,依然导致消息堆积的现象。为了进一步探究其背后的原因,我模拟生产环境的状态,每秒钟往kafka中丢了1000条消息,再观察,我发现,在这种情况下,有时json.Unmarshal也有比较长的耗时,会出现1.5毫秒的耗时,另外,而写日志需要5毫秒左右,如此,每秒只能消费140条消息,消息堆积的现象也就能够解释了。

结论 

消息堆积的主要原因是日志打印操作耗时较长,最差时每秒只能消费140条消息。此外,有时JSON解析的时间也较长,这也是一个需要关注的问题。

接下来的目标是找出JSON解析耗时较长和日志打印慢的具体原因,并进行优化。通过解决这些问题,我们有望提高日志处理的效率,从而解决消息堆积的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/67207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【opencv】第7章 图像变换

7.1 基 于OpenCV 的 边 缘 检 测 本节中&#xff0c;我们将一起学习OpenCV 中边缘检测的各种算子和滤波器——Canny 算子、Sobel 算 子 、Laplacian 算子以及Scharr 滤波器。 7.1.1 边缘检测的一般步骤 在具体介绍之前&#xff0c;先来一起看看边缘检测的一般步骤。 1.【第…

[Qt]常用控件介绍-多元素控件-QListWidget、QTableWidget、QQTreeWidget

目录 1.多元素控件介绍 2.ListWidget控件 属性 核心方法 核心信号 细节 Demo&#xff1a;编辑日程 3.TableWidget控件 核心方法 QTableWidgetItem核心信号 QTableWidgetItem核心方法 细节 Demo&#xff1a;编辑学生信息 4.TreeWidget控件 核心方法 核心信号…

JavaScript系列(26)--安全编程实践详解

JavaScript安全编程实践详解 &#x1f512; 今天&#xff0c;让我们深入探讨JavaScript的安全编程实践。在当今的网络环境中&#xff0c;安全性已经成为开发者必须重点关注的领域。 安全编程基础 &#x1f31f; &#x1f4a1; 小知识&#xff1a;JavaScript安全编程涉及多个方…

OpenGL中Shader LOD失效

1&#xff09;OpenGL中Shader LOD失效 2&#xff09;DoTween的GC优化 3&#xff09;开发微信小程序游戏有没有类似Debug真机图形的方法 4&#xff09;射线和Mesh三角面碰撞检测的算法 这是第418篇UWA技术知识分享的推送&#xff0c;精选了UWA社区的热门话题&#xff0c;涵盖了U…

Zookeeper 数据迁移实战:基础环境搭建与高效迁移方案全览

文章目录 一、Zookeeper数据迁移简介二、迁移zookeeper数据基础环境三、利用快照迁移zookeeper数据1、Node1最新的zk快照文件和日志文件2、将被迁移方node2的zookeeper的集群全部stop3、将源node1集群数据和日志拷贝到指定目录下4、验证优先启动拷贝的数据、日志的zookeeper节点…

什么是数据仓库?

什么是数据仓库&#xff1f; 数据仓库&#xff08;Data Warehouse&#xff0c;简称DW&#xff09;是一种面向分析和决策的数据存储系统&#xff0c;它将企业中分散的、异构的数据按照一定的主题和模型进行集成和存储&#xff0c;为数据分析、报表生成以及商业智能&#xff08;…

ubuntu支持中文的字体

在 Ubuntu 系统中&#xff0c;支持中文的字体可以通过安装或启用适配中文字符的字体包来实现。以下是 Ubuntu 上常用的中文字体以及安装方法&#xff1a; 常见支持中文的字体 思源字体系列&#xff08;推荐&#xff09;&#xff1a; 思源黑体&#xff08;Noto Sans CJK / Sourc…

java 迪米特法则,原理、思想、工作流程、实现细节、稳定性、优缺点、应用场景等

迪米特法则&#xff08;Law of Demeter&#xff0c;LoD&#xff09;&#xff0c;也被称为“最少知识原则”&#xff0c;是一种指导面向对象设计的原则&#xff0c;旨在减少对象之间的耦合度。以下是对迪米特法则的详细解析。 1. 定义 迪米特法则指出&#xff1a;一个对象应该…

[Linux]从零开始的STM32MP157交叉编译环境配置

一、前言 最近该忙的事情也是都忙完了&#xff0c;也是可以开始好好的学习一下Linux了。之前九月份的时候就想入手一块Linux的开发板用来学习Linux底层开发。之前在NXP和STM32MP系列之间犹豫&#xff0c;思来想去还是入手了一块STM32MP157。当然不是单纯因为MP157的性能在NXP之…

小程序如何引入腾讯位置服务

小程序如何引入腾讯位置服务 1.添加服务 登录 微信公众平台 注意&#xff1a;小程序要企业版的 第三方服务 -> 服务 -> 开发者资源 -> 开通腾讯位置服务 在设置 -> 第三方设置 中可以看到开通的服务&#xff0c;如果没有就在插件管理中添加插件 2.腾讯位置服务…

添加计算机到AD域中

添加计算机到AD域中 一、确定计算机的DNS指向域中的DNS二、打开系统设置三、加域成功后 一、确定计算机的DNS指向域中的DNS 二、打开系统设置 输入域管理员的账密 三、加域成功后 这里有显示&#xff0c;就成功了。

你喜欢用什么编辑器?

电脑工作者和程序员所使用的文本编辑器通常需要具备高效率、易用性以及对代码友好等特点&#xff0c;包括语法高亮、自动完成、多文件同时编辑、查找替换、版本控制集成等功能。以下是几个广受开发者欢迎且实用性较强的文本编辑器&#xff1a; Visual Studio Code&#xff08;V…

32单片机综合应用案例——智能家居灯光控制系统(二)(内附详细代码讲解!!!)

"即使世界看似残酷&#xff0c;也要坚持自己的梦想&#xff0c;因为只有这样&#xff0c;你才能创造属于自己的奇迹。”“不要害怕失败&#xff0c;因为失败是成功的垫脚石。”“即使跌倒了一百次&#xff0c;也要勇敢地爬起来一百零一次。”“永远不要低估自己的潜力&…

从epoll事件的视角探讨TCP:三次握手、四次挥手、应用层与传输层之间的联系

目录 一、应用层与TCP之间的联系 二、 当通信双方中的一方如客户端主动断开连接时&#xff0c;仅是在客户端的视角下连接已经断开&#xff0c;在服务端的眼中&#xff0c;连接依然存在&#xff0c;为什么&#xff1f;——触发EPOLLRDHUP事件&#xff1a;对端关闭连接或停止写…

使用RSyslog将Nginx Access Log写入Kafka

个人博客地址&#xff1a;使用RSyslog将Nginx Access Log写入Kafka | 一张假钞的真实世界 环境说明 CentOS Linux release 7.3.1611kafka_2.12-0.10.2.2nginx/1.12.2rsyslog-8.24.0-34.el7.x86_64.rpm 创建测试Topic $ ./kafka-topics.sh --zookeeper 192.168.72.25:2181/k…

设计模式02:结构型设计模式之适配器模式使用情景及其基础Demo

1.适配器模式 用途&#xff1a;接口兼容评价&#xff1a;复杂、冗余、难以调试&#xff0c;个人认为直接在旧系统那里封装一个新实现调用旧实现就好了场景&#xff1a;系统A、B、C想调用同一个功能接口&#xff0c;但是实现细节存在差异时&#xff08;其实就是入参和出参转化处…

使用 Docker 部署 Java 项目(通俗易懂)

目录 1、下载与配置 Docker 1.1 docker下载&#xff08;这里使用的是Ubuntu&#xff0c;Centos命令可能有不同&#xff09; 1.2 配置 Docker 代理对象 2、打包当前 Java 项目 3、进行编写 DockerFile&#xff0c;并将对应文件传输到 Linux 中 3.1 编写 dockerfile 文件 …

大数据学习(35)- spark- action算子

&&大数据学习&& &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 承认自己的无知&#xff0c;乃是开启智慧的大门 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4dd;支持一下博主哦&#x1f91…

fisco bcosV3 Table智能合约开发

环境 &#xff1a; fisco bcos 3.11.0 webase-front : 3.1.1 console 3.8.0 table合约【3.2.0版本后的】 前言 最近在做毕设&#xff0c;数据的存储方式考虑使用fisco-bcos的table表存储&#xff0c;经过这几天的研究&#xff0c;发现对于fisco2和 fisco3版本的table表合约功能…

推荐几本UML语言的经典书籍与常用软件

推荐几本 UML(统一建模语言)的经典书籍: 《UML用户指南》 作者:Grady Booch、James Rumbaugh、Ivar Jacobson介绍:这本书由 UML 的主要设计者撰写,是学习 UML 的经典入门书籍。书中详细介绍了 UML 的基本概念、模型图以及使用场景,适合初学者和进阶用户。《UML精粹》(U…