将表格数据(CSV 或 Excel 文件)加载到向量数据库(ChromaDB)中。这里定义的类 PrepareVectorDBFromTabularData
,它的主要功能是读取表格数据文件到DataFrame中、生成嵌入向量、并将这些数据存储在向量数据库的集合中,同时对注入的数据进行验证。
代码结构与功能分析
1. 类的简介
- 目标:将 CSV 或 Excel 文件的数据转换为向量并存储到 ChromaDB 中。
- 主要方法:
run_pipeline
:运行整个数据处理管道。_load_dataframe
:加载数据文件为 Pandas DataFrame。_prepare_data_for_injection
:根据文件内容生成向量和相关元数据。_inject_data_into_chromadb
:将数据注入到 ChromaDB 中。_validate_db
:验证向量数据库的集合内容。
2. 初始化 (__init__
)
def __init__(self, file_directory:str) -> None:self.APPCFG = LoadConfig()self.file_directory = file_directory
- 参数:
file_directory
:待处理文件的路径。
- 功能:
- 加载配置对象
self.APPCFG
,其中包含数据库和嵌入生成器的实例。 - 初始化文件路径
file_directory
。
- 加载配置对象
3. 运行数据处理管道 (run_pipeline
)
def run_pipeline(self):self.df, self.file_name = self._load_dataframe(file_directory=self.file_directory)self.docs, self.metadatas, self.ids, self.embeddings = self._prepare_data_for_injection(df=self.df, file_name=self.file_name)self._inject_data_into_chromadb()self._validate_db()
- 功能:
- 加载数据文件(调用
_load_dataframe
)。 - 准备向量和相关元数据(调用
_prepare_data_for_injection
)。 - 将数据注入到 ChromaDB 集合中(调用
_inject_data_into_chromadb
)。 - 验证注入是否成功(调用
_validate_db
)。
- 加载数据文件(调用
4. 加载数据文件 (_load_dataframe
)
def _load_dataframe(self, file_directory: str):file_names_with_extensions = os.path.basename(file_directory)file_name, file_extension = os.path.splitext(file_names_with_extensions)if file_extension == ".csv":df = pd.read_csv(file_directory)return df, file_nameelif file_extension == ".xlsx":df = pd.read_excel(file_directory)return df, file_nameelse:raise ValueError("The selected file type is not supported")
- 参数:
file_directory
:待加载的文件路径。
- 功能:
- 根据文件扩展名(
.csv
或.xlsx
),将文件加载为 Pandas DataFrame。 - 返回加载的数据和文件名(不带扩展名)。
- 根据文件扩展名(
- 异常:
- 如果文件类型不被支持,则抛出
ValueError
。
- 如果文件类型不被支持,则抛出
5. 准备数据 (_prepare_data_for_injection
)
def _prepare_data_for_injection(self, df:pd.DataFrame, file_name:str):docs = []metadatas = []ids = []embeddings = []for index, row in df.iterrows():output_str = ""for col in df.columns:output_str += f"{col}: {row[col]},\n"response = self.APPCFG.OpenAIEmbeddings.embed_documents(output_str)[0]embeddings.append(response)docs.append(output_str)metadatas.append({"source": file_name})ids.append(f"id{index}")return docs, metadatas, ids, embeddings
- 参数:
df
:待处理的 Pandas DataFrame。file_name
:文件名,用于生成元数据。
- 功能:
- 遍历 DataFrame 的每一行,将行数据格式化为字符串
output_str
。 - 使用
OpenAIEmbeddings.embed_documents
为字符串生成向量。 - 保存生成的文档、元数据、唯一 ID 和向量。
- 遍历 DataFrame 的每一行,将行数据格式化为字符串
- 返回值:
- 文档列表、元数据列表、ID 列表和向量列表。
6. 注入数据到 ChromaDB (_inject_data_into_chromadb
)
def _inject_data_into_chromadb(self):chroma_client = self.APPCFG.chroma_clientexisting_collections = chroma_client.list_collections()collection_name = self.APPCFG.collection_nameexisting_collection_names = [collection.name for collection in existing_collections]if collection_name in existing_collection_names:collection = chroma_client.get_collection(name=collection_name)print(f"Retrieved existing collection: {collection_name}")else:collection = chroma_client.create_collection(name=collection_name)print(f"Created new collection: {collection_name}")collection.add(documents=self.docs,metadatas=self.metadatas,embeddings=self.embeddings,ids=self.ids)print("Data is stored in ChromaDB.")
- 功能:
- 检查集合是否已存在。如果存在,则获取;否则,创建新集合。
- 将文档、元数据、嵌入向量和 ID 添加到集合中。
- 异常处理:
- 避免重复创建集合。
7. 验证数据库内容 (_validate_db
)
def _validate_db(self):vectordb = self.APPCFG.chroma_client.get_collection(name=self.APPCFG.collection_name)print("Number of vectors in vectordb:", vectordb.count())
- 功能:
- 获取集合并打印其中向量的数量,确认数据是否注入成功。
代码运行结果:
总结
这段代码的整体流程如下:
- 加载 CSV 或 Excel 文件,转换为 Pandas DataFrame。
- 遍历 DataFrame 的每一行,生成文档、元数据和嵌入向量。
- 将生成的数据注入到 ChromaDB 的集合中。
- 验证数据库集合中的向量数量,确保注入成功。
需要注意文件格式支持、嵌入生成器和 ChromaDB 客户端的兼容性问题。
完整代码:
import os
import pandas as pd
from utils.load_config import LoadConfig
import pandas as pdclass PrepareVectorDBFromTabularData:"""This class is designed to prepare a vector database from a CSV and XLSX file.It then loads the data into a ChromaDB collection. The process involvesreading the CSV file, generating embeddings for the content, and storing the data in the specified collection.Attributes:APPCFG: Configuration object containing settings and client instances for database and embedding generation.file_directory: Path to the CSV file that contains data to be uploaded."""def __init__(self, file_directory:str) -> None:"""Initialize the instance with the file directory and load the app config.Args:file_directory (str): The directory path of the file to be processed."""self.APPCFG = LoadConfig()self.file_directory = file_directorydef run_pipeline(self):"""Execute the entire pipeline for preparing the database from the CSV.This includes loading the data, preparing the data for injection, injectingthe data into ChromaDB, and validating the existence of the injected data."""self.df, self.file_name = self._load_dataframe(file_directory=self.file_directory)self.docs, self.metadatas, self.ids, self.embeddings = self._prepare_data_for_injection(df=self.df, file_name=self.file_name)self._inject_data_into_chromadb()self._validate_db()def _load_dataframe(self, file_directory: str):"""Load a DataFrame from the specified CSV or Excel file.Args:file_directory (str): The directory path of the file to be loaded.Returns:DataFrame, str: The loaded DataFrame and the file's base name without the extension.Raises:ValueError: If the file extension is neither CSV nor Excel."""file_names_with_extensions = os.path.basename(file_directory)print(file_names_with_extensions)file_name, file_extension = os.path.splitext(file_names_with_extensions)if file_extension == ".csv":df = pd.read_csv(file_directory)return df, file_nameelif file_extension == ".xlsx":df = pd.read_excel(file_directory)return df, file_nameelse:raise ValueError("The selected file type is not supported")def _prepare_data_for_injection(self, df:pd.DataFrame, file_name:str):"""Generate embeddings and prepare documents for data injection.Args:df (pd.DataFrame): The DataFrame containing the data to be processed.file_name (str): The base name of the file for use in metadata.Returns:list, list, list, list: Lists containing documents, metadatas, ids, and embeddings respectively."""docs = []metadatas = []ids = []embeddings = []for index, row in df.iterrows():output_str = ""# Treat each row as a separate chunkfor col in df.columns:output_str += f"{col}: {row[col]},\n"response = self.APPCFG.OpenAIEmbeddings.embed_documents(output_str)[0]embeddings.append(response)docs.append(output_str)metadatas.append({"source": file_name})ids.append(f"id{index}")return docs, metadatas, ids, embeddingsdef _inject_data_into_chromadb(self):"""Inject the prepared data into ChromaDB.Raises an error if the collection_name already exists in ChromaDB.The method prints a confirmation message upon successful data injection."""chroma_client = self.APPCFG.chroma_client# 列出所有集合的名称existing_collections = chroma_client.list_collections()collection_name = self.APPCFG.collection_name #"titanic_small"# 获取所有集合existing_collections = chroma_client.list_collections()# 提取集合名称existing_collection_names = [collection.name for collection in existing_collections]if collection_name in existing_collection_names:# 如果集合存在,获取它collection = chroma_client.get_collection(name=collection_name)print(f"Retrieved existing collection: {collection_name}")else:# 如果集合不存在,创建它collection = chroma_client.create_collection(name=collection_name)print(f"Created new collection: {collection_name}")collection.add(documents=self.docs,metadatas=self.metadatas,embeddings=self.embeddings,ids=self.ids)print("==============================")print("Data is stored in ChromaDB.") def _validate_db(self):"""Validate the contents of the database to ensure that the data injection has been successful.Prints the number of vectors in the ChromaDB collection for confirmation."""vectordb = self.APPCFG.chroma_client.get_collection(name=self.APPCFG.collection_name)print("==============================")print("Number of vectors in vectordb:", vectordb.count())print("==============================")