开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

 一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回


二、术语

2.1.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.2.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:

2.3.LangChain支持流式输出的方法

  • stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。
  • astream:异步的流式传输,用于异步处理需求的情况。
  • astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。

2.4.langchainhub

    是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。

    它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。

2.5.asyncio

    是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search以及langchainhub的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub

3.2. 注册Google Search API账号

参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

3.3. 生成Google Search API的KEY


四、技术实现

4.1. 使用Tool&流式输出

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:{tools}Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).Valid "action" values: "Final Answer" or {tool_names}Provide only ONE action per $JSON_BLOB, as shown:```{{"action": $TOOL_NAME,"action_input": $INPUT}}```Follow this format:Question: input question to answerThought: consider previous and subsequent stepsAction:```$JSON_BLOB```Observation: action result... (repeat Thought/Action/Observation N times)Thought: I know what to respondAction:```{{"action": "Final Answer","action_input": "Final response to human"}}Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}{agent_scratchpad}(reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk =  data['chunk']content =  chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

说明:

流式输出的数据结构为:

{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}

4.2. 通过langchainhub使用公共prompt

   在4.1使用Tool&流式输出的代码基础上进行调整

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"from langchain import hubllm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]prompt = hub.pull("hwchase17/structured-chat-agent")print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk =  data['chunk']content =  chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

4.3. 整合代码

在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整

import uvicorn
import osfrom typing import Annotated
from fastapi import (Depends,FastAPI,WebSocket,WebSocketException,WebSocketDisconnect,status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapperfrom langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"class ConnectionManager:def __init__(self):self.active_connections: list[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def send_personal_message(self, message: str, websocket: WebSocket):await websocket.send_text(message)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()app = FastAPI()async def authenticate(websocket: WebSocket,userid: str,secret: str,
):if userid is None or secret is None:raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)print(f'userid: {userid},secret: {secret}')if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:return 'pass'else:return 'fail'@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resultdef get_prompt():prompt = hub.pull("hwchase17/structured-chat-agent")return promptasync def chat(query):global llm,toolsagent = create_structured_chat_agent(llm, tools, get_prompt())agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)events = agent_executor.astream_events({"input": query}, version="v1")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)yield content@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):await manager.connect(websocket)try:while True:text = await websocket.receive_text()if 'fail' == permission:await manager.send_personal_message(f"authentication failed", websocket)else:if text is not None and len(text) > 0:async for msg in chat(text):await manager.send_personal_message(msg, websocket)except WebSocketDisconnect:manager.disconnect(websocket)print(f"Client #{userid} left the chat")await manager.broadcast(f"Client #{userid} left the chat")if __name__ == '__main__':tools = [search]llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!DOCTYPE html>
<html><head><title>Chat</title></head><body><h1>WebSocket Chat</h1><form action="" onsubmit="sendMessage(event)"><label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label><label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label><br/><button onclick="connect(event)">Connect</button><hr><label>Message: <input type="text" id="messageText" autocomplete="off"/></label><button>Send</button></form><ul id='messages'></ul><script>var ws = null;function connect(event) {var userid = document.getElementById("userid")var secret = document.getElementById("secret")ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);ws.onmessage = function(event) {var messages = document.getElementById('messages')var message = document.createElement('li')var content = document.createTextNode(event.data)message.appendChild(content)messages.appendChild(message)};event.preventDefault()}function sendMessage(event) {var input = document.getElementById("messageText")ws.send(input.value)input.value = ''event.preventDefault()}</script></body>
</html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:

```
Action:
```
{"action": "Final Answer","action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```

PS:

1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/865134.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

美术馆预约小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;展品信息管理&#xff0c;管理员管理&#xff0c;用户管理&#xff0c;美术馆管理&#xff0c;基础数据管理&#xff0c;论坛管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;美术馆&#xff…

[C/C++] -- gdb调试与coredump

1.gdb调试 GDB&#xff08;GNU 调试器&#xff09;是一个强大的工具&#xff0c;用于调试程序。 安装 1. wget http://ftp.gnu.org/gnu/gdb/gdb-8.1.tar.gz 2. tar -zxvf gdb-8.1.1.tar.gz 3. cd gdb-8.1.1 4. ./configure 5. make 6. make install 基础用法 …

vue3.0(十六)axios详解以及完整封装方法

文章目录 axios简介1. promise2. axios特性3. 安装4. 请求方法5. 请求方法别名6. 浏览器支持情况7. 并发请求 Axios的config的配置信息1.浏览器控制台相关的请求信息&#xff1a;2.配置方法3.默认配置4.配置的优先级5.axios请求响应结果 Axios的拦截器1.请求拦截2.响应拦截3.移…

回流焊常见缺陷

不润湿(Nonwetting)/润湿不良(Poor Wetting) 通常润湿不良是指焊点焊锡合金没有很好的铺展开来,从而无法得到良好的焊点并直接影响到焊点的可靠性。 产生原因: 1. 焊盘或引脚表面的镀层被氧化,氧化层的存在阻挡了焊锡与镀层之间的接触; 2. 镀层厚度不够或是加工不良,很…

常见测试测量接口的比较:PXI、PXIe、PCI、VXI、GPIB、USB

详细比较了六种常见的测试测量接口&#xff1a;PXI、PXIe、PCI、VXI、GPIB、USB。每种接口都有其独特的特点和应用场景。通过比较它们的性能、带宽、模块化程度和应用领域&#xff0c;帮助工程师选择最适合其测试需求的接口类型。 1. PXI&#xff08;PCI eXtensions for Instru…

uboot 编译时传递参数实现条件编译

KCFLAGS make ARCHarm KCFLAGS-DENV_DEBUG CROSS_COMPILEaarch64-linux-gnu-env/sf.c env_sf_save 加入调试信息 # saveenv Saving Environment to SPI Flash... env_sf_save (1) spi_flash_erase (2) spi_flash_write is40000 Erasing SPI flash...Writing to SPI flash.…

试用笔记之-Delphi xe 微信/支付宝支付源代码

首先delphi xe 微信/支付宝支付源代码下载&#xff1a; http://www.htsoft.com.cn/download/DelphiXEWeiXin_ZhiFuBao_ZhiFu.rar 解压后可以看到源代码 直接执行可执行文件&#xff1a;

大模型技术在辅助学习中的应用

大模型技术在辅助学习中的应用场景非常广泛&#xff0c;以下是一些典型示例。大模型技术在辅助学习中具有广阔的应用前景&#xff0c;可以为学生提供更加个性化、智能化和高效的学习体验。随着大模型技术的不断发展&#xff0c;我们可以期待在未来看到更多创新应用。北京木奇移…

【Python】已解决:ERROR: No matching distribution found for JPype1

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决&#xff1a;ERROR: No matching distribution found for JPype1 一、分析问题背景 在安装Python的第三方库时&#xff0c;有时会遇到“ERROR: No matching distribution fo…

前端技术(说明篇)

Introduction ##编写内容&#xff1a;1.前端概念梳理 2.前端技术种类 3.前端学习方式 ##编写人&#xff1a;贾雯爽 ##最后更新时间&#xff1a;2024/07/01 Overview 最近在广州粤嵌进行实习&#xff0c;项目名称是”基于Node实现多人聊天室“&#xff0c;主要内容是对前端界…

springboot图书馆座位预约系统-计算机毕业设计源码85670

目 录 摘要 1 绪论 1.1 选题背景与意义 1.2开发现状 1.3论文结构与章节安排 2 开发环境及相关技术介绍 2.1 MySQL数据库 2.2 Tomcat服务器 2.3 Java语言 2.4 SpringBoot框架介绍 3 图书馆座位预约系统系统分析 3.1 可行性分析 3.1.1 技术可行性分析 3.1.2 经济可…

【Qt】初识QtQt Creator

一.简述Qt 1.什么是Qt Qt 是⼀个 跨平台的 C 图形⽤⼾界⾯应⽤程序框架 。它为应⽤程序开发者提供了建⽴艺术级图形界⾯所需的所有功能。它是完全⾯向对象的&#xff0c;很容易扩展。Qt 为开发者提供了⼀种基于组件的开发模式&#xff0c;开发者可以通过简单的拖拽和组合来实现…

基于ESP32 IDF的WebServer实现以及OTA固件升级实现记录(三)

经过前面两篇的前序铺垫&#xff0c;对webserver以及restful api架构有了大体了解后本篇描述下最终的ota实现的代码以及调试中遇到的诡异bug。 eps32的实际ota实现过程其实esp32官方都已经基本实现好了&#xff0c;我们要做到无非就是把要升级的固件搬运到对应ota flash分区里面…

2.3 主程序和外部IO交互 (文件映射方式)----IO Server实现

2.3 主程序和外部IO交互 &#xff08;文件映射方式&#xff09;----IO Server C实现 效果显示 1 内存共享概念 基本原理&#xff1a;以页面为单位&#xff0c;将一个普通文件映射到内存中&#xff0c;达到共享内存和节约内存的目的&#xff0c;通常在需要对文件进行频繁读写时…

手写一个类似@RequestParam的注解(用来接收请求体的参数)

一、本文解决的痛点 按照大众认为的开发规范&#xff0c;一般post类型的请求参数应该传在请求body里面。但是我们有些post接口只需要传入一个字段&#xff0c;我们接受这种参数就得像下面这样单独创建一个类&#xff0c;类中再添加要传入的基本类型字段&#xff0c;配合Reques…

LLM指令微调Prompt的最佳实践(二):Prompt迭代优化

文章目录 1. 前言2. Prompt定义3. 迭代优化——以产品说明书举例3.1 产品说明书3.2 初始Prompt3.3 优化1: 添加长度限制3.4 优化2: 细节纠错3.5 优化3: 添加表格 4. 总结5. 参考 1. 前言 前情提要&#xff1a; 《LLM指令微调Prompt的最佳实践&#xff08;一&#xff09;&#…

nexus未开启匿名访问Anonymous Access,访问maven元数据maven-metadata,报401未授权Unauthorized错误

一、背景 下午在调试nexus的时候&#xff0c;其他同事不小心把匿名访问停用了&#xff0c;导致客户端android打包的时候&#xff0c;报错&#xff1a; Received status code 401 from server: Unauthorized。 访问http://192.168.xx.xx:8081/repository/public/com/xxx/xxxcor…

【软件测试】单元测试、系统测试、集成测试详解

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、单元测试的概念 单元测试是对软件基本组成单元进行的测试&#xff0c;如函数或一个类的方法…