【LLM】MCP(Python):实现 stdio 通信的Client与Server

本文将详细介绍如何使用 Model Context Protocol (MCP) 在 Python 中实现基于 STDIO 通信的 Client 与 Server。MCP 是一个开放协议,它使 LLM 应用与外部数据源和工具之间的无缝集成成为可能。无论你是构建 AI 驱动的 IDE、改善 chat 交互,还是构建自定义的 AI 工作流,MCP 提供了一种标准化的方式,将 LLM 与它们所需的上下文连接起来。部分灵感来源:Se7en。

提示
在开始前,请确保你已经安装了必要的依赖包:

pip install openai mcp

本文中,我们将介绍如何配置环境、编写 MCP Server 以及实现 MCP Client。


环境配置

在使用 MCP 之前,需要先配置相关环境变量,以便 Client 与 Server 都能正确加载所需的参数。你可以在项目根目录下创建一个 .env 文件,并写入以下内容:

此外,创建一个 .env 文件来存储您的配置:

MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here

上述配置中,MODEL_NAME 表示使用的 OpenAI 模型名称(例如 “deepseek-chat”),BASE_URL 指向 OpenAI API 的基础地址,而 API_KEY 则为访问 API 所需的密钥。


书写 Server 的规范

构建 MCP Server(特别是基于 stdio 通信)时,推荐遵循统一规范,提升可读性、可维护性与复用性

  • 服务命名统一
    使用 MCP_SERVER_NAME 作为唯一名称,贯穿日志、初始化等环节。
  • 日志配置清晰
    统一使用 logging 模块,推荐 INFO 级别,便于调试和追踪。
  • 工具注册规范
    通过 @mcp.tool() 装饰器注册工具函数,要求:
    • 命名清晰
    • 参数有类型注解
    • 注释说明参数与返回值(推荐中文)
    • 加入边界检查或异常处理
  • 使用标准 stdio 启动方式
    通过 async with stdio_server() 获取输入输出流,统一调用 _mcp_server.run(...) 启动服务。
  • 初始化选项规范
    使用 InitializationOptions 设置服务名、版本及能力声明(通常由 FastMCP 提供)。
  • 通用模板

    import asyncio
    import logging
    from mcp.server.fastmcp import FastMCP
    from mcp.server import InitializationOptions, NotificationOptions
    from mcp.server.stdio import stdio_server  # STDIO 通信方式# 定义唯一服务名称
    MCP_SERVER_NAME = "your-stdio-server-name"# 配置日志输出
    logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger(MCP_SERVER_NAME)# 创建 FastMCP 实例
    mcp = FastMCP(MCP_SERVER_NAME)# 定义工具
    @mcp.tool()
    def your_tool_name(param1: type, param2: type) -> return_type:"""工具描述。参数:- param1 (type): 描述- param2 (type): 描述返回:- return_type: 描述"""# 工具实现pass# 启动 MCP Server 主函数
    async def main():# 创建 stdio 通信通道async with stdio_server() as (read_stream, write_stream):# 构建初始化选项init_options = InitializationOptions(server_name=MCP_SERVER_NAME,server_version="1.0.0",capabilities=mcp._mcp_server.get_capabilities(notification_options=NotificationOptions(),experimental_capabilities={}))logger.info("MCP Server 以 STDIO 模式启动中...")# 启动 Serverawait mcp._mcp_server.run(read_stream, write_stream, init_options)# 主程序入口
    if __name__ == "__main__":asyncio.run(main())
    

编写 MCP Server

MCP Server 的实现主要基于标准输入输出(STDIO)进行通信。服务端通过注册工具,向外界提供如加法、减法、乘法以及除法等计算功能。下面简述服务端的主要实现步骤:

  1. 初始化 FastMCP 实例
    服务端首先创建一个 FastMCP 实例,并为其命名(例如 “math-stdio-server”)。
  2. 工具注册
    使用装饰器的方式注册加法、减法、乘法、除法等工具,每个工具均包含详细的参数说明和返回值说明。
  3. 日志配置
    通过 Python 标准日志模块对服务端进行日志配置,以便记录服务运行状态和错误信息。
  4. 建立 STDIO 通信
    使用 stdio_server() 函数建立基于 STDIO 的通信,并构造初始化选项,包含服务器名称、版本以及能力说明。随后,调用 MCP 内部的服务启动函数开始监听和处理来自 Client 的请求。
import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server import InitializationOptions, NotificationOptions
from mcp.server.stdio import stdio_server  # 直接导入 stdio_server 函数# 定义服务器名称
MCP_SERVER_NAME = "math-stdio-server"# 配置日志
logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)# 注册加法工具
@mcp.tool()
def add(a: float, b: float) -> float:"""加法工具参数:- a (float): 第一个数字(必填)- b (float): 第二个数字(必填)返回:- float: a 与 b 的和"""return a + b# 注册减法工具
@mcp.tool()
def subtract(a: float, b: float) -> float:"""减法工具参数:- a (float): 被减数(必填)- b (float): 减数(必填)返回:- float: a 与 b 的差"""return a - b# 注册乘法工具
@mcp.tool()
def multiply(a: float, b: float) -> float:"""乘法工具参数:- a (float): 第一个数字(必填)- b (float): 第二个数字(必填)返回:- float: a 与 b 的积"""return a * b# 注册除法工具
@mcp.tool()
def divide(a: float, b: float) -> float:"""除法工具参数:- a (float): 分子(必填)- b (float): 分母(必填,且不能为零)返回:- float: a 与 b 的商"""if b == 0:raise ValueError("除数不能为零")return a / basync def main():# 使用 stdio_server 建立 STDIO 通信async with stdio_server() as (read_stream, write_stream):# 构造初始化选项init_options = InitializationOptions(server_name=MCP_SERVER_NAME,server_version="1.0.0",capabilities=mcp._mcp_server.get_capabilities(notification_options=NotificationOptions(),experimental_capabilities={}))logger.info("通过 STDIO 模式启动 MCP Server ...")# 使用内部的 _mcp_server 运行服务await mcp._mcp_server.run(read_stream, write_stream, init_options)if __name__ == "__main__":asyncio.run(main())

编写 MCP Client

MCP Client 主要实现与多个基于 STDIO 的服务器建立连接,并通过 OpenAI API 对用户的自然语言查询进行处理,调用相应工具获得最终结果。客户端的主要逻辑可以分为以下部分:

  1. 初始化客户端
    在 MCPClient 类的构造函数中,传入所需的模型名称、OpenAI API 基础地址、API 密钥以及包含服务端脚本路径的列表。客户端将使用这些参数初始化 OpenAI 异步客户端,同时准备一个 AsyncExitStack 来管理所有异步上下文。
  2. 建立多个 STDIO 连接
    通过遍历服务器脚本列表,为每个脚本生成唯一标识符(如 server0server1 等),然后依次调用 stdio_client 函数建立连接,并通过 ClientSession 完成初始化。在连接成功后,从每个服务器获取可用工具列表,并将工具名称加上前缀(例如 server0_add)保存到映射表中,避免工具名称冲突。
  3. 处理用户查询
    process_query 方法中,客户端首先根据用户的输入构造消息,然后汇总所有连接服务器提供的工具,传递给 OpenAI API 进行处理。当 API 返回调用工具的请求时,客户端根据工具名称找到对应服务器会话,并执行相应的工具调用,收集返回结果后再交由 API 生成后续回复,直至所有工具调用处理完成。
  4. 交互式对话循环
    客户端提供一个简单的命令行交互循环,用户输入查询后,调用 process_query 方法获取最终回复,并打印在终端上。如果用户输入 quit 或使用 Ctrl+C 中断,则客户端将平滑退出并释放所有资源。
  5. 资源清理
    最后,在退出前,通过 AsyncExitStack 统一关闭所有连接,确保资源不会泄露。
import asyncio
import json
import os
import sys
from typing import List
from contextlib import AsyncExitStackfrom mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAIclass MCPClient:def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):"""初始化 MCP 客户端,支持多个 stdio 服务器。:param model_name: OpenAI 模型名称,例如 "deepseek-chat"。:param base_url: OpenAI API 基础地址,例如 "https://api.deepseek.com/v1"。:param api_key: OpenAI API 密钥。:param server_scripts: stdio 服务脚本路径列表。"""self.model_name = model_nameself.base_url = base_urlself.api_key = api_keyself.server_scripts = server_scriptsself.sessions = {}         # server_id -> (session, session_ctx, stdio_ctx)self.tool_mapping = {}     # 带前缀的工具名 -> (session, 原始工具名)self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)self.exit_stack = AsyncExitStack()async def initialize_sessions(self):"""初始化所有 stdio 服务器连接,并收集工具映射。"""for i, script in enumerate(self.server_scripts):if not (os.path.exists(script) and script.endswith(".py")):print(f"脚本 {script} 不存在或不是 .py 文件,跳过。")continueserver_id = f"server{i}"params = StdioServerParameters(command="python", args=[script], env=None)try:stdio_ctx = stdio_client(params)stdio = await self.exit_stack.enter_async_context(stdio_ctx)session_ctx = ClientSession(*stdio)session = await self.exit_stack.enter_async_context(session_ctx)await session.initialize()self.sessions[server_id] = (session, session_ctx, stdio_ctx)response = await session.list_tools()for tool in response.tools:self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)print(f"已连接到 {script},工具:{[tool.name for tool in response.tools]}")except Exception as e:print(f"连接 {script} 失败:{e}")async def cleanup(self):"""释放所有资源。"""try:await self.exit_stack.aclose()print("所有连接资源已释放")except asyncio.CancelledError:passexcept Exception as e:print(f"清理资源时异常:{e}")async def _gather_available_tools(self):"""汇总所有服务器的工具列表。"""tools = []for server_id, (session, _, _) in self.sessions.items():response = await session.list_tools()for tool in response.tools:tools.append({"type": "function","function": {"name": f"{server_id}_{tool.name}","description": tool.description,"parameters": tool.inputSchema,}})return toolsasync def process_query(self, query: str) -> str:"""处理查询,调用 OpenAI API 和相应工具后返回结果。"""messages = [{"role": "user", "content": query}]available_tools = await self._gather_available_tools()try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:return f"调用 OpenAI API 失败:{e}"final_text = [response.choices[0].message.content or ""]message = response.choices[0].message# 当有工具调用时循环处理while message.tool_calls:for call in message.tool_calls:tool_name = call.function.nameif tool_name not in self.tool_mapping:final_text.append(f"未找到工具:{tool_name}")continuesession, original_tool = self.tool_mapping[tool_name]tool_args = json.loads(call.function.arguments)try:result = await session.call_tool(original_tool, tool_args)final_text.append(f"[调用 {tool_name} 参数: {tool_args}]")final_text.append(f"工具结果: {result.content}")except Exception as e:final_text.append(f"调用 {tool_name} 出错:{e}")continuemessages += [{"role": "assistant", "tool_calls": [{"id": call.id,"type": "function","function": {"name": tool_name, "arguments": json.dumps(tool_args)}}]},{"role": "tool", "tool_call_id": call.id, "content": str(result.content)}]try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:final_text.append(f"调用 OpenAI API 失败:{e}")breakmessage = response.choices[0].messageif message.content:final_text.append(message.content)return "\n".join(final_text)async def chat_loop(self):"""交互式对话循环,捕获中断平滑退出。"""print("MCP 客户端已启动,输入问题,输入 'quit' 退出。")while True:try:query = input("问题: ").strip()if query.lower() == "quit":breakresult = await self.process_query(query)print("\n" + result)except KeyboardInterrupt:print("\n检测到中断信号,退出。")breakexcept Exception as e:print(f"发生错误:{e}")async def main():model_name = os.getenv("MODEL_NAME", "deepseek-chat")base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")api_key = os.getenv("API_KEY")if not api_key:print("未设置 API_KEY 环境变量")sys.exit(1)# 示例:使用两个 stdio 脚本server_scripts = ["server.py"]client = MCPClient(model_name, base_url, api_key, server_scripts)try:await client.initialize_sessions()await client.chat_loop()except KeyboardInterrupt:print("\n收到中断信号")finally:await client.cleanup()if __name__ == "__main__":try:asyncio.run(main())except KeyboardInterrupt:print("程序已终止。")

总结

通过本文的介绍,我们了解了如何使用 MCP 协议在 Python 中构建基于 STDIO 通信的 Client 与 Server。服务端通过注册多个工具为外部应用提供计算能力,而客户端则利用 OpenAI API 和工具调用的方式,将自然语言查询转化为对具体工具的调用,最终将结果反馈给用户。

这种基于 STDIO 的通信方式不仅简化了服务端与客户端之间的连接,还能方便地支持多服务器同时运行,为构建灵活高效的 LLM 应用提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/74693.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 安装 Elasticsearch 教程

目录 一、安装 Elasticsearch 二、安装 Kibana 三、安装 IK 分词器 四、Elasticsearch 常用配置 五、Elasticsearch 常用命令 一、安装 Elasticsearch (一)创建 Docker 网络 因为后续还需要部署 Kibana 容器,所以需要让 Elasticsearch…

Swagger @ApiOperation

ApiOperation 注解并非 Spring Boot 自带的注解,而是来自 Swagger 框架,Swagger 是一个规范且完整的框架,用于生成、描述、调用和可视化 RESTful 风格的 Web 服务,而 ApiOperation 主要用于为 API 接口的操作添加描述信息。以下为…

【奇点时刻】GPT4o新图像生成模型底层原理深度洞察报告(篇2)

由于上一篇解析深度不足,经过查看学习相关论文,以下是一份对 GPT-4o 最新的图像生成模型 的深度梳理与洞察,从模型原理到社区解读、对比传统扩散模型,再到对未来趋势的分析。为了便于阅读,整理成以下七个部分&#xff…

C# 窗体应用(.FET Framework ) 打开文件操作

一、 打开文件或文件夹加载数据 1. 定义一个列表用来接收路径 public List<string> paths new List<string>();2. 打开文件选择一个文件并将文件放入列表中 OpenFileDialog open new OpenFileDialog(); // 过滤 open.Filter "(*.jpg;*.jpge;*.bmp;*.png…

Scala 面向对象编程总结

​​​抽象属性和抽象方法 基本语法 定义抽象类&#xff1a;abstract class Person{} //通过 abstract 关键字标记抽象类定义抽象属性&#xff1a;val|var name:String //一个属性没有初始化&#xff0c;就是抽象属性定义抽象方法&#xff1a;def hello():String //只声明而没…

人工智能赋能工业制造:智能制造的未来之路

一、引言 随着人工智能技术的飞速发展&#xff0c;其应用场景不断拓展&#xff0c;从消费电子到医疗健康&#xff0c;从金融科技到交通运输&#xff0c;几乎涵盖了所有行业。而工业制造作为国民经济的支柱产业&#xff0c;也在人工智能的浪潮中迎来了深刻的变革。智能制造&…

元宇宙概念下,UI 设计如何打造沉浸式体验?

一、元宇宙时代UI设计的核心趋势 在元宇宙概念下&#xff0c;UI设计的核心目标是打造沉浸式体验&#xff0c;让用户在虚拟世界中感受到身临其境的交互效果。以下是元宇宙时代UI设计的几个核心趋势&#xff1a; 沉浸式体验设计 元宇宙的核心是提供沉浸式体验&#xff0c;UI设计…

AI 如何帮助我们提升自己,不被替代

在当今快速发展的时代&#xff0c;人工智能&#xff08;AI&#xff09;正逐渐渗透到生活的方方面面。许多人担心 AI 会取代人类的工作&#xff0c;然而&#xff0c;AI 更多的是作为一种强大的赋能工具&#xff0c;帮助我们提升自身能力&#xff0c;让我们在工作中更具竞争力。以…

基于SpringBoot+Vue实现的二手交易市场平台功能一

一、前言介绍&#xff1a; 1.1 项目摘要 随着社会的发展和人们生活水平的提高&#xff0c;消费者购买能力的提升导致产生了大量的闲置物品&#xff0c;这些闲置物品具有一定的经济价值。特别是在高校环境中&#xff0c;学生群体作为一个具有一定消费水平的群体&#xff0c;每…

k8s安装cri驱动创建storageclass动态类

部署nfs服务器 #所有k8s节点安装nfs客户端 yum install -y nfs-utils mkdir -p /nfs/share echo "/nfs/share *(rw,sync,no_root_squash)" >> /etc/exports systemctl enable --now nfs-serverhelm部署nfs的provisioner&sc 所有k8s节点安装客户端 yu…

SpringBoot + Netty + Vue + WebSocket实现在线聊天

最近想学学WebSocket做一个实时通讯的练手项目 主要用到的技术栈是WebSocket Netty Vue Pinia MySQL SpringBoot&#xff0c;实现一个持久化数据&#xff0c;单一群聊&#xff0c;支持多用户的聊天界面 下面是实现的过程 后端 SpringBoot启动的时候会占用一个端口&#xff…

大数据Spark(五十七):Spark运行架构与MapReduce区别

文章目录 Spark运行架构与MapReduce区别 一、Spark运行架构 二、Spark与MapReduce区别 Spark运行架构与MapReduce区别 一、Spark运行架构 Master:Spark集群中资源管理主节点&#xff0c;负责管理Worker节点。Worker:Spark集群中资源管理的从节点&#xff0c;负责任务的运行…

【爬虫】网页抓包工具--Fiddler

网页抓包工具对比&#xff1a;Fiddler与Sniff Master Fiddler基础知识 Fiddler是一款强大的抓包工具&#xff0c;它的工作原理是作为web代理服务器运行&#xff0c;默认代理地址是127.0.0.1&#xff0c;端口8888。代理服务器位于客户端和服务器之间&#xff0c;拦截所有HTTP/…

Redis:集群

为什么要有集群&#xff1f; Redis 集群&#xff08;Redis Cluster&#xff09;是 Redis 官方提供的分布式解决方案&#xff0c;用于解决单机 Redis 在数据容量、并发处理能力和高可用性上的局限。通过 Redis 集群&#xff0c;可以实现数据分片、故障转移和高可用性&#xff0…

【2012】【论文笔记】太赫兹波在非磁化等离子体——

前言 类型 太赫兹 + 等离子体 太赫兹 + 等离子体 太赫兹+等离子体 期刊 物理学报 物理学报 物理学报 作者

Linux字符驱动设备开发入门之框架搭建

声明 本博客所记录的关于正点原子i.MX6ULL开发板的学习笔记&#xff0c;&#xff08;内容参照正点原子I.MX6U嵌入式linux驱动开发指南&#xff0c;可在正点原子官方获取正点原子Linux开发板 — 正点原子资料下载中心 1.0.0 文档&#xff09;&#xff0c;旨在如实记录我在学校学…

小刚说C语言刷题——第15讲 多分支结构

1.多分支结构 所谓多分支结构是指在选择的时候有多种选择。根据条件满足哪个分支&#xff0c;就走对应分支的语句。 2.语法格式 if(条件1) 语句1; else if(条件2) 语句2; else if(条件3) 语句3; ....... else 语句n; 3.示例代码 从键盘输入三条边的长度&#xff0c;…

Apache httpclient okhttp(1)

学习链接 Apache httpclient & okhttp&#xff08;1&#xff09; Apache httpclient & okhttp&#xff08;2&#xff09; httpcomponents-client github apache httpclient文档 apache httpclient文档详细使用 log4j日志官方文档 【Java基础】- HttpURLConnection…

洛谷题单3-P1420 最长连号-python-流程图重构

题目描述 输入长度为 n n n 的一个正整数序列&#xff0c;要求输出序列中最长连号的长度。 连号指在序列中&#xff0c;从小到大的连续自然数。 输入格式 第一行&#xff0c;一个整数 n n n。 第二行&#xff0c; n n n 个整数 a i a_i ai​&#xff0c;之间用空格隔开…

使用binance-connector库获取Binance全市场的币种价格,然后选择一个币种进行下单

一个完整的示例,展示如何使用 api 获取Binance全市场的币种价格,然后选择一个最便宜的币种进行下单操作 代码经过修改,亲测可用,目前只可用于现货,合约的待开发 获取市场价格:使用client.ticker_price()获取所有交易对的当前价格 账户检查:获取账户余额,确保有足够的资…