知识库搭建
- 1 介绍
- 词向量
- 向量数据库
- 2 使用embedding API
- 3 数据处理
- 数据加载
- 数据清洗
- 文档分割
- 4 搭建并使用向量数据库
- 4.1 自定义embedding封装
- 4.2 chroma数据库
- 4.3 向量检索
详细代码参考:https://github.com/lin902/llm-application
1 介绍
词向量
词向量就是把词、句、文段通过向量的形式表示出语义信息。比如猫和狗在向量相似度上就比猫和苹果要近。词向量的优势是可以通过向量存储、相似度语义上的查询(而非关键词搜索);同时可以将多模态的信息映射成统一的向量形式。
向量数据库
存储词向量的数据库。比如图数据库neo4j,本次要用的langchain数据库chroma.
2 使用embedding API
智谱
from dotenv import load_dotenv, find_dotenv
from zhipuai import ZhipuAI
import os_ = load_dotenv(find_dotenv())def zhipu_embedding(text: str):api_key = os.environ['ZHIPUAI_API_KEY']client = ZhipuAI(api_key=api_key)response = client.embeddings.create(model="embedding-2",input=text,)return responsetext = '要生成 embedding 的输入文本,字符串形式。'
response = zhipu_embedding(text=text)
print(f'response类型为:{type(response)}')
print(f'embedding类型为:{response.object}')
print(f'生成embedding的model为:{response.model}')
print(f'生成的embedding长度为:{len(response.data[0].embedding)}')
print(f'embedding(前10)为: {response.data[0].embedding[:10]}')
3 数据处理
数据加载
# 先看一下路径
# 计算上一级目录的路径
current_path = os.getcwd()
parent_path = os.path.dirname(current_path)# 获取上一级目录下的所有文件和文件夹
file_list = os.listdir(parent_path)# 输出上一级目录下的全部文件名
print("上一级目录下的文件名:")
for file in file_list:print(file)
pdf的文档提取
from langchain.document_loaders.pdf import PyMuPDFLoader
# print(os.getcwd())
# print(os.path.dirname(os.getcwd()))
# 创建一个 PyMuPDFLoader Class 实例,输入为待加载的 pdf 文档路径
loader = PyMuPDFLoader("../llm-universe/data_base/knowledge_db/pumkin_book/pumpkin_book.pdf") # 路径一定要对# 调用 PyMuPDFLoader Class 的函数 load 对 pdf 文件进行加载
pdf_pages = loader.load()
markdown文档提取
from langchain.document_loaders.markdown import UnstructuredMarkdownLoaderloader = UnstructuredMarkdownLoader("../llm-universe/data_base/knowledge_db/prompt_engineering/1. 简介 Introduction.md")
md_pages = loader.load()
数据清洗
可以看到,加载后的内容有句间的换行\n,还有标点符号·,把这些符号替换成空格。注意\n在文字与符号之间是要保留的,所以我们只删除文字与文字之间、符号与符号之间的换行\n.
# 处理掉字符之间换行/n,^\u4e00-\u9fff指非文字
import re
# print(pdf_page)
pattern = re.compile(r'[^\u4e00-\u9fff](\n)[^\u4e00-\u9fff]', re.DOTALL)
# 换行符(\n)替换为空字符串,但仅当这些换行符位于非汉字字符之间时。
pdf_page.page_content = re.sub(pattern, lambda match: match.group(0).replace('\n', ''), pdf_page.page_content)
print(pdf_page.page_content)
#
# 句间换行没有了
# 处理句中字之间换行符号
import re# 编译一个正则表达式模式,匹配汉字之间的换行符
pattern = re.compile(r'[\u4e00-\u9fff]+(\n)+[\u4e00-\u9fff]+', re.DOTALL)# 使用re.sub()函数替换匹配到的模式,将换行符替换为空字符串
pdf_page.page_content = re.sub(pattern, lambda match: match.group().replace('\n', ''), pdf_page.page_content)print(pdf_page.page_content)pdf_page.page_content = pdf_page.page_content.replace('•', '')
pdf_page.page_content = pdf_page.page_content.replace(' ', '')
# pdf_page.page_content = pdf_page.page_content.replace('\n', '') 不能直接全换,字与符号之间的换行有必要留
print(pdf_page.page_content)
# 处理 · 和空格
#print(pdf_page)# 段与段之间还有换行符
md_page.page_content = md_page.page_content.replace('\n\n', '\n')
print(md_page.page_content)
文档分割
模型上下文长度受限,因此在构建知识库的时候,需要对文档进行分割,分割成若干个chunk,转化成词向量,保存在数据库中。
如何对文档进行分割,其实是数据处理中最核心的一步,其往往决定了检索系统的下限。但是,如何选择分割方式,往往具有很强的业务相关性——针对不同的业务、不同的源数据,往往需要设定个性化的文档分割方式。
递归字符文本分割器: 通过递归的方式进行,意味着它会重复地将文本分割,直到满足特定的条件
'''
* RecursiveCharacterTextSplitter 递归字符文本分割
RecursiveCharacterTextSplitter 将按不同的字符递归地分割(按照这个优先级["\n\n", "\n", " ", ""]),这样就能尽量把所有和语义相关的内容尽可能长时间地保留在同一位置
RecursiveCharacterTextSplitter需要关注的是4个参数:* separators - 分隔符字符串数组
* chunk_size - 每个文档的字符数量限制
* chunk_overlap - 两份文档重叠区域的长度
* length_function - 长度计算函数
'''
#导入文本分割器
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 知识库中单段文本长度
CHUNK_SIZE = 500# 知识库中相邻文本重合长度
OVERLAP_SIZE = 50
# 使用递归字符文本分割器 通过递归的方式进行,意味着它会重复地将文本分割,直到满足特定的条件
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE,chunk_overlap=OVERLAP_SIZE
)
text_splitter.split_text(pdf_page.page_content[0:1000])
# 使用递归字符文本分割器 通过递归的方式进行,意味着它会重复地将文本分割,直到满足特定的条件
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE,chunk_overlap=OVERLAP_SIZE
)
text_splitter.split_text(pdf_page.page_content[0:1000])
print(f"切分后的字符数(可以用来大致评估 token 数):{sum([len(doc.page_content) for doc in split_docs])}")
4 搭建并使用向量数据库
4.1 自定义embedding封装
langchain内置了llama 千帆等emdedding但并没有包含所有大模型。要实现自定义 Embeddings,需要定义一个自定义类继承自 LangChain 的 Embeddings 基类,然后定义两个函数:① embed_query 方法,用于对单个字符串(query)进行 embedding;②embed_documents 方法,用于对字符串列表(documents)进行 embedding。
封装在.py文件中与调用文件保存在同一目录
from __future__ import annotationsimport logging
from typing import Dict, List, Anyfrom langchain.embeddings.base import Embeddings
from langchain.pydantic_v1 import BaseModel, root_validatorlogger = logging.getLogger(__name__)class ZhipuAIEmbeddings(BaseModel, Embeddings):"""`Zhipuai Embeddings` embedding models."""client: Any"""`zhipuai.ZhipuAI"""@root_validator()def validate_environment(cls, values: Dict) -> Dict:"""实例化ZhipuAI为values["client"]Args:values (Dict): 包含配置信息的字典,必须包含 client 的字段.Returns:values (Dict): 包含配置信息的字典。如果环境中有zhipuai库,则将返回实例化的ZhipuAI类;否则将报错 'ModuleNotFoundError: No module named 'zhipuai''."""from zhipuai import ZhipuAIvalues["client"] = ZhipuAI()return valuesdef embed_query(self, text: str) -> List[float]:"""生成输入文本的 embedding.Args:texts (str): 要生成 embedding 的文本.Return:embeddings (List[float]): 输入文本的 embedding,一个浮点数值列表."""embeddings = self.client.embeddings.create(model="embedding-2",input=text)return embeddings.data[0].embeddingdef embed_documents(self, texts: List[str]) -> List[List[float]]:"""生成输入文本列表的 embedding.Args:texts (List[str]): 要生成 embedding 的文本列表.Returns:List[List[float]]: 输入列表中每个文档的 embedding 列表。每个 embedding 都表示为一个浮点值列表。"""return [self.embed_query(text) for text in texts]async def aembed_documents(self, texts: List[str]) -> List[List[float]]:"""Asynchronous Embed search docs."""raise NotImplementedError("Please use `embed_documents`. Official does not support asynchronous requests")async def aembed_query(self, text: str) -> List[float]:"""Asynchronous Embed query text."""raise NotImplementedError("Please use `aembed_query`. Official does not support asynchronous requests")
其中,
Dict:用于注解字典类型。
List:用于注解列表类型。
Any:用于注解时表示可以接受任何类型。
BaseModel是pydantic库中的一个类,用于创建带有数据验证和设置的模型类。root_validator是一个装饰器,用于在模型实例化后立即对所有根级别数据进行验证。
embed_query 是对单个文本(str)计算 embedding 的方法,这里我们重写该方法,调用验证环境时实例化的ZhipuAI来 调用远程 API 并返回 embedding 结果。
embed_documents 是对字符串列表(List[str])计算embedding 的方法,对于这种类型输入我们采取循环方式挨个计算列表内子字符串的 embedding 并返回。
4.2 chroma数据库
加载文档
import os
from dotenv import load_dotenv, find_dotenv# 读取本地/项目的环境变量。
# find_dotenv()寻找并定位.env文件的路径
# load_dotenv()读取该.env文件,并将其中的环境变量加载到当前的运行环境中
# 如果你设置的是全局的环境变量,这行代码则没有任何作用。
_ = load_dotenv(find_dotenv())# 如果你需要通过代理端口访问,你需要如下配置
# os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'
# os.environ["HTTP_PROXY"] = 'http://127.0.0.1:7890'# 获取folder_path下所有文件路径,储存在file_paths里
file_paths = []
folder_path = '../llm-universe/data_base/knowledge_db'
for root, dirs, files in os.walk(folder_path): # os.walk()生成一个三元组(root, dirs, files)for file in files:file_path = os.path.join(root, file)file_paths.append(file_path)
print(file_paths[:3])
print(file_paths)
读取文档
from langchain.document_loaders.pdf import PyMuPDFLoader
from langchain.document_loaders.markdown import UnstructuredMarkdownLoader# 遍历文件路径并把实例化的loader存放在loaders里
loaders = []for file_path in file_paths:file_type = file_path.split('.')[-1]if file_type == 'pdf':loaders.append(PyMuPDFLoader(file_path))elif file_type == 'md':loaders.append(UnstructuredMarkdownLoader(file_path))
# 下载文件并存储到text
texts = []for loader in loaders: texts.extend(loader.load())
text = texts[1]
print(f"每一个元素的类型:{type(text)}.", f"该文档的描述性数据:{text.metadata}", f"查看该文档的内容:\n{text.page_content[0:]}", sep="\n------\n")
文档分割
from langchain.text_splitter import RecursiveCharacterTextSplitter# 切分文档
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)split_docs = text_splitter.split_documents(texts)
构建数据库
# 使用 OpenAI Embedding
# from langchain.embeddings.openai import OpenAIEmbeddings
# 使用百度千帆 Embedding
# from langchain.embeddings.baidu_qianfan_endpoint import QianfanEmbeddingsEndpoint
# 使用我们自己封装的智谱 Embedding,需要将封装代码下载到本地使用
from zhipuai_embedding import ZhipuAIEmbeddings# 定义 Embeddings
# embedding = OpenAIEmbeddings()
embedding = ZhipuAIEmbeddings()
# embedding = QianfanEmbeddingsEndpoint()# 定义持久化路径
persist_directory = '../llm-universe/data_base/vector_db/chroma'
!rm -rf '../llm-universe/data_base/vector_db/chroma' # 删除旧的数据库文件(如果文件夹中有文件的话),windows电脑请手动删除
from langchain.vectorstores.chroma import Chromavectordb = Chroma.from_documents(documents=split_docs[:5], # 为了速度,只选择前 20 个切分的 doc 进行生成;使用千帆时因QPS限制,建议选择前 5 个docembedding=embedding,persist_directory=persist_directory # 允许我们将persist_directory目录保存到磁盘上
)
# 保存
vectordb.persist()
print(f"向量库中存储的数量:{vectordb._collection.count()}")
4.3 向量检索
- 余弦相似度
question="什么是大语言模型"
sim_docs = vectordb.similarity_search(question,k=3)
print(f"检索到的内容数:{len(sim_docs)}")
for i, sim_doc in enumerate(sim_docs):print(f"检索到的第{i}个内容: \n{sim_doc.page_content[:200]}", end="\n--------------\n")
可以看到是相关,但没啥有用的信息。如果只考虑检索出内容的相关性会导致内容过于单一,可能丢失重要信息。所以下面,最大边际相关性 (MMR, Maximum marginal relevance) 可以帮助我们在保持相关性的同时,增加内容的丰富度。核心思想是选择一个相关性高的文章+相关性低但内容丰富的文章。(互补嘛)
2. MMR检索
mmr_docs = vectordb.max_marginal_relevance_search(question,k=5)
for i, sim_doc in enumerate(mmr_docs):print(f"MMR 检索到的第{i}个内容: \n{sim_doc.page_content[:200]}", end="\n--------------\n")
还是感觉不咋有内容