文章目录
- 本文介绍
- 向量和向量数据库
- 向量
- 向量数据库
- 索引
- 开始动手实现rag
- 加载文档数据并建立索引
- 将向量存放到向量数据库中
- 检索生成
- 构成一条链
本文介绍
从本节开始,有了上一节的langchain基础学习,接下来使用langchain实现一个rag应用,并稍微深入的讲解一下流程
向量和向量数据库
向量
在rag中,不得不提向量和向量数据库,在ai检索中多数采用特征,在文本中多数采用向量来表示文本。采用向量后,“语义”的匹配程度就转换成了向量之间的相似程度。计算向量相似度的算法有很多,比如,余弦相似度、内积、欧氏距离等等。
有了向量,当用户提出问题时,处理过程就变成了将问题转换为向量,然后计算向量之间的距离,找到与问题向量最接近的文档向量,从而实现“语义”的匹配。
OpenAI 提供了一个专门负责将文本转换成向量的 API——Embeddings。我们可以根据需要,选择自己部署模型,或是选择别人提供的服务。不同的 Embedding 模型之间的差异主要取决于训练样本,比如有的模型会在中文处理上表现得比较好。
向量数据库
在 RAG 系统中,我们要把数据存放到哪里呢?我们需要一个数据库,只不过,我们需要的既不是 Oracle、MySQL 这样的关系数据库,也不是 MongoDB、Redis 这样的 NoSQL 数据库。因为我们后续处理的都是向量,所以,我们需要的是向量数据库。
向量数据库与传统数据库有很大的差别,在使用方式上,传统数据库搜索信息倾向于精确匹配,而向量数据库的匹配则是语义上的接近。
在实现上二者也存在不小的差别,比如,由于向量本身通常维度会很多,如果按照传统数据库的方式直接进行存储,将会带来很多问题。向量数据库需要把向量数据作为一个完整的单元处理,底层存储结构也需要根据这个特点进行规划。另外,向量数据格式也相对单一,每个维度的数据往往都是固定的数据格式(浮点数、二进制整数等)。
索引
下面是一个常见的索引过程:
在这个过程里面,我们会先对信息源做一次信息提取。信息源可能是各种文档,比如 Word 文档、PDF 文件,Web 页面,甚至是一些图片。从这些信息源中,我们把内容提取出来,也就是其中的文本。
接下来,我们会把这些文本进行拆分,将其拆分成更小的文本块。之所以要拆分,主要是原始的文本可能会比较大,这并不利于检索,还有一点重要原因是,我们前面说过,要把检索到的信息拼装到提示词里,过大的文本可能会造成提示词超过模型有限的上下文窗口。
再来,就是把文本块转换成向量,也就是得到 Embedding 的过程。前面我们说过,这个过程往往是通过训练好的模型来完成的。到这里,我们就把信息转换成了向量。最后一步,就是把得到的向量存储到向量数据库中,供后续的检索使用。
至此,我们对常见的 RAG 流程已经有了基本了解。但实际上,RAG 领域正处于一个快速发展的过程中,有很多相关技术也在不断地涌现:
- 虽然采用向量搜索对于语义理解很有帮助,但一些人名、缩写、特定 ID 之类的信息,却是传统搜索的强项,有人提出混合搜索的概念,将二者结合起来;
- 通过各种搜索方式,我们会得到很多的候选内容,但到底哪个与我们的问题更相关,有人引入了重排序(Rerank)模型,以此决定候选内容与查询问题的相关程度;
- 除了在已有方向的努力,甚至还有人提出了 RAG 的新方向。我们前面讨论的流程前提条件是把原始信息转换成了向量,但这本质上还是基于文本的,更适合回答一些事实性问题。它无法理解更复杂的关系,比如,我的朋友里谁在 AI 领域里工作。所以,有人提出了基于知识图谱的 RAG,知识图谱是一种结构化的语义知识库,特别适合找出信息之间的关联。
开始动手实现rag
加载文档数据并建立索引
建索引,使用TextLoader从txt中加载数据
from langchain_community.document_loaders import TextLoaderloader = TextLoader("introduction.txt")
docs = loader.load()text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma(collection_name="ai_learning",embedding_function=OpenAIEmbeddings(),persist_directory="vectordb"
)
vectorstore.add_documents(splits)
这里的 TextLoader 属于 DocumentLoader。在 LangChain 中,有一个很重要的概念叫文档(Document),它包括文档的内容(page_content)以及相关的元数据(metadata)。所有原始信息都是文档,索引信息的第一步就是把这些文档加载进来,这就是 DocumentLoader 的作用。
除了这里用到的 TextLoader,LangChain 社区里已经实现了大量的 DocumentLoader,比如,从数据库里加载数据的 SQLDatabaseLoader,从亚马逊 S3 加载文件的 S3FileLoader。基本上,大部分我们需要的文档加载器都可以找到直接的实现。
拆分加载进来的文档是 TextSplitter 的主要职责。
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
虽然都是文本,但怎样拆分还是有讲究的,拆分源代码和拆分普通文本,处理方法就是不一样的。LangChain 社区里同样实现了大量的 TextSplitter,我们可以根据自己的业务特点进行选择。我们这里使用了 RecursiveCharacterTextSplitter,它会根据常见的分隔符(比如换行符)递归地分割文档,直到把每个块拆分成适当的大小。
将向量存放到向量数据库中
做好基础的准备之后,就要把拆分的文档存放到向量数据库里了:
vectorstore = Chroma(collection_name="ai_learning",embedding_function=OpenAIEmbeddings(),persist_directory="vectordb"
)
vectorstore.add_documents(splits)
LangChain 支持了很多的向量数据库,它们都有一个统一的接口:VectorStore,在这个接口中包含了向量数据库的统一操作,比如添加、查询之类的。这个接口屏蔽了向量数据库的差异,在向量数据库并不为所有程序员熟知的情况下,给尝试不同的向量数据库留下了空间。各个具体实现负责实现这些接口,我们这里采用的实现是 Chroma。
在 Chroma 初始化的过程中,我们指定了 Embedding 函数,它负责把文本变成向量。这里我们采用了 OpenAI 的 Embeddings 实现,你完全可以根据自己的需要选择相应的实现,LangChain 社区同样提供了大量的实现,比如,你可以指定 Hugging Face 这个模型社区中的特定模型来做 Embedding。
到这里,我们就完成了索引的过程,看上去还是比较简单的。为了验证我们索引的结果,我们可以调用 similarity_search 检索向量数据库的数据:
vectorstore = Chroma(collection_name="ai_learning",embedding_function=OpenAIEmbeddings(),persist_directory="vectordb"
)
documents = vectorstore.similarity_search("专栏的作者是谁?")
print(documents)
这里用的 similarity_search 表示的是根据相似度进行搜索,还可以使用 max_marginal_relevance_search,它会采用 MMR(Maximal Marginal Relevance,最大边际相关性)算法。这个算法可以在保持结果相关性的同时,尽量选择与已选结果不相似的内容,以增加结果的多样性。
检索生成
现在,我们已经为我们 RAG 应用准备好了数据。接下来,就该正式地构建我们的 RAG 应用了。我在之前的聊天机器上做了一些修改,让它能够支持 RAG,代码如下:
from operator import itemgetter
from typing import List
import tiktoken
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage, trim_messages
from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import OpenAIEmbeddings
from langchain_openai.chat_models import ChatOpenAI
from langchain_chroma import Chromavectorstore = Chroma(collection_name="ai_learning",embedding_function=OpenAIEmbeddings(),persist_directory="vectordb"
)retriever = vectorstore.as_retriever(search_type="similarity")# Token 计算
def str_token_counter(text: str) -> int:enc = tiktoken.get_encoding("o200k_base")return len(enc.encode(text))def tiktoken_counter(messages: List[BaseMessage]) -> int:num_tokens = 3tokens_per_message = 3tokens_per_name = 1for msg in messages:if isinstance(msg, HumanMessage):role = "user"elif isinstance(msg, AIMessage):role = "assistant"elif isinstance(msg, ToolMessage):role = "tool"elif isinstance(msg, SystemMessage):role = "system"else:raise ValueError(f"Unsupported messages type {msg.__class__}")# 确保 msg.content 不是 Nonecontent = msg.content or "" num_tokens += (tokens_per_message+ str_token_counter(role)+ str_token_counter(content))if msg.name:num_tokens += tokens_per_name + str_token_counter(msg.name)return num_tokens# 限制 token 长度
trimmer = trim_messages(max_tokens=4096,strategy="last",token_counter=tiktoken_counter,include_system=True,
)store = {}def get_session_history(session_id: str) -> BaseChatMessageHistory:if session_id not in store:store[session_id] = InMemoryChatMessageHistory()return store[session_id]model = ChatOpenAI()prompt = ChatPromptTemplate.from_messages([("system","""You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.Context: {context}""",),MessagesPlaceholder(variable_name="history"),("human", "{question}"),]
)def format_docs(docs):return "\n\n".join(doc.page_content for doc in docs)context = itemgetter("question") | retriever | format_docs
# first_step的内容有可能是这样的:
# {
# "question": "什么是LangChain?",
# "context": "LangChain 是一个用于构建基于 LLM 的应用的框架。\n它支持多种数据检索方式。"
# }
first_step = RunnablePassthrough.assign(context=context)from langchain_core.prompt_values import ChatPromptValue
def format_prompt_output(input):"""确保 `prompt` 生成的内容是 `List[BaseMessage]`"""# print("\n[DEBUG] Prompt 输出类型:", type(input))# print("[DEBUG] Prompt 输出内容:", input)# 如果 input 是 `ChatPromptValue`,提取 `.messages`if isinstance(input, ChatPromptValue):return input.messages # 直接返回 `List[BaseMessage]`# 如果 input 已经是 `List[BaseMessage]`,直接返回if isinstance(input, list) and all(isinstance(msg, BaseMessage) for msg in input):return input# 其他情况报错raise TypeError(f"❌ format_prompt_output 传入了不正确的格式: {type(input)}")chain = first_step | prompt | format_prompt_output | trimmer | modelwith_message_history = RunnableWithMessageHistory(chain,get_session_history=get_session_history,input_messages_key="question",history_messages_key="history",
)config = {"configurable": {"session_id": "dreamhead"}}while True:user_input = input("You:> ")if user_input.lower() == 'exit':breakif user_input.strip() == "":continue# 修正 stream 传参stream = with_message_history.stream({"question": user_input}, # 直接传入字符串config=config)# print(prompt)for chunk in stream:print(chunk.content, end='', flush=True)print()
示例代码运行如下:
为了进行检索,我们需要指定数据源,这里就是我们的向量数据库,其中存放着我们前面已经索引过的数据:
vectorstore = Chroma(collection_name="ai_learning",embedding_function=OpenAIEmbeddings(),persist_directory="vectordb"
)retriever = vectorstore.as_retriever(search_type="similarity")
为什么不直接使用向量数据库呢?因为 Retriever 并不只有向量数据库一种实现,比如,WikipediaRetriever 可以从 Wikipedia 上进行搜索。所以,一个 Retriever 接口就把具体的实现隔离开来。
回到向量数据库上,当我们调用 as_retriever 创建 Retriever 时,还传入了搜索类型(search_type),这里的搜索类型和前面讲到向量数据库的检索方式是一致的,这里我们传入的是 similarity,当然也可以传入 mmr。
文档检索出来,并不能直接就和我们的问题拼装到一起。这时,就轮到提示词登场了。下面是我们在代码里用到的提示词
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Context: {context}
在这段提示词里,我们告诉大模型,根据提供的上下文回答问题,不知道就说不知道。这是一个提示词模板,在提示词的最后是我们给出的上下文(Context)。这里上下文是根据问题检索出来的内容。
有了这个提示词,再加上聊天历史和我们的问题,就构成了一个完整的提示词模板:
prompt = ChatPromptTemplate.from_messages([("system","""You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.Context: {context}""",),MessagesPlaceholder(variable_name="history"),("human", "{question}"),]
)
构成一条链
接下来,就是把各个组件组装到一起,构成一条完整的链:
context = itemgetter("question") | retriever | format_docs
first_step = RunnablePassthrough.assign(context=context)
chain = first_step | prompt | trimmer | modelwith_message_history = RunnableWithMessageHistory(chain,get_session_history=get_session_history,input_messages_key="question",history_messages_key="history",
)
在这段代码里,我们首先构建了一个 context 变量,它也一条链。第一步是从传入参数中获取到 question 属性,也就是我们的问题,然后把它传给 retriever。retriever 会根据问题去做检索,对应到我们这里的实现,就是到向量数据库中检索,检索的结果是一个文档列表。
文档是 LangChain 应用内部的表示,要传给大模型,我们需要把它转成文本,这就是 format_docs 做的事情,它主要是把文档内容取出来拼接到一起:
def format_docs(docs):return "\n\n".join(doc.page_content for doc in docs)
这里补充几句实现细节。在 LangChain 代码里, | 运算符被用作不同组件之间的连接,其实现的关键就是大部分组件都实现了 Runnable 接口,在这个接口里实现了 or 和 ror。or 表示这个对象出现在| 左边时的处理,相应的 ror 表示这个对象出现在右边时的处理。
Python 在处理 a | b 这个表达式时,它会先尝试找 a 的 or,如果找不到,它会尝试找 b 的 ror。所以,在 context 的处理中, 来自标准库的 itemgetter 虽然没有实现__or__,但 retriever 因为实现了 Runnable 接口,所以,它也实现了 ror。所以,这段代码才能组装出我们所需的链。
有了 context 变量,我们可以用它构建了另一个变量 first_step:
first_step = RunnablePassthrough.assign(context=context)
RunnablePassthrough.assign 这个函数就是在不改变链当前状态值的前提下,添加新的状态值。前面我们说了,这里赋给 context 变量的值是一个链,我们可以把它理解成一个函数,它会在运行期执行,其参数就是我们当前的状态值。现在你可以理解 itemgetter(“question”) 的参数是从哪来的了。这个函数的返回值会用来在当前的状态里添加一个叫 context 的变量,以便在后续使用。