Delete api写s3的流程
milvus版本:v2.3.2
整体架构:
Delete 的数据流向
delete相关配置
dataNode:segment:insertBufSize: 16777216 # Max buffer size to flush for a single segment.deleteBufBytes: 67108864 # Max buffer size to flush del for a single channelsyncPeriod: 600 # The period to sync segments if buffer is not empty.
当collection已经有flushed文件,如果后续有insert和delete操作,这个配置文件控制这个行为。
s3文件不支持进行文件内容的编辑。因此需要有一种机制能够进行insert和delete。
delete在内存中(buffer)的流程
堆栈:
start()(internal\util\flowgraph\node.go)|--go nodeCtx.work()(同上)|--n.Operate(input)(同上)|--dn.bufferDeleteMsg()(internal\datanode\flow_graph_delete_node.go)|--dn.delBufferManager.StoreNewDeletes()(同上)
func (m *DeltaBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,tss []Timestamp, tr TimeRange, startPos, endPos *msgpb.MsgPosition,
) {// 获取delDataBufbuffer, loaded := m.Load(segID)// 如果不存在则新建if !loaded {buffer = newDelDataBuf(segID)}// 将pks存入buffersize := buffer.Buffer(pks, tss, tr, startPos, endPos)m.pushOrFixHeap(segID, buffer)m.updateMeta(segID, buffer)m.usedMemory.Add(size)metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(len(pks)))
}
pks存储的是主键值。
这是对内存的操作。
delete写入s3的流程
Start()(internal\util\flowgraph\node.go)|--go nodeCtx.work()(同上)|--n.Operate(input)(同上)|--dn.flushManager.flushDelData()(internal\datanode\flow_graph_delete_node.go)|--m.handleDeleteTask()(internal\datanode\flush_manager.go)|--m.getFlushQueue(segmentID).enqueueDelFlush()(同上)|--q.getFlushTaskRunner(pos).runFlushDel()(同上)|--runFlushDel()(internal\datanode\flush_task.go)|--task.flushDeleteData()(同上)|--t.MultiWrite(ctx, t.data)(internal\datanode\flush_manager.go)
当达到syncPeriod或者buffer满或者执行flush操作,会触发写s3操作。
看看这个函数flushDelData():
// notify flush manager del buffer data
func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID,pos *msgpb.MsgPosition,
) error {// del signal with empty dataif data == nil || data.delData == nil {m.handleDeleteTask(segmentID, &flushBufferDeleteTask{}, nil, pos)return nil}collID, partID, err := m.getCollectionAndPartitionID(segmentID)if err != nil {return err}// 编码解码器,提供序列化,反序列化功能delCodec := storage.NewDeleteCodec()// 序列化blob, err := delCodec.Serialize(collID, partID, segmentID, data.delData)if err != nil {return err}logID, err := m.AllocOne()if err != nil {log.Error("failed to alloc ID", zap.Error(err))return err}blobKey := metautil.JoinIDPath(collID, partID, segmentID, logID)blobPath := path.Join(m.ChunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)// 合成kvskvs := map[string][]byte{blobPath: blob.Value[:]}data.LogSize = int64(len(blob.Value))data.LogPath = blobPathlog.Info("delete blob path", zap.String("path", blobPath))m.handleDeleteTask(segmentID, &flushBufferDeleteTask{ChunkManager: m.ChunkManager,data: kvs,}, data, pos)return nil
}
delCodec.Serialize()返回的变量blob类型为*Blob。
// Blob is a pack of key&value
type Blob struct {Key stringValue []byteSize int64RowNum int64
}
blobPath为s3的文件路径。
files/delta_log/447918772784340274/447918772784340275/447918772784340283/447918772783939606
delta_log存储的是insert和delete增量数据。
s3的截图:
总结
1.delete/insert增量数据写入buffer。
2.满足一定条件buffer刷入s3。