17|回调函数:在AI应用中引入异步通信机制

17|回调函数:在AI应用中引入异步通信机制

回调函数和异步编程

回调函数,你可能并不陌生。它是函数 A 作为参数传给另一个函数 B,然后在函数 B 内部执行函数 A。当函数 B 完成某些操作后,会调用(即“回调”)函数 A。这种编程模式常见于处理异步操作,如事件监听、定时任务或网络请求。

在编程中,异步通常是指代码不必等待某个操作完成(如 I/O 操作、网络请求、数据库查询等)就可以继续执行的能力。异步机制的实现涉及事件循环、任务队列和其他复杂的底层机制。这与同步编程形成对比,在同步编程中,操作必须按照它们出现的顺序完成。

下面是回调函数的一个简单示例。

def compute(x, y, callback):result = x + ycallback(result)def print_result(value):print(f"The result is: {value}")def square_result(value):print(f"The squared result is: {value**2}")# 使用print_result作为回调
compute(3, 4, print_result)  # 输出: The result is: 7# 使用square_result作为回调
compute(3, 4, square_result)  # 输出: The squared result is: 49

不过,上面这个程序中并没有体现出异步操作。虽然回调函数这种编程模式常见于处理异步操作,但回调函数本身并不代表异步。回调只是一种编程模式,允许你在某个操作完成时(无论是否异步)执行某些代码。

而下面的例子,就是在异步操作时使用回调函数的示例。

import asyncioasync def compute(x, y, callback):print("Starting compute...")await asyncio.sleep(0.5)  # 模拟异步操作result = x + y# callback(result)print("Finished compute...")def print_result(value):print(f"The result is: {value}")async def another_task():print("Starting another task...")await asyncio.sleep(1)print("Finished another task...")async def main():print("Main starts...")task1 = asyncio.create_task(compute(3, 4, print_result))task2 = asyncio.create_task(another_task())await task1await task2print("Main ends...")asyncio.run(main())

这个示例中,当我们调用 asyncio.create_task(compute(3, 4, print_result)),compute 函数开始执行。当它遇到 await asyncio.sleep(2) 时,它会暂停,并将控制权交还给事件循环。这时,事件循环可以选择开始执行 another_task,这是另一个异步任务。这样,你可以清晰地看到,尽管 compute 函数还没有完成,another_task 函数也得以开始执行并完成。这就是异步编程,允许你同时执行多个操作,而不需要等待一个完成后再开始另一个。

LangChain 中的 Callback 处理器

LangChain 的 Callback 机制允许你在应用程序的不同阶段进行自定义操作,如日志记录、监控和数据流处理,这个机制通过 CallbackHandler(回调处理器)来实现。

回调处理器是 LangChain 中实现 CallbackHandler 接口的对象,为每类可监控的事件提供一个方法。当该事件被触发时,CallbackManager 会在这些处理器上调用适当的方法。

BaseCallbackHandler 是最基本的回调处理器,你可以继承它来创建自己的回调处理器。它包含了多种方法,如 on_llm_start/on_chat(当 LLM 开始运行时调用)和 on_llm_error(当 LLM 出现错误时调用)等。

img

LangChain 也提供了一些内置的处理器,例如 StdOutCallbackHandler,它会将所有事件记录到标准输出。还有 FileCallbackHandler,会将所有的日志记录到一个指定的文件中。

在组件中使用回调处理器

在 LangChain 的各个组件,如 Chains、Models、Tools、Agents 等,都提供了两种类型的回调设置方法:构造函数回调和请求回调。你可以在初始化 LangChain 时将回调处理器传入,或者在单独的请求中使用回调。例如,当你想要在整个链的所有请求中进行日志记录时,可以在初始化时传入处理器;而当你只想在某个特定请求中使用回调时,可以在请求时传入。

这两者的区别,我给你整理了一下。

img

下面这段示例代码,使用 LangChain 执行了一个简单的任务,结合使用 LangChain 的回调机制与 loguru 日志库,将相关事件同时输出到标准输出和 "output.log" 文件中。

from loguru import loggerfrom langchain.callbacks import FileCallbackHandler
from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplatelogfile = "output.log"logger.add(logfile, colorize=True, enqueue=True)
handler = FileCallbackHandler(logfile)llm = OpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")# this chain will both print to stdout (because verbose=True) and write to 'output.log'
# if verbose=False, the FileCallbackHandler will still write to 'output.log'
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler], verbose=True)
answer = chain.run(number=2)
logger.info(answer)

其中,初始化 LLMChain 时指定的 verbose 参数,就等同于将一个输出到控制台的回调处理器添加到你的对象中。这个在你调试程序时非常有用,因为它会将所有事件的信息输出到控制台。

简而言之,LangChain 通过回调系统提供了一种灵活的方式,来监控和操作应用程序的不同阶段。

自定义回调函数

我们也可以通过 BaseCallbackHandler 和 AsyncCallbackHandler 来自定义回调函数。下面是一个示例。

import asyncio
from typing import Any, Dict, Listfrom langchain.chat_models import ChatOpenAI
from langchain.schema import LLMResult, HumanMessage
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler# 创建同步回调处理器
class MyFlowerShopSyncHandler(BaseCallbackHandler):def on_llm_new_token(self, token: str, **kwargs) -> None:print(f"获取花卉数据: token: {token}")# 创建异步回调处理器
class MyFlowerShopAsyncHandler(AsyncCallbackHandler):async def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None:print("正在获取花卉数据...")await asyncio.sleep(0.5)  # 模拟异步操作print("花卉数据获取完毕。提供建议...")async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:print("整理花卉建议...")await asyncio.sleep(0.5)  # 模拟异步操作print("祝你今天愉快!")# 主要的异步函数
async def main():flower_shop_chat = ChatOpenAI(max_tokens=100,streaming=True,callbacks=[MyFlowerShopSyncHandler(), MyFlowerShopAsyncHandler()],)# 异步生成聊天回复await flower_shop_chat.agenerate([[HumanMessage(content="哪种花卉最适合生日?只简单说3种,不超过50字")]])# 运行主异步函数
asyncio.run(main())

在这个鲜花店客服的程序中,当客户问及关于鲜花的建议时,我们使用了一个同步和一个异步回调。

MyFlowerShopSyncHandler 是一个同步回调,每当新的 Token 生成时,它就简单地打印出正在获取的鲜花数据。

而 MyFlowerShopAsyncHandler 则是异步的,当客服开始提供鲜花建议时,它会模拟数据的异步获取。在建议完成后,它还会模拟一个结束的操作,如向客户发出感谢。

这种结合了同步和异步操作的方法,使得程序能够更有效率地处理客户请求,同时提供实时反馈。

这里的异步体现在这样几个方面。

  1. 模拟延时操作:在 MyFlowerShopAsyncHandler 中,我们使用了 await asyncio.sleep(0.5) 来模拟其他请求异步获取花卉信息的过程。当执行到这个 await 语句时,当前的 on_llm_start 函数会“暂停”,释放控制权回到事件循环。这意味着,在这个 sleep 期间,其他异步任务(如其他客户的请求)可以被处理。
  2. 回调机制:当 ChatOpenAI 在处理每个新 Token 时,它会调用 on_llm_new_token 方法。因为这是一个同步回调,所以它会立即输出。但是,开始和结束的异步回调 on_llm_start 和 on_llm_end 在开始和结束时都有一个小的延时操作,这是通过 await asyncio.sleep(0.5) 模拟的。
  3. 事件循环:Python 的 syncio 库提供了一个事件循环,允许多个异步任务并发运行。在我们的例子中,虽然看起来所有的操作都是按顺序发生的,但由于我们使用了异步操作和回调,如果有其他并发任务,它们可以在 await 暂停期间运行。

为了更清晰地展示异步的优势,通常我们会在程序中同时运行多个异步任务,并观察它们如何“并发”执行。但在这个简单的例子中,我们主要是通过模拟延时来展示异步操作的基本机制。

因此说,回调函数为异步操作提供了一个机制,使你可以定义“当操作完成时要做什么”,而异步机制的真正实现涉及更深层次的底层工作,如事件循环和任务调度。

用 get_openai_callback 构造令牌计数器

下面,我带着你使用 LangChain 中的回调函数来构造一个令牌计数器。这个计数功能对于监控大模型的会话消耗以及成本控制十分重要。

在构造令牌计数器之前,我们来回忆一下第 10 课中的记忆机制。我们用下面的代码生成了 ConversationBufferMemory。

from langchain import OpenAI
from langchain.chains import ConversationChain
from langchain.chains.conversation.memory import ConversationBufferMemory# 初始化大语言模型
llm = OpenAI(temperature=0.5,model_name="text-davinci-003")# 初始化对话链
conversation = ConversationChain(llm=llm,memory=ConversationBufferMemory()
)# 第一天的对话
# 回合1
conversation("我姐姐明天要过生日,我需要一束生日花束。")
print("第一次对话后的记忆:", conversation.memory.buffer)# 回合2
conversation("她喜欢粉色玫瑰,颜色是粉色的。")
print("第二次对话后的记忆:", conversation.memory.buffer)# 回合3 (第二天的对话)
conversation("我又来了,还记得我昨天为什么要来买花吗?")
print("/n第三次对话后时提示:/n",conversation.prompt.template)
print("/n第三次对话后的记忆:/n", conversation.memory.buffer)

同时,我们也给出了各种记忆机制对 Token 的消耗数量的估算示意图。

img

当对话轮次逐渐增加时,各种记忆机制对 Token 的消耗数量估算

不过,这张图毕竟是估算,要真正地衡量出每种记忆机制到底耗费了多少个 Token,那就需要回调函数上场了。

下面,我们通过回调函数机制,重构这段程序。为了做到这一点,我们首先需要确保在与大语言模型进行交互时,使用了 get_openai_callback 上下文管理器。

在 Python 中,一个上下文管理器通常用于管理资源,如文件或网络连接,这些资源在使用前需要设置,在使用后需要清理。上下文管理器经常与 with 语句一起使用,以确保资源正确地设置和清理。

get_openai_callback 被设计用来监控与 OpenAI 交互的 Token 数量。当你进入该上下文时,它会通过监听器跟踪 Token 的使用。当你退出上下文时,它会清理监听器并提供一个 Token 的总数。通过这种方式,它充当了一个回调机制,允许你在特定事件发生时执行特定的操作或收集特定的信息。

具体代码如下:

from langchain import OpenAI
from langchain.chains import ConversationChain
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain.callbacks import get_openai_callback# 初始化大语言模型
llm = OpenAI(temperature=0.5, model_name="text-davinci-003")# 初始化对话链
conversation = ConversationChain(llm=llm,memory=ConversationBufferMemory()
)# 使用context manager进行token counting
with get_openai_callback() as cb:# 第一天的对话# 回合1conversation("我姐姐明天要过生日,我需要一束生日花束。")print("第一次对话后的记忆:", conversation.memory.buffer)# 回合2conversation("她喜欢粉色玫瑰,颜色是粉色的。")print("第二次对话后的记忆:", conversation.memory.buffer)# 回合3 (第二天的对话)conversation("我又来了,还记得我昨天为什么要来买花吗?")print("/n第三次对话后时提示:/n",conversation.prompt.template)print("/n第三次对话后的记忆:/n", conversation.memory.buffer)# 输出使用的tokens
print("\n总计使用的tokens:", cb.total_tokens)

这里,我使用了 get_openai_callback 上下文管理器来监控与 ConversationChain 的交互。这允许我们计算在这些交互中使用的总 Tokens 数。

输出:

总计使用的tokens: 966

下面,我再添加了一个 additional_interactions 异步函数,用于演示如何在多个并发交互中计算 Tokens。

当我们讨论异步交互时,指的是我们可以启动多个任务,它们可以并发(而不是并行)地运行,并且不会阻塞主线程。在 Python 中,这是通过 asyncio 库实现的,它使用事件循环来管理并发的异步任务。

import asyncio
# 进行更多的异步交互和token计数
async def additional_interactions():with get_openai_callback() as cb:await asyncio.gather(*[llm.agenerate(["我姐姐喜欢什么颜色的花?"]) for _ in range(3)])print("\n另外的交互中使用的tokens:", cb.total_tokens)# 运行异步函数
asyncio.run(additional_interactions())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/595413.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何对APP进行安全加固

引言 如今,移动应用市场蓬勃发展,APP数量呈现爆炸性增长。随着5G技术的广泛应用,APP的增长趋势持续增强。然而,由于APP的泛滥,网络攻击者的目标也在逐渐转移,数亿的移动互联网用户面临着病毒攻击的威胁&am…

计算机毕业设计——SpringBoot 房屋销售租赁平台 房屋购物网站(附源码)

1,绪论 1.1 背景调研 在房地产行业持续火热的当今环境下,房地产行业和互联网行业协同发展,互相促进融合已经成为一种趋势和潮流。本项目实现了在线房产平台的功能,多种技术的灵活运用使得项目具备很好的用户体验感。 这个项目的…

杨中科 ASP.NET MVC

ASP.NET Core 入门 什么是ASP.NET CORE 1、ASP.NET Core是.NET中做Web开发的框架 2、ASP.NET Core MVC 传统MVC项目,前后端都做在一起 3、ASP.NET Core Web API: 前后端分离、多端开发。(是属于MVC中的一部分) 4、ASPNET Core MVC其实包含Web API,不过…

Python入学测试题 江苏某线下培训机构出题

入学测试题 温馨提示:为检测自身的情况,请认真作答,不要进行上网搜答案 1. 实现冒泡排序 for i in range(len(nums)):for j in range(0, len(nums) - i - 1):if nums[j] > nums[j 1]:nums[j], nums[j 1] nums[j 1], nums[j] 2. 二维…

未来十年不变的AI是什么?吴恩达等专家关于2024年AI发展趋势的预测

随着2024年的到来,人工智能领域正迎来前所未有的变革和发展。从深度学习到自然语言处理,AI技术的每一个分支都在经历着快速的进步。在这个关键的时刻,业界专家们提出了对未来趋势的深刻洞察,预测了将形成AI发展主流的关键方向。智…

雍禾医疗以患者为中心 雍禾植发医生文志清专注做精每一台手术

随着生活方式的改变,晚睡、长期使用电子产品等原因让脱发成为消费者不可忽视的问题。有相关数据显示,76%消费者关注自己的头皮健康状况,53%的人群受到脱发困扰,已经有20%消费者认识到专业毛发医疗机构的重要性,选择到专…

V8R6小版本升级步骤(单机环境)

在KingbaseES V8R6版本提供了sys_upgrade的升级工具。 sys_upgade介绍 sys_upgrade实现KingbaseES服务器实例版本升级。 sys_upgrade 允许将存储在KingbaseES数据文件中的数据升级到一个更高的KingbaseES主版本,而无需进行主版本升级(例如从 V8R6C4 到 V8R6C5)通常…

STL——vector详解

目录 💡基本概念 💡存放内置数据类型 💡存放自定义数据类型 💡存放自定义数据类型指针 💡vector容器嵌套容器 💡vector构造函数 💡vector赋值操作 💡vector容量和大小 &…

大数据技术原理与应用期末考试题

大数据技术原理与应用期末考试题 一、单选题 1.下面哪个选项属于大数据技术的“数据存储和管理”技术层面的功能? A、利用分布式文件系统、数据仓库、关系数据库等实现对结构化、半结构化和非结构化海量数据的存储和管理 B、利用分布式并行编程模型和计算框架,结合机器学习…

YOLOv8改进 | 主干篇 | ConvNeXtV2全卷积掩码自编码器网络

一、本文介绍 本文给大家带来的改进机制是ConvNeXtV2网络,ConvNeXt V2是一种新型的卷积神经网络架构,它融合了自监督学习技术和架构改进,特别是加入了全卷积掩码自编码器框架和全局响应归一化(GRN)层。我将其替换YOLOv8的特征提取网络,用于提取更有用的特征。经过我的实…

西电期末1018.logistic方程

一.题目 二.分析与思路 根据题目递归即可&#xff0c;用while函数判断是否到达1000项&#xff0c;内部用abs函数&#xff08;绝对值函数&#xff09;判断是否收敛&#xff0c;最后按照结果输出即可。 三.代码实现 #include<bits/stdc.h>//万能头 int main() {double …

【SI PI 学习路线图】

一、【题干】 SI/PI基本理论 SI/PI经典书籍 软件操作 Hyperlynx Sigrity SIWAVE HFSS Q3D ADS 相关软件Help文档 二、行动路线图 01 1.信号完整性基本概念介绍 2.时域和频域的关系 3.S参数的介绍 4.仿真实例&#xff1a;S参数仿真 平板项目低速信号仿真 02 1.TDR基本概念…

Flink 任务指标监控

目录 状态监控指标 JobManager 指标 TaskManager 指标 Job 指标 资源监控指标 数据流监控指标 任务监控指标 网络监控指标 容错监控指标 数据源监控指标 数据存储监控指标 JobManager 指标 TaskManager 指标 Job 指标 当使用 Apache Flink 进行流处理任务时&…

NLP基础——中文分词

简介 分词是自然语言处理&#xff08;NLP&#xff09;中的一个基本任务&#xff0c;它涉及将连续的文本序列切分成多个有意义的单元&#xff0c;这些单元通常被称为“词”或“tokens”。在英语等使用空格作为自然分隔符的语言中&#xff0c;分词相对简单&#xff0c;因为大部分…

React实现简单登录

一 实现效果(样式是之前设置的&#xff09; 二 具体实现代码 2.1 Login.js import {useNavigate} from "react-router-dom"; import React from "react"; // import ./style2.cssfunction Login(){const navigateuseNavigate()func…

nginx在国产服务器上stream配置项无法识别的问题

最近在搭建k8sranchar&#xff0c;需要用到nginx做负载均衡&#xff0c;之前在系统中也会用到&#xff0c;之前一直使用http选项&#xff0c;做转发配置。 基本格式如下图所示&#xff1a; 但是在ranchar的安装中默认方式使用stream配置项。 使用yum默认安装的nginx不支持该关…

Yapi安装配置(CentOs)

环境要求 nodejs&#xff08;7.6) mongodb&#xff08;2.6&#xff09; git 准备工作 清除yum命令缓存 sudo yum clean all卸载低版本nodejs yum remove nodejs npm -y安装nodejs,获取资源,安装高版本nodejs curl -sL https://rpm.nodesource.com/setup_8.x | bash - #安装 s…

Spring Cloud Config相关面试题及答案(2024)

1、什么是 Spring Cloud Config&#xff0c;它解决了哪些问题&#xff1f; Spring Cloud Config 是一个为微服务架构提供集中化外部配置支持的项目。它是构建在 Spring Cloud 生态系统之上&#xff0c;利用 Spring Boot 的开发便利性&#xff0c;简化了分布式系统中的配置管理…

现在的人们如何看待数据隐私?

PrimiHub一款由密码学专家团队打造的开源隐私计算平台&#xff0c;专注于分享数据安全、密码学、联邦学习、同态加密等隐私计算领域的技术和内容。 在当前时代&#xff0c;每一次点击、触摸或按键都留下了数字痕迹。但是我们对自己的个人数据几乎没有控制的权限&#xff0c;这让…

百度自由DIY小程序源码:PHP+MySQL组合开发 带完整的搭建教程

随着移动互联网的快速发展&#xff0c;小程序已成为企业与用户互动的重要平台。然而&#xff0c;对于许多中小企业和开发者来说&#xff0c;从零开始开发一款小程序需要投入大量的时间和资源。 以下是部分代码示例&#xff1a; 系统特色功能一览&#xff1a; 1.高度自定义&…