腾讯mini项目-【指标监控服务重构】2023-08-26

今日已办

Venus 的 Trace 无感化

定义 handler 函数

  1. fiber.Handler 的主要处理逻辑
  2. 返回处理中出现的 error
  3. 返回处理中响应 json 的函数
// handler
// @Description:
// @Author xzx 2023-08-26 18:00:03
// @Param c
// @Return error
// @Return func() error : function for response json
type handler func(c *fiber.Ctx) (error, func() error)

定义TraceWrapper函数

  1. otel-trace 的逻辑嵌入 handler 中

  2. 启动 span

  3. 执行 handler,记录 span 的 attributes

  4. 根据返回的 err, jsonRespFunc 分情况讨论

  5. 条件处理逻辑
    err != nil && jsonRespFunc == nil存在错误,将错误记录到Span中,结束Span,执行c.Next()
    err != nil && jsonRespFunc != nil存在错误,将错误记录到Span中,结束Span,响应JSON
    err == nil && jsonRespFunc == nil结束Span,执行c.Next()
    err == nil && jsonRespFunc != nil结束Span,响应JSON
  6. 代码实现

// TraceWrapper return fiber.Handler integrate report otel-trace
// @Description integrate otel-trace logic in handler
// @Author xzx 2023-08-26 18:00:03
// @Param f
// @Param opts
// @Return fiber.Handler
func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {return func(c *fiber.Ctx) error {_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)// execute handler logicerr, jsonRespFunc := f(c)// todo: setSpanAttributesif err != nil {// record span errorspan.RecordError(err)span.SetStatus(codes.Error, err.Error())span.End()if jsonRespFunc != nil {// response error resultreturn jsonRespFunc()}// ignore error, continue handlersreturn c.Next()}span.End()if jsonRespFunc != nil {// response success resultreturn jsonRespFunc()}// err == nil, jsonRespFunc == nilreturn c.Next()}
}

具体的 handler 逻辑

// SplitAndValidate split multi-events and validate uploaded data
// @Description
// @Author xzx 2023-08-26 17:54:21
// @Param c
// @Return Err
// @Return f
func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "all_upload"),)))RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))if err != nil {log.Logger().Error("Create Baggage Member Failed", zap.Error(err))}Baggage, err := baggage.New(RequestIdBaggage)if err != nil {log.Logger().Error("Create Baggage Failed", zap.Error(err))}c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))c.Accepts(fiber.MIMEMultipartForm)uploadedData, err := parseJSON(c)if err != nil {log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}if err = uploadedData.Meta.Validate(); err != nil {log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_meta"),)))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}// must use pointer when using Locals of fiber if it's about to modifyevents := make([]*schema.Event, 0, len(uploadedData.Data))parts := make([]*protocol.Part, 0, len(uploadedData.Data))for idx, data := range uploadedData.Data {log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event := &schema.Event{}part := &protocol.Part{}if err = data.Validate(); err != nil {Err = errotelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_data"),)))part.Code = protocol.ValidationErrorpart.Data = err.Error()}part.ID = data.IDevent.Meta = uploadedData.Metaevent.Data = dataevents = append(events, event)parts = append(parts, part)}c.Locals("meta", uploadedData.Meta)c.Locals("events", events)c.Locals("parts", parts)return Err, nil
}// HandleEvent handle event
// @Description
// @Author xzx 2023-08-26 17:54:23
// @Param c
// @Return Err
// @Return f
func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)for idx, event := range events {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event.BackendID = uuid.NewString()event.UploadTime = time.Now().UnixMilli()part := parts[idx]part.BackendID = event.BackendIDcountry, region, city, err := ip.ParseIP(event.IP)if err != nil {Err = errlog.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))part.Code = protocol.IPParsingErrorpart.Data = err.Error()}log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))event.Country = countryevent.Region = regionevent.City = city}return Err, nil
}// WriteKafka write to kafka
// @Description
// @Author xzx 2023-08-26 17:54:26
// @Param c
// @Return Err
// @Return f
func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))meta := c.Locals("meta").(schema.Meta)events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)// mark if all parts are succeed, which is response for codeisAllSuccess := true// topic is like to_analyzer__0.PERF_CRASHtopic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)traceparent := c.Get(traceparentHeaderKey)if len(traceparent) == 55 {spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()traceparent = traceparent[:36] + spanId + traceparent[52:]}messages := make([]kafka.Message, 0, len(events))for idx, event := range events {// skip if event was failedif parts[idx].Code != 0 {isAllSuccess = falsecontinue}bytes, err := sonic.Marshal(event)if err != nil {Err = errlog.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))parts[idx].Code = protocol.SerializationErrorparts[idx].Data = err.Error()}messages = append(messages, kafka.Message{Topic: topic,Value: bytes,Headers: []kafka.Header{{Key: traceparentHeaderKey, Value: []byte(traceparent)},},})}if len(messages) == 0 { // would not write to kafka since every part were failed for some reasonlog.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return errors.New("every data were failed to handle, check their code and data"), func() error {return c.JSON(protocol.Response{Code:  protocol.AllFail,Data:  "every data were failed to handle, check their code and data",Parts: parts,})}}log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))kafkaProducer := connector.GetEventKafka()if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))c.Status(fiber.StatusInternalServerError)return err, func() error {return c.JSON(protocol.Response{Code:  protocol.AllFail,Data:  fmt.Sprintf("failed to write to kafka: %s", err.Error()),Parts: parts,})}}if isAllSuccess {c.Status(fiber.StatusOK)otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "success_upload"))))return nil, func() error {return c.JSON(protocol.Response{Code:  protocol.AllSuccess,Parts: parts,})}} else {c.Status(fiber.StatusPartialContent)return Err, func() error {return c.JSON(protocol.Response{Code:  protocol.PartialSuccess,Data:  "some data were failed to handle, check their code and data",Parts: parts,})}}
}

同步会议

进度

  1. 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
  2. otel上报的开关,已经 review 完合并了
  3. 部署了jaeger的整套方案
  4. 关于 watermill 和 baserunner 的 banchmark

Review和测试方案

  1. 代码 review
    1. fiber.Handler 接入 otel 无感化,代码可扩展性
    2. 修复部分命名规范、注释规范和代码质量检查中指出的问题
    3. 移除 profile-compose 的 grafana 容器
    4. log 的初始化
    5. 移除 venus-compose 的无用配置
  2. 测试方案
    1. 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
      1. 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
    2. 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
      1. 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
      2. es 已购买集群,等待接入
      3. 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
      4. 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
    3. 对于 ck 集群的指标
      1. 【插入】ck 的 写入耗时,1/5分钟的写入成功率
      2. 【查询】ck 的 查询耗时,查询成功率
      3. 【资源利用率】ck 的 cpu、内存利用率
    4. 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等

总结

  1. 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
  2. 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
  3. 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
  4. 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
  5. 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
  6. 扩展:benchmark 的 hyperscan 和官方正则处理的对比

数据记录、控制变量

cadvisor

image-20230826231631007

services:cadvisor:image: gcr.io/cadvisor/cadvisor:latestcontainer_name: cadvisornetworks:- backendcommand: --url_base_prefix=/cadvisorvolumes:- /:/rootfs:ro- /var/run:/var/run:rw- /sys:/sys:ro- /var/lib/docker/:/var/lib/docker:ro- /dev/disk/:/dev/disk:ro

jaeger-all-in-one 分开部署

为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署

services:jaeger-collector:image: jaegertracing/jaeger-collectorcontainer_name: jaeger-collectornetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--es.num-shards=1","--es.num-replicas=0","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- elasticsearchjaeger-query:image: jaegertracing/jaeger-querycontainer_name: jaeger-querynetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--span-storage.type=elasticsearch","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090- no_proxy=localhost- QUERY_BASE_PATH=/jaegerdepends_on:- jaeger-agentjaeger-agent:image: jaegertracing/jaeger-agentcontainer_name: jaeger-agentnetworks:- backendcommand: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- jaeger-collector

明日待办

  1. 压测

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/87739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

和 Node.js 说拜拜,Deno零配置解决方案

不知道大家注意没有,在我们启动各种类型的 Node repo 时,root 目录很快就会被配置文件塞满。例如,在最新版本的 Next.js 中,我们就有 next.config.js、eslintrc.json、tsconfig.json 和 package.json。而在样式那边,还…

Spring面试题9:Spring的BeanFactory和FactoryBean的区别和联系

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说Spring的BeanFactory和FactoryBean的区别和联系 区别:BeanFactory是一个工厂接口,主要负责管理和创建Bean实例。它是Spring提供的最底层的…

优维产品最佳实践:主机合规性检查

我们常常会感到这样的困惑,为什么这么多的无效主机记录,为什么这些主机很多信息空白,当许多人一起维护主机信息时,常常会出现信息错漏的情况。主机是运维最重要最基本的CMDB信息,而「合规性检查」为我们提供了更高效便…

Spring Cloud Alibaba Gateway 简单使用

文章目录 Spring Cloud Alibaba Gateway1.Gateway简介2. 流量网关和服务网关的区别3. Spring Cloud Gateway 网关的搭建3.1 Spring Cloud Gateway 配置项的说明3.2 依赖导入3.3 配置文件 Spring Cloud Alibaba Gateway 1.Gateway简介 Spring Cloud Gateway是一个基于Spring F…

linux之df命令 查看分区大小

linux命令df linux命令:检查文件系统的磁盘空间占用情况。 格式:df [选项] 说明:df命令可显示所有文件系统对I节点和磁盘块的使用情况。 命令中各个选项的含义: df -a:显示所有文件系统的磁盘使用情况,包括…

混合Rollup:探秘 Metis、Fraxchain、Aztec、Miden和Ola

1. 引言 混合Rollup为新的以太坊L2扩容方案,其分为2大类: 将乐观与ZK技术结合的混合Rollup同时支持公开智能合约 和 私人智能合约 的混合Rollup 本文将重点关注Metis、Fraxchain、Aztec、Miden和Ola这五大项目。 2. 何为混合Rollup? 混合…

MySQL-树型结构数据查询

表结构 进行树形结构查询,利用特殊语法进行操作 with recursive t as(select parent_id , business_namefrom business_line where id 21union allselect a.parent_id, a.business_namefrom business_line a join t on a.id t.parent_id) select business_name f…

[AI Agent学习] MetaGPT源码浅析

前言 工作上,需要使用AI Agent,所以需要深入学习一下AI Agent,光阅读各种文章,总觉无法深入细节,所以开看各类AI Agent相关的开源项目,此为第一篇,学习一下MetaGPT的源码。 基本目标 MetaGPT是一…

【深度学习实验】前馈神经网络(八):模型评价(自定义支持分批进行评价的Accuracy类)

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. __init__(构造函数) 2. update函数(更新评价指标) 5. accumulate(计算准确率) 4. reset(重置评价指标) 5. 构造数据进行测试 6. 代码整合 一、实验介绍 本文将实…

进行 XSS 攻击 和 如何防御

跨站脚本攻击(XSS 攻击)是 Web 开发中最危险的攻击之一。以下是它们的工作原理以及防御方法。 XSS 攻击 跨站脚本攻击就是在另一个用户的计算机上运行带有恶意的 JS 代码。假如我们的程序没有对这些恶意的脚本进行防御的话,他们就会由我们的…

【Java】泛型 之 super通配符

我们前面已经讲到了泛型的继承关系&#xff1a;Pair<Integer>不是Pair<Number>的子类。 考察下面的set方法&#xff1a; void set(Pair<Integer> p, Integer first, Integer last) {p.setFirst(first);p.setLast(last); }传入Pair<Integer>是允许的&…

李宏毅hw-10 ——adversarial attack

一、查漏补缺&#xff1a; 1.关于glob.glob的用法&#xff0c;返回一个文件路径的 列表&#xff1a; 当然&#xff0c;再套用1个sort&#xff0c;就是将所有的文件路径按照字母进行排序了 2.relpath relative_path返回相对于基准路径的相对路径的函数 二、代码剖析&#xff…

Mybatis连接DB2数据库时,FETCH FIRST {n} ROWS ONLY不能参数化解决

Mybatis连接DB2数据为时 ......WHERE ROW_NUM_HAHA > #{start,jdbcTypeNUMERIC} FETCH FIRST #{pageSize,jbdcTypeNUMERIC} ROWS ONLY...... 如果像上面这样写是不行的。查过资料后&#xff0c;才发现FETCH FIRST后面的值是不能参数化的&#xff0c;只能写死。而Mybatis中…

STM32单片机入门学习(四)-蜂鸣器

蜂鸣器接线 低平蜂鸣器&#xff0c;低电平发声&#xff0c;高电平不发声&#xff0c; 三个排针&#xff0c;VCC接3.3v&#xff0c;GND接地&#xff0c;I/O接A0口&#xff0c;如图&#xff1a; 蜂鸣器代码&#xff1a;响一秒停半秒 #include "stm32f10x.h" #includ…

MySQL 排序规则

文章目录 1.简介2.支持的排序规则3.设置排序规则4.中文排序规则参考文献 1.简介 字符集是一组符号和编码。排序规则是一组用于比较字符集中的字符的规则。 每个 MySQL 字符集可以支持一个或者多个排序规则&#xff0c;用于定义每个字符的比较规则&#xff0c;包括是否区分大小…

软考高级系统架构设计师系列论文真题八:论企业集成平台的技术与应用

软考高级系统架构设计师系列论文真题八:论企业集成平台的技术与应用 一、论企业集成平台的技术与应用二、找准核心论点三、理论素材准备四、精品范文赏析1.摘要2.正文3.总结软考高级系统架构设计师系列论文之:百篇软考高级架构设计师论文范文软考高级系统架构设计师系列之:论…

LRU、LFU 内存淘汰算法的设计与实现

1、背景介绍 LRU、LFU都是内存管理淘汰算法&#xff0c;内存管理是计算机技术中重要的一环&#xff0c;也是多数操作系统中必备的模块。应用场景&#xff1a;假设 给定你一定内存空间&#xff0c;需要你维护一些缓存数据&#xff0c;LRU、LFU就是在内存已经满了的情况下&#…

以容器方式运行 windows 图形化界面系统,附docker详细配置步骤和yaml完整执行文件

以容器方式运行 windows 图形化界面系统,附docker详细配置步骤和yaml完整执行文件。 常规普通的docker中运行windows系统,只能运行无界面化的系统,例如: 要在Docker中运行Windows应用程序,需要使用Windows容器。以下是一些步骤: 确认您的操作系统支持Docker桌面应用程…

2023-2024年最新大数据学习路线

文章目录 2023-2024年最新大数据学习路线大数据开发入门*01*阶段案例实战 大数据核心基础*02*阶段案例实战 千亿级数仓技术*03*阶段项目实战 PB级内存计算04阶段项目实战 亚秒级实时计算*05*阶段项目实战 大厂面试*06* 2023-2024年最新大数据学习路线 新路线图在Spark一章不再…

Android跨进程通信:Binder机制原理

目录 1. Binder到底是什么&#xff1f; 2. 知识储备 2.1 进程空间划分 2.2 进程隔离 & 跨进程通信&#xff08; IPC &#xff09; 2.3 内存映射 2.3.1 作用 2.3.2 实现过程 2.3.3 特点 2.3.4 应用场景 2.3.5 实例讲解 ① 文件读 / 写操作 ② 跨进程通信 3. Bi…