腾讯mini项目-【指标监控服务重构】2023-08-16

今日已办

v1

验证 StageHandler 在处理消息时是否为单例,【错误尝试】

type StageHandler struct {
}func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 1")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware2(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 2")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware3(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 3")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Handler1(msg *message.Message) error {log.Logger.Info("StageHandler Handler 1")fmt.Printf("%p\n", &s)return nil
}

image-20230816161140984

v2

  • 定义不同 Handler
type CrashHandler struct {Topic string
}func (s CrashHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": CrashHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": CrashHandler Handler 1 end")return nil
}type LagHandler struct {Topic string
}func (s LagHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": LagHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": LagHandler Handler 1 end")return nil
}
  • 添加到router中
	for _, topic := range topics {var category stringvar handlerFunc message.NoPublishHandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashHandler{Topic: category}.Handler1} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagHandler{Topic: category}.Handler1} else {continue}handler := router.AddNoPublisherHandler(topic+"test-handler", topic, subscriber, handlerFunc)}

image-20230816171659632

  • 结论
    • handler 实例会不断创建
    • 不同的 handler 可以并行处理不同主题的消息
    • 相同的 handler 在处理该主题的消息时是顺序的

官方文档: Message Router (watermill.io)

订阅者可以一次消费一条消息,也可以并行消费多条消息

  • Single stream of messages 是最简单的方法,这意味着在调用 msg.Ack() 之前,订阅者将不会收到任何新消息
  • Multiple message streams 仅部分订阅者支持。通过一次订阅多个主题分区,可以并行消费多条消息,甚至是之前未确认的消息(例如,Kafka 订阅者就是这样工作的) Router 通过运行并发 HandlerFuncs(每个分区一个)来处理此模型

v3

存在并发安全问题

  1. 公用一个上下文
  2. 频繁的修改上下文中的字段值
  3. 不同Handler和MiddleWare存在并发

解决思路

  • 将一次消息处理会使用到的数据集合定义为一个结构体
type ContextData struct {Status intEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}
  • 使用message的Context来传递这个数据

image-20230816222038683

  • 移除掉 ProfileCtx 的相关设计
  • 使用watermillzap.Logger来替换本身的 LoggerAdapter,更加直观且与原项目适配image-20230816225840032

完整代码

profile/internal/watermill/consumer/consumer_context.go

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context"kc "github.com/Kevinello/kafka-client""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""github.com/garsue/watermillzap""github.com/qiulin/watermill-kafkago/pkg/kafkago""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""strings""time"
)// Consume
// @Description
// @Author xzx 2023-08-16 22:52:52
func Consume() {logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := newPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))if err != nil {log.Logger.Fatal("get topics failed", zap.Error(err))return}for _, topic := range topics {var category stringvar handlerFunc message.HandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashWriteKafka} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagWriteKafka} else {continue}router.AddHandler(category, topic, subscriber, connector.GetTopic(category), publisher, handlerFunc).AddMiddleware(UnpackKafkaMessage,InitPerformanceEvent,AnalyzeEvent)}if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}// newPubSub
// @Description
// @Author xzx 2023-08-16 22:52:45
// @Param logger
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub(logger watermill.LoggerAdapter) (message.Publisher, message.Subscriber) {marshaler := kafkago.DefaultMarshaler{}publisher := kafkago.NewPublisher(kafkago.PublisherConfig{Brokers:     []string{config.Profile.GetString("kafka.bootstrap")},Async:       false,Marshaler:   marshaler,OTELEnabled: false,Ipv4Only:    true,Timeout:     100 * time.Second,}, logger)subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{Brokers:       []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:   marshaler,ConsumerGroup: config.Profile.GetString("kafka.group"),OTELEnabled:   false,}, logger)if err != nil {log.Logger.Fatal("Unable to create subscriber", zap.Error(err))}return publisher, subscriber
}

profile/internal/watermill/consumer/consumer_stage.go

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state"
)type ContextData struct {Status intEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data ContextData// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}log.Logger.Info("[1-UnpackKafkaItem] unpack kafka item success", zap.Any("event", data.Event))msg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorreturn nil, contextErr}log.Logger.Info("[2-InitPerformanceEvent] Consume performance event success", zap.Any("event", data.Event))data.Event.ProfileData = eventmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorreturn nil, contextErr}log.Logger.Info("[3-AnalyzeEvent] analyze event success", zap.Any("event", data.Event))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// CrashWriteKafka
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func CrashWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-CrashWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}func LagWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-LagWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}

测试

上报PERF_LAG Event可以并发处理 2 条消息,不必等待上一条消息处理完

image-20230816222712201

image-20230816222820505

多次测试发现是由于两条消息走了不同的 Handler

image-20230816230158777

暂未修复,明明是同一主题的两条消息却都走了两条不同的链路,而且 publisher 最后写回的主题也是写到了不同的主题上,并且上报另一个类型的事件,即另一个主题的消息却无法触发消费者消费!

暂定先写死两个主题名称测试是否正常

明日待办

  1. 开会讨论项目规划和任务分工
  2. 继续完成需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/81991.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

沈阳建筑大学《乡村振兴战略下传统村落文化旅游设计》 许少辉八一著作

沈阳建筑大学《乡村振兴战略下传统村落文化旅游设计》 许少辉八一著作

怎么推广自己抖店的商品?最适合0经验新手操作的办法,来看看

我是王路飞。 抖店开通后,想要把自己店铺的商品卖出去,就需要进行推广了。 但是怎么推广呢? 要么利用抖音的搜索和推荐流量,获取曝光,实现点击和转化。 不过这种玩法有个弊端,就是需要你有一定的电商经…

农民朋友有福利啦!建行江门市分行“裕农通+农资结算”平台正式上线

随着广东广圣农业发展有限公司办公室内的裕农通“智慧眼”结算机“叮”的一声到账提醒,标志着全国首个“裕农通农资结算“平台的成功上线,也标志着建行广东省江门市分行的裕农通业务又迈上了一个新的台阶。 广东广圣农业发展有限公司(以下简…

FTP这么“好用”和“便宜”,为什么企业还要替换掉?

FTP是一种历史悠久的网络协议,自1971年问世以来,它因其简易性、便捷性以及强大的跨平台兼容性而被广泛使用。在网站开发、软件更新和数据备份等多个场景中,FTP都发挥了重要作用。不过,随着互联网技术的不断发展和企业需求的多样化…

Linux线程

1.进程是资源管理的最小单位,线程是程序执行的最小单位。 2.每个进程有自己的数据段、代码段和堆栈段。线程通常叫做轻型的进程,它包含独立的栈和CPU寄存器状态,线程是进程的一条执行路径,每个线程共享其所附属进程的所有资源,包括…

蓝桥杯 题库 简单 每日十题 day4

01 津津上初中了。妈妈认为津津应该更加用功学习,所以津津除了上学之外,还要参加妈妈为她报名的各科复习班。另外每周妈妈还会送她去学习朗诵、舞蹈和钢琴。但是津津如果一天上课超过八个小时就会不高兴,而且上得越久就会越不高兴。假设津津…

【笔试强训选择题】Day43.习题(错题)解析

作者简介:大家好,我是未央; 博客首页:未央.303 系列专栏:笔试强训选择题 每日一句:人的一生,可以有所作为的时机只有一次,那就是现在!!!&#xff…

【QT5-解决不同分辨率屏幕-进行匹配大小-适应屏幕大小-基础样例】

【QT5-解决不同分辨率屏幕-进行匹配大小-适应屏幕大小】 1、前言2、实验环境3-1、问题说明-屏幕视频3-2、解决方式-个人总结解决思路:我们在软件启动的时候,先获取屏幕大小,然后根据长宽,按照一定比例,重新设置大小。并…

VMware17 不可恢复错误mks解决方案

用的虚拟机VMware17版本,然后运行带HDR的unity程序,结果报错 网上找了很多解决方案,都没用。毕竟需要在不放弃虚拟机3D加速的情况下运行。 最终皇天不负有心人,亲测有效的方法: 在虚拟机名字.vmx文件里添加以下2行&a…

URL 管理器

基本介绍 对外接口 对外提供两个接口:一个可以提取URL,一个可以增加URL,分别对应图上的1和2。 当要爬取某个网页时,则可以从1接口提取出该网页的URL进行爬取。 有时候爬取的网页内容中会包含别的网页链接,即包含有U…

网工内推 | 国企、上市公司,IA/IP认证即可,有年终、绩效

01 上海市机械设备成套(集团)有限公司 招聘岗位:网络工程师 职责描述: 1、 负责公司电脑、网络设备、电器设备、办公设备等硬件的管理、维护和使用,做好计算机硬件及办公设备台帐; 2、 负责公司办公软件的…

回溯算法 解题思路

文章目录 算法介绍回溯算法能解决的问题解题模板1. 组合问题2. N皇后问题 算法介绍 回溯法(Back Tracking Method)(探索与回溯法)是一种选优搜索法,又称为试探法,按选优条件向前搜索,以达到目标…

zookeeper —— 分布式服务协调框架

zookeeper —— 分布式服务协调框架 一、Zookeeper概述1、Zookeeper的基本概念2、Zookeeper的特点3、Zookeeper的数据结构 二、Zookeeper的安装部署1、Zookeeper的下载2、Zookeeper的安装本地模式(单机模式standalone)安装部署分布式(集群模式…

1978-2021年全国各省城镇与农村恩格尔系数数据

1978-2021年全国各省城镇与农村恩格尔系数数据 1、时间:1978-2021年 2、指标:城镇恩格尔系数、农村恩格尔系数 3、范围:31省市 4、来源:各省年鉴 5、用途:反应居民生活质量 6、指标解释: 恩格尔系数…

【SpringSecurity】三更草堂项目案例分析3 - 鉴权操作

目录 鉴权RBAC 模型前置准备redis 实现角色权限获取 鉴权 RBAC 模型 参考 CSDN 文章 RBAC(Role-Based Access Control),基于角色的访问控制,现在主流的权限管理系统的权限设计都是 RBAC 模型 所谓的 RBAC 模型,可以理…

Java精品项目源码第61期垃圾分类科普平台(代号V061)

Java精品项目源码第61期垃圾分类科普平台(代号V061) 大家好,小辰今天给大家介绍一个垃圾分类科普平台,演示视频公众号(小辰哥的Java)对号查询观看即可 文章目录 Java精品项目源码第61期垃圾分类科普平台(代号V061)难度指数&…

【TCP】滑动窗口、流量控制 以及拥塞控制

滑动窗口、流量控制 以及拥塞控制 1. 滑动窗口(效率机制)2. 流量控制(安全机制)3. 拥塞控制(安全机制) 1. 滑动窗口(效率机制) TCP 使用 确认应答 策略,对每一个发送的数…

MySQL的常用术语

目录 1.关系 2.元组 3.属性 MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 1.关系 前面的博客有说到,MySQL是一款关系型数据库管理软件,一个关系就是 一张二维表(表) 我想大家都知道表格怎么…

【100天精通Python】Day61:Python 数据分析_Pandas可视化功能:绘制饼图,箱线图,散点图,散点图矩阵,热力图,面积图等(示例+代码)

目录 1 Pandas 可视化功能 2 Pandas绘图实例 2.1 绘制线图 2.2 绘制柱状图 2.3 绘制随机散点图 2.4 绘制饼图 2.5 绘制箱线图A 2.6 绘制箱线图B 2.7 绘制散点图矩阵 2.8 绘制面积图 2.9 绘制热力图 2.10 绘制核密度估计图 1 Pandas 可视化功能 pandas是一个强大的数…

Autojs 小游戏实践-神农百草园

概述 最近一直再写刷视频软件脚本,比如手机视频软件太多,每天都需要手动提现羊毛,太累,使用Autojs来帮助我提现,签到,扯远了,因为做刷视频脚本感觉有点无聊,所以试着做小游戏找图脚…