今日已办
watermill
将 key 设置到 message 中
修改 watermill-kafka
源码 将 key 设置到 message.metadata中
接入 otel-sdk
- 添加 middleware
resolveUpstreamCtx
解析上游上下文,开启根Span - 添加 middleware
middleware.InstantAck
- 马上ACK,使得多条消息可以平行处理(走middleware 和 handler 的逻辑)
// Package pubsub
// @Author xzx 2023/8/12 10:01:00
package pubsubimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.opentelemetry.io/otel""go.opentelemetry.io/otel/attribute""go.opentelemetry.io/otel/propagation"semconv "go.opentelemetry.io/otel/semconv/v1.17.0""go.opentelemetry.io/otel/trace""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/otelclient""profile/internal/schema""profile/internal/schema/performance""profile/internal/state""profile/internal/watermill/watermillkafka"
)// consumeCtxData
// @Description: Data collection of a message processing context
// @Author xzx 2023-08-17 13:36:12
type consumeCtxData struct {Status intEvent schema.EventRootSpan trace.SpanRootSpanCtx context.ContextAppID string // API 上报FetchScenario string // API 上报
}// resolveUpstreamCtx
// @Description
// @Author xzx 2023-08-18 11:15:09
// @Param h
// @Return message.HandlerFunc
func resolveUpstreamCtx(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data consumeCtxData// get upstream producer's W3C trace context via propagationheaderCarrier := make(propagation.HeaderCarrier)headerCarrier.Set("Traceparent", msg.Metadata.Get("Traceparent"))upstreamProducerCtx := otel.GetTextMapPropagator().Extract(msg.Context(), headerCarrier)// set traceID to consumer contextconsumerCtx := trace.ContextWithRemoteSpanContext(msg.Context(),trace.NewSpanContext(trace.SpanContextConfig{TraceID: trace.SpanContextFromContext(upstreamProducerCtx).TraceID(),}))//start tracingdata.RootSpanCtx, data.RootSpan = otelclient.ConsumerTracer.Start(consumerCtx, "Profile-Consumer",trace.WithSpanKind(trace.SpanKindConsumer),trace.WithLinks(trace.LinkFromContext(upstreamProducerCtx, semconv.OpentracingRefTypeFollowsFrom)))msg.SetContext(context.WithValue(msg.Context(), "data", &data))return h(msg)}
}// unpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param hmsg.SetContext(context.WithValue(msg.Context(), "data", data))
// @Return message.HandlerFunc
func unpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)unpackKafkaMessageCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "unpackKafkaMessage",trace.WithSpanKind(trace.SpanKindClient))// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(unpackKafkaMessageCtx, "unmarshal error", contextErr)span.End()return nil, contextErr}log.Logger.InfoContext(unpackKafkaMessageCtx, "[UnpackKafkaItem] unpack kafka item success",zap.Any("event", data.Event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))setSpanAttributes(span, data)msg.SetContext(context.WithValue(msg.Context(), "data", data))span.End()return h(msg)}
}// initPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func initPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)initPerformanceEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "initPerformanceEvent",trace.WithSpanKind(trace.SpanKindInternal))event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorhandlerErr(initPerformanceEventCtx, "event factory error", contextErr)span.End()return nil, contextErr}log.Logger.InfoContext(initPerformanceEventCtx, "[initPerformanceEvent] init performance event success",zap.Any("event", event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))data.Event.ProfileData = eventsetSpanAttributes(span, data)msg.SetContext(context.WithValue(msg.Context(), "data", data))span.End()return h(msg)}
}// analyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func analyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)analyzeEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "analyzeEvent",trace.WithSpanKind(trace.SpanKindInternal))contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorhandlerErr(analyzeEventCtx, "analyze event error", contextErr)span.End()return nil, contextErr}log.Logger.InfoContext(analyzeEventCtx, "[analyzeEvent] analyze performance event success",zap.Any("event", data.Event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilsetSpanAttributes(span, data)msg.SetContext(context.WithValue(msg.Context(), "data", data))span.End()return h(msg)}
}// crashHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func crashHandler(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",trace.WithSpanKind(trace.SpanKindProducer))toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(writeKafkaCtx, "marshal error", contextErr)span.End()return nil, contextErr}msg = message.NewMessage(data.Event.BackendID, toWriteBytes)msg.Metadata.Set(watermillkafka.HeaderKey, data.Event.ID)log.Logger.Info("[4-crashHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))setSpanAttributes(span, data)span.End()data.RootSpan.End()return message.Messages{msg}, nil
}// lagHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func lagHandler(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(*consumeCtxData)writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",trace.WithSpanKind(trace.SpanKindProducer))toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(writeKafkaCtx, "marshal error", contextErr)span.End()return nil, contextErr}msg = message.NewMessage(data.Event.BackendID, toWriteBytes)log.Logger.Info("[4-lagHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))setSpanAttributes(span, data)span.End()data.RootSpan.End()return message.Messages{msg}, nil
}// setSpanAttributes
// @Description setSpanAttributes
// @Author xzx 2023-08-03 23:19:17
// @Param span
// @Param profileCtx
func setSpanAttributes(span trace.Span, data *consumeCtxData) {if span.IsRecording() {span.SetAttributes(attribute.String("event.category", data.Event.Category),attribute.String("event.backend_id", data.Event.BackendID),)}
}// handlerErr
// @Description
// @Author xzx 2023-07-20 15:36:46
// @Param span
// @Param ctx
// @Param msg
// @Param err
func handlerErr(ctx context.Context, msg string, err error) {log.Logger.ErrorContext(ctx, msg, zap.Error(err))
}
会议纪要
进度
- venus 的 metrics 独立分支开发
- venus 的 trace 修复了一些bug
- 返回 error 主动调用 span.end()
- profile 的 watemill pub/sub 和 trace 上报还原原本功能
- profile 的 hyperscan 的继续调研中
待办
- 调研如何关闭otel,设置开关配置
- 找到了 sdktrace.WithSampler(sdktrace.NeverSample()) - 可以不上报 trace
- 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
- 调研 watermill 的 otelEnabled 的功能,其他集成 otel 的第三方库等
- hyperscan 的 benchmark
明日待办
- 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
- 在
watermill-kafka
的源码基础上对 publisher 的 写回 kafka 添加otel - trace & log
的逻辑