腾讯mini项目-【指标监控服务重构】2023-08-26

今日已办

Venus 的 Trace 无感化

定义 handler 函数

  1. fiber.Handler 的主要处理逻辑
  2. 返回处理中出现的 error
  3. 返回处理中响应 json 的函数
// handler
// @Description:
// @Author xzx 2023-08-26 18:00:03
// @Param c
// @Return error
// @Return func() error : function for response json
type handler func(c *fiber.Ctx) (error, func() error)

定义TraceWrapper函数

  1. otel-trace 的逻辑嵌入 handler 中

  2. 启动 span

  3. 执行 handler,记录 span 的 attributes

  4. 根据返回的 err, jsonRespFunc 分情况讨论

  5. 条件处理逻辑
    err != nil && jsonRespFunc == nil存在错误,将错误记录到Span中,结束Span,执行c.Next()
    err != nil && jsonRespFunc != nil存在错误,将错误记录到Span中,结束Span,响应JSON
    err == nil && jsonRespFunc == nil结束Span,执行c.Next()
    err == nil && jsonRespFunc != nil结束Span,响应JSON
  6. 代码实现

// TraceWrapper return fiber.Handler integrate report otel-trace
// @Description integrate otel-trace logic in handler
// @Author xzx 2023-08-26 18:00:03
// @Param f
// @Param opts
// @Return fiber.Handler
func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {return func(c *fiber.Ctx) error {_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)// execute handler logicerr, jsonRespFunc := f(c)// todo: setSpanAttributesif err != nil {// record span errorspan.RecordError(err)span.SetStatus(codes.Error, err.Error())span.End()if jsonRespFunc != nil {// response error resultreturn jsonRespFunc()}// ignore error, continue handlersreturn c.Next()}span.End()if jsonRespFunc != nil {// response success resultreturn jsonRespFunc()}// err == nil, jsonRespFunc == nilreturn c.Next()}
}

具体的 handler 逻辑

// SplitAndValidate split multi-events and validate uploaded data
// @Description
// @Author xzx 2023-08-26 17:54:21
// @Param c
// @Return Err
// @Return f
func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "all_upload"),)))RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))if err != nil {log.Logger().Error("Create Baggage Member Failed", zap.Error(err))}Baggage, err := baggage.New(RequestIdBaggage)if err != nil {log.Logger().Error("Create Baggage Failed", zap.Error(err))}c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))c.Accepts(fiber.MIMEMultipartForm)uploadedData, err := parseJSON(c)if err != nil {log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}if err = uploadedData.Meta.Validate(); err != nil {log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_meta"),)))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}// must use pointer when using Locals of fiber if it's about to modifyevents := make([]*schema.Event, 0, len(uploadedData.Data))parts := make([]*protocol.Part, 0, len(uploadedData.Data))for idx, data := range uploadedData.Data {log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event := &schema.Event{}part := &protocol.Part{}if err = data.Validate(); err != nil {Err = errotelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_data"),)))part.Code = protocol.ValidationErrorpart.Data = err.Error()}part.ID = data.IDevent.Meta = uploadedData.Metaevent.Data = dataevents = append(events, event)parts = append(parts, part)}c.Locals("meta", uploadedData.Meta)c.Locals("events", events)c.Locals("parts", parts)return Err, nil
}// HandleEvent handle event
// @Description
// @Author xzx 2023-08-26 17:54:23
// @Param c
// @Return Err
// @Return f
func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)for idx, event := range events {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event.BackendID = uuid.NewString()event.UploadTime = time.Now().UnixMilli()part := parts[idx]part.BackendID = event.BackendIDcountry, region, city, err := ip.ParseIP(event.IP)if err != nil {Err = errlog.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))part.Code = protocol.IPParsingErrorpart.Data = err.Error()}log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))event.Country = countryevent.Region = regionevent.City = city}return Err, nil
}// WriteKafka write to kafka
// @Description
// @Author xzx 2023-08-26 17:54:26
// @Param c
// @Return Err
// @Return f
func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))meta := c.Locals("meta").(schema.Meta)events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)// mark if all parts are succeed, which is response for codeisAllSuccess := true// topic is like to_analyzer__0.PERF_CRASHtopic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)traceparent := c.Get(traceparentHeaderKey)if len(traceparent) == 55 {spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()traceparent = traceparent[:36] + spanId + traceparent[52:]}messages := make([]kafka.Message, 0, len(events))for idx, event := range events {// skip if event was failedif parts[idx].Code != 0 {isAllSuccess = falsecontinue}bytes, err := sonic.Marshal(event)if err != nil {Err = errlog.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))parts[idx].Code = protocol.SerializationErrorparts[idx].Data = err.Error()}messages = append(messages, kafka.Message{Topic: topic,Value: bytes,Headers: []kafka.Header{{Key: traceparentHeaderKey, Value: []byte(traceparent)},},})}if len(messages) == 0 { // would not write to kafka since every part were failed for some reasonlog.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return errors.New("every data were failed to handle, check their code and data"), func() error {return c.JSON(protocol.Response{Code:  protocol.AllFail,Data:  "every data were failed to handle, check their code and data",Parts: parts,})}}log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))kafkaProducer := connector.GetEventKafka()if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))c.Status(fiber.StatusInternalServerError)return err, func() error {return c.JSON(protocol.Response{Code:  protocol.AllFail,Data:  fmt.Sprintf("failed to write to kafka: %s", err.Error()),Parts: parts,})}}if isAllSuccess {c.Status(fiber.StatusOK)otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "success_upload"))))return nil, func() error {return c.JSON(protocol.Response{Code:  protocol.AllSuccess,Parts: parts,})}} else {c.Status(fiber.StatusPartialContent)return Err, func() error {return c.JSON(protocol.Response{Code:  protocol.PartialSuccess,Data:  "some data were failed to handle, check their code and data",Parts: parts,})}}
}

同步会议

进度

  1. 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
  2. otel上报的开关,已经 review 完合并了
  3. 部署了jaeger的整套方案
  4. 关于 watermill 和 baserunner 的 banchmark

Review和测试方案

  1. 代码 review
    1. fiber.Handler 接入 otel 无感化,代码可扩展性
    2. 修复部分命名规范、注释规范和代码质量检查中指出的问题
    3. 移除 profile-compose 的 grafana 容器
    4. log 的初始化
    5. 移除 venus-compose 的无用配置
  2. 测试方案
    1. 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
      1. 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
    2. 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
      1. 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
      2. es 已购买集群,等待接入
      3. 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
      4. 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
    3. 对于 ck 集群的指标
      1. 【插入】ck 的 写入耗时,1/5分钟的写入成功率
      2. 【查询】ck 的 查询耗时,查询成功率
      3. 【资源利用率】ck 的 cpu、内存利用率
    4. 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等

总结

  1. 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
  2. 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
  3. 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
  4. 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
  5. 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
  6. 扩展:benchmark 的 hyperscan 和官方正则处理的对比

数据记录、控制变量

cadvisor

image-20230826231631007

services:cadvisor:image: gcr.io/cadvisor/cadvisor:latestcontainer_name: cadvisornetworks:- backendcommand: --url_base_prefix=/cadvisorvolumes:- /:/rootfs:ro- /var/run:/var/run:rw- /sys:/sys:ro- /var/lib/docker/:/var/lib/docker:ro- /dev/disk/:/dev/disk:ro

jaeger-all-in-one 分开部署

为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署

services:jaeger-collector:image: jaegertracing/jaeger-collectorcontainer_name: jaeger-collectornetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--es.num-shards=1","--es.num-replicas=0","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- elasticsearchjaeger-query:image: jaegertracing/jaeger-querycontainer_name: jaeger-querynetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--span-storage.type=elasticsearch","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090- no_proxy=localhost- QUERY_BASE_PATH=/jaegerdepends_on:- jaeger-agentjaeger-agent:image: jaegertracing/jaeger-agentcontainer_name: jaeger-agentnetworks:- backendcommand: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- jaeger-collector

明日待办

  1. 压测

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/87739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

和 Node.js 说拜拜,Deno零配置解决方案

不知道大家注意没有,在我们启动各种类型的 Node repo 时,root 目录很快就会被配置文件塞满。例如,在最新版本的 Next.js 中,我们就有 next.config.js、eslintrc.json、tsconfig.json 和 package.json。而在样式那边,还…

Spring面试题9:Spring的BeanFactory和FactoryBean的区别和联系

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说Spring的BeanFactory和FactoryBean的区别和联系 区别:BeanFactory是一个工厂接口,主要负责管理和创建Bean实例。它是Spring提供的最底层的…

优维产品最佳实践:主机合规性检查

我们常常会感到这样的困惑,为什么这么多的无效主机记录,为什么这些主机很多信息空白,当许多人一起维护主机信息时,常常会出现信息错漏的情况。主机是运维最重要最基本的CMDB信息,而「合规性检查」为我们提供了更高效便…

Spring Cloud Alibaba Gateway 简单使用

文章目录 Spring Cloud Alibaba Gateway1.Gateway简介2. 流量网关和服务网关的区别3. Spring Cloud Gateway 网关的搭建3.1 Spring Cloud Gateway 配置项的说明3.2 依赖导入3.3 配置文件 Spring Cloud Alibaba Gateway 1.Gateway简介 Spring Cloud Gateway是一个基于Spring F…

混合Rollup:探秘 Metis、Fraxchain、Aztec、Miden和Ola

1. 引言 混合Rollup为新的以太坊L2扩容方案,其分为2大类: 将乐观与ZK技术结合的混合Rollup同时支持公开智能合约 和 私人智能合约 的混合Rollup 本文将重点关注Metis、Fraxchain、Aztec、Miden和Ola这五大项目。 2. 何为混合Rollup? 混合…

MySQL-树型结构数据查询

表结构 进行树形结构查询,利用特殊语法进行操作 with recursive t as(select parent_id , business_namefrom business_line where id 21union allselect a.parent_id, a.business_namefrom business_line a join t on a.id t.parent_id) select business_name f…

[AI Agent学习] MetaGPT源码浅析

前言 工作上,需要使用AI Agent,所以需要深入学习一下AI Agent,光阅读各种文章,总觉无法深入细节,所以开看各类AI Agent相关的开源项目,此为第一篇,学习一下MetaGPT的源码。 基本目标 MetaGPT是一…

【深度学习实验】前馈神经网络(八):模型评价(自定义支持分批进行评价的Accuracy类)

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. __init__(构造函数) 2. update函数(更新评价指标) 5. accumulate(计算准确率) 4. reset(重置评价指标) 5. 构造数据进行测试 6. 代码整合 一、实验介绍 本文将实…

进行 XSS 攻击 和 如何防御

跨站脚本攻击(XSS 攻击)是 Web 开发中最危险的攻击之一。以下是它们的工作原理以及防御方法。 XSS 攻击 跨站脚本攻击就是在另一个用户的计算机上运行带有恶意的 JS 代码。假如我们的程序没有对这些恶意的脚本进行防御的话,他们就会由我们的…

李宏毅hw-10 ——adversarial attack

一、查漏补缺: 1.关于glob.glob的用法,返回一个文件路径的 列表: 当然,再套用1个sort,就是将所有的文件路径按照字母进行排序了 2.relpath relative_path返回相对于基准路径的相对路径的函数 二、代码剖析&#xff…

STM32单片机入门学习(四)-蜂鸣器

蜂鸣器接线 低平蜂鸣器,低电平发声,高电平不发声, 三个排针,VCC接3.3v,GND接地,I/O接A0口,如图: 蜂鸣器代码:响一秒停半秒 #include "stm32f10x.h" #includ…

LRU、LFU 内存淘汰算法的设计与实现

1、背景介绍 LRU、LFU都是内存管理淘汰算法,内存管理是计算机技术中重要的一环,也是多数操作系统中必备的模块。应用场景:假设 给定你一定内存空间,需要你维护一些缓存数据,LRU、LFU就是在内存已经满了的情况下&#…

2023-2024年最新大数据学习路线

文章目录 2023-2024年最新大数据学习路线大数据开发入门*01*阶段案例实战 大数据核心基础*02*阶段案例实战 千亿级数仓技术*03*阶段项目实战 PB级内存计算04阶段项目实战 亚秒级实时计算*05*阶段项目实战 大厂面试*06* 2023-2024年最新大数据学习路线 新路线图在Spark一章不再…

Android跨进程通信:Binder机制原理

目录 1. Binder到底是什么? 2. 知识储备 2.1 进程空间划分 2.2 进程隔离 & 跨进程通信( IPC ) 2.3 内存映射 2.3.1 作用 2.3.2 实现过程 2.3.3 特点 2.3.4 应用场景 2.3.5 实例讲解 ① 文件读 / 写操作 ② 跨进程通信 3. Bi…

Cron表达式_用于定时调度任务

一、Cron表达式简介 Cron表达式是一个用于设置计划任务的字符串,该字符串以5或6个空格分隔,分为6或7个域,每一个域代表任务在相应时间、日期或时间间隔执行的规则【Cron表达式最初是在类Unix操作中系统中使用的,但现在已经广泛应用…

人机融合需要在事实与价值之间构建新型的拓扑关系

人机融合,这是指将人类智慧(含艺术)与计算机科技相结合,共同解决复杂问题的一种新方式。在人机融合中,我们需要构建事实与价值之间的新型拓扑关系,以实现更有效的知识管理和决策支持。 事实是指客观存在的、…

Python爬虫爬取豆瓣电影短评(爬虫入门,Scrapy框架,Xpath解析网站,jieba分词)

声明:以下内容仅供学习参考,禁止用于任何商业用途 很久之前就想学爬虫了,但是一直没机会,这次终于有机会了 主要参考了《疯狂python讲义》的最后一章 首先安装Scrapy: pip install scrapy 然后创建爬虫项目&#…

力扣26:删除有序数组中的重复项

26. 删除有序数组中的重复项 - 力扣(LeetCode) 题目: 给你一个 非严格递增排列 的数组 nums ,请你 原地 删除重复出现的元素,使每个元素 只出现一次 ,返回删除后数组的新长度。元素的 相对顺序 应该保持 …

Leetcode 95. 不同的二叉搜索树 II

文章目录 题目代码&#xff08;9.21 首刷看解析&#xff09; 题目 Leetcode 95. 不同的二叉搜索树 II 代码&#xff08;9.21 首刷看解析&#xff09; class Solution { public:vector<TreeNode*> generateTrees(int n) {return build(1,n);}vector<TreeNode*> bu…

将本地前端工程中的npm依赖上传到Nexus

【问题背景】 用Nexus搭建了内网的依赖仓库&#xff0c;需要将前端工程中node_modules中的依赖上传到Nexus上&#xff0c;但是node_modules中的依赖已经是解压后的状态&#xff0c;如果直接机械地将其简单地打包上传到Nexus&#xff0c;那么无法通过npm install下载使用。故有…