一、创建操作
1、python版本
from pymilvus import Collection, FieldSchema, DataType, CollectionSchema, connections
from pymilvus.orm import utility, dbfrom knowledge_brain.milvus_sink import milvus_sink
from study.connect import Connectclass MilvusOperatC:def __init__(self, host, port, user, password, db_name ,alias,collection_name,schema,num_shards): #,collection_name,schema,shards_numprint("加载milvus依赖")self.host = hostself.port = portself.user = userself.password = passwordself.db_name = db_nameself.alias = aliasself.collection_name = collection_nameself.schema = schemaself.num_shards = num_shardsself.collection = self.con()#创建连接"""host ipport 端口user 用户名password 密码db_name 数据库alias 别名"""def con(self):connections.connect(host=self.host,user=self.user,password=self.password,port=self.port,alias=self.alias)# 是否有该数据库,无则新建print("建立数据库连接~~~~")if self.db_name in db.list_database():passelse:print("没有%s数据库,进行新建~~~~" % self.db_name)db.create_database(self.db_name)print("新建%s数据库完成!" % self.db_name)# 使用数据库,建立数据库连接db.using_database(self.db_name)# 是否有该集合,无则创建if utility.has_collection(self.collection_name):print("集合已存在")passelse:print("没有%s集合,进行新建~~~~" % self.collection_name)self.col_create()print("新建%s集合完成!" % self.collection_name)collection = Collection(self.collection_name)print("数据库连接完成!")return collection#集合创建"""collection_name 集合名称schema 表头描述信息database 数据库信息shards_num 分片信息"""def col_create(self):# fields = [# FieldSchema(name='vec', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=1024),# FieldSchema(name='doc_slicing_id', dtype=DataType.VARCHAR, descrition='doc_slicing_id', max_length=100,# is_primary=True),# FieldSchema(name='doc_id', dtype=DataType.VARCHAR, descrition='doc_id', max_length=100)# ]# schema = CollectionSchema(fields=fields, description=self.col_name)collection = Collection(name=self.collection_name, schema=self.schema,num_shards=self.num_shards)print(collection)#索引创建def index_create(self,index_params:str,vec_field:str):# create IVF_FLAT index for collection.# index_params = {# 'metric_type': 'L2',# 'index_type': "IVF_FLAT",# 'params': {"nlist": 150}# }self.collection.create_index(field_name=vec_field, index_params=index_params)# collection.create_index(# field_name="doc_id",# index_name="doc_id_index"# )# collection.load()#分区创建def partition_create(self,partition_name:str):self.collection.create_partition(partition_name)def load(self):self.collection.load()print("数据load成功")def unload(self):self.collection.release()if __name__ == '__main__':host='XX.17.38'port='31639'user='Milvus'password='Milvus'db_name='knowledge_test'alias='default'fields = [FieldSchema(name='pk', dtype=DataType.INT64, descrition='主键', max_length=200, is_primary=True,auto_id=True),FieldSchema(name='car_model', dtype=DataType.VARCHAR, descrition='car_model', max_length=65535, is_primary=False,auto_id=False),FieldSchema(name='text', dtype=DataType.VARCHAR, descrition='page_content', max_length=65535, is_primary=False,auto_id=False),FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=1024)]schema = CollectionSchema(fields=fields, description='集合描述')# Collection('test',schema)# MilvusCreate(host,port,user,password,db_name,alias,'test',schema,2)milvus_sink(db_name,'test',1,1,1)
python调用
from pymilvus import FieldSchema, DataType, CollectionSchema
from study.MilvusOperatC import MilvusOperatCif __name__ == '__main__':host='XXX.17.38'port='31639sss'user='Milvus'password='Milvus'db_name='knowledge_test'alias='default'fields = [FieldSchema(name='pk', dtype=DataType.INT64, descrition='主键', max_length=200, is_primary=True,auto_id=True),FieldSchema(name='car_model', dtype=DataType.VARCHAR, descrition='car_model', max_length=65535,is_primary=False,auto_id=False),FieldSchema(name='text', dtype=DataType.VARCHAR, descrition='page_content', max_length=65535, is_primary=False,auto_id=False),FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=1024)]schema = CollectionSchema(fields=fields, description='集合描述')index_params = {'metric_type': 'L2',#COSINE IP'index_type': "IVF_FLAT",'params': {"nlist": 150}}#embedding字段名称vec_field = "vector"mc = MilvusOperatC(host,port,user,password,db_name,alias,'test',schema,2)mc.index_create(index_params,vec_field)mc.partition_create('2024032103')mc.load()mc.unload()
2、java版本
package com.gwm.milvus;import com.google.gson.internal.$Gson$Preconditions;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DataType;
import io.milvus.grpc.GetLoadStateResponse;
import io.milvus.grpc.GetLoadingProgressResponse;
import io.milvus.param.*;
import io.milvus.param.collection.*;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.param.partition.CreatePartitionParam;/*** @author yangyingchun* @version 1.0* @date 2024/3/20 16:02*/
public class MilvusOperateC {/*** 获取连接* @param host* @param port* @param username* @param password* @param database* @return*/public static MilvusServiceClient getConn(String host,Integer port,String username,String password,String database){MilvusServiceClient milvusServiceClient= new MilvusServiceClient(ConnectParam.newBuilder().withHost(host).withPort(port).withAuthorization(username, password).withDatabaseName(database).build());return milvusServiceClient;}/*** 创建集合* @param milvusServiceClient* @param collectionName* @param databaseName* @param shardsNum* @param description* @return*/public static Integer col_create(MilvusServiceClient milvusServiceClient,String collectionName,String databaseName,Integer shardsNum,String description){FieldType fieldType1 = FieldType.newBuilder().withName("id").withDataType(DataType.Int64).withPrimaryKey(true).withAutoID(true).build();FieldType fieldType2 = FieldType.newBuilder().withName("text").withDataType(DataType.VarChar).withMaxLength(65535) //varchar类型必填.build();FieldType fieldType3 = FieldType.newBuilder().withName("vector").withDataType(DataType.FloatVector).withDimension(1024).build();CreateCollectionParam createCollectionParam= CreateCollectionParam.newBuilder().withCollectionName(collectionName).withDatabaseName(databaseName).withShardsNum(shardsNum).withDescription(description).addFieldType(fieldType1).addFieldType(fieldType2).addFieldType(fieldType3).build();R<RpcStatus> collection = milvusServiceClient.createCollection(createCollectionParam);Integer status = collection.getStatus();return status;}/*** 创建 索引* @param milvusServiceClient* @param metricType* @param collectionName* @param fieldName* @param INDEX_TYPE* @param INDEX_PARAM* @return*/public static Integer index_create(MilvusServiceClient milvusServiceClient,MetricType metricType, String collectionName, String fieldName, IndexType INDEX_TYPE,String INDEX_PARAM){R<RpcStatus> index = milvusServiceClient.createIndex(CreateIndexParam.newBuilder().withCollectionName(collectionName).withFieldName(fieldName).withIndexType(INDEX_TYPE).withMetricType(metricType).withExtraParam(INDEX_PARAM).withSyncMode(Boolean.FALSE).build());Integer status = index.getStatus();return status;}/*** 创建分区* @param milvusServiceClient* @param collectionName* @param partitionName* @return*/public static Integer partition_create(MilvusServiceClient milvusServiceClient,String collectionName,String partitionName){R<RpcStatus> partition = milvusServiceClient.createPartition(CreatePartitionParam.newBuilder().withCollectionName(collectionName).withPartitionName(partitionName).build());Integer status = partition.getStatus();return status;}/*** load数据到内存中,load进展情况* @param milvusServiceClient* @param collectionName* @param database* @return*/public static Integer load(MilvusServiceClient milvusServiceClient,String collectionName,String database){// You can check the loading statusInteger loadstatus = -1;GetLoadStateParam loadparam = GetLoadStateParam.newBuilder().withCollectionName(collectionName).build();R<GetLoadStateResponse> responseLoad = milvusServiceClient.getLoadState(loadparam);System.out.println("当前状态:"+responseLoad.getData().getState());System.out.println("当前状态:"+responseLoad.getStatus() + R.Status.Success.getCode());if (responseLoad.getStatus() != R.Status.Success.getCode()) {System.out.println(responseLoad.getMessage());}
// System.out.println(responseLoad.getStatus());
//
// // and loading progress as well
//
// GetLoadingProgressParam loadingparam = GetLoadingProgressParam.newBuilder()
// .withCollectionName(collectionName)
// .build();
// R<GetLoadingProgressResponse> responseLoading = milvusServiceClient.getLoadingProgress(loadingparam);
// if (responseLoading.getStatus() != R.Status.Success.getCode()) {
// System.out.println(responseLoading.getMessage());
// }
// /**
// * load进展
// */
// System.out.println(responseLoading.getData().getProgress());LoadCollectionParam loadCollectionParam = LoadCollectionParam.newBuilder().withDatabaseName(database).withCollectionName(collectionName).build();R<RpcStatus> rpcStatusR = milvusServiceClient.loadCollection(loadCollectionParam);loadstatus = rpcStatusR.getStatus();return loadstatus;}/*** 释放内存数据* @param milvusServiceClient* @param collectionName* @return*/public static Integer release(MilvusServiceClient milvusServiceClient,String collectionName){R<RpcStatus> rpcStatusR = milvusServiceClient.releaseCollection(ReleaseCollectionParam.newBuilder().withCollectionName(collectionName).build());Integer status = rpcStatusR.getStatus();return status;}
}
调用
package com.gwm.milvus;import io.milvus.client.MilvusServiceClient;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;/*** @author yangyingchun* @version 1.0* @date 2024/3/20 16:50*/
public class MilvusTest {public static void main(String[] args) {String collectionName = "repositoryId_test1";String databaseName = "knowledge_cosine_test";Integer shards =2;String description = "测试java创建collection";MilvusServiceClient milvusClient =MilvusOperateC.getConn("XXX.17.38", 31630, "Milvus", "Milvus", "knowledge_test");Integer integer = MilvusOperateC.col_create(milvusClient,collectionName,databaseName,shards,description);System.out.println("collection创建状态:"+integer);String INDEX_PARAM = "{\"nlist\":1024}";Integer vector = MilvusOperateC.index_create(milvusClient, MetricType.COSINE, collectionName, "vector", IndexType.IVF_FLAT, INDEX_PARAM);System.out.println("索引创建状态:"+vector);Integer partitionCreate = MilvusOperateC.partition_create(milvusClient, collectionName, "20240320");System.out.println("分区创建状态:"+partitionCreate);Integer load = MilvusOperateC.load(milvusClient, collectionName, databaseName);System.out.println("集合load状态:"+load);Integer release = MilvusOperateC.release(milvusClient, collectionName);System.out.println("集合释放状态"+release);}
}