milvus upsert流程源码分析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Upsert 的数据流向:

在这里插入图片描述

1.客户端sdk发出Upsert API请求。

import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx:       ctx,Condition: NewTaskCondition(ctx),req:       request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator:   node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr:         node.chMgr,chTicker:      node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

msgPack变量:

在这里插入图片描述

msgPack包含了insertRequest和deleteRequest。

在这里插入图片描述

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

在这里插入图片描述

deleteRequest包含主键值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707326.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

11.题目:编号3272 小蓝的漆房

题目&#xff1a; ###本题主要考察暴力&#xff0c;枚举&#xff0c;模拟 #include<bits/stdc.h> using namespace std; const int N1e410; int a[N],b[N]; int main(){ios::sync_with_stdio(0),cin.tie(0),cout.tie(0);int num;cin>>num;//样例个数循环for(int i…

socket套接字

前言 两个应用程序如果需要进行通讯最基本的一个前提就是能够唯一的标示一个进程&#xff0c;我们知道IP层的ip地址可以唯一标示主机&#xff0c;而TCP层协议和端口号可以唯一标示主机的一个进程&#xff0c;这样我们可以利用ip地址&#xff0b;协议&#xff0b;端口号唯一标示…

许战海矩阵|佐香园:“熟酱”的爆品战略

佐香园是辽宁帝华味精食品有限公司的主导品牌&#xff0c;是一个拥有自主研发与生产经营能力的民营企业&#xff0c;主要生产和销售香料、调味料以及食品添加剂等产品。该品牌自创办以来&#xff0c;一直坚持以市场为导向&#xff0c;走专业化发展之路&#xff0c;打造全方位的…

LACP——链路聚合控制协议

LACP——链路聚合控制协议 什么是LACP&#xff1f; LACP&#xff08;Link Aggregation Control Protocol&#xff0c;链路聚合控制协议&#xff09;是一种基于IEEE802.3ad标准的实现链路动态聚合与解聚合的协议&#xff0c;它是链路聚合中常用的一种协议。 链路聚合组中启用了…

TikTok矩阵系统的功能展示:深入解析与源代码分享!

今天我来和大家说说TikTok矩阵系统&#xff0c;在当今数字化时代&#xff0c;社交媒体平台已成为人们获取信息、交流思想和娱乐放松的重要渠道&#xff0c;其中&#xff0c;TikTok作为一款全球知名的短视频社交平台&#xff0c;凭借其独特的创意内容和强大的算法推荐系统&#…

【MQ05】异常消息处理

异常消息处理 上节课我们已经学习到了消息的持久化和确认相关的内容。但是&#xff0c;光有这些还不行&#xff0c;如果我们的消费者出现问题了&#xff0c;无法确认&#xff0c;或者直接报错产生异常了&#xff0c;这些消息要怎么处理呢&#xff1f;直接丢弃&#xff1f;这就是…

带大家做一个,易上手的家常蒜香菠菜

一捆 菠菜 四瓣蒜 蒜去皮切末 菠菜切段 多清洗几次 因为菠菜上面的土真的是太多了 菠菜下锅 加水煮一分钟左右 因为菠菜内的草酸成分非常高 所以这一步肯定是要的 然后将菠菜捞出来 干和叶子分开 锅中水倒掉 清洗一下 然后起锅烧油 下蒜末炒香 然后 下菠菜干 因为干熟的…

Python + Selenium —— 网页元素定位之标签名和链接文本定位

tag name tag name 为标签名定位&#xff0c;使用网页元素的标签名如a, div, input, span 等。 但是有一个问题&#xff0c;常见的标签名比如 在同一个页面上有非常多。会不会觉得 tag name 没什么用呢&#xff1f; 当然普通的模拟操作是不大有用&#xff0c;这个重复性实在…

笔记:GO1.19 带来的优化(重新编译juicefs)

## 背景 go编写的应用程序&#xff08;juicefs&#xff09;在k8s&#xff08;docker&#xff09;中运行&#xff0c;时不时出现 OOM Killed。 ## 分析 发现某些应用使用juicefs会导致内存使用飙升&#xff1b; k8s的pod给的内存资源&#xff1a;request 2G&#xff0c;limit…

基于springboot实现线上阅读系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现线上阅读系统演示 摘要 随着社会发展速度的愈来愈快&#xff0c;以及社会压力变化的越来越快速&#xff0c;致使很多人采取各种不同的方法进行解压。大多数人的稀释压力的方法&#xff0c;是捧一本书籍&#xff0c;心情地让自己沉浸在情节里面&#xff0c;以…

基于沁恒微 ch643q 多通道采集 adc 驱动层实现

一、代码 #include "main.h"/********************************************************************** fn ADC_Function_Init** brief Initializes ADC collection.** return none*/ void ADC_Function_Init(void) {ADC_InitTypeDef ADC_InitStructure …

pdffactory pro 8中文破解版

详细介绍 PdfFactory&#xff0c;PDF文档虚拟打印机&#xff0c;无须Acrobat即可创建Adobe PDF文件&#xff0c;创建PDF文件的方法比其他方法更方便和高效。支持将多个文档整合到一个PDF文件、增加字体和便签、PDF加密、去水印、压缩优化。 FinePrint&#xff0c;Windows虚拟…

【踩坑】修复xrdp无法关闭Authentication Required验证窗口

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 问题如下&#xff0c;时不时出现&#xff0c;有时还怎么都关不掉&#xff0c;很烦&#xff1a; 解决方法一&#xff1a;命令行输入 dbus-send --typemethod_call --destorg.gnome.Shell /org/gnome/Shell org.gn…

js 面试题--事件循环event loop--宏任务和微任务

1 事件循环event loop概念&#xff1a; js 是非阻塞单线程语言&#xff0c;js在执行过程中会产生执行环境&#xff0c;执行环境会按顺序添加到执行栈中&#xff0c;先执行同步栈中的任务&#xff0c;当遇到异步任务时会添加到task队列中&#xff0c;同步栈执行完后&#xff0c…

一文读懂什么是 OCR 识别

在数字化时代&#xff0c;信息处理和数据管理是企业运营的重要环节。然而&#xff0c;手工输入信息存在效率低和准确性低的问题&#xff0c;严重影响了企业的工作流程和决策过程。因此&#xff0c;OCR&#xff08;Optical Character Recognition&#xff09;识别技术的应用变得…

【Unity】导入IAP插件后依赖冲突问题 com.android.billingclient冲突

【Unity】Attribute meta-data#com.google.android.play.billingclient.version 多版本库冲突_unity billingclient-CSDN博客 打开mainTemplate.gradle 找到dependencies { } 在里面末尾加上如下&#xff1a; configurations.all {exclude group: com.android.billingclien…

uni-app 实现拍照后给照片加水印功能

遇到个需求需要实现&#xff0c;研究了一下后写了个demo 本质上就是把拍完照后的照片放到canvas里&#xff0c;然后加上水印样式然后再重新生成一张图片 代码如下&#xff0c;看注释即可~使用的话记得还是得优化下代码 <template><view class"content"&g…

单词倒排——c语言解法

以下是题目&#xff1a; 这个题中有三个点&#xff0c; 一个是将非字母的字符转换为空格&#xff0c; 第二是如果有两个连续的空格&#xff0c; 那么就可以将这两个连续的空格变成一个空格。 第三个点就是让单词倒排。 那么我们就可以将这三个点分别封装成三个函数。 还有就是…

特征融合篇 | YOLOv8 引入通用高效层聚合网络 GELAN | YOLOv9 新模块

今天的深度学习方法专注于如何设计最合适的目标函数,以使模型的预测结果最接近真实情况。同时,必须设计一个合适的架构,以便为预测提供足够的信息。现有方法忽视了一个事实,即当输入数据经过逐层特征提取和空间转换时,会丢失大量信息。本文将深入探讨数据通过深度网络传输…

linux系统Jenkins的安装

Jenkins安装 安装上传安装包解压包首次登录要去服务器查看密码&#xff0c;更改密码选择需要安装的插件设置Admin用户和密码安装完成 安装 上传安装包 上传 jdk17 tomcat jenkins.war的安装包 . 上传 tomcat安装包解压包 解压jdk tar xf jdk-11.0.18_linux-x64_bin.tar.gz解…