import os
from typing import Anyfrom langchain. agents import AgentType, initialize_agent, Tool
from langchain_openai import ChatOpenAI
from langchain. tools import BaseTool
from langchain_experimental. tools. python. tool import PythonREPLTool
from langchain. memory import ConversationBufferMemory
from langchain_community. utilities import WikipediaAPIWrapper
from langgraph. prebuilt import create_react_agent
os. environ[ "OPENAI_API_KEY" ] = "sk-cRMC2m0GsE18vYaWdAMj"
os. environ[ "OPENAI_BASE_URL" ] = "https://aigptx.top/v1/"
def create_init_tool_agent ( ) : """创建基本的ReAct智能体""" wikipedia = WikipediaAPIWrapper( ) python_repl = PythonREPLTool( ) tools = [ Tool( name= "维基百科" , func= wikipedia. run, description= "用于查询维基百科文章的工具" ) , Tool( name= "Python解释器" , func= python_repl. run, description= "用于执行Python代码的工具,可以进行计算或数据分析" ) ] llm = ChatOpenAI( temperature= 1 , max_tokens= 2000 , model= 'gpt-3.5-turbo-0125' ) memory = ConversationBufferMemory( memory_key= "chat_history" , return_messages= True ) langgraph_agent_executor = initialize_agent( tools, llm, agent= AgentType. CHAT_CONVERSATIONAL_REACT_DESCRIPTION, verbose= True , memory= memory, handle_parsing_errors= True ) return langgraph_agent_executor
def create_openai_functions_agent ( ) : """创建基于OpenAI函数调用的智能体""" wikipedia = WikipediaAPIWrapper( ) python_repl = PythonREPLTool( ) tools = [ Tool( name= "Python执行器" , func= python_repl. run, description= "执行Python代码的工具,适合进行计算、数据处理" ) , Tool( name= "维基百科" , func= wikipedia. run, description= "搜索维基百科文章的工具,适合查询事实性信息" ) ] llm = ChatOpenAI( temperature= 0 ) agent = initialize_agent( tools, llm, agent= AgentType. OPENAI_FUNCTIONS, verbose= True ) return agent
class WeatherTool ( BaseTool) : name: str = "天气查询" description: str = "查询指定城市的天气情况" def _run ( self, city: str ) - > str : return f" { city} 的天气: 晴朗, 25°C, 湿度50%" async def _arun ( self, city: str ) - > str : return self. _run( city) class CalculatorTool ( BaseTool) : name: str = "计算器" description: str = "进行数学计算,输入应为数学表达式" def _run ( self, expression: str ) - > str : try : result = eval ( expression) return f"计算结果: { result} " except Exception as e: return f"计算错误: { str ( e) } " async def _arun ( self, expression: str ) - > str : return self. _run( expression) def create_custom_tool_agent ( ) : """创建带有自定义工具的智能体""" tools = [ WeatherTool( ) , CalculatorTool( ) , PythonREPLTool( ) ] llm = ChatOpenAI( temperature= 0 ) agents = initialize_agent( tools, llm, agent= AgentType. CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose= True ) return agents
if __name__ == "__main__" : print ( "=== LangChain 单智能体模式示例 ===" ) agent_type = "openai_functions" response: Any = '' if agent_type == "react" : agent = create_init_tool_agent( ) response = agent. invoke( { 'input' : '谁是阿尔伯特·爱因斯坦? 他出生于哪一年? 计算从他出生到现在过了多少年。回答的时候请使用中文输出' , 'chat_history' : [ ] } ) elif agent_type == "openai_functions" : agent = create_openai_functions_agent( ) response = agent. invoke( { 'input' : '计算 2345 + 5678 的结果,并解释这两个数字的数学特性。' , 'chat_history' : [ ] } ) elif agent_type == "custom" : agent = create_custom_tool_agent( ) response = agent. invoke( { 'input' : '北京今天的时间和今天的天气如何?然后计算25乘以4的结果。' , 'chat_history' : [ ] } ) print ( f"\n最终回答: { response} " )