Python使用fastAPI实现一个流式传输接口

1. 使用fastapi实现流式传输

1.1 服务端 fastapi_server.py

编写服务端代码fastapi_server.py。服务端代码主要使用了fastapi和uvicorn两个库。

#!/usr/bin/env python
# coding=utf-8
# @Time    : 2024/1/31 19:13
# @Software: PyCharm
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
import uvicornapp = FastAPI()async def generate_data():for i in range(1, 11):time.sleep(1)  # 模拟每秒生成一个块的耗时操作yield f"FASTAPI Chunk {i}\n"@app.get("/stream")
async def stream_data():return StreamingResponse(generate_data(), media_type="text/plain")if __name__ == "__main__":uvicorn.run(app, host="127.0.0.1", port=8001)

1.2客户端 (requests)

使用python编写一个客户端stream_client.py,进行流式接收。

#!/usr/bin/env python
# coding=utf-8
# @Time    : 2024/1/31 19:14
#
# stream_client.pyimport requestsurl = "http://127.0.0.1:8001/stream/"  # 替换为你的实际接口地址def test1():try:response = requests.get(url, stream=True) # stream参数为Trueif response.status_code == 200:for chunk in response.iter_content(chunk_size=7):  # 这行很重要哦if chunk:print(chunk.decode("utf-8"), end="")except requests.RequestException as e:print(f"Request failed: {e}")def test2():try:response = requests.get(url, stream=True)if response.status_code == 200:for line in response.iter_lines(decode_unicode=True, chunk_size=8):if line:print("Received SSE event:", line)except requests.RequestException as e:print(f"Request failed: {e}")# test1()
test2()

1.3 在html中流式显示

新建一个client.html文件,放在fastapi_server.py目录下。同时修改fastapi_server.py, 增加一个BlockIterator迭代器对长文本进行切块。

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Streaming Client (client.html)</title>
</head>
<body>
aaa
<div id="output"></div><script>const outputDiv = document.getElementById('output');// 替换为你的实际接口地址const url = '$FASTAPI$';// 使用 Fetch API 发送请求fetch(url).then(response => {const reader = response.body.getReader();return new ReadableStream({async start(controller) {while (true) {const { done, value } = await reader.read();// 如果读取完毕,中止流if (done) {controller.close();break;}// 将每个块的内容添加到页面上outputDiv.innerHTML += new TextDecoder().decode(value) +"<br>";}}});}).then(stream => {// 使用 TextStream,将流连接到页面上的输出const textStream = new TextStream(stream);return textStream.pipeTo(new WritableStream({write: chunk => {// 在这里你可以处理每个块的数据console.log('Received chunk:', chunk);}}));}).catch(error => console.error('Error:', error));
</script></body>
</html>

服务端增加一个/web_client接口,读取上述网页内容

#!/usr/bin/env python
# coding=utf-8
# @Time    : 2024/1/31 19:13
# @Software: PyCharm
from fastapi import FastAPI
from starlette.responses import HTMLResponse
from fastapi.responses import StreamingResponse
import time
import uvicornapp = FastAPI()CONTENT = """《易经》被誉为诸经之首,大道之源,是中华优秀传统文化的总纲领,是中华民族五千年智慧的结晶。他含盖万有、纲纪群伦,是中华文化的杰出代表;他博大精微、包罗万象,亦是中华文明的源头。其内容涉及哲学、生命、自然、科学、政治、天文、地理、文学、艺术等诸多领域,是各家共同的经典。
《易经》包括《连山》《归藏》《周易》三部易书,现存于世的只有《周易》。《周易》相传是周文王被囚羑里时,研究《易经》所作的结论。"""class BlockIterator:def __init__(self, text, block_size=10):self.text = textself.block_size = block_sizeself.index = 0def __iter__(self):return selfdef __next__(self):if self.index >= len(self.text):raise StopIterationblock = self.text[self.index:self.index + self.block_size]self.index += self.block_sizereturn blockasync def generate_data():for i in BlockIterator(CONTENT, block_size=1):time.sleep(0.05)  # 模拟每秒生成一个块的耗时操作# yield f"FASTAPI Chunk {i}\n"yield i@app.get("/web_client", response_class=HTMLResponse)
async def read_root():with open("static/client.html", "r") as file:html_content = file.read()html_content = html_content.replace("$FASTAPI$", "http://127.0.0.1:8001/stream/")return HTMLResponse(content=html_content)@app.get("/stream")
async def stream_data():return StreamingResponse(generate_data(), media_type="text/plain")if __name__ == "__main__":uvicorn.run(app, host="127.0.0.1", port=8001)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/663260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据 - Hadoop系列《三》- MapReduce(分布式计算引擎)概述

上一篇文章&#xff1a; 大数据 - Hadoop系列《三》- HDFS&#xff08;分布式文件系统&#xff09;概述-CSDN博客 目录 12.1 针对MapReduce的设计构思 1. 如何对付大数据处理场景 2. 构建抽象编程模型 3. 统一架构、隐藏底层细节 12.2 分布式计算概念 12.3 MapReduce定义…

如何通过CVE漏洞编码找到对应的CVE漏洞详情及源码修改地址

背景&#xff1a; 最近正在使用docker进行一些cve漏洞的复现&#xff0c;有时候就要通过CVE的漏洞编码&#xff0c;找到对应的漏洞详情&#xff0c;以及漏洞的源码修改 以我上一篇文章的CVE-2020-17518编码为例 Apache Flink文件上Apache Flink文件上 方法&#xff1a; 通…

为什么golang不支持可重入锁呢?

为什么golang不需要可重入锁&#xff1f; 在工程中使用锁的原因在于为了保护不变量&#xff0c;也可以用于保护内、外部的不变量。 基于此&#xff0c;Go 在互斥锁设计上会遵守这几个原则。如下&#xff1a; 在调用 mutex.Lock 方法时&#xff0c;要保证这些变量的不变性保持…

手写分布式存储系统v0.2版本

引言 上回说到 手写分布式存储系统v0.1版本 &#xff0c;已经实现了通过监听TCP端口并将数据写到本地磁盘的功能&#xff0c;今天咱们就继续往上面添砖加瓦 v0.2版本大致做以下功能 实现滚动写文件 代码优化 一、滚动写文件实现 由于咱们写文件是用的mmap进行文件写入&am…

索引的设计原则(MySQL)

文章目录 文章目录 前言 一、搜索的索引列 二、使用唯一索引 三、使用短索引 四、最左前缀原则 五、不要过度使用 六、尽量使用主键索引 ​​​​​ 前言 索引的设计需要遵循一些原则&#xff0c;创建索引时遵循这些原则&#xff0c;有利于提升查询效率。 一、搜索的索引列 创建…

解决ModuleNotFoundError: No module named ‘pysqlite2‘

目录 一、问题描述 二、问题分析 三、解决方法 四、参考文章 一、问题描述&#xff1a; 新建conda编译环境。安装Jupyter后打不开&#xff0c;报错&#xff1a; 二、问题分析&#xff1a; 缺少sqlite3动态链接库 三、解决方法&#xff1a; SQLite Download Page 下载…

组播目的地址

路由器收到目的地址为224.0.0.5和239.0.0.5的组播报文如何处理? 224.0.0.5为永久组地址,是IANA为路由协议预留的IP地址(也称为保留组地址),用于标识一组特定的网络设备,供路由协议,目前被分配于OSPF协议使用,运行了OSPF协议的网络设备默认都会加入该组播组。 当路由器…

数字地球开放平台农作物长势监测解决方案

利用遥感技术进行产量预测是一种高效而准确的方法&#xff0c;通过监测植被的生长状况、土地利用、气象等因素&#xff0c;可以为农业决策提供有力支持。数字地球开放平台拥有200颗卫星&#xff0c;为您提供一站式卫星遥感服务。 数字地球开放平台将为您介绍一般遥感技术在农作…

游戏开发丨基于Panda3D的迷宫小球游戏

文章目录 写在前面Panda3D程序设计程序分析运行结果系列文章写在后面 写在前面 本期内容 基于panda3d的迷宫中的小球游戏 所需环境 pythonpycharm或anacondapanda3d 下载地址 https://download.csdn.net/download/m0_68111267/88792121 Panda3D Panda3D是一种开放源代码…

如何选择最适合的服务器

许多朋友想做一些网站&#xff0c;应用&#xff0c;游戏&#xff0c;小程序等等&#xff0c;都需要接触一个基础&#xff0c;就是服务器。服务器相当于一台24小时不关机的联网电脑&#xff0c;浏览网页或者应用相当于用户在访问这台电脑里的文件。那么如何选择最适合自己的服务…

在微服务项目中,实现无停机更新有哪些方法和策略?

在微服务项目中&#xff0c;确保线上更新不停掉服务是非常关键的&#xff0c;以保障系统的可用性。以下是一些方法和策略&#xff0c;可以帮助实现无停机更新&#xff1a; 蓝绿部署&#xff08;Blue-Green Deployment&#xff09;&#xff1a; 在蓝绿部署中&#xff0c;维护两个…

【Java高级】通过CompletableFuture类异步、并行获取数据库数据

1. CompletableFuture类简介 CompletableFuture 类是 Java 中用于异步编程和处理异步任务的工具类。它是 Java 8 引入的一部分&#xff0c;提供了一种方便的方式来处理异步操作&#xff0c;例如并行执行多个任务、等待多个任务完成等。 CompletableFuture 可以通过 runAsync …

【Java 数据结构】对象的比较

Java中对象的比较 1. PriorityQueue中插入对象2. 元素的比较2.1 基本类型的比较2.2 对象比较的问题 3. 对象的比较3.1 覆写基类的equals3.2 基于Comparble接口类的比较3.3 基于比较器比较3.4 三种方式对比 4. 集合框架中PriorityQueue的比较方式5. 使用PriorityQueue创建大小堆…

【AI_Design】Midjourney学习笔记

目录 后缀解析Promot合格使用prompt关键词描述 关键词化合作用关键词网站推荐 联合Chatgpt使用总结 后缀解析 –ar&#xff1a;宽高比设置–c&#xff1a;多样性设置&#xff08;数值0-100&#xff0c;默认值0&#xff09;–s&#xff1a;风格化设置&#xff08;数值0-1000&am…

Java面试——计网篇

一、基础篇 1、 TCP/IP 网络模型 对于同一台设备上的进程间通信&#xff0c;有很多种方式&#xff0c;比如有管道、消息队列、共享内存、信号等方式&#xff0c;而对于不同设备上的进程间通信&#xff0c;就需要网络通信&#xff0c;而设备是多样性的&#xff0c;所以要兼容多…

结构体--共用体--枚举 之难点——链表 奋力学习嵌入式的第十六天

结构体 注意&#xff1a; 1.结构体类型 可以定义在 函数里里面 但是此时作用域就被限定在该函数中 2.结构体定义形式 //形式一 限定一类型 后定义变量 struct stu { ... }; struct stu s; //形式二 定义类型的同时 定义变量 struct stu { ... }s1,s2,*s3,s4[10]; struc…

【说明】Nginx配置反向代理但是不包含某个地址

【说明】Nginx配置反向代理但是不包含某个地址 配置 反向代理 /api 这个路径 但是不包含 /api/wechat/service 这个请求 www.weixin.com/api/wechat/service www.weixin.com/api/getUser OK location ^~ /api(?!/wechat/service) { // OK }

影院购票|电影院订票选座小程序|基于微信小程序的电影院购票系统设计与实现(源码+数据库+文档)

电影院订票选座小程序目录 目录 基于微信小程序的电影院购票系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、用户功能实现 2、管理员功能实现 &#xff08;1&#xff09;影院信息管理 &#xff08;2&#xff09;电影信息管理 &#xff08;3&#xff09;已…

STM32F1 - 存储器映射

Memory mapping 1> 外设内存地址映射2> GPIO寄存器映射3> 存储器访问 1> 外设内存地址映射 1> STM32F103ZET6的地址线位宽为32位&#xff0c;所以寻址空间为4GB &#xff08;2 ^ 32 4GB&#xff09;&#xff1b; 2> STM32将&#xff0c;Flash&#xff0c;SR…

PageHelper.startPage(pageNum, pageSize);分页总数不正确,避坑!

今天在调用一个查询列表的接口时&#xff0c;分页总数老是查的不正确&#xff1a; 当pageSize选择10时&#xff0c;总数只有10条 当pageSize选择20时&#xff0c;总数只有15条 实际上总数为15条实在正确的&#xff0c;然后取看代码&#xff1a; Overridepublic AjaxResult pro…