使用langchain异步获取网络信息

文章目录

  • 前言
  • 发现问题
  • 分析问题
  • 阻塞?
  • 阻塞!
  • 附:用什么阻塞streamlit进程

前言

从来没想过在接入通义千问的时候还会遇到NotImplementedError。实在难以理解,处理过后才明白问题。现在总结后给出结果。

发现问题

我们来看个例子。就比如这段代码:

摘自ranying666/langchain_tongyi_practical中的5-langchain-search-document_loaders.py

loader = AsyncChromiumLoader(["https://dataea.cn/okr-keyresult-checklist/"])
html = loader.load()
html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(html)

如果把这段直接接入streamlit,那就直接报错:

NotImplementedError

比较离谱的是,只有这么一个错误,没有其他信息。

分析问题

很难理解为什么是这个错。

随着排查的进行,发现问题好像出在AsyncChromiumLoader中。

AsyncChromiumLoader继承自BaseLoader,而BaseLoaderload方法中,调用的却是lazy_load。先不用看这个方法的具体内容,就看这个方法的名字,你大概也能猜出来什么问题了:

懒加载带来的未能实例化的问题。

简单地说,就是:streamlit已在前面飞,loader.load()还在后面追。终于,追不上了,就爆炸了。而刚好的是,lazy_load中抛出的异常就是这个NotImplementedError

众所周知,streamlit构建网页的过程是单线程的。

所以,当我们需要请求内容的时候,使用异步请求的AsyncChromiumLoader就会出现这种问题。

那么,该怎么办呢?

阻塞?

怎么办呢?阻塞,对吧?很容易想到。

于是会想当然的这么用:

loader = AsyncChromiumLoader(["https://dataea.cn/okr-keyresult-checklist/"])
html = loader.load()
while html is None:time.sleep(1)
html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(html)

看着很直观,检测html是否有返回值。

如果真这么简单的话我也不会把它写在这里(🤷‍♂️)。

结果就是,还是报错。这又是为什么呢?逻辑没问题呀?

逻辑是没问题,问题出在底层。作为一个异步函数,他怎么可能没有返回值呢?

我们来回顾一下load方法:

# Sub-classes should not implement this method directly. Instead, they should implement the lazy load method.
def load(self) -> List[Document]:"""Load data into Document objects."""return list(self.lazy_load())

那么,lazy_load是怎么回事呢?

def lazy_load(self) -> Iterator[Document]:"""A lazy loader for Documents."""if type(self).load != BaseLoader.load:return iter(self.load())raise NotImplementedError(f"{self.__class__.__name__} does not implement lazy_load()")

这里比较有意思的是,对实例化的AsyncChromiumLoader对象(就是这个self),判断AsyncChromiumLoader.loadBaseLoader.load是否一致。

其实这里比较的是地址信息,因为子类如果重写了这个load方法,那么地址就会被改变。如果不一致的话,就会返回一个迭代器,这个迭代器就是为了后续过程中无论返回的是否是list,都能够迭代。

听起来没问题。

但是,懒加载呢?

async def aload(self) -> List[Document]:"""Load data into Document objects."""return [document async for document in self.alazy_load()]async def alazy_load(self) -> AsyncIterator[Document]:"""A lazy loader for Documents."""iterator = await run_in_executor(None, self.lazy_load)done = object()while True:doc = await run_in_executor(None, next, iterator, done)  # type: ignore[call-arg, arg-type]if doc is done:breakyield doc  # type: ignore[misc]

比较神奇的就出现在这里了。alazy_load方法最终给出来的就是一个Document类的迭代器,然后最终通过yield给到调用方,直到doc在迭代过程中达到了done

但是呢,doc变量的结果是await run_in_executor(None, next, iterator, done),即使run_in_executor返回的是一个迭代器对象,最终由await进行处理,所以是有返回值的,但是返回的是未来需要返回的,是asyncio.futures.Future类。这一点完全可以类比Java中的Future对象。

所以,最终而言,AsyncChromiumLoader.load并不是直到结束才返回值,而是在执行的过程中不断地通过yield给出返回值,只是在await最终处理为AsyncIterator[Document]类型。

阻塞!

为了让streamlit等待异步请求,就需要主线程停下来,直到请求结束了才能继续执行。

那这回该怎么办呢?直接用asyncioplaywright给阻塞掉。

首先,我们需要利用asyncio创建一个阻塞事件,并设置所有的事件都需要在阻塞事件结束后执行。

其次,在执行这个阻塞之间的时候,我们依然使用异步请求,只不过是所有的事件都在等我们。

于是,可以给出代码如下:

import asyncio
import platform
from playwright.async_api import async_playwright
# 阻塞事件
async def fetch_page_content(url):async with async_playwright() as p:browser = await p.chromium.launch()page = await browser.new_page()await page.goto(url)content = await page.content()await browser.close()return content
# 阻塞主进程
@st.cache_data
def load_documents(url):loop = Noneif platform.system() == 'Windows':loop = asyncio.ProactorEventLoop()elif platform.system() == 'Linux':loop = asyncio.new_event_loop()elif platform.system() == 'Darwin':loop = asyncio.SelectorEventLoop()else:return Nonehtml_content = loop.run_until_complete(fetch_page_content(url))html2text = Html2TextTransformer()document = Document(page_content=html_content)docs_transformed = list(html2text.transform_documents([document]))return docs_transformed

其实这里面最核心的就是asyncio下的run_until_complete函数:

def run_until_complete(self, future):"""Run until the Future is done.If the argument is a coroutine, it is wrapped in a Task.WARNING: It would be disastrous to call run_until_complete()with the same coroutine twice -- it would wrap it in twodifferent Tasks and that can't be good.Return the Future's result, or raise its exception."""self._check_closed()self._check_running()new_task = not futures.isfuture(future)future = tasks.ensure_future(future, loop=self)if new_task:# An exception is raised if the future didn't complete, so there# is no need to log the "destroy pending task" messagefuture._log_destroy_pending = Falsefuture.add_done_callback(_run_until_complete_cb)try:self.run_forever()except:if new_task and future.done() and not future.cancelled():# The coroutine raised a BaseException. Consume the exception# to not log a warning, the caller doesn't have access to the# local task.future.exception()raisefinally:future.remove_done_callback(_run_until_complete_cb)if not future.done():raise RuntimeError('Event loop stopped before Future completed.')return future.result()

这个函数最大的特点就是会创建一个任务并执行。直到任务执行完成或者报错中断之前,其他所有任务都得等着这个任务的回调函数。

于是,这个函数就阻塞了streamlit的进程,直到异步任务完成。

附:用什么阻塞streamlit进程

其实这段文字本来应该接在上面这段的。但是这个坑实在太神奇了,单独拉出来说明。

这里面还有一个很神奇的坑:用什么东西阻塞。

就像上面这段代码,针对WindowsLinuxDarwin,分别采用了ProactorEventLoopnew_event_loopSelectorEventLoop阻塞streamlit进程。

如果在Linux平台中使用ProactorEventLoop,那么streamlit进程依然不会阻塞,因为他们都只能在各自的操作系统中起作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/852430.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习500问——Chapter11:迁移学习(1)

文章目录 11.1 迁移学习基础知识 11.1.1 什么是迁移学习 11.1.2 为什么需要迁移学习 11.1.3 迁移学习的基本问题有哪些 11.1.4 迁移学习有哪些常用概念 11.1.5 迁移学习与传统机器学习有什么区别 11.1.6 迁移学习的核心及度量准则 11.1.7 迁移学习与其他概念的区别 11.1.8 什么…

服务器再升级!64线程服务器震撼上线,全新渲染体验等你来解锁

秉承着 “科技赋能创意,连接创造价值”的使命, 经过精心的策划和筹备, 蓝海创意云 64线程服务器, 以全新的面貌,优惠的价格, 与大家见面了! 诚邀您一起,解锁全新的渲染体验&am…

《软件定义安全》之八:软件定义安全案例

第8章 软件定义安全案例 1.国外案例 1.1 Fortinet:传统安全公司的软件定义方案 Fortinet的软件定义安全架构强调与数据中心的结合,旨在将安全转型为软件定义的模式,使安全运维能够与数据中心的其他部分一样灵活、弹性。在Fortinet看来&…

MySQL为何不支持函数索引的使用

MySQL不支持函数索引的使用,主要基于以下几个原因: 索引机制:MySQL的索引主要是基于B树结构来构建的,这种结构通过保持数据的顺序性和层级性来实现高效的查询。然而,当在查询条件中使用函数时,MySQL需要先…

随着开源程序的发展,越来越多的程序员开始关注并加入开源大模型的行列,对于开源行业和开源项目不同人有不同的关注点,推荐几个热门项目

随着开源软件的普及和开源项目的蓬勃发展,越来越多的程序员开始关注并加入开源大模型的行列。开源项目的好处是显而易见的,它们提供了一种共享的方式,让人们能够自由地使用、复制、修改和分发软件。这种模式不仅有利于技术的进步和创新&#…

网络协议五

一、RPC协议 【整体都没仔细研究 无论是什么 RPC,底层都是 Socket 编程 二、结合双十一项目 【没仔细研究 VPC 1. 创建一个 VPC 并指定一个 IP 段 解释:虚拟私有云(VPC)是一个虚拟的网络环境,它与互联网隔离&…

ABAP 私人小笔记

最近没事&#xff0c;把笔记上的东西慢慢移到CSDN上&#xff0c;持续完善中~ 1&#xff1a;/nsxi_monitor查看接口日志 2&#xff1a;内表数据拼接 2.1、给内表加一个空行并修改他的值 APPEND INITIAL LINE TO GT_ALV ASSIGNING FIELD-SYMBOL(<FS>) .2.2、将内表数据拼接…

亿达四方:一站式SolidWorks代理服务,打造设计竞争力

在当今瞬息万变的设计与制造领域&#xff0c;高效、精准的3D设计软件已成为推动企业创新与发展的核心驱动力。作为业界知名的SolidWorks一站式代理服务商&#xff0c;亿达四方致力于为企业搭建从软件采购到技术应用的全方位桥梁&#xff0c;全面赋能设计团队&#xff0c;助力企…

stable-diffusion.cpp 文字生成图片

纯 C/C 中 [Stable Diffusion] 的推断 https://github.com/CompVis/stable-diffusion ## 特点 - 基于 [ggml]&#xff08;https://github.com/ggerganov/ggml&#xff09; 的普通 C/C 实现&#xff0c;工作方式与 [llama.cpp]&#xff08;https://github.com/ggerganov/llam…

微信小程序请求request封装

公共基础路径封装 // config.js module.exports {// 测试BASE_URL: https://cloud.chejj.cn,// 正式// BASE_URL: https://cloud.mycjj.com };请求封装 // request.js import config from ../config/baseUrl// 请求未返回时的loading const showLoading () > wx.showLoadi…

蓝桥杯软件测试第十五届蓝桥杯模拟赛1期题目解析

PS 需要第十五界蓝桥杯模拟赛1期功能测试模板、单元测试被测代码、自动化测试被测代码请加&#x1f427;:1940787338 备注&#xff1a;15界蓝桥杯省赛软件测试模拟赛1期 题目1 功能测试用例1&#xff08;测试用例&#xff09;&#xff08;15分&#xff09; 【前期准备】 按步…

后端开发面试题5(附答案)

前言 在下首语言是golang,所以会用他作为示例。 原文参见 @arialdomartini的: Back-End Developer Interview Questions 数据库相关问题 1. 如果要你将一个项目从MySQL迁移至PostgreSQL中,你会如何迁移? 迁移一个项目从MySQL到PostgreSQL涉及多个步骤,主要包括数据备份、…

网页元素解析元素标签和style变更

前言 如何解析html标签&#xff1f; 如何给标签增加样式&#xff1f; <div class"related-tags"><span>相关主题推荐&#xff1a;</span>a<a hrefhttp://www.csdn.net/tag/标签 target"_blank">标签</a><a href"h…

【STM32】输入捕获应用-测量脉宽或者频率(方法1)

图1 脉宽/频率测量示意图 1 测量频率 当捕获通道TIx 上出现上升沿时&#xff0c;发生第一次捕获&#xff0c;计数器CNT 的值会被锁存到捕获寄存器CCR中&#xff0c;而且还会进入捕获中断&#xff0c;在中断服务程序中记录一次捕获&#xff08;可以用一个标志变量来记录&#…

CC攻击的有效应对方案

随着互联网的发展&#xff0c;网络安全问题愈发突出。CC攻击&#xff08;Challenge Collapsar Attack&#xff09;&#xff0c;一种针对Web应用程序的分布式拒绝服务&#xff08;DDoS&#xff09;攻击方式&#xff0c;已经成为许多网络管理员和网站拥有者不得不面对的重大挑战。…

跨越式发展:中小型企业如何争取水库枢纽乙级资质

1. 明确目标&#xff0c;深入了解资质要求 首先&#xff0c;全面研究水利行业乙级设计资质的具体标准&#xff0c;包括企业规模、人员构成、技术能力、财务状况、管理体系等&#xff0c;确保每项要求都能精准对应并准备充分。 2. 强化内部管理与体系建设 建立或优化组织架构…

Fantasy Icons Megapack(梦幻盔甲宝石图标魔法道具图标集)

所有图标都具备高质量&#xff0c;并以专业水平实施。任何幻想风格游戏的上佳选择。 - 可更新的超级资源包&#xff1b; - 每个图标的大小均为 256x256 像素 (PNG)&#xff1b; - 总计 2672 个独一无二的图标&#xff1b; - 所有图标均具有透明背景。 超级资源包内置&#xff1…

Redisson原理解析

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…

如何通过“小猪APP分发”轻松实现应用分发

你是否也在为应用分发发愁&#xff1f; 还记得那些日子吗&#xff1f;你花费了大量的时间和精力开发了一款出色的应用&#xff0c;但却在分发和推广环节遇到了瓶颈。是的&#xff0c;无论你的应用多么优秀&#xff0c;如果不能顺利分发给用户&#xff0c;那一切都是徒劳的。别…

[c++刷题]贪心算法.N01

题目如上: 首先通过经验分析&#xff0c;要用最少的减半次数&#xff0c;使得数组总和减少至一半以上&#xff0c;那么第一反应就是每次都挑数组中最大的数据去减半&#xff0c;这样可以是每次数组总和值减少程度最大化。 代码思路:利用大根堆去找数据中的最大值&#xff0c;…