fastapi实现websocket在线聊天

最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题

1.问题难点

在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。

2.解决方案

使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。

3.方案设计

  • 每个进程启动的时候都进行一个消息的订阅。
  • 通过http请求,进行消息发布。
  • 每个进程收到发布的消息后,进行判断是否由自己进行处理。

4.代码实现

①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。

# 初始化app
app = FastAPI(title="Ws Chat", description="测试", version="1.0.0")
app.openapi_version = "3.0.0"app.include_router(chat.app, prefix='/api/chat', tags=['Chat'])@app.on_event('startup')
async def on_startup():print(f"订阅初始化:{os.getpid()}")# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop = asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen():  # 监听通道# print(msg)msg_data = msg.get("data")if msg_data and isinstance(msg_data, str):msg_data_dict = json.loads(msg_data)print(f"chat:{msg_data_dict}")sender = msg_data_dict.get("sender")# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool = aioredis.from_url("redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True)psub = pool.pubsub()async with psub as p:# 消息订阅await p.subscribe("chat")await reader(p)await p.unsubscribe("chat")

②websocket处理类

from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections = {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] = websocketawait websocket.send_json({"type": "system","msg": "Welcome to the chat app!","sender": "system","recipient": client_id})try:# 处理消息while True:# 获取信息message = await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get("type") == "private_message":recipient = message.get("recipient")msg = message.get("msg")recipient_conn = self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({"type": "private_message","sender": client_id,"msg": msg,"recipient": recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket = self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]

③websocket连接

from app.chat_manager.server import ConnectionManagercm = ConnectionManager()@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]

④http请求进行消息发布

@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):""""""ws_param = {"type": "private_message","msg": param.msg,"sender": param.sender,"recipient": param.recipient}# 进行消息发布await r.publish('diagnosis_chat', json.dumps(ws_param))return {'code': 200, 'msg': '成功', 'data': ''}

5.源码

github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat

6.参考文章

https://www.cnblogs.com/a00ium/p/16931133.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/219236.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows Subsystem for Linux (WSL) 安装与使用笔记

文章目录 Part.I IntroductionPart.II 安装Chap.I 安装流程Chap.II 迁移至其他盘 Part.III 使用Chap.I 一些信息Chap.II 配置下载软件的源Chap.III 安装 pip Reference Part.I Introduction Windows Subsystem for Linux 简写为 WSL,是 Windows 的一个 Linux 子系统…

常用的建表但范式、反规范化

规范化: 规范化是用于数据库设计的一系列原理和技术,它可以减少表中数据的冗余,增加数据完整性和一致性。通常有很多范式。 第一范式(1NF): 常用的三种范式: 表中的字段都是不可再分割的原子属…

vue/uniapp - 返回上一页并onLoad/onShow刷新数据列表接口

目录 详情页(detail.vue):列表页(list.vue)大佬最后 在uni中,返回页面是不会触发 onLoad方法的; 如果我们只想在特定情况下返回上一页才需要刷新数据,那么用onShow的话,那刷新就太频繁了; 这时候&#xf…

蝴蝶Butterfly 数据集VOC+yolo-2000张(labelImg标注)

蝴蝶被誉为“会飞的花朵”,是一类非常美丽的昆虫。蝴蝶大多数体型属于中型至大型,翅展在15~260毫米之间,有2对膜质的翅。体躯长圆柱形,分为头、胸、腹三部分。体及翅膜上覆有鳞片及毛,形成各种色彩斑纹。今天要介绍的是…

C++类模板案例-实现一个通用的数组类

案例中用到的方法 可以对内置数据类型以及自定义数据类型的数据进行存储将数组中的数据类型存储到堆区构造函数中可以传入数组的容量提供对应的拷贝构造函数以及operator防止浅拷贝问题提供尾插法和尾删法对数组中的数据进行增加和删除可以通过下标的方式访问数组中的元素可以…

算法导论复习(二)

算法导论第二次复习以 分治法 为专题 文章目录 分治算法是什么归并排序Strassen矩阵乘法最近点对 求解递推表达式 分治算法是什么 归并排序 代码如下&#xff1a; #include <iostream> #include <vector>using namespace std;// 归并函数&#xff0c;将两个有序数…

代码随想Day36 | 435. 无重叠区间、763.划分字母区间、56. 合并区间

435. 无重叠区间 这道题和前一天的射箭题目思想类似&#xff0c;用总区间个数-不重叠的区间个数等于需要去除的区间个数。首先对左边界排序&#xff0c;如果当前的左边界大于等于上一区间的右边界&#xff0c;则说明是一个不重叠的区间&#xff0c;否则&#xff0c;更新上一重…

Redis生产实战-热key、大key解决方案、数据库与缓存最终一致性解决方案

生产环境中热 key 处理 热 key 问题就是某一瞬间可能某条内容特别火爆&#xff0c;大量的请求去访问这个数据&#xff0c;那么这样的 key 就是热 key&#xff0c;往往这样的 key 也是存储在了一个 redis 节点中&#xff0c;对该节点压力很大 那么对于热 key 的处理就是通过热…

nginx 优化和安装防盗链以及实验举例

目录 nginx编译安装常用模块 生产中建议设置 nginx 内核限制文件优化 先将 nginx编译安装直至 systemctl命令使用 nginx服务 安全优化 隐藏 nginx版本号 查看版本号 隐藏版本信息 修改用户与组 限制单个 IP的访问频率和连接数 防盗链相关设置 在源主机里配置防盗链 …

【C++11】右值引用与移动语义

一.左值与右值 左值&#xff1a;可以取地址的表示数据的表达式&#xff0c;左值可以出现在赋值符号左边 右值&#xff1a;不能取地址的表示数据的表达式&#xff0c;右值不能出现在赋值符号左边 int fun() {return 0; } int main() {int a 0;//a->左值const int b 1;//b-&…

【lesson14】MySQL表的基本查询(1)

文章目录 表的基本操作介绍retrieveselect列建表基本测试 where子句建表基本测试 表的基本操作介绍 CRUD : Create(创建), Retrieve(读取)&#xff0c;Update(更新)&#xff0c;Delete&#xff08;删除&#xff09; retrieve select列 建表 基本测试 插入数据 全列查询 …

利用断路器状态统计sentinel熔断次数

最近项目需要sentinel熔断时记录熔断的次数&#xff0c;在经过一阵搜索后决定利用断路器的状态变化来实现此功能 然而&#xff0c;遇到了这样的一个情况&#xff0c;断路器的状态在第一次熔断时正常从close–>open&#xff0c;但在后续&#xff08;熔断时间内blocked或者熔断…

RocketMQ的延迟消息是如何实现的❓

RocketMQ 作为一款强大的分布式消息中间件&#xff0c;提供了丰富的功能&#xff0c;其中之一就是延迟消息。在本篇博客中&#xff0c;我们将深入探讨 RocketMQ 延迟消息的实现机制&#xff0c;了解消息的定时投递和消费流程。 1. 定时消息的发送 RocketMQ 实现延迟消息的第一…

linux应用软件下载站收集

一、这是一个别人问题帖&#xff0c;里面有很多下载站点。 谁知道可以自由下载Linux软件的论坛或者平台&#xff1f;类似52破解论坛。国内国外都可以&#xff0c;我在搜索引擎找不到&#xff1f; - 知乎

2023年度影响力出海品牌传音移动互联:开放合作 赋能更多中国企业高效出海

伴随着全球化的脚步&#xff0c;出海成为许多中国企业的“必选项”&#xff0c;与之配套的出海服务相关业务也得到了极大的发展。近日&#xff0c;第五届鲸鸣奖颁奖典礼上&#xff0c;传音移动互联凭借为企业提供高效优质的出海解决方案&#xff0c;荣获鲸鸣奖“2023年度影响力…

SpringBoot 引入nacos 【最新 | 可运行】

SpringBoot 引入nacos 首先要了解在 Springboot 中只支持那些 Springboot 的版本&#xff08;我真的被这个搞死了&#xff09;,可以如下图参考&#xff1a; 下面我们就开始吧 下载 Nacos nacos 下载地址&#xff0c;这里可以选择你要下载的版本&#xff0c;我选择下载了2.2.…

<JavaEE> 文件IO -- 数据流和文件内容操作(Reader 和 Writer 、InputStream 和 OutputStream)

目录 一、数据流概述 二、流的关闭 2.1 使用 close() 方法 2.2 使用 try-finally 2.3 使用 try-with-resources 三、字符流的读写 3.1 Reader 类 3.2 Writer 类 四、字节流的读写 4.1 InputStream 类 4.2 OutputStream 类 一、数据流概述 1&#xff09;在 Java 中&…

[c]零钱兑换

题目比较简单&#xff0c;看答案就能看懂什么意思 #include<stdio.h> int main() {int count 0;int n;scanf("%d", &n);for (int i 0; i < n; i){for (int k 0; k <n/2; k){for (int j 0; j < n/5 ; j){if (i 2 * k 5 * j n){count;}}}}p…

【Python基础】迭代器

文章目录 [toc]什么是迭代可迭代对象判断数据类型是否是可迭代类型 迭代器对可迭代对象进行迭代的本质获取可迭代对象的迭代器通过迭代器获取数据StopIteration异常 自定义迭代器__iter__()方法__next__()方法判断数据类型是否是可迭代类型自定义迭代器案例分离模式整合模式 fo…

这套软件测试技巧|软测经典面试题真的有用,今天面试大部分都遇到了!!!

祝同学们都能够顺利找到心仪的工作拿高薪&#xff0c;废话不多说&#xff0c;下面上题了~ 46、您以往是否曾经从事过性能测试工作&#xff1f;如果有&#xff0c;请尽可能的详细描述您以往的性能测试工作的完整过程。 &#xff08;以自己最熟悉的性能测试项目为例&#xff09; …