摘要
本篇文章以实现简单的第一范式 RAG-Naive RAG为目标,并最终创建并实现一个基于RAG的论文分析器的项目。
LangChain 文档加载
文档加载是 RAG 流程的起点,它负责从各种数据源读取原始文档,将其转化为程序可处理的格式。LangChain 支持多种文档类型的加载,具体支持的加载文档类型,可以参考我写的这篇文章:
langchain框架-文档加载器详解-CSDN博客
- 接下来,我将以ArxivLoader 论文加载器为示例,基于langchain实现一个论文文档加载器:
- 首先阅读项目源码说明得知,使用ArxivLoader 需要安装两个模块 arxiv 和 pyMuPDF:
pip install -U arxiv pymupdf
- 注意:pymupdy 依赖需要Visual Studio 工具包的支持,如果没有安装的话会报python 依赖加载异常。
- 继续阅读源码说明确定其参数:
- 示例代码
def load_arxiv_paper(arxiv_id):try:# 初始化ArxivLoaderloader = ArxivLoader(query=arxiv_id,load_max_docs=1, # 限制加载文档数量load_all_available_meta=True # 加载所有可用的元数据)# 加载论文文档documents = loader.load()logger.info(f"成功加载论文,标题: {documents[0].metadata.get('Title', 'Unknown')}")return documentsexcept Exception as e:logger.error(f"加载论文时发生错误: {str(e)}")return None
- 并将最终结果打印并保存到文件中缓存,整个文档加载的流程就结束了。
# 论文的arxiv IDarxiv_id = "2402.19473"# 加载论文documents = load_arxiv_paper(arxiv_id)if not documents:return# 存入 getParper.txt 文件中。with open("getPaper.txt", "w", encoding="utf-8") as f:for doc in documents:f.write(doc.page_content + "\n")exit()
- 最终获取展示:注意ArxivLoader最终获取的论文只是文本部分,不能包含图片部分的内容。
LangChain 文本分割
在完成文档加载之后,对加载的文档进行合理的文本分割显得尤为重要。因为在检索增强生成(RAG)流程中,优质的文本分块方式对最终结果有着显著影响。
考虑到本项目所涉及的论文为英文文档,并且为了确保论文内容的连贯性,我们决定采用语义分割的策略。具体而言,我们将借助 LangChain 框架中的 NLTKTextSplitter 工具。该工具依托自然语言工具包(NLTK),能够精准地按照句子对文本进行分割,从而为后续的 RAG 任务奠定坚实基础。
Langchain 框架中包含了很多文本分割方法,如果读者想系统学习langchain 的文本分割,可以参考我写的以下文章:
langchain框架-文档分割器详解(官方库)-CSDN博客
langchain框架-文档分割器详解(非官方库)-CSDN博客
langchain框架-文档分割器总结即补充-CSDN博客
NLTK 语料包下载
使用NLTKTextSplitter 需要先下载NLTK 语料工具包。但由于网络等原因,我们可能无法正常下载该工具包。我们可以采用网络代理的方式,如果不能用代理的小伙伴,可以下载我的上传资源进行练习。具体的代码如下:
def getNLTKData():import nltkos.environ['http_proxy'] = 'http://127.0.0.1:33210'os.environ['https_proxy'] = 'http://127.0.0.1:33210'# 指定自定义的数据目录custom_data_dir = 'E:\\nltk_data'nltk.data.path.append(custom_data_dir)# 下载资源if not os.path.exists(os.path.join(custom_data_dir, 'tokenizers', 'punkt')):nltk.download('punkt', download_dir=custom_data_dir)if not os.path.exists(os.path.join(custom_data_dir, 'tokenizers', 'punkt_tab')):nltk.download('punkt_tab', download_dir=custom_data_dir)
CSDN语料包下载地址
https://download.csdn.net/download/weixin_41645817/90627575
分割文本方法
def process_documents(documents):if not documents:return Nonetry:# 初始化文本分割器nltk_splitter = NLTKTextSplitter(chunk_size=1000,chunk_overlap=200,)# 分割文档texts = nltk_splitter.split_documents(documents)logger.info(f"文档已分割为 {len(texts)} 个片段")return textsexcept Exception as e:print(e)logger.error(f"处理文档时发生错误: {str(e)}")return None
执行并保存
# 处理文档split_documents = process_documents(documents)if not split_documents:return# 将分割后的文本存入 getPaperChunk2.json 文件中chunks_data = {"total_chunks": len(split_documents),"chunks": []}for index, text in enumerate(split_documents, 1):chunk_info = {"chunk_id": f"chunk_{index}","page_content": text.page_content}chunks_data["chunks"].append(chunk_info)# 将JSON数据写入文件with open("getPaperChunk2.json", "w", encoding="utf-8") as f:json.dump(chunks_data, f, ensure_ascii=False, indent=4)print(f"成功保存 {len(split_documents)} 个文本块到 getPaperChunk2.json")
LangChain 嵌入模型
实现文本切分后,我们需要将切分的文档进行向量化处理,在这里我将使用ZhipuAIEmbeddings 进行文档的向量化处理,主要是因为我买了智谱AI embedding-3 的流量包了哈。小伙伴们可以自行选择向量化模型,LangChain 框架也集成了众多embeddings类的实现,具体可以参考我的这篇文章:
LangChain框架-嵌入模型详解-CSDN博客
LangChain 向量存储
文本向量化后,我们需要将向量化后的数据存入到向量数据库中。LangChain 框架集成了众多向量数据库的实现,我们可以很方便的通过LangChain 将数据存入到向量数据库。在这里,我将使用一款基于内存的轻量级数据库Faiss 作为示例进行向量存储。如果您想系统性的学习LangChain 向量存储部分,可以参考我的这篇文章:
LangChain框架-向量存储详解-CSDN博客
向量化与向量存储代码示例
-
安装zhipuai,faiss-cpu 依赖
pip install zhipuai,faiss-cpu
-
向量化并存入Faiss库
def create_vector_store(documents):if not documents:logger.warning("没有文档需要处理")return Nonetry:# 使用绝对路径存储缓存cache_dir = os.path.join(os.path.dirname(__file__), "cache")os.makedirs(cache_dir, exist_ok=True)store = LocalFileStore(cache_dir)# 创建向量存储underlying_embeddings = ZhipuAIEmbeddings(model="embedding-3",api_key=os.getenv("ZHIPUAI_API_KEY"),)cached_embedder = CacheBackedEmbeddings.from_bytes_store(underlying_embeddings, store, namespace=underlying_embeddings.model)# 批量处理文档batch_size = 30vector_stores = []for i in range(0, len(documents), batch_size):batch_texts = documents[i:i + batch_size]try:batch_vector_store = FAISS.from_documents(documents=batch_texts,embedding=cached_embedder)vector_stores.append(batch_vector_store)logger.info(f"成功处理第 {i//batch_size + 1} 批文档")except Exception as batch_error:logger.error(f"处理第 {i//batch_size + 1} 批文档时出错: {str(batch_error)}")continueif not vector_stores:logger.error("没有成功创建任何向量存储")return None# 合并所有批次的向量存储merged_store = vector_stores[0]for store in vector_stores[1:]:merged_store.merge_from(store)logger.info(f"向量存储创建成功,共处理 {len(documents)} 个文档")return merged_storeexcept Exception as e:print(e)logger.error(f"创建向量存储时发生错误: {str(e)}")return None
这段代码实现了一个向量存储创建功能,主要包含以下几个关键部分:
-
使用LocalFileStore实现本地缓存存储,避免重复计算向量;
-
使用智谱AI的embedding模型将文本转换为向量,并通过CacheBackedEmbeddings进行缓存管理;
-
采用批处理方式,每批处理30个文档,(智谱AI 的最大批处理数量为64)
-
使用FAISS创建向量存储
-
将所有批次的向量存储合并成一个完整的向量库。
这种实现方式既保证了处理大量文档的效率,又通过缓存机制优化了性能。
cache中最终缓存的向量数据展示:
流程调用
# 创建向量存储vectorstore = create_vector_store(split_documents)if not vectorstore:returnprint("向量存储创建成功")
LangChain 检索器
通过langchain 框架,我们可以很容易的将向量存储转换为检索器对象。如果您想系统性的学习langchain 的检索器,可以参考我的这篇文章:
LangChain框架-检索器详解-CSDN博客
示例项目将向量存储转化为检索器,最后使用了invoke()方法,接收检索的对象文档。代码以及最终的检索结果如下:
处理代码
retriever = vectorstore.as_retriever()
doc =retriever.invoke("这篇论文讲述了什么内容?")
print(doc)
LLM内容整合
最后,我们可以通过调用DeepSeek,再对检索器检索的内容进行总结。在这里我使用了ChatPromptTemplate 对话提示词模板和 create_stuff_documents_chain 处理文档问答的工具链形成一个最终答案,代码如下:
system_prompt = ChatPromptTemplate.from_messages([("system", """根据提供的上下文: {context} \n\n 使用回答问题: {input}""")])# 初始化大模型llm = ChatOpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"),base_url="https://api.deepseek.com/v1",model="deepseek-chat")# 构建链 这个链将文档作为输入,并使用之前定义的提示模板和初始化的大模型来生成答案chain = create_stuff_documents_chain(llm,system_prompt)res = chain.invoke({"input": "这篇论文讲述了什么内容?", "context": all_context})print("最终答案:" +res)
最终结果展示
最终答案:这篇论文是一篇关于**检索增强生成(Retrieval-Augmented Generation, RAG)**的系统性综述,旨在全面梳理RAG的基础理论、优化方法、多模态应用、基准测试、当前局限及未来方向。以下是核心内容总结:
---
### **1. 研究背景与动机**
- **AIGC的挑战**:尽管生成式AI(如大模型)取得显著进展,但仍面临知识更新滞后、长尾数据处理、数据泄露风险、高训练/推理成本等问题。
- **RAG的作用**:通过结合信息检索(IR)与生成模型,RAG能够动态检索外部知识库中的相关内容,提升生成结果的准确性和鲁棒性,同时降低模型幻觉(hallucination)。---
### **2. 主要贡献**
- **全面性综述**:覆盖RAG的**基础框架**(检索器Retriever与生成器Generator的协同)、**优化方法**(输入增强、检索器改进、生成器调优等)、**跨模态应用**(文本、图像、视频等)、**评测基准**及未来挑战。
- **填补空白**:相比已有研究(如Zhao et al.仅关注多模态应用或忽略基础理论),本文首次系统整合RAG的全链条技术,强调其在多领域的适用性。---
### **3. 核心内容**
- **RAG基础**(Section II-III)
介绍检索器(如稠密检索Dense Retrieval)与生成器(如LLMs)的协同机制,以及如何通过检索动态增强生成过程。
- **优化方法**(Section III-B)
提出5类增强策略:
- **输入增强**:如查询转换(Query2Doc、HyDE生成伪文档扩展查询语义)。
- **检索器改进**:优化索引结构或相似度计算。
- **生成器调优**:利用检索结果指导生成。
- **结果后处理**:对输出进行过滤或重排序。
- **端到端优化**:联合训练检索与生成模块。- **多模态应用**(Section IV)
超越文本生成,探讨RAG在图像、视频、音频等模态中的独特应用(如跨模态检索增强生成)。- **评测与挑战**(Section V-VI)
分析现有基准(如检索质量、生成效果评估),指出RAG的局限性(如检索效率、多模态对齐)及未来方向(如轻量化部署、动态知识更新)。---
### **4. 论文结构**
1. **引言**:RAG的动机与综述范围。
2. **基础理论**:检索器与生成器的协同机制。
3. **优化方法**:输入、检索、生成等环节的增强策略。
4. **多模态应用**:跨领域案例(如医疗、金融、创作)。
5. **评测基准**:现有评估框架与指标。
6. **局限与展望**:技术瓶颈与潜在解决方案。
7. **结论**:总结RAG的价值与发展路径。---
### **5. 创新点**
- **多模态视角**:强调RAG在非文本领域的潜力(如视觉问答、视频摘要)。
- **技术整合**:首次系统化分类RAG优化方法(如输入端的查询转换策略)。
- **实践指导**:为研究者提供跨领域应用的设计思路(如如何选择检索源或生成模型)。---
### **总结**
该论文是RAG领域的里程碑式综述,不仅梳理了技术脉络,还指出了未来研究方向(如高效检索算法、多模态泛化能力),对学术界和工业界均有重要参考价值。
完整的代码脚本
import json
import logging
import osfrom dotenv import load_dotenv
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
from langchain_community.chat_models import ChatOpenAI
from langchain_community.document_loaders import ArxivLoader
from langchain_community.embeddings import ZhipuAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import NLTKTextSplitterlogger = logging.getLogger('my_logger')
# 加载 .env 文件中的环境变量
load_dotenv()def getNLTKData():import nltk# 指定自定义的数据目录custom_data_dir = 'D:\\nltk_data'nltk.data.path.append(custom_data_dir)# 下载资源if not os.path.exists(os.path.join(custom_data_dir, 'tokenizers', 'punkt')):os.environ['http_proxy'] = 'http://127.0.0.1:33210'os.environ['https_proxy'] = 'http://127.0.0.1:33210'nltk.download('punkt', download_dir=custom_data_dir)nltk.download('punkt_tab', download_dir=custom_data_dir)def load_arxiv_paper(arxiv_id):try:# 初始化ArxivLoaderloader = ArxivLoader(query=arxiv_id,load_max_docs=1, # 限制加载文档数量load_all_available_meta=True # 加载所有可用的元数据)# 加载论文文档documents = loader.load()logger.info(f"成功加载论文,标题: {documents[0].metadata.get('Title', 'Unknown')}")return documentsexcept Exception as e:logger.error(f"加载论文时发生错误: {str(e)}")return Nonedef process_documents(documents):if not documents:return Nonetry:# 初始化文本分割器nltk_splitter = NLTKTextSplitter(chunk_size=1000,chunk_overlap=200,)# 分割文档texts = nltk_splitter.split_documents(documents)logger.info(f"文档已分割为 {len(texts)} 个片段")return textsexcept Exception as e:print(e)logger.error(f"处理文档时发生错误: {str(e)}")return Nonedef create_vector_store(documents):if not documents:logger.warning("没有文档需要处理")return Nonetry:# 使用绝对路径存储缓存cache_dir = os.path.join(os.path.dirname(__file__), "cache")os.makedirs(cache_dir, exist_ok=True)store = LocalFileStore(cache_dir)# 创建向量存储underlying_embeddings = ZhipuAIEmbeddings(model="embedding-3",api_key=os.getenv("ZHIPUAI_API_KEY"),)cached_embedder = CacheBackedEmbeddings.from_bytes_store(underlying_embeddings,store,namespace=underlying_embeddings.model)# 批量处理文档batch_size = 30vector_stores = []for i in range(0, len(documents), batch_size):batch_texts = documents[i:i + batch_size]try:batch_vector_store = FAISS.from_documents(documents=batch_texts,embedding=cached_embedder)vector_stores.append(batch_vector_store)logger.info(f"成功处理第 {i // batch_size + 1} 批文档")except Exception as batch_error:logger.error(f"处理第 {i // batch_size + 1} 批文档时出错: {str(batch_error)}")continueif not vector_stores:logger.error("没有成功创建任何向量存储")return None# 合并所有批次的向量存储merged_store = vector_stores[0]for store in vector_stores[1:]:merged_store.merge_from(store)logger.info(f"向量存储创建成功,共处理 {len(documents)} 个文档")return merged_storeexcept Exception as e:print(e)logger.error(f"创建向量存储时发生错误: {str(e)}")return Nonedef NaiveRAG_main(arxiv_id, input):getNLTKData()# 加载论文documents = load_arxiv_paper(arxiv_id)if not documents:return# 存入 getParper.txt 文件中。with open("getPaper.txt", "w", encoding="utf-8") as f:for doc in documents:f.write(doc.page_content + "\n")# 处理文档split_documents = process_documents(documents)if not split_documents:return# 将分割后的文本存入 getPaperChunk2.json 文件中chunks_data = {"total_chunks": len(split_documents),"chunks": []}for index, text in enumerate(split_documents, 1):chunk_info = {"chunk_id": f"chunk_{index}","page_content": text.page_content}chunks_data["chunks"].append(chunk_info)# 将JSON数据写入文件with open("getPaperChunk2.json", "w", encoding="utf-8") as f:json.dump(chunks_data, f, ensure_ascii=False, indent=4)print(f"成功保存 {len(split_documents)} 个文本块到 getPaperChunk2.json")# 创建向量存储vectorstore = create_vector_store(split_documents)if not vectorstore:returnprint("向量存储创建成功")retriever = vectorstore.as_retriever()doc = retriever.invoke(input)print(doc)system_prompt = ChatPromptTemplate.from_messages([("system", """根据提供的上下文: {context} \n\n 使用回答问题: {input}""")])# 初始化大模型llm = ChatOpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"),base_url="https://api.deepseek.com/v1",model="deepseek-chat")# 构建链 这个链将文档作为输入,并使用之前定义的提示模板和初始化的大模型来生成答案chain = create_stuff_documents_chain(llm, system_prompt)res = chain.invoke({"input": input, "context": doc})print("最终答案:" + res)if __name__ == '__main__':NaiveRAG_main(arxiv_id="2402.19473", input="这篇论文讲述了什么内容?")
requirements.txt依赖
logger~=1.4
langchain~=0.3.23
langchain-community~=0.3.21
langchain-text-splitters~=0.3.8
pandas~=2.2.3
arxiv~=2.2.0
pymupdf~=1.25.5
python-dotenv~=1.1.0
zhipuai~=2.1.5.20250415
总结
本篇文章以实现第一范式 RAG-Naive RAG为目标,使用langchain框架构造了一个基于Naive RAG的论文分析器的项目。然而随着研究的深入,人们发现 Naive RAG 在检索质量、响应生成质量以及增强过程中存在挑战。于是 Advanced RAG 范式被提出,它在数据索引、检索前和检索后都进行了额外处理,如通过更精细的数据清洗等方法提升文本的一致性、准确性和检索效率,在检索前对问题进行重写、路由和扩充等,检索后对文档库进行重排序、上下文筛选与压缩等。接下来小编会以LangChain 框架实现RAG 的第二范式Advanced RAG为目标,对Naive RAG项目进行重构。感兴趣的小伙伴请关注小编的RAG 专栏,后续小编还会对Modular RAG 和 Agentic RAG 进行学习和总结,敬请期待!