今日已办
Venus 的 Trace 无感化
定义 handler
函数
fiber.Handler
的主要处理逻辑- 返回处理中出现的
error
- 返回处理中响应
json
的函数
// handler
// @Description:
// @Author xzx 2023-08-26 18:00:03
// @Param c
// @Return error
// @Return func() error : function for response json
type handler func(c *fiber.Ctx) (error, func() error)
定义TraceWrapper
函数
-
将
otel-trace
的逻辑嵌入 handler 中 -
启动 span
-
执行 handler,记录 span 的 attributes
-
根据返回的 err, jsonRespFunc 分情况讨论
-
条件 处理逻辑 err != nil && jsonRespFunc == nil 存在错误,将错误记录到Span中,结束Span,执行c.Next() err != nil && jsonRespFunc != nil 存在错误,将错误记录到Span中,结束Span,响应JSON err == nil && jsonRespFunc == nil 结束Span,执行c.Next() err == nil && jsonRespFunc != nil 结束Span,响应JSON -
代码实现
// TraceWrapper return fiber.Handler integrate report otel-trace
// @Description integrate otel-trace logic in handler
// @Author xzx 2023-08-26 18:00:03
// @Param f
// @Param opts
// @Return fiber.Handler
func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {return func(c *fiber.Ctx) error {_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)// execute handler logicerr, jsonRespFunc := f(c)// todo: setSpanAttributesif err != nil {// record span errorspan.RecordError(err)span.SetStatus(codes.Error, err.Error())span.End()if jsonRespFunc != nil {// response error resultreturn jsonRespFunc()}// ignore error, continue handlersreturn c.Next()}span.End()if jsonRespFunc != nil {// response success resultreturn jsonRespFunc()}// err == nil, jsonRespFunc == nilreturn c.Next()}
}
具体的 handler 逻辑
// SplitAndValidate split multi-events and validate uploaded data
// @Description
// @Author xzx 2023-08-26 17:54:21
// @Param c
// @Return Err
// @Return f
func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "all_upload"),)))RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))if err != nil {log.Logger().Error("Create Baggage Member Failed", zap.Error(err))}Baggage, err := baggage.New(RequestIdBaggage)if err != nil {log.Logger().Error("Create Baggage Failed", zap.Error(err))}c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))c.Accepts(fiber.MIMEMultipartForm)uploadedData, err := parseJSON(c)if err != nil {log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}if err = uploadedData.Meta.Validate(); err != nil {log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_meta"),)))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}// must use pointer when using Locals of fiber if it's about to modifyevents := make([]*schema.Event, 0, len(uploadedData.Data))parts := make([]*protocol.Part, 0, len(uploadedData.Data))for idx, data := range uploadedData.Data {log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event := &schema.Event{}part := &protocol.Part{}if err = data.Validate(); err != nil {Err = errotelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_data"),)))part.Code = protocol.ValidationErrorpart.Data = err.Error()}part.ID = data.IDevent.Meta = uploadedData.Metaevent.Data = dataevents = append(events, event)parts = append(parts, part)}c.Locals("meta", uploadedData.Meta)c.Locals("events", events)c.Locals("parts", parts)return Err, nil
}// HandleEvent handle event
// @Description
// @Author xzx 2023-08-26 17:54:23
// @Param c
// @Return Err
// @Return f
func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)for idx, event := range events {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event.BackendID = uuid.NewString()event.UploadTime = time.Now().UnixMilli()part := parts[idx]part.BackendID = event.BackendIDcountry, region, city, err := ip.ParseIP(event.IP)if err != nil {Err = errlog.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))part.Code = protocol.IPParsingErrorpart.Data = err.Error()}log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))event.Country = countryevent.Region = regionevent.City = city}return Err, nil
}// WriteKafka write to kafka
// @Description
// @Author xzx 2023-08-26 17:54:26
// @Param c
// @Return Err
// @Return f
func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))meta := c.Locals("meta").(schema.Meta)events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)// mark if all parts are succeed, which is response for codeisAllSuccess := true// topic is like to_analyzer__0.PERF_CRASHtopic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)traceparent := c.Get(traceparentHeaderKey)if len(traceparent) == 55 {spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()traceparent = traceparent[:36] + spanId + traceparent[52:]}messages := make([]kafka.Message, 0, len(events))for idx, event := range events {// skip if event was failedif parts[idx].Code != 0 {isAllSuccess = falsecontinue}bytes, err := sonic.Marshal(event)if err != nil {Err = errlog.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))parts[idx].Code = protocol.SerializationErrorparts[idx].Data = err.Error()}messages = append(messages, kafka.Message{Topic: topic,Value: bytes,Headers: []kafka.Header{{Key: traceparentHeaderKey, Value: []byte(traceparent)},},})}if len(messages) == 0 { // would not write to kafka since every part were failed for some reasonlog.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return errors.New("every data were failed to handle, check their code and data"), func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: "every data were failed to handle, check their code and data",Parts: parts,})}}log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))kafkaProducer := connector.GetEventKafka()if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))c.Status(fiber.StatusInternalServerError)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: fmt.Sprintf("failed to write to kafka: %s", err.Error()),Parts: parts,})}}if isAllSuccess {c.Status(fiber.StatusOK)otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "success_upload"))))return nil, func() error {return c.JSON(protocol.Response{Code: protocol.AllSuccess,Parts: parts,})}} else {c.Status(fiber.StatusPartialContent)return Err, func() error {return c.JSON(protocol.Response{Code: protocol.PartialSuccess,Data: "some data were failed to handle, check their code and data",Parts: parts,})}}
}
同步会议
进度
- 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
- otel上报的开关,已经 review 完合并了
- 部署了jaeger的整套方案
- 关于 watermill 和 baserunner 的 banchmark
Review和测试方案
- 代码 review
- fiber.Handler 接入 otel 无感化,代码可扩展性
- 修复部分命名规范、注释规范和代码质量检查中指出的问题
- 移除 profile-compose 的 grafana 容器
- log 的初始化
- 移除 venus-compose 的无用配置
- 测试方案
- 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
- 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
- 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
- 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
- es 已购买集群,等待接入
- 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
- 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
- 对于 ck 集群的指标
- 【插入】ck 的 写入耗时,1/5分钟的写入成功率
- 【查询】ck 的 查询耗时,查询成功率
- 【资源利用率】ck 的 cpu、内存利用率
- 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等
- 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
总结
- 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
- 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
- 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
- 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
- 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
- 扩展:benchmark 的 hyperscan 和官方正则处理的对比
数据记录、控制变量
cadvisor
services:cadvisor:image: gcr.io/cadvisor/cadvisor:latestcontainer_name: cadvisornetworks:- backendcommand: --url_base_prefix=/cadvisorvolumes:- /:/rootfs:ro- /var/run:/var/run:rw- /sys:/sys:ro- /var/lib/docker/:/var/lib/docker:ro- /dev/disk/:/dev/disk:ro
jaeger-all-in-one 分开部署
为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署
services:jaeger-collector:image: jaegertracing/jaeger-collectorcontainer_name: jaeger-collectornetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--es.num-shards=1","--es.num-replicas=0","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- elasticsearchjaeger-query:image: jaegertracing/jaeger-querycontainer_name: jaeger-querynetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--span-storage.type=elasticsearch","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090- no_proxy=localhost- QUERY_BASE_PATH=/jaegerdepends_on:- jaeger-agentjaeger-agent:image: jaegertracing/jaeger-agentcontainer_name: jaeger-agentnetworks:- backendcommand: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- jaeger-collector
明日待办
- 压测