milvus版本:v2.3.2
整体架构:
Upsert 的数据流向:
1.客户端sdk发出Upsert API请求。
import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)
2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。
注意这里是dmQueue。DDL类型的是ddQueue。
代码路径:internal\proxy\impl.go
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx: ctx,Condition: NewTaskCondition(ctx),req: request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator: node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr: node.chMgr,chTicker: node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}
3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。
PreExecute()一般为参数校验等工作。
Execute()为真正执行逻辑。
PostExecute()执行完后的逻辑,什么都不做,返回nil。
代码路径:internal\proxy\task_upsert.go
func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs: it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}
msgPack变量:
msgPack包含了insertRequest和deleteRequest。
insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。
deleteRequest包含主键值。