1 RAG 介绍
RAG是一种先进的自然语言处理方法,它结合了信息检索和文本生成技术,用于提高问答系统、聊天机器人等应用的性能。
2 RAG 的工作流程
-
文档加载(Document Loading)
- 从各种来源加载大量文档数据。
- 这些文档将作为知识库,用于后续的信息检索。
-
文档分割(Document Splitting)
- 将加载的文档分割成更小的段落或部分。
- 这有助于提高检索的准确性和效率。
-
嵌入向量生成(Embedding Generation)
- 对每个文档或文档的部分生成嵌入向量。
- 这些嵌入向量捕捉文档的语义信息,方便后续的相似度比较。
-
写入向量数据库(Writing to Vector Database)
- 将生成的嵌入向量存储在一个向量数据库中。
- 数据库支持高效的相似度搜索操作。
-
查询生成(Query Generation)
- 用户提出一个问题或输入一个提示。
- RAG模型根据输入生成一个或多个相关的查询。
-
文档检索(Document Retrieval)
- 使用生成的查询在向量数据库中检索相关文档。
- 选择与查询最相关的文档作为信息源。
-
上下文融合(Context Integration)
- 将检索到的文档内容与原始问题或提示融合,构成扩展的上下文。
-
答案生成(Answer Generation)
- 基于融合后的上下文,RAG生成模型产生最终的回答或文本。
- 这一步骤旨在综合原始输入和检索到的信息。
3 准备环境
3.1 向量数据库环境
已经通过百度向量数据库测试申请的才能访问创建,地址:VectorDB 向量数据库官网页-百度智能云
1 创建百度向量数据库实例,注意需要地域,可用区需要和 BCC 保持在同一个 VPC 内。 地址:百度智能云-向量数据库
2 创建成功后,通过实例详情页查看访问的地址信息和账号信息,用于访问操作向量数据库。如例子截图,访问信息如下:
# 访问地址格式:http://${IP}:${PORT}
访问地址:http://192.168.20.4:5287
账号:root
密钥:xxxx
3.2 开通千帆 Embedding 模型
千帆模型开通付费之后才能使用,开通不会产生费用,且有代金券赠送
1 开通千帆 Embedding 模型的收费,地址: 百度智能云千帆大模型平台
2 右上角个人中心的安全认证里面提取用于鉴权调用 Embedding 模型的 Access Key 和 Secret Key
3.3 客户端环境
3.3.1 数据准备和写入
本例子使用的是 bcc 计算型 c5 2c4g 实例基于 Centos 系统作为例子,但不仅限于 bcc,只要是同 vpc 内的服务器产品都可以。已经有 BCC 客户端的用户忽略步骤 1。
1 创建 BCC 客户端。 地址:百度智能云
2 登录创建的实例进行环境准备,部署安装 python 环境和搭建知识库所必须的依赖包,
# 安装 python 3.9
yum install -y python39
# langchain 依赖包,用于把文本数据转化为向量数据。
pip3.9 install langchain
# pymochow 依赖包,用于访问和操作百度向量数据库。
pip3.9 install pymochow
# qianfan 依赖包,用于访问千帆大模型。
pip3.9 install qianfan
# pdfplumber 依赖包,加载除了 pdf 文档。
pip3.9 install pdfplumber
# 创建项目目录
mkdir -p knowledge/example_data && cd knowledge
3 上传一个 PDF 文件到 knowledge/example_data 目录下
4 创建访问的配置文件
# config.py
import os
from pymochow.auth.bce_credentials import BceCredentials# 定义配置信息
account = 'root'
api_key = '修改为你的密钥'
endpoint = '修改为之前记录的访问地址,如 http://192.168.20.4:5287'# 初始化BceCredentials对象
credentials = BceCredentials(account, api_key)# 设置千帆AI平台的安全认证信息(AK/SK),通过环境变量
# 注意替换以下参数为您的Access Key和Secret Key
os.environ["QIANFAN_ACCESS_KEY"] = "your_iam_ak"
os.environ["QIANFAN_SECRET_KEY"] = "your_iam_sk"
5 创建 document 数据库
import pymochow
from pymochow.configuration import Configuration
import config # 导入配置文件config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)try:db = client.create_database("document")
except Exception as e: # 捕获所有类型的异常print(f"Error: {e}") # 打印异常信息
db_list = client.list_databases()
for db_name in db_list:print(db_name.database_name)
client.close()
6 创建 chunks 数据表
import time
import pymochow # 导入pymochow库,用于操作数据库
from pymochow.configuration import Configuration # 用于配置客户端
import config # 导入配置文件,包含身份验证和终端信息# 导入pymochow模型相关的类和枚举类型
from pymochow.model.schema import Schema, Field, VectorIndex, SecondaryIndex, HNSWParams
from pymochow.model.enum import FieldType, IndexType, MetricType, TableState
from pymochow.model.table import Partition# 使用配置文件中的信息初始化客户端
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)# 选择或创建数据库
db = client.database("document")# 定义数据表的字段
fields = [Field("id", FieldType.UINT64, primary_key=True, partition_key=True, auto_increment=False, not_null=True),Field("source", FieldType.STRING),Field("author", FieldType.STRING, not_null=True),Field("vector", FieldType.FLOAT_VECTOR, dimension=384)
]# 定义数据表的索引
indexes = [VectorIndex(index_name="vector_idx", field="vector", index_type=IndexType.HNSW, metric_type=MetricType.L2, params=HNSWParams(m=32, efconstruction=200)),SecondaryIndex(index_name="author_idx", field="author")
]# 尝试创建数据表,捕获并打印可能出现的异常
try:table = db.create_table(table_name="chunks", replication=3, partition=Partition(partition_num=1), schema=Schema(fields=fields, indexes=indexes))
except Exception as e: # 捕获所有类型的异常print(f"Error: {e}") # 打印异常信息# 轮询数据表状态,直到表状态为NORMAL,表示表已准备好
while True:time.sleep(2) # 每次检查前暂停2秒,减少对服务器的压力table = db.describe_table("chunks")if table.state == TableState.NORMAL: # 表状态为NORMAL,跳出循环breaktime.sleep(10) # 如果状态不是NORMAL,等待更长时间再次检查# 打印数据表的详细信息
print("table: {}".format(table.to_dict()))client.close() # 关闭客户端连接
7 从PDF文档中加载数据、将文档内容分割为更小的文本块以及利用千帆AI平台的接口来对文本进行向量化表示,并且写到 chunks 表,本例子会用小的文档作为例子,用户可以根据实际情况加载。
# 导入必要的库
from langchain_community.document_loaders import PDFPlumberLoader # 用于加载PDF文档
from langchain.text_splitter import RecursiveCharacterTextSplitter # 用于文本分割
import os # 用于操作系统功能,如设置环境变量
import qianfan # 千帆AI平台SDK
import time # 用于暂停执行,避免请求频率过高
import pymochow
import config # 导入配置文件
from pymochow.model.table import Row # 用于写入向量数据
from pymochow.configuration import Configuration# 加载PDF文档
loader = PDFPlumberLoader("./example_data/ai-paper.pdf") # 指定PDF文件路径
documents = loader.load() # 加载文档
print(documents[0]) # 打印加载的第一个文档内容# 设置文本分割器,指定分割的参数
# chunk_size定义了每个分割块的字符数,chunk_overlap定义了块之间的重叠字符数
# separators列表定义了用于分割的分隔符
text_splitter = RecursiveCharacterTextSplitter(chunk_size=384, chunk_overlap=0, separators=["\n\n", "\n", " ", "", "。", ","]
)
all_splits = text_splitter.split_documents(documents) # 对文档进行分割
print(all_splits[0]) # 打印分割后的第一个块内容emb = qianfan.Embedding() # 初始化嵌入模型对象embeddings = [] # 用于存储每个文本块的嵌入向量
for chunk in all_splits: # 遍历所有分割的文本块# 获取文本块的嵌入向量,使用默认模型Embedding-V1resp = emb.do(texts=[chunk.page_content])embeddings.append(resp['data'][0]['embedding']) # 将嵌入向量添加到列表中time.sleep(1) # 暂停1秒,避免请求过于频繁
print(embeddings[0]) # 打印第一个文本块的嵌入向量# 逐行写入向量化数据
rows = []
for index, chunk in enumerate(all_splits):row = Row(id=index,source=chunk.metadata["source"],author=chunk.metadata["Author"],vector=embeddings[index])rows.append(row)
# 打印第一个Row对象转换成的字典格式,以验证数据结构
print(rows[0].to_dict())# 读取数据库配置文件,并且初始化连接
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)# 选择或创建数据库
db = client.database("document")try:table = db.describe_table("chunks")table.upsert(rows=rows) # 批量写入向量数据,一次最多支持写入1000条table.rebuild_index("vector_idx") # 创建向量索引,必要步骤
except Exception as e: # 捕获所有类型的异常print(f"Error: {e}") # 打印异常信息
client.close()
当打印到如下的数据证明你写入成功了。
3.3.2 文档检索
1 基于标量的检索
import pymochow
from pymochow.configuration import Configuration
import config # 导入配置文件config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)# 选择或创建数据库
db = client.database("document")try:table = db.describe_table("chunks")primary_key = {'id': 0}projections = ["id", "source", "author"]res = table.query(primary_key=primary_key, projections=projections, retrieve_vector=True)
except Exception as e: # 捕获所有类型的异常print(f"Error: {e}") # 打印异常信息print(res)
client.close()
结果显示如下:
2 基于向量的检索
import os
import config
import pymochow
import qianfan
from pymochow.configuration import Configuration
from pymochow.model.table import AnnSearch, HNSWSearchParams# 初始化千帆AI平台的嵌入模型对象
emb = qianfan.Embedding()# 定义待查询的问题文本
question = "讲解下大模型的发展趋势"# 获取问题文本的嵌入向量
resp = emb.do(texts=[question])
question_embedding = resp['data'][0]['embedding']# 使用配置信息初始化向量数据库客户端
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)# 选择数据库
db = client.database("document")try:# 获取指定表的描述信息table = db.describe_table("chunks")# 构建近似最近邻搜索对象anns = AnnSearch(vector_field="vector", # 指定用于搜索的向量字段名vector_floats=question_embedding, # 提供查询向量params=HNSWSearchParams(ef=200, limit=1) # 设置HNSW算法参数和返回结果的限制数量)# 执行搜索操作res = table.search(anns=anns)# 打印搜索结果print(res)
except Exception as e: # 捕获并打印所有异常信息print(f"Error: {e}")# 关闭客户端连接
client.close()
3 基于标量和向量的混合检索
import os
import config
import pymochow
import qianfan
from pymochow.configuration import Configuration
from pymochow.model.table import AnnSearch, HNSWSearchParams# 初始化千帆AI平台的嵌入模型对象
emb = qianfan.Embedding()# 定义待查询的问题文本
question = "讲解下大模型的发展趋势"# 获取问题文本的嵌入向量
resp = emb.do(texts=[question])
question_embedding = resp['data'][0]['embedding']# 使用配置信息初始化向量数据库客户端
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)# 选择数据库
db = client.database("document")try:# 获取指定表的描述信息table = db.describe_table("chunks")# 构建近似最近邻搜索对象anns = AnnSearch(vector_field="vector", # 指定用于搜索的向量字段名vector_floats=question_embedding, # 提供查询向量params=HNSWSearchParams(ef=200, limit=1), # 设置HNSW算法参数和返回结果的限制数量filter="author='CNKI'" # 提供标量的过来条件)# 执行搜索操作res = table.search(anns=anns)# 打印搜索结果print(res)
except Exception as e: # 捕获并打印所有异常信息print(f"Error: {e}")# 关闭客户端连接
client.close()
当然后续还需要上下文融合和答案生成,可以基于百度文心大模型的能力实现,本文篇幅有限就不详细展开了。
原文链接:千帆+Langchain+VectorDB 建立简单的 RAG 示例 - 向量数据库 - 设计架构