高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案

高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案


目录

  1. 🟢 WebSocket 协议概述
  2. 🔵 在 FastAPI 中实现 WebSocket
  3. 🟣 Django Channels 实现异步实时通信
  4. 🔴 使用 Redis 实现实时推送

🟢 1. WebSocket 协议概述

WebSocket 是 HTML5 规范中提出的一种新协议,旨在实现客户端与服务器之间的全双工通信。与传统的 HTTP 请求/响应模型不同,WebSocket 协议允许持久的、双向的连接,客户端和服务器可以在任意时刻相互发送数据,而无需重新发起 HTTP 请求。WebSocket 可以减少通信延迟和网络资源消耗,特别适用于实时应用程序,如聊天系统、股票交易、游戏、以及其他需要即时数据更新的场景。

🧩 WebSocket 的工作原理

WebSocket 的工作流程可以简单概括如下:

  1. 初始握手:客户端通过 HTTP 发起 WebSocket 连接请求,服务器通过特殊的响应头 Upgrade 升级协议,建立连接。
  2. 持久连接:一旦连接建立,客户端和服务器之间的通信通道就保持打开,直到一方主动关闭连接。双方可以同时发送消息,消息是基于帧的传输方式。
  3. 双向通信:与传统的 HTTP 轮询不同,WebSocket 是全双工的,意味着服务器和客户端可以随时相互发送数据。消息可以是文本帧或者二进制帧,且通过 WebSocket 协议传输的消息有较小的开销,适合频繁的小数据传输场景。
⚡️ WebSocket 和 HTTP 的区别
  • 连接方式:HTTP 是短连接,每次请求/响应后连接断开,而 WebSocket 是持久化连接,只有在连接关闭时才断开。
  • 通信方向:HTTP 是客户端发起请求,服务器响应;WebSocket 支持双向通信,客户端和服务器都可以主动发送消息。
  • 协议开销:WebSocket 建立连接时会使用 HTTP 协议握手,但后续通信数据帧头信息非常小,开销比 HTTP 低。

WebSocket 的这些特性使其成为实时应用开发中的重要工具。


🔵 2. 在 FastAPI 中实现 WebSocket

FastAPI 是一个现代的、快速的 Web 框架,提供了对 WebSocket 支持的简便实现。FastAPI 内部集成了 ASGI(Asynchronous Server Gateway Interface),这意味着它能够轻松处理异步通信任务,比如 WebSocket。以下是如何在 FastAPI 中实现 WebSocket 通信的详细步骤。

📌 FastAPI 中的 WebSocket 代码实现
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()# 管理多个 WebSocket 连接的管理器
class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []# 添加新连接async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)# 移除断开的连接async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)# 向所有连接广播消息async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):await manager.connect(websocket)try:while True:# 接收客户端消息data = await websocket.receive_text()await manager.broadcast(f"Client #{client_id} says: {data}")except WebSocketDisconnect:manager.disconnect(websocket)await manager.broadcast(f"Client #{client_id} disconnected")
🧩 代码解析
  1. ConnectionManager 类:用于管理 WebSocket 连接。它可以接受新连接、断开旧连接以及广播消息给所有连接的客户端。
  2. websocket_endpoint:这个路径函数处理 WebSocket 连接。当客户端连接时,它将接收并广播消息给其他连接的客户端。它使用 client_id 来标识连接的客户端。
  3. 广播消息:通过 manager.broadcast() 方法,可以将来自某个客户端的消息发送给所有连接的客户端。
🚀 运行示例:

要运行这个示例,可以将代码保存为 main.py,然后使用命令 uvicorn main:app --reload 启动 FastAPI 服务器。连接的客户端可以通过 /ws/{client_id} 连接,并且所有客户端都能实时接收消息。

此示例展示了如何通过 FastAPI 简洁地实现 WebSocket 服务,利用 Python 的异步特性来实现高效的实时通信。


🟣 3. Django Channels 实现异步实时通信

Django 默认是同步框架,但通过 Django Channels,可以为其添加异步功能,尤其是支持 WebSocket 的实时通信功能。Django Channels 通过将请求分配给适当的消费者来处理异步通信,而不需要重新设计整个 Django 应用。以下是如何使用 Django Channels 来实现 WebSocket 通信。

📌 安装 Django Channels

首先,需要安装 Django Channels:

pip install channels

然后,在 settings.py 中添加 Channels 配置:

INSTALLED_APPS = [# 其他应用'channels',
]# 指定 ASGI 应用
ASGI_APPLICATION = "myproject.asgi.application"

在项目根目录创建一个 asgi.py 文件:

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from myapp import routingos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')application = ProtocolTypeRouter({"http": get_asgi_application(),"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
})
📌 Django Channels WebSocket 代码实现

创建 consumers.py 来定义 WebSocket 消费者:

import json
from channels.generic.websocket import AsyncWebsocketConsumerclass ChatConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = 'chat'await self.channel_layer.group_add(self.room_name, self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.room_name, self.channel_name)async def receive(self, text_data):message = json.loads(text_data)['message']await self.channel_layer.group_send(self.room_name,{'type': 'chat_message','message': message})async def chat_message(self, event):message = event['message']await self.send(text_data=json.dumps({'message': message}))

routing.py 中定义 WebSocket 路由:

from django.urls import re_path
from . import consumerswebsocket_urlpatterns = [re_path(r'ws/chat/$', consumers.ChatConsumer.as_asgi()),
]
🧩 代码解析
  1. ChatConsumer:这是 WebSocket 消费者类,使用 AsyncWebsocketConsumer 来处理异步 WebSocket 连接。connect() 方法处理客户端连接,disconnect() 方法处理断开,receive() 用于接收并转发消息。
  2. channel_layer:Django Channels 使用频道层来在多个 WebSocket 连接之间共享数据。group_send() 方法用于向一个组中的所有 WebSocket 客户端广播消息。
🚀 运行示例

通过配置 Django Channels 后,启动 Django 开发服务器并连接到 /ws/chat/ 端点,即可开始实时通信。此实现展示了如何利用 Django 的扩展实现异步 WebSocket 通信,方便现有 Django 项目无缝添加实时通信功能。


🔴 4. 使用 Redis 实现实时推送

Redis 是一个强大的内存数据存储系统,支持发布/订阅(Pub/Sub)模式,这使得它非常适合实现实时推送功能。通过将 Redis 集成到 WebSocket 应用中,可以轻松地实现高效的实时数据推送服务。以下展示如何使用 Redis 结合 WebSocket 来实现消息的实时推送。

📌 安装 Redis 和依赖

首先,安装 Redis 和 aioredis 以在 Python 中使用异步 Redis 客户端:

pip install aioredis
📌 实现基于 Redis 的 WebSocket 推送

在 WebSocket 服务中,通过 Redis 的发布/订阅模式实现消息推送。

import aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.on_event("startup")
async def startup_event():global redisredis = await aioredis.create_redis_pool('redis://localhost')@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await manager.connect(websocket)try:while True:# 接收消息data = await websocket.receive_text()await redis.publish("channel:1", data)except WebSocketDisconnect:manager.disconnect(websocket)@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
📌 Redis 订阅消息并推送给 WebSocket 客户端

实现 Redis 消息订阅和推送:

import aioredis
from fastapi import FastAPIapp = FastAPI()@app.on_event("startup")
async def startup_event():redis = await aioredis.create_redis_pool('redis://localhost')pubsub = redis.pubsub()async def reader():await pubsub.subscribe('channel:1')async for message in pubsub.listen():print(f"Received message: {message['data']}")app.loop.create_task(reader())@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
🧩 代码解析
  1. Redis 发布:通过 redis.publish() 方法,消息发布到指定的频道 channel:1,这些消息随后会被其他 Redis 客户端订阅。
  2. Redis 订阅pubsub.listen() 用于订阅 Redis 频道上的消息,收到消息后可以处理或转发给 WebSocket 客户端。
🚀 运行示例

启动 Redis 服务器并运行上述代码,客户端通过 WebSocket 连接,可以实时接收 Redis 频道中的推送消息。此示例展示了如何利用 Redis 高效的发布/订阅机制来实现实时消息推送,结合 WebSocket 实现真正的全双工通信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/53992.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于 Camera Tuning 岗位的一些认识和看法

前言: 之前也写过几篇关于Camera Tuning岗位的相关介绍: IQ Tuning 学习路线 关于入职 ISP Tuning 岗位的几个问题及解答 随着工作时间越长,对这个岗位的认识较之前也有了一些更深刻的理解,这里整理总结一下。 一方面是当做现阶…

PHP一键约课高效健身智能健身管理系统小程序源码

一键约课,高效健身 —— 智能健身管理系统让健康触手可及 🏋️‍♀️ 告别繁琐,一键开启健身之旅 你还在为每次去健身房前的繁琐预约流程而烦恼吗?现在有了“一键约课高效健身智能健身管理系统”,所有问题都迎刃而解…

智能体-AI-Agent-简介

文章目录 一,什么是AI Agent二,扣子个人空间团队空间探索区 一,什么是AI Agent AI智能体并没有什么特别,本质上就是一个帮助你解决工作和学习中的一个工具。 很多自媒体把智能体描述的天花乱坠,那不过是他们畅想的智…

Spring Security认证与授权

1 Spring Security介绍 Spring Security是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架。由于它是Spring生态系统中的一员,因此它伴随着整个Spring生态系统不断修正、升级,在spring boot项目中加入springsecurity更是…

Vue的学习(三)

目录 一、for循环中key的作用 1‌.提高性能‌: ‌2.优化用户体验‌: ‌3.辅助Vue进行列表渲染‌: 4‌.方便可复用组件的使用‌: 二、methods及computed及wacth的区别 三、过滤器 1.Vue 2 过滤器简介 定义过滤器 使用过滤…

用 Swift 写 Android App ?来了解下 Skip 原生级跨平台框架

最近在找资料的时候,机缘巧合发现了一个有趣的商业跨平台框架 Skip ,刚好看到了它发布 1.0 正式版,主要作用是将 Swift 开发引入到 Android 领域,这样 App 就可以共享 Swift 的业务逻辑,在 SwiftUI 中完成 Android App…

C#基础(8)函数

前言 我们先前已经具备了一些自己写小程序的能力,但是依旧还是逃不过大量的代码,那么今天,我们就将学习一个减少我们重复工作量的东西:函数。 在数学中我们知道,函数就是一个固定的公式,会画固定的图。 …

Python | Leetcode Python题解之第395题至少有K个重复字符的最长子串

题目: 题解: class Solution:def longestSubstring(self, s1: str, k: int) -> int:if k 1: return len(s1)n len(s1)res 0for c in range(1, len(set(s1)) 1):# 滑窗中字母种类个数恰好为 cfreq Counter()l cnt tcnt 0 for r, ch in enu…

代码随想录训练营Day3 | 链表理论基础 | 203.移除链表元素 | 707.设计链表 | 206.反转链表

今天任务:学习链表理论基础 链表的类型 链表的存储方式 链表的定义…

开发一款通过蓝牙连接控制水电表的微信小程序

增强软硬件交互 为了更好的解决师生生活中的实际问题,开发蓝牙小程序加强了和校区硬件的交互。 比如通过蓝牙连接控制水电表,减少实体卡片的使用。添加人脸活体检测功能,提高本人认证效率,减少师生等待时间。 蓝牙水电控展示 蓝…

计算机科学基础 -- 超流水线

超流水线的概念 超流水线是一种进一步提高处理器性能的技术,它在传统流水线的基础上,将多个流水线组合在一起,能够在同一个时钟周期内同时执行多条指令。这种设计允许处理器的多个功能单元并行处理不同的指令,从而提高指令级并行…

HashMap常用方法及底层原理

目录 一、什么是HashMap二、HashMap的链表与红黑树1、数据结构2、链表转为红黑树3、红黑树退化为链表 三、存储(put)操作四、读取(get)操作五、扩容(resize)操作六、HashMap的线程安全与顺序1、线程安全2、…

【LeetCode每日一题】2024年9月第二周(上)

2024.9.9 中等 难度评分 1333 链接:2181. 合并零之间的节点 (1)题目描述: (2)示例 (3)分析 整体来说,描述还算清晰的题目,找到0节点所框定的区域&#xff0c…

Python中列表、元组、字典和集合的详细解释

Python中列表、元组、字典和集合的详细解释 1. 列表(List) 定义:列表是可变的有序集合,可以存储多个项目。列表中的项目可以是不同类型的。 特点: 有序:元素的顺序是固定的。可变:可以修改内…

Pandas读取某列、某行数据——loc、iloc区别

loc:通过行、列的名称或标签来索引 iloc:通过行、列的索引位置来寻找数据 首先,我们先创建一个DataFrame生成数据 import pandas as pddata {a:[1,2,3,4,5],b:[6,7,8,9,10],c:[11,12,13,14,15] } data pd.DataFrame(data) print(data) 运行…

工具、环境等其他小问题归纳

此篇文章内容会不定期更新,仅作为学习过程中的笔记记录 一、查询Windows 10环境下python版本与安装路径 若电脑成功安装了python环境,不小心忘了版本。 I、查询版本 1、cmd窗口快捷查询 Win R 输入cmd 进入窗口; 直接输入 python --version …

QT如何ui上的QTableWidget控件如何使用

在Qt中,QTableWidget是一个常用的控件,用于在UI上展示和操作表格数据。如果你是在Qt Designer中设计UI,那么你可以直接将QTableWidget从Widget Box拖拽到你的窗体上。如果你是在代码中创建UI,那么你需要通过编程方式添加QTableWid…

[数据集][目标检测]血细胞检测数据集VOC+YOLO格式2757张4类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2757 标注数量(xml文件个数):2757 标注数量(txt文件个数):2757 标注…

关于武汉芯景科技有限公司的IIC电平转换芯片XJ9517开发指南(兼容PCF9517)

一、芯片引脚介绍 1.芯片引脚 2.引脚描述 二、系统结构图 三、功能描述 1.电平转换 2.芯片使能/失能 EN 引脚为高电平有效,内部上拉至 VCC(B),允许用户选择中继器何时有效。这可用于在上电时隔离行为不良的从机,直到…

4052A/4052B/4052C/4052D/4052E/4052F/4052G /4052H信号/频谱分析仪

4052A/4052B/4052C/4052D/4052E/4052F/4052G /4052H信号/频谱分析仪 苏州新利通 Ceyear 4052具备出色的测试动态范围、相位噪声、幅度精度和测试速度,具备频谱分析、I/Q分析、实时频谱分析、瞬态分析、矢量信号分析、脉冲分析、音频分析等丰富的测试功能。 Ceyear…