腾讯mini项目-【指标监控服务重构】2023-08-18

今日已办

watermill

将 key 设置到 message 中

修改 watermill-kafka 源码 将 key 设置到 message.metadata中

image-20230818224812801

image-20230818224728773

接入 otel-sdk

  1. 添加 middleware resolveUpstreamCtx 解析上游上下文,开启根Span
  2. 添加 middleware middleware.InstantAck - 马上ACK,使得多条消息可以平行处理(走middleware 和 handler 的逻辑)
// Package pubsub
// @Author xzx 2023/8/12 10:01:00
package pubsubimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.opentelemetry.io/otel""go.opentelemetry.io/otel/attribute""go.opentelemetry.io/otel/propagation"semconv "go.opentelemetry.io/otel/semconv/v1.17.0""go.opentelemetry.io/otel/trace""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/otelclient""profile/internal/schema""profile/internal/schema/performance""profile/internal/state""profile/internal/watermill/watermillkafka"
)// consumeCtxData
// @Description: Data collection of a message processing context
// @Author xzx 2023-08-17 13:36:12
type consumeCtxData struct {Status        intEvent         schema.EventRootSpan      trace.SpanRootSpanCtx   context.ContextAppID         string // API 上报FetchScenario string // API 上报
}// resolveUpstreamCtx
// @Description
// @Author xzx 2023-08-18 11:15:09
// @Param h
// @Return message.HandlerFunc
func resolveUpstreamCtx(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data consumeCtxData// get upstream producer's W3C trace context via propagationheaderCarrier := make(propagation.HeaderCarrier)headerCarrier.Set("Traceparent", msg.Metadata.Get("Traceparent"))upstreamProducerCtx := otel.GetTextMapPropagator().Extract(msg.Context(), headerCarrier)// set traceID to consumer contextconsumerCtx := trace.ContextWithRemoteSpanContext(msg.Context(),trace.NewSpanContext(trace.SpanContextConfig{TraceID: trace.SpanContextFromContext(upstreamProducerCtx).TraceID(),}))//start tracingdata.RootSpanCtx, data.RootSpan = otelclient.ConsumerTracer.Start(consumerCtx, "Profile-Consumer",trace.WithSpanKind(trace.SpanKindConsumer),trace.WithLinks(trace.LinkFromContext(upstreamProducerCtx, semconv.OpentracingRefTypeFollowsFrom)))msg.SetContext(context.WithValue(msg.Context(), "data", &data))return h(msg)}
}// unpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param hmsg.SetContext(context.WithValue(msg.Context(), "data", data))
// @Return message.HandlerFunc
func unpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)unpackKafkaMessageCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "unpackKafkaMessage",trace.WithSpanKind(trace.SpanKindClient))// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(unpackKafkaMessageCtx, "unmarshal error", contextErr)span.End()return nil, contextErr}log.Logger.InfoContext(unpackKafkaMessageCtx, "[UnpackKafkaItem] unpack kafka item success",zap.Any("event", data.Event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))setSpanAttributes(span, data)msg.SetContext(context.WithValue(msg.Context(), "data", data))span.End()return h(msg)}
}// initPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func initPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)initPerformanceEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "initPerformanceEvent",trace.WithSpanKind(trace.SpanKindInternal))event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorhandlerErr(initPerformanceEventCtx, "event factory error", contextErr)span.End()return nil, contextErr}log.Logger.InfoContext(initPerformanceEventCtx, "[initPerformanceEvent] init performance event success",zap.Any("event", event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))data.Event.ProfileData = eventsetSpanAttributes(span, data)msg.SetContext(context.WithValue(msg.Context(), "data", data))span.End()return h(msg)}
}// analyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func analyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)analyzeEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "analyzeEvent",trace.WithSpanKind(trace.SpanKindInternal))contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorhandlerErr(analyzeEventCtx, "analyze event error", contextErr)span.End()return nil, contextErr}log.Logger.InfoContext(analyzeEventCtx, "[analyzeEvent] analyze performance event success",zap.Any("event", data.Event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilsetSpanAttributes(span, data)msg.SetContext(context.WithValue(msg.Context(), "data", data))span.End()return h(msg)}
}// crashHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func crashHandler(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",trace.WithSpanKind(trace.SpanKindProducer))toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(writeKafkaCtx, "marshal error", contextErr)span.End()return nil, contextErr}msg = message.NewMessage(data.Event.BackendID, toWriteBytes)msg.Metadata.Set(watermillkafka.HeaderKey, data.Event.ID)log.Logger.Info("[4-crashHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))setSpanAttributes(span, data)span.End()data.RootSpan.End()return message.Messages{msg}, nil
}// lagHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func lagHandler(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",trace.WithSpanKind(trace.SpanKindProducer))toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(writeKafkaCtx, "marshal error", contextErr)span.End()return nil, contextErr}msg = message.NewMessage(data.Event.BackendID, toWriteBytes)log.Logger.Info("[4-lagHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))setSpanAttributes(span, data)span.End()data.RootSpan.End()return message.Messages{msg}, nil
}// setSpanAttributes
// @Description  setSpanAttributes
// @Author xzx 2023-08-03 23:19:17
// @Param span
// @Param profileCtx
func setSpanAttributes(span trace.Span, data *consumeCtxData) {if span.IsRecording() {span.SetAttributes(attribute.String("event.category", data.Event.Category),attribute.String("event.backend_id", data.Event.BackendID),)}
}// handlerErr
// @Description
// @Author xzx 2023-07-20 15:36:46
// @Param span
// @Param ctx
// @Param msg
// @Param err
func handlerErr(ctx context.Context, msg string, err error) {log.Logger.ErrorContext(ctx, msg, zap.Error(err))
}

会议纪要

进度

  1. venus 的 metrics 独立分支开发
  2. venus 的 trace 修复了一些bug
    1. 返回 error 主动调用 span.end()
  3. profile 的 watemill pub/sub 和 trace 上报还原原本功能
  4. profile 的 hyperscan 的继续调研中

待办

  1. 调研如何关闭otel,设置开关配置
    1. 找到了 sdktrace.WithSampler(sdktrace.NeverSample()) - 可以不上报 trace
  2. 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
  3. 调研 watermill 的 otelEnabled 的功能,其他集成 otel 的第三方库等
  4. hyperscan 的 benchmark

明日待办

  1. 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
  2. watermill-kafka 的源码基础上对 publisher 的 写回 kafka 添加 otel - trace & log 的逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/83822.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试Java后端

sql 五表联合查询 面试八股 JDK,JRE,JVM之间的区别 JDK,Java标准开发包,它提供了编译、运行Java程序所需的各种工具和资源,包括Java编译器、Java运行时环境,以及常用的Java类库等。 JRE(Java Runtime Environment)&…

Learn Prompt-Prompt 高级技巧:AI-town 虚拟小镇

AI-town可能是2023年最令人鼓舞的AI代理实验之一。我们经常讨论单个LLM的突现能力,但 Agents 突现在大规模下可能会更复杂和迷人。一个AI的种群可以展现出整个文明的演化。 🎉开始阅读前,如果你对其他文章感兴趣,可以到欢迎页关注…

Anaconda下载安装教程,新手详细

Anaconda的安装包下载分为官网下载和清华源下载, ①官网:Anaconda官网 (别的博主说官网较慢,有时候还进不去,我感觉还行,2分钟就下载好了。如果不顺利,请尝试使用清华源) ②清华源…

机器学习入门教学——损失函数(极大似然估计法)

1、前言 我们在训练神经网络时,最常用到的方法就是梯度下降法。在了解梯度下降法前,我们需要了解什么是损失(代价)函数。所谓求的梯度,就是损失函数的梯度。如果不知道什么是梯度下降的,可以看一下这篇文章:机器学习入…

性能测试 —— Tomcat监控与调优:status页监控

Tomcat服务器是一个免费的开放源代码的Web 应用服务器,Tomcat是Apache 软件基金会(Apache Software Foundation)Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。 Tomcat是一个轻量级应用服务器,在中小型系统…

DevExpress WinForms图表组件 - 直观的数据信息呈现新方式!(一)

凭借界面控件DevExpress WinForms全面的2D和3D图表类型的集合,DevExpress WinForms的图表控件设计大大简化了开发者直观地向最终用户呈现信息的方式。 DevExpress WinForms有180组件和UI库,能为Windows Forms平台创建具有影响力的业务解决方案。同时能完…

IDEA——工程项目的两种窗口开发模式

文章目录 引言一、多项目窗口模式的便利1.1 源码 debug 二、多项目窗口模式的弊端三、多项目窗口的版本管理四、单项目、多项目窗口模式转换 引言 idea编辑器有两种窗口模式,一种是单项目窗口,另一种是多项目窗口。 我个人使用较多的是单项目窗口&#…

Linux 多线程 | 线程安全、死锁、线程同步

在前面的文章中我们讲述了锁的理解、原理、用户级线程库的内容,以及对Linux中的锁和线程进行了封装,本文中将继续对多线程的内容进行讲解。 可重入与线程安全 概念 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见…

网络安全第一次作业

1、什么是防火墙 防火墙是一种网络安全系统,它根据预先确定的安全规则监视和控制传入和传出的网络流量。其主要目的是阻止对计算机或网络的未经授权的访问,同时允许合法通信通过。 防火墙可以在硬件、软件或两者的组合中实现,并且可以配置为根…

七绝 . 秋寒

题记 拜读署名“淡定人生D”近日发表在“ 今日头条 ”上的古体诗《七绝 . 凉》,本老朽在由衷赞叹该女子才貌双全之时,也对自己寄居养老的成都崇州街子古镇今日下午的秋寒突至天气,情怀涌动,思绪万千,亦作《七绝 . 秋寒…

JavaScript小案例-树形菜单(菜单数据为数组)

菜单层级理论上可以无限多&#xff0c;因为是递归渲染。 gif演示图&#xff1a; 代码&#xff1a; 树形菜单.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content&quo…

超越创意,从用户创造内容到AI生成内容的新时代

在这个信息爆炸的时代&#xff0c;内容创作正经历前所未有的变革&#xff0c;其频率和多样性令人瞠目结舌。曾经&#xff0c;我们主要依赖传统媒体&#xff0c;需要专业团队为人们打造内容&#xff0c;这被称为专业生成内容&#xff08;PGC&#xff0c;Professional-generated …

MD5加密算法

1、简介 MD5在90年代初由MIT的计算机科学实验室和RSA Data Security Inc发明&#xff0c;经MD2、MD3和MD4发展而来。 MD5将任意长度的“字节串”变换成一个128bit的大整数&#xff0c;并且它是一个不可逆的字符串变换算法&#xff0c;换句话说就是&#xff0c;即使你看到源程序…

IO口电路种类

文章目录 参考1.高速振荡电路&#xff08;时钟IO引脚&#xff09;2.与 GPIO 功能共享的低速振荡电路&#xff08;子时钟IO&#xff09;3.CMOS 滞后输入引脚4.电源输入保护电路5.A/D 转换器 ref (AVRH)带保护电路的电源输入端6.CMOS 电平输出7.CMOS 电平输出&#xff0c;带有模…

sublime text3 设置代码错误提示之php

第一步&#xff1a; ctrlshiftp 输入 install package 并选中 第二步&#xff1a; 输入 sublimelinter-php 第三步&#xff1a; Prefernces> package Settings >Sublimelinter > settings 这里不同按照版本可能不一样 有些可能是 settings User 第四步 完成

Python 变量

视频版教程 Python3零基础7天入门实战视频教程 变量 无论使用什么语言编程&#xff0c;总要处理数据&#xff0c;处理数据就需要使用变量来保存数据。变量就像一个个小容器&#xff0c;用于“盛装”程序中的数据。 再说说&#xff0c;Python的数据类型&#xff0c;有以下六种…

分享一个基于uniapp+springboot技术开发的校园失物招领小程序(源码、lw、调试)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

TPU-MLIR——实现Chatglm2-6B大模型移植部署

TPU-MLIR——实现Chatglm2-6B大模型移植部署 本项目实现BM1684X部署语言大模型ChatGLM2-6B。通过TPU-MLIR编译器将模型转换成bmodel&#xff0c;并采用c代码将其部署到BM1684X的PCIE环境&#xff0c;或者SoC环境。 编译chatglm2-6B模型 1. 下载‘Chat-GLM2-6B’ 2. 对该模型…

谷歌版ChatGPT与旗下邮箱、视频、地图等,实现全面集成!

9月20日&#xff0c;谷歌在官网宣布推出Bard Extensions。借助该扩展用户可在谷歌的Gmail、谷歌文档、网盘、Google 地图、视频等产品中使用Bard。 Bard是谷歌基于PaLM 2大模型&#xff0c;打造的一款类ChatGPT产品&#xff0c;可自动生成文本、代码、实时查询信息等。新的集成…

数据结构与算法(C语言版)P5---栈

1、栈 1.1、栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。__进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。__栈中的数据元素遵守__后进先出&#xff08;先进后出&#xff09;__LIFO&#xf…