milvus分批写入测试数据

milvus分批写入测试数据

场景:

假如现在需要向milvus写入500W数据,调用milvus的insert api一次性写入500W数据是不现实的,内存也吃不消。

这时候需要分批写入,例如每次写入1000行,直至写完500W数据。

实现代码如下:

package mainimport ("context""fmt""github.com/milvus-io/milvus-sdk-go/v2/client""github.com/milvus-io/milvus-sdk-go/v2/entity""log""math/rand""strconv"
)const (milvusAddr     = `192.168.230.71:19530`dim            = 512collectionName = "hello_iterator"msgFmt                                     = "==== %s ====\n"idCol, randomCol, addressCol, embeddingCol = "ID", "random", "address", "embeddings"totalRows                                  = 100005batchSize                                  = 1000
)func main() {ctx := context.Background()log.Printf(msgFmt, "start connecting to Milvus")c, err := client.NewClient(ctx, client.Config{Address: milvusAddr,})if err != nil {log.Fatal("failed to connect to milvus, err: ", err.Error())}defer c.Close()// delete collection if existshas, err := c.HasCollection(ctx, collectionName)if err != nil {log.Fatalf("failed to check collection exists, err: %v", err)}if has {//c.DropCollection(ctx, collectionName)log.Println("collection exists")return}// create collectionlog.Printf(msgFmt, fmt.Sprintf("create collection, `%s`", collectionName))schema := entity.NewSchema().WithName(collectionName).WithDescription("hello_milvus is the simplest demo to introduce the APIs").WithField(entity.NewField().WithName(idCol).WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true).WithIsAutoID(false)).WithField(entity.NewField().WithName(randomCol).WithDataType(entity.FieldTypeDouble)).WithField(entity.NewField().WithName(addressCol).WithDataType(entity.FieldTypeVarChar).WithTypeParams(entity.TypeParamMaxLength, "50")).WithField(entity.NewField().WithName(embeddingCol).WithDataType(entity.FieldTypeFloatVector).WithDim(dim))if err := c.CreateCollection(ctx, schema, 1); err != nil {log.Fatalf("create collection failed, err: %v", err)}// 分批数groupNum := totalRows / batchSizeif totalRows%batchSize > 0 {groupNum++ // 如果有余数,则需要多写一批}// 分批写入数据for i := 0; i < groupNum; i++ {// 计算当前批次的起始和结束索引start := i * batchSizeend := start + batchSizeif end > totalRows {end = totalRows // 确保不会超出总数}// insert dataidList := make([]int64, 0, batchSize)randomList := make([]float64, 0, batchSize)addressList := make([]string, 0, batchSize)embeddingList := make([][]float32, 0, batchSize)// generate datafor i := start; i < end; i++ {idList = append(idList, int64(i))}for i := start; i < end; i++ {randomList = append(randomList, rand.Float64())}for i := start; i < end; i++ {addressList = append(addressList, "wuhan"+strconv.Itoa(i))}for i := start; i < end; i++ {vec := make([]float32, 0, dim)for j := 0; j < dim; j++ {vec = append(vec, rand.Float32())}embeddingList = append(embeddingList, vec)}idColData := entity.NewColumnInt64(idCol, idList)randomColData := entity.NewColumnDouble(randomCol, randomList)addressColData := entity.NewColumnVarChar(addressCol, addressList)embeddingColData := entity.NewColumnFloatVector(embeddingCol, dim, embeddingList)if _, err := c.Insert(ctx, collectionName, "", idColData, randomColData, addressColData, embeddingColData); err != nil {log.Fatalf("failed to insert random data into `hello_milvus, err: %v", err)}log.Printf("inserted:%d\n", end)}if err := c.Flush(ctx, collectionName, false); err != nil {log.Fatalf("failed to flush data, err: %v", err)}// build indexlog.Printf(msgFmt, "start creating index HNSW")idx, err := entity.NewIndexHNSW(entity.COSINE, 15, 50)if err != nil {log.Fatalf("failed to create ivf flat index, err: %v", err)}if err := c.CreateIndex(ctx, collectionName, embeddingCol, idx, false); err != nil {log.Fatalf("failed to create index, err: %v", err)}log.Printf(msgFmt, "start loading collection")err = c.LoadCollection(ctx, collectionName, false)if err != nil {log.Fatalf("failed to load collection, err: %v", err)}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL 慢 SQL 排查

作者&#xff1a;文若 前言 所谓慢SQL 是指在数据库中执行时间超过指定阈值的语句。慢查询太多&#xff0c;对于业务而言&#xff0c;是有很大风险的&#xff0c;可能随时都会因为某种原因而被触发&#xff0c;并且根据我们的经验&#xff0c;数据库最常出现的问题&#xff0…

《大数据基础》相关知识点及考点,例题

1.6大数据计算模式 1、MapReduce可以并行执行大规模数据处理任务&#xff0c;用于大规模数据集&#xff08;大于1TB&#xff09;的并行运算。MapReduce 极大地方便了分布式编程工作&#xff0c;它将复杂的、运行于大规模集群上的并行计算过程高度地抽象为两个函数一一Map和Redu…

[MySQL][复核查询][多表查询][自连接][自查询]详细讲解

目录 1.铺垫&基本查询回顾1.多表查询1.何为笛卡尔积&#xff1f;2.示例 2.自连接1.何为自连接&#xff1f;2.示例 3.子查询1.何为子查询&#xff1f;2.单行子查询3.多行子查询4.多列子查询5.在from子句中使用子查询6.合并查询 1.铺垫&基本查询回顾 前面讲解的MYSQL表的…

OPPO 2024届校招正式批笔试题-后端(C卷)

小欧的括号嵌套 题目描述 小欧想要构造一个合法的括号序列满足以下条件&#xff1a; 括号序列长度恰好为 2 n 2n 2n。括号序列的嵌套层数最大值为 r r r。 括号嵌套层数是指在一个字符串中&#xff0c;以左括号 “(” 和右括号 “)” 形成的括号对的最大嵌套深度。 输入…

获取欧洲时报中国板块前新闻数据-scrapy

这里写目录标题 1.创建项目文件二.爬虫文件编写三.管道存储四.settings文件 1.创建项目文件 创建scrapy项目的命令&#xff1a;scrapy startproject <项目名字> 示例&#xff1a; scrapy startproject myspiderscrapy genspider <爬虫名字> <允许爬取的域名>…

联邦学习(Federated learning)—— 去中心化联邦中心化联邦

提出联邦学习的目的&#xff1a;解决数据孤立问题和安全隐私问题。 联邦学习的主要思想&#xff1a;基于分布在多个设备上的数据集构建机器学习模型&#xff0c;同时防止数据泄露。&#xff08;是一种分布式机器学习方法&#xff09; 联邦学习架构 联邦学习的架构分为两种&…

修改了mybatis的xml中的sql不重启服务器如何动态加载更新

目录 一、背景 二、注意 三、代码 四、使用示例 五、其他参考博客 一、背景 开发一个报表功能&#xff0c;好几百行sql&#xff0c;每次修改完想自测下都要重启服务器&#xff0c;启动一次服务器就要3分钟&#xff0c;重启10次就要半小时&#xff0c;耗不起时间呀。于是在…

windows docker nvidia wsl2

下载驱动(GeForce Experience里的也可以)https://www.nvidia.cn/Download/index.aspx 安装wsl2https://blog.csdn.net/qq_39942341/article/details/121512900?ops_request_misc%257B%2522request%255Fid%2522%253A%2522172122816816800227436617%2522%252C%2522scm%2522%253A…

Docker构建LNMP环境并运行Wordpress平台

1.准备Nginx 上传文件 Dockerfile FROM centos:7 as firstADD nginx-1.24.0.tar.gz /opt/ COPY CentOS-Base.repo /etc/yum.repos.d/RUN yum -y install pcre-devel zlib-devel openssl-devel gcc gcc-c make && \useradd -M -s /sbin/nologin nginx && \cd /o…

沙尘传输模拟教程(基于wrf-chem)

沙尘传输模拟教程(基于wrf-chem) 文章目录 沙尘传输模拟教程(基于wrf-chem)简介实验目的wrf-chem简介 软件准备wps、wrf-chem安装conda安装ncl安装ncap安装 数据准备气象数据准备下垫面数据准备 WPS数据预处理namelist.wps的设置geogrid.exe下垫面处理ungrib.exe气象数据预处理…

docker替换主程序排错

docker替换主程序排错 背景&#xff1a;经常会遇到主程序启动错误&#xff0c;导致无法进入到容器内部排错。 替换命令 docker run --rm -it --entrypoint/bin/sh 镜像名

石油与化工行业的工业互联网平台革新之路

石油和化工工业互联网平台的变革是近年来工业发展的重要趋势之一&#xff0c;它基于工业互联网技术&#xff0c;通过数字化、网络化和智能化手段&#xff0c;推动石油和化工行业的转型升级。以下是关于石油和化工工业互联网平台变革的详细分析&#xff1a; 一、革新背景 行业…

@Valid校验前端参数

1、导入依赖。&#xff08;springmvc的stater-web和json依赖也需要添加&#xff0c;此处先不列举&#xff09; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependen…

批量下载网易云音乐歌单的Python脚本

在日常的音乐收藏和整理中,有时候我们希望能够快速地备份或下载网易云音乐中的歌曲,以便在没有网络连接的情况下也能随时听到自己喜欢的音乐。这时候,Python可以提供一种便捷的解决方案,让我们能够轻松地实现这一目标。 技术背景 本文介绍的Python脚本利用了Requests库和…

SpringSecurity + JWT 实现登录认证

目录 1. JWT 的组成和优势 2. JWT 的工作原理 2.1 生成 JWT 2.2 传输JWT 3. SpringSecurity JWT 实现登录认证 3.1 配置 Spring Security 安全过滤链 3.2 自定义登录认证过滤器 3.3 实现SpringSecurity用户对象 3.4 获取当前登录用户 1. JWT 的组成和优势 JWT 由三…

SSE(Server Sent Event)实战(3)- Spring Web Flux 实现

上篇博客 SSE&#xff08;Server Sent Event&#xff09;实战&#xff08;2&#xff09;- Spring MVC 实现&#xff0c;我们用 Spring MVC 实现了简单的消息推送&#xff0c;并且留下了两个问题&#xff0c;这篇博客&#xff0c;我们用 Spring Web Flux 实现&#xff0c;并且看…

STM32(六):STM32指南者-定时器实验

目录 一、基本概念1、常规定时器2、内核定时器 二、基本定时器实验1、实验说明2、编程过程&#xff08;1&#xff09;配置LED&#xff08;2&#xff09;配置定时器&#xff08;3&#xff09;设定中断事件&#xff08;4&#xff09;主函数计数 3、工程代码 三、通用定时器实验实…

【Neural signal processing and analysis zero to hero】- 2

Nonstationarities and effects of the FT course from youtube: 传送地址 why we need extinguish stationary and non-stationary signal, because most of neural signal is non-stationary. Welch’s method for smooth spectral decomposition Full FFT method y…

【TDA4板端部署】基于 Pytorch 训练并部署 ONNX 模型在 TDA4

1 将torch模型转onnx模型 Ti转换工具只支持以下格式&#xff1a; Caffe - 0.17 (caffe-jacinto in gitHub) Tensorflow - 1.12 ONNX - 1.3.0 (opset 9 and 11) TFLite - Tensorflow 2.0-Alpha 基于 Tensorflow、Pytorch、Caffe 等训练框架&#xff0c;训练模型&#xff1a;选择…

数据结构与算法(2):顺序表与链表

1.前言 哈喽大家好喔&#xff0c;今天博主继续进行数据结构的分享与学习&#xff0c;今天的主要内容是顺序表与链表&#xff0c;是最简单但又相当重要的数据结构&#xff0c;为以后的学习有重要的铺垫&#xff0c;希望大家一起交流学习&#xff0c;互相进步&#xff0c;让我们…