LangGraph自适应RAG

LangGraph自适应RAG

    • 介绍
    • 索引
    • LLMs
    • web 搜索工具
    • graph
      • graph state
      • graph flow
      • build graph
        • 执行

介绍

自适应 RAG 是一种 RAG 策略,它将 (1) 查询分析 (2) 主动/自校正 RAG 结合起来。
在文章中,他们报告了查询分析到路由获取:

  • No Retrieval
  • Single-shot RAG
  • Iterative RAG

让我们使用 LangGraph 在此基础上进行构建。
在我们的实现中,我们将在以下之间进行路由:

  • 网络搜索:与最近事件相关的问题
  • 自校正 RAG:针对与我们的索引相关的问题
    在这里插入图片描述

索引

from typing import List
import requests
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Vearchfrom langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import (BaseModel
)
from langchain_text_splitters import RecursiveCharacterTextSplitterfrom common.constant import VEARCH_ROUTE_URL, BGE_M3_EMB_URLclass Bgem3Embeddings(BaseModel, Embeddings):def embed_documents(self, texts: List[str]) -> List[List[float]]:print(texts)return []def embed_query(self, text: str) -> List[float]:if not text:return []return cop_embeddings(text)"""
bg3m3转向量
"""def cop_embeddings(input: str) -> list:if not input.strip():return []headers = {"Content-Type": "application/json"}params = {"sentences": [input],"type": "dense"}response = requests.post(BGE_M3_EMB_URL, headers=headers, json=params)if response.status_code == 200:cop_embeddings_result = response.json()if not cop_embeddings_result or 'embeddings' not in cop_embeddings_result or not cop_embeddings_result['embeddings']:return []original_vector = cop_embeddings_result['embeddings'][0]original_size = len(original_vector)# 将1024的向量兼容为1536,适配openai向量接口adaptor_vector = [0.0] * 1536for i in range(min(original_size, 1536)):adaptor_vector[i] = original_vector[i]return adaptor_vectorelse:print(f"cop_embeddings error: {response.text}")return []# Docs to index
urls = ["https://lilianweng.github.io/posts/2023-06-23-agent/","https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/","https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]# 加载文档
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]# 文档分块
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=500, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)# 数据存储到向量库
embeddings_model = Bgem3Embeddings()
# embeddings_model, VEARCH_ROUTE_URL,"lanchain_autogpt","lanchain_autogpt_db", 3,
vectorstore = Vearch.from_documents(documents=doc_splits,embedding=embeddings_model,path_or_url=VEARCH_ROUTE_URL,table_name="lanchain_autogpt",db_name="lanchain_autogpt_db",flag=3
)
retriever = vectorstore.as_retriever()

LLMs

### Routerfrom typing import Literalfrom langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAIfrom common.common import PROXY_URL, API_KEY
from index1 import retriever# Data model
class RouteQuery(BaseModel):"""将用户查询路由到最相关的数据源。"""datasource: Literal["vectorstore", "web_search"] = Field(...,description="给定一个用户问题,选择将其发送到web_search或vectorstore。",)# LLM with function call
llm = ChatOpenAI(model_name="gpt-4o", api_key=API_KEY, base_url=PROXY_URL, temperature=0)
structured_llm_router = llm.with_structured_output(RouteQuery)# Prompt
system = """你是将用户问题传送到vectorstore或web_search的专家。
vectorstore包含与agents、prompt engineering和adversarial attacks相关的文档。
使用向量库回答有关这些主题的问题。否则,请使用web_search。"""
route_prompt = ChatPromptTemplate.from_messages([("system", system),("human", "{question}"),]
)question_router = route_prompt | structured_llm_router# 案例1:数据源选择
# print(
#     question_router.invoke(
#         {"question": "谁将成为NFL选秀的第一人?"}
#     )
# )
# print(question_router.invoke({"question": "Agent memory有哪些类型?"}))# Data model
class GradeDocuments(BaseModel):"""Binary score for relevance check on retrieved documents."""binary_score: str = Field(description="文档与问题相关, 'yes' or 'no'")structured_llm_grader = llm.with_structured_output(GradeDocuments)# Prompt
system = """你是一个评估检索到的文档和用户问题的相关性的分级员。 \n 如果文档包含与用户问题相关的关键字或语义,则将其评为相关。 \n它不需要是一个严格的测试。目标是过滤掉错误的检索。 \n给出二进制分数 'yes' or 'no' 表示文档是否与问题相关。"""
grade_prompt = ChatPromptTemplate.from_messages([("system", system),("human", "检索到的文档: \n\n {document} \n\n 用户问题: {question}"),]
)retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
# 案例2:检索评估
# print(retrieval_grader.invoke({"question": question, "document": doc_txt}))from langchain import hub
from langchain_core.output_parsers import StrOutputParser# Prompt
prompt = hub.pull("rlm/rag-prompt")# Post-processing
def format_docs(docs):return "\n\n".join(doc.page_content for doc in docs)# Chain
rag_chain = prompt | llm | StrOutputParser()# Run
# generation = rag_chain.invoke({"context": docs, "question": question})
# 案例3:prompt = hub.pull("rlm/rag-prompt") 生成
# print(generation)# Data model
class GradeHallucinations(BaseModel):"""Binary score for hallucination present in generation answer."""binary_score: str = Field(description="答案以事实为基础, 'yes' 或 'no'")structured_llm_grader = llm.with_structured_output(GradeHallucinations)# Prompt
system = """你是一名评估LLM生成是否以一组检索到的事实为基础/支持的分级员。 \n 给一个二进制分数 'yes' 或 'no'. 'Yes' 意味着答案以一系列事实为基础。"""
hallucination_prompt = ChatPromptTemplate.from_messages([("system", system),("human", "设置事实: \n\n {documents} \n\n 大模型生成: {generation}"),]
)hallucination_grader = hallucination_prompt | structured_llm_grader
# hgr = hallucination_grader.invoke({"documents": docs, "generation": generation})
# 案例4:幻觉评估
# print(hgr)### Answer Grader# Data model
class GradeAnswer(BaseModel):"""Binary score to assess answer addresses question."""binary_score: str = Field(description="答案是否解决了问题, 'yes' 或 'no'")structured_llm_grader = llm.with_structured_output(GradeAnswer)# Prompt你是一名评估答案是否能解决问题的评分员
system = """你是一名评估答案是否能解决问题的评分员 \n 给一个二进制分数 'yes' 或 'no'。 'Yes' 意味着答案解决了问题。"""
answer_prompt = ChatPromptTemplate.from_messages([("system", system),("human", "用户问题: \n\n {question} \n\n 大模型生成: {generation}"),]
)answer_grader = answer_prompt | structured_llm_grader
# agr = answer_grader.invoke({"question": question, "generation": generation})
# 案例5:答复评估
# print(agr)# Prompt
system = """你是一个问题重写器,可以将输入问题转换为更好的版本,该版本针对向量库检索进行了优化。
查看输入并尝试推理潜在的语义意图/含义。"""
re_write_prompt = ChatPromptTemplate.from_messages([("system", system),("human","下面是原始问题: \n\n {question} \n 提出一个改进的问题。",),]
)question_rewriter = re_write_prompt | llm | StrOutputParser()
# qrr = question_rewriter.invoke({"question": question})
# 案例6:问题重写
# print(qrr)

web 搜索工具

import osfrom langchain_community.tools.tavily_search import TavilySearchResultsfrom common.common import TAVILY_API_KEY# 提前通过 https://app.tavily.com/home 申请
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEYtavily_tool = TavilySearchResults(k=3)

graph

将流程捕获为图表。

graph state

from typing import Listfrom typing_extensions import TypedDictclass GraphState(TypedDict):"""Represents the state of our graph.Attributes:question: questiongeneration: LLM generationdocuments: list of documents"""question: strgeneration: strdocuments: List[str]

graph flow

from langchain.schema import Document
from index1 import retriever
from llm2 import rag_chain, retrieval_grader, question_rewriter, question_router, hallucination_grader, answer_grader
from webstool3 import web_search_tool# 检索向量库中的doc
def retrieve(state):"""Retrieve documentsArgs:state (dict): The current graph stateReturns:state (dict): New key added to state, documents, that contains retrieved documents"""print("---RETRIEVE---")question = state["question"]# Retrievaldocuments = retriever.invoke(question)return {"documents": documents, "question": question}# 大模型生成
def generate(state):"""Generate answerArgs:state (dict): The current graph stateReturns:state (dict): New key added to state, generation, that contains LLM generation"""print("---GENERATE---")question = state["question"]documents = state["documents"]# RAG generationgeneration = rag_chain.invoke({"context": documents, "question": question})return {"documents": documents, "question": question, "generation": generation}# 文档评估
def grade_documents(state):"""Determines whether the retrieved documents are relevant to the question.Args:state (dict): The current graph stateReturns:state (dict): Updates documents key with only filtered relevant documents"""print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")question = state["question"]documents = state["documents"]# Score each docfiltered_docs = []for d in documents:score = retrieval_grader.invoke({"question": question, "document": d.page_content})grade = score.binary_scoreif grade == "yes":print("---GRADE: DOCUMENT RELEVANT---")filtered_docs.append(d)else:print("---GRADE: DOCUMENT NOT RELEVANT---")continuereturn {"documents": filtered_docs, "question": question}# 输入重写
def transform_query(state):"""Transform the query to produce a better question.Args:state (dict): The current graph stateReturns:state (dict): Updates question key with a re-phrased question"""print("---TRANSFORM QUERY---")question = state["question"]documents = state["documents"]# Re-write questionbetter_question = question_rewriter.invoke({"question": question})return {"documents": documents, "question": better_question}# web搜索
def web_search(state):"""Web search based on the re-phrased question.Args:state (dict): The current graph stateReturns:state (dict): Updates documents key with appended web results"""print("---WEB SEARCH---")question = state["question"]# Web searchdocs = web_search_tool.invoke({"query": question})web_results = "\n".join([d["content"] for d in docs])web_results = Document(page_content=web_results)return {"documents": web_results, "question": question}### Edges ###def route_question(state):"""Route question to web search or RAG.Args:state (dict): The current graph stateReturns:str: Next node to call"""print("---ROUTE QUESTION---")question = state["question"]source = question_router.invoke({"question": question})if source.datasource == "web_search":print("---ROUTE QUESTION TO WEB SEARCH---")return "web_search"elif source.datasource == "vectorstore":print("---ROUTE QUESTION TO RAG---")return "vectorstore"# 生成答案 还是 重新生成问题
def decide_to_generate(state):"""Determines whether to generate an answer, or re-generate a question.Args:state (dict): The current graph stateReturns:str: Binary decision for next node to call"""print("---ASSESS GRADED DOCUMENTS---")state["question"]filtered_documents = state["documents"]if not filtered_documents:# All documents have been filtered check_relevance# We will re-generate a new queryprint("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---")return "transform_query"else:# We have relevant documents, so generate answerprint("---DECISION: GENERATE---")return "generate"# 确定生成是否基于文档并回答问题。
def grade_generation_v_documents_and_question(state):"""Determines whether the generation is grounded in the document and answers question.Args:state (dict): The current graph stateReturns:str: Decision for next node to call"""print("---CHECK HALLUCINATIONS---")question = state["question"]documents = state["documents"]generation = state["generation"]score = hallucination_grader.invoke({"documents": documents, "generation": generation})grade = score.binary_score# Check hallucinationif grade == "yes":print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")# Check question-answeringprint("---GRADE GENERATION vs QUESTION---")score = answer_grader.invoke({"question": question, "generation": generation})grade = score.binary_scoreif grade == "yes":print("---DECISION: GENERATION ADDRESSES QUESTION---")return "useful"else:print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")return "not useful"else:print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")return "not supported"

build graph

from langgraph.graph import END, StateGraphfrom pprint import pprintfrom common.common import show_img
from gflow5 import web_search, retrieve, grade_documents, generate, transform_query, route_question, decide_to_generate, \grade_generation_v_documents_and_question
from gstate4 import GraphState# 定义工作流
workflow = StateGraph(GraphState)# Define the nodes
workflow.add_node("web_search", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query# Build graph
workflow.set_conditional_entry_point(route_question,{"web_search": "web_search","vectorstore": "retrieve",},
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges("grade_documents",decide_to_generate,{"transform_query": "transform_query","generate": "generate",},
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges("generate",grade_generation_v_documents_and_question,{"not supported": "generate","useful": END,"not useful": "transform_query",},
)# Compile
app = workflow.compile()

在这里插入图片描述

执行
# Run
inputs = {"question": "熊队的哪位球员有望在2024年的NFL选秀中获得第一名?"
}
for output in app.stream(inputs):for key, value in output.items():# Nodepprint(f"Node '{key}':")# Optional: print full state at each node# pprint.pprint(value["keys"], indent=2, width=80, depth=None)pprint("\n---\n")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

采用PHP语言(医院安全不良事件上报系统源码)医院不良事件 各类事件分析、分类、处理流程

医疗安全不容忽视! 医疗安全(不良)事件是指在临床诊疗活动中以及医院运行过程中,任何可能影响患者的诊疗结果、增加患者的痛苦和负担并可能引发医疗纠纷或医疗事故,以及影响医疗工作的正常运行和医务人员人身安全的因…

什么是隐马尔可夫模型?

文章目录 一、说明二、玩具HMM:5′拼接位点识别三、那么,隐藏了什么?四、查找最佳状态路径五、超越最佳得分对齐六、制作更逼真的模型七、收获 关键词:hidden markov model 一、说明 被称为隐马尔可夫模型的统计模型是计算生物学…

libdrm 2.4.107 needed because amdgpu has the highest requirement

libdrm 2.4.107 needed because amdgpu has the highest requirement 1.问题分析解决 1.问题 Message: libdrm 2.4.107 needed because amdgpu has the highest requirement Run-time dependency libdrm_intel found: YES 2.4.107 Run-time dependency libdrm_amdgpu found: Y…

Day 25:1807. 替换字符串中的括号内容

Leetcode 1807. 替换字符串中的括号内容 给你一个字符串 s ,它包含一些括号对,每个括号中包含一个 非空 的键。 比方说,字符串 “(name)is(age)yearsold” 中,有 两个 括号对,分别包含键 “name” 和 “age” 。 你知道…

Ansible介绍

一、Ansible概述 Ansible是一款开源的自动化运维工具,基于Python开发,主要用于批量系统配置、批量程序部署、批量运行命令等功能。它集合了众多运维工具的优点,并通过其高度模块化的特性,实现了灵活、可扩展的自动化运维管理。 …

ARDUINO NRF24L01

连线 5v 3.3皆可 gnd Optimized high speed nRF24L01 driver class documentation: Optimized High Speed Driver for nRF24L01() 2.4GHz Wireless Transceiver 同时下载同一个程序 案例默认引脚ce ces ,7,8 可以 修改为 9,10 安装库 第一个示例 两…

Java基础学习-方法

目录 方法基础概念 方法的格式: 案例:最简单方法的定义 案例:带参数的方法调用 案例:求圆的面积 带有返回值的方法: 方法注意点 方法的重载: ​编辑 案例:数组的遍历: 案例…

C++新特性复习1 版本11

参照来自于: cppreference.com 老实说,我是毕业不久就开始用C,原因就是VC,当时用来做界面。还好吧,不是觉得太难,起码对数学底子没有要求,后面偶尔也用用,但是整体还是C居多。现在项…

基于深度学习网络的USB摄像头实时视频采集与手势检测识别matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 系统架构 4.2 GoogLeNet网络简介 4.3 手势检测 5.算法完整程序工程 1.算法运行效果图预览 (完整程序运行后无水印) 训练过程如下: 将摄像头对准手势,然后进行…

【Kubernetes】k8s--安全机制

机制说明 Kubernetes 作为一个分布式集群的管理工具,保证集群的安全性是其一个重要的任务。API Server 是集群内部各个组件通信的中介, 也是外部控制的入口。所以 Kubernetes 的安全机制基本就是围绕保护 API Server 来设计的。 比如 kubectl 如果想向 …

Commons-Collections篇-CC2链分析

前言 3.1-3.2.1版本中TransformingComparator并没有去实现Serializable接口,是不可以被序列化的,所以我们重新搭建一个4.0的具有漏洞的CC环境 CC2链主要使用的和CC4一样,但是区别在于CC2避免了使用Transformer数组,没有使用Insta…

6.深度卷积神经网络

目录 1.深度卷积神经网络ALexNet 2012AlexNetAlexNet架构AlexNet与LeNet复杂度对比总结代码实现2.使用块的网络VGG 2014 image竞猜第二VGG架构进度总结代码实现3.网络中的网络NiN全连接层的问题NiN块NiN架构总结代码实现4.含并行连结的网络(GoogLeNet)2014 image竞猜第一最好…

总结CSS 实现新手引导效果的六种方式

前言 我们在平常做业务中,在功能变更,或者有大的改动时,经常会用到新手引导功能, 虽然有很多库可以使用, 但是有时候很简单的需求,没必要引入库, 本文用最简化代码,实现一下新手引导…

Java面试题之MySQL事务详解

事务是什么 MySQL中的事务(Transaction)是数据库管理系统执行的一个逻辑操作单元,它是由一系列数据库操作组成的逻辑工作单元。事务是并发控制的单位,也是用户定义的一个操作序列。事务的主要目的是确保数据的完整性和一致性&…

JAVA小知识23:set与HashSet

一、Set 1.1、Set的基本知识 set也是单列集合的一种,用于存储一组不重复的元素。它是一种集合数据类型,常用于需要确保元素唯一性和快速查找的场景。他有如下特点: 无序性:Set 中的元素是无序的,没有特定的顺序。唯…

代码签名证书申请指南

申请代码签名证书的具体流程可以归纳为以下几个步骤: 1、确定证书类型: 根据您的需求选择合适的代码签名证书类型。常见的有OV(Organization Validation,组织验证)代码签名证书和EV(Extended Validation&am…

【Java】已解决java.lang.IllegalAccessException异常

文章目录 一、问题分析背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决java.lang.IllegalAccessException异常 一、问题分析背景 在Java开发中,java.lang.IllegalAccessException是一个常见的运行时异常,它通常发生在尝试…

ARM32开发--FreeRTOS-事件组

系列文章目录 知不足而奋进 望远山而前行 目录 系列文章目录 文章目录 前言 目标 内容 概念 事件标志位 开发流程 功能介绍 创建事件组 触发事件 等待事件触发 同步 清理事件 案例 总结 前言 在嵌入式系统开发中,任务之间的同步和通信是至关重要的…

智慧矿山项目建设整体解决方案(938页 )

智慧矿山,究竟是什么? 在深入探讨之前,让我们先来提出一个深刻的问题:我们能否借助科技的力量,让矿山作业不仅安全、高效,还能做到环保可持续?答案是肯定的。智慧矿山,正是这一理念…

支撑每秒 600 万订单无压力,SpringBoot + Disruptor 太猛了!

一、背景 工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录. 二、Disruptor介绍 Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存…