milvus upsert流程源码分析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Upsert 的数据流向:

在这里插入图片描述

1.客户端sdk发出Upsert API请求。

import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx:       ctx,Condition: NewTaskCondition(ctx),req:       request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator:   node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr:         node.chMgr,chTicker:      node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

msgPack变量:

在这里插入图片描述

msgPack包含了insertRequest和deleteRequest。

在这里插入图片描述

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

在这里插入图片描述

deleteRequest包含主键值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707326.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

11.题目:编号3272 小蓝的漆房

题目&#xff1a; ###本题主要考察暴力&#xff0c;枚举&#xff0c;模拟 #include<bits/stdc.h> using namespace std; const int N1e410; int a[N],b[N]; int main(){ios::sync_with_stdio(0),cin.tie(0),cout.tie(0);int num;cin>>num;//样例个数循环for(int i…

socket套接字

前言 两个应用程序如果需要进行通讯最基本的一个前提就是能够唯一的标示一个进程&#xff0c;我们知道IP层的ip地址可以唯一标示主机&#xff0c;而TCP层协议和端口号可以唯一标示主机的一个进程&#xff0c;这样我们可以利用ip地址&#xff0b;协议&#xff0b;端口号唯一标示…

许战海矩阵|佐香园:“熟酱”的爆品战略

佐香园是辽宁帝华味精食品有限公司的主导品牌&#xff0c;是一个拥有自主研发与生产经营能力的民营企业&#xff0c;主要生产和销售香料、调味料以及食品添加剂等产品。该品牌自创办以来&#xff0c;一直坚持以市场为导向&#xff0c;走专业化发展之路&#xff0c;打造全方位的…

LACP——链路聚合控制协议

LACP——链路聚合控制协议 什么是LACP&#xff1f; LACP&#xff08;Link Aggregation Control Protocol&#xff0c;链路聚合控制协议&#xff09;是一种基于IEEE802.3ad标准的实现链路动态聚合与解聚合的协议&#xff0c;它是链路聚合中常用的一种协议。 链路聚合组中启用了…

Java中的==和equals()方法的区别是?hashCode()和equals()的关系是什么?

目录 解释Java中的和equals()方法。 hashCode()和equals()的关系是什么&#xff1f; 解释Java中的和equals()方法。 在Java中&#xff0c;和equals()方法都用于比较两个对象&#xff0c;但它们在比较时的侧重点和行为上有所不同。 1. **运算符&#xff1a;** - 是Java中的…

spring-cloud-openfeign oauth2拦截器默认配置

版本 spring-cloud:4.1.0 spring-security:6.2.1 依赖 添加oauth2客户端依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-oauth2-client</artifactId> </dependency>配置 spring:clou…

掌榕网融合通信产品:无代码开发,实现营销短信、客户咨询处理

{无代码开发&#xff0c;轻松连接电商平台} 在现今的快节奏电商行业中&#xff0c;掌榕网为企业带来了一种无需依赖传统编程技术即可实现电商平台的快速连接和集成的解决方案。通过使用掌榕网提供的融合通信服务&#xff0c;电商企业可以轻松地整合全球700运营商的通信资源&a…

Python 使用 仿真框架cocotb 实现FPGA板卡仿真验证

要使用 Python 结合仿真框架 Cocotb&#xff08;Co-simulation COrner TestBench&#xff09;实现 FPGA 板卡的仿真验证&#xff0c;您可以利用 Cocotb 提供的功能来编写测试台和仿真环境&#xff0c;与 Verilog/VHDL 设计进行交互并进行仿真验证。下面我将为您介绍一个简单的示…

TikTok矩阵系统的功能展示:深入解析与源代码分享!

今天我来和大家说说TikTok矩阵系统&#xff0c;在当今数字化时代&#xff0c;社交媒体平台已成为人们获取信息、交流思想和娱乐放松的重要渠道&#xff0c;其中&#xff0c;TikTok作为一款全球知名的短视频社交平台&#xff0c;凭借其独特的创意内容和强大的算法推荐系统&#…

Ubuntu20.04升级openssh9.4(源码升级)

目录 一、前景提要 二、先备份默认配置 三、卸载apt已安装OpenSSH 四、在线安装源码编译依赖环境 五、源码安装 六、还原之前的配置(如果权限安装跳过此步骤) 七、错误处理 一、前景提要 当前Linux Server上的OpenSSH版本太低&#xff0c;有漏洞而且不安全&#xff0c;…

编译opencv gpu版的条件

一、具备以下条件即可编译opencv gpu&#xff1a; 1、 终端设备必须有独立显卡。cmd窗口&#xff1a;nvidia-smi查看显卡信息 2、下载并安装CUDA Toolkit&#xff08;根据显卡下载对应的CUDA Toolkit软件&#xff09;、cuDNN&#xff08;根据CUDA版本下载对应的cuDNN&#xff0…

Vue2:编程式路由页面跳转

一、情景说明 在前面&#xff0c;我们学习了通过router-link标签&#xff0c;实现路由页面跳转 但是&#xff0c;它有局限性 就是router-link最终会被替换成<a>标签 如果&#xff0c;我们的跳转按钮是button实现的了&#xff1f; 该如何实现路由页面跳转了&#xff1f;…

【MQ05】异常消息处理

异常消息处理 上节课我们已经学习到了消息的持久化和确认相关的内容。但是&#xff0c;光有这些还不行&#xff0c;如果我们的消费者出现问题了&#xff0c;无法确认&#xff0c;或者直接报错产生异常了&#xff0c;这些消息要怎么处理呢&#xff1f;直接丢弃&#xff1f;这就是…

带大家做一个,易上手的家常蒜香菠菜

一捆 菠菜 四瓣蒜 蒜去皮切末 菠菜切段 多清洗几次 因为菠菜上面的土真的是太多了 菠菜下锅 加水煮一分钟左右 因为菠菜内的草酸成分非常高 所以这一步肯定是要的 然后将菠菜捞出来 干和叶子分开 锅中水倒掉 清洗一下 然后起锅烧油 下蒜末炒香 然后 下菠菜干 因为干熟的…

Python + Selenium —— 网页元素定位之标签名和链接文本定位

tag name tag name 为标签名定位&#xff0c;使用网页元素的标签名如a, div, input, span 等。 但是有一个问题&#xff0c;常见的标签名比如 在同一个页面上有非常多。会不会觉得 tag name 没什么用呢&#xff1f; 当然普通的模拟操作是不大有用&#xff0c;这个重复性实在…

笔记:GO1.19 带来的优化(重新编译juicefs)

## 背景 go编写的应用程序&#xff08;juicefs&#xff09;在k8s&#xff08;docker&#xff09;中运行&#xff0c;时不时出现 OOM Killed。 ## 分析 发现某些应用使用juicefs会导致内存使用飙升&#xff1b; k8s的pod给的内存资源&#xff1a;request 2G&#xff0c;limit…

Java面试题之mysql

Mysql 1. MySQL的索引原理是什么?什么是索引&#xff1f;以及索引的优缺点&#xff1f;2. 解释一下B树和B树的区别及各自定义?3. MyISAM索引和Innodb索引的区别&#xff1f;4. 什么是聚簇索引&#xff1f;辅助索引&#xff1f;5.非聚簇索引一定会回表查询么&#xff1f;6. 什…

树形数组对象扁平化与反向转换

将树形数组对象扁平化是将具有层级结构的对象数组转换成一个简单的、没有嵌套的对象数组的过程。这通常涉及到递归遍历树结构&#xff0c;并将每个节点添加到一个新的数组中。下面是一个使用JavaScript实现这一过程的例子&#xff1a; function flattenTree(tree) { let resul…

基于springboot实现线上阅读系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现线上阅读系统演示 摘要 随着社会发展速度的愈来愈快&#xff0c;以及社会压力变化的越来越快速&#xff0c;致使很多人采取各种不同的方法进行解压。大多数人的稀释压力的方法&#xff0c;是捧一本书籍&#xff0c;心情地让自己沉浸在情节里面&#xff0c;以…

基于沁恒微 ch643q 多通道采集 adc 驱动层实现

一、代码 #include "main.h"/********************************************************************** fn ADC_Function_Init** brief Initializes ADC collection.** return none*/ void ADC_Function_Init(void) {ADC_InitTypeDef ADC_InitStructure …