Python Tornado 实现SSE服务端主动推送方案

一、SSE 服务端消息推送

SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。

本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暂性场景下的 SSE 实现

短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。

Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutorclass SSE(RequestHandler):def initialize(self):# 关闭自动结束self._auto_finish = Falseprint("initialize")def set_default_headers(self):# 设置为事件驱动模式self.set_header('Content-Type', "text/event-stream")# 不使用缓存self.set_header('Content-Control', "no-cache")# 保持长连接self.set_header('Connection', "keep-alive")# 允许跨域self.set_header('Access-Control-Allow-Origin', "*")def prepare(self):# 准备线程池self.executor = self.application.pool@tornado.gen.coroutinedef get(self):result = yield self.doHandle()self.write(result)# 结束self.finish()@run_on_executordef doHandle(self):tornado.ioloop.IOLoop.current()# 分十次推送信息for i in range(10):time.sleep(1)self.flush()self.callback(f"current: {i}")return f"data: end\n\n"def callback(self, message):# 事件推送message = f"data: {message}\n\n"self.write(message)self.flush()class Application(tornado.web.Application):def __init__(self):handlers = [("/sse", SSE),("/(.*)$", tornado.web.StaticFileHandler, {"path": "resources/static","default_filename": "index.html"})]super(Application, self).__init__(handlers)self.pool = ThreadPoolExecutor(200)def startServer(port):app = Application()httpserver = tornado.httpserver.HTTPServer(app)httpserver.listen(port)print(f"Start server success", f"The prot = {port}")tornado.ioloop.IOLoop.current().start()if __name__ == '__main__':startServer(8020)

运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:

在这里插入图片描述

那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>测试服务器推送技术</title>
</head>
<body><div id="messages"></div>
</body>
<script>const eventSource = new EventSource('http://localhost:8020/sse');// 事件回调eventSource.onmessage = (event) => {console.log(event.data)const messagesDiv = document.getElementById('messages');messagesDiv.innerHTML += '<p>' + event.data + '</p>';};// 异常eventSource.onerror = (error) => {console.error('EventSource failed:', error);eventSource.close();};eventSource.onopen = ()=>{console.log("开启")}</script>
</html>

运行后可以看到服务端分阶段推送过来的数据:

在这里插入图片描述

三、长连接场景下的 SSE 实现

上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。

下面是一个实现案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor# 单例
def singleton(cls):instances = {}def wrapper(*args, **kwargs):if cls not in instances:instances[cls] = cls(*args, **kwargs)return instances[cls]return wrapper# 订阅推送工具类
@singleton
class Pusher():def __init__(self):self.clients = {}def add_client(self, client_id, callback):if client_id not in self.clients:self.clients[client_id] = callbackprint(f"{client_id} 连接")def send_all(self, message):for client_id in self.clients:callback = self.clients[client_id]print("发送消息给:", client_id)callback(message)def send(self, client_id, message):callback = self.clients[client_id]print("发送消息给:", client_id)callback(message)class SSE(RequestHandler):# 定义推送者pusher = Pusher()def initialize(self):# 关闭自动结束self._auto_finish = Falseprint("initialize")def set_default_headers(self):# 设置为事件驱动模式self.set_header('Content-Type', "text/event-stream")# 不使用缓存self.set_header('Content-Control', "no-cache")# 保持长连接self.set_header('Connection', "keep-alive")# 允许跨域self.set_header('Access-Control-Allow-Origin', "*")@tornado.gen.coroutinedef get(self):# 客户端唯一标识client_id = self.get_argument("client_id")self.pusher.add_client(client_id, self.callback)def callback(self, message):# 事件推送message = f"data: {message}\n\n"self.write(message)self.flush()# 定义推送接口,模拟推送
class Push(RequestHandler):# 定义推送者pusher = Pusher()def prepare(self):# 准备线程池self.executor = self.application.pool@tornado.gen.coroutinedef get(self):# 客户端标识client_id = self.get_argument("client_id")# 推送的消息message = self.get_argument("message")result = yield self.doHandle(client_id, message)self.write(result)@run_on_executordef doHandle(self, client_id, message):tornado.ioloop.IOLoop.current()self.pusher.send(client_id, message)return "success"class Application(tornado.web.Application):def __init__(self):handlers = [("/sse", SSE),("/push", Push),("/(.*)$", tornado.web.StaticFileHandler, {"path": "resources/static","default_filename": "index.html"})]super(Application, self).__init__(handlers)self.pool = ThreadPoolExecutor(200)def startServer(port):app = Application()httpserver = tornado.httpserver.HTTPServer(app)httpserver.listen(port)print(f"Start server success", f"The prot = {port}")tornado.ioloop.IOLoop.current().start()if __name__ == '__main__':startServer(8020)

这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。

同样前端也要修改,需要给自己定义 client_id ,例如:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>测试服务器推送技术</title>
</head>
<body><div id="client"></div><div id="messages"></div>
</body>
<script>function generateUUID() {let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {const r = Math.random() * 16 | 0;const v = c === 'x' ? r : (r & 0x3 | 0x8);return v.toString(16);});return uuid;}// 利用uuid 模拟生成唯一的客户端IDlet client_id = generateUUID();document.getElementById('client').innerHTML = "当前 client_id = "+client_id;const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);// 事件回调eventSource.onmessage = (event) => {console.log(event.data)const messagesDiv = document.getElementById('messages');messagesDiv.innerHTML += '<p>' + event.data + '</p>';};// 异常eventSource.onerror = (error) => {console.error('EventSource failed:', error);eventSource.close();};eventSource.onopen = ()=>{console.log("开启")}</script>
</html>

这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。

下面使用浏览器打开三个页面,可以看到三个不同的 client_id :

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在服务端的日志中也能看到这三个客户端的连接:

在这里插入图片描述

下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :

在这里插入图片描述

下面看到 client_id2493045e-84dd-4118-8d96-0735c4ac186b的页面:

在这里插入图片描述
已经成功收到推送的消息,反之看另外两个:

在这里插入图片描述
在这里插入图片描述
都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/654687.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[m1pro ] ssh: connect to host localhost port 22: Connection refused

在学习Hadoop 的时候&#xff0c;使用 ssh localhost 遇到以下问题 原因&#xff1a; 本地没有打开远程登录 解决办法&#xff1a;打开远程登录 成功结果

防御保护---防火墙的用户认证

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 一.用户认证概述 防火墙用户认证是一种安全措施&#xff0c;用于验证和授权网络用户的身份。它是防火墙的一部分&#xff0c;旨在确保只有经过身份验证的用户才能访问网络资源。 防火墙用户认证…

JDK1.8新特性(Day24)

Lambda表达式 介绍 Lambda表达式是一种没有名字的函数,也可称为闭包&#xff0c;是Java 8 发布的最重要新特性。本质上是一段匿名内部类&#xff0c;也可以是一段可以传递的代码。还有叫箭头函数的... 闭包 闭包就是能够读取其他函数内部变量的函数,比如在java中,方法内部的局…

智慧文旅:打造无缝旅游体验的关键

随着科技的快速发展和消费者需求的不断升级&#xff0c;旅游业正面临着前所未有的变革压力。智慧文旅作为数字化转型的重要领域&#xff0c;旨在通过智能化、数据化手段为游客提供更加优质、便捷、个性化的服务&#xff0c;打造无缝的旅游体验。本文将深入探讨智慧文旅在打造无…

第九节HarmonyOS 常用基础组件17-ScrollBar

1、描述 滚动条组件ScrollBar&#xff0c;用于配合可滚动组件使用&#xff0c;如List、Grid、Scroll。 2、接口 可包含子组件 ScrollBar(value:{scroller:Scroller, direction?: ScrollBarDirection, state?: BarState}) 3、参数 参数名 参数类型 必填 描述 scrolle…

R-YOLO

Abstract 提出了一个框架&#xff0c;名为R-YOLO&#xff0c;不需要在恶劣天气下进行注释。考虑到正常天气图像和不利天气图像之间的分布差距&#xff0c;我们的框架由图像翻译网络&#xff08;QTNet&#xff09;和特征校准网络&#xff08;FCNet&#xff09;组成&#xff0c;…

spire.doc合并word文档

文章目录 spire.doc合并word文档1. 引入maven依赖2. 需要合并的word3. 合并文档代码4. 合并结果5. 合并产生段落&#xff0c;table样式混乱问题 spire.doc合并word文档 1. 引入maven依赖 <repositories><repository><id>com.e-iceblue</id><name&g…

CH395Q之CH395Q简介(一)

本节主要介绍以下内容&#xff1a; 1、TCP/IP协议栈是什么&#xff08;了解&#xff09; 2、CH395Q是什么&#xff08;了解&#xff09; 3、CH395Q工作命令&#xff08;熟悉&#xff09; 4、CH395Q & W5500 一、TCP/IP协议栈是什么 是一系列网络协议的总和&#xff0…

17. 使用 tslib 库

17. 使用 tslib 库 1. tslib 简介2. tslib 移植2.1 下载 tslib 源码2.2 编译 tslib 源码2.3 tslib 安装目录下的文件夹介绍2.4 在开发板上测试 tslib 3. tslib 库函数介绍3.1 打开触摸屏设备3.2 配置触摸屏设备3.3 读取触摸屏设备 4. 基于 tslib 编写触摸屏应用程序4.1 单点触摸…

强化学习-google football 实验记录

google football 实验记录 1. gru模型和dense模型对比实验 实验场景&#xff1a;5v5(控制蓝方一名激活球员)&#xff0c;跳4帧&#xff0c;即每个动作执行4次 实验点&#xff1a; 修复dense奖励后智能体训练效果能否符合预期 实验目的&#xff1a; 对比gru 长度为16 和 dens…

Mysql-存储引擎-InnoDB

数据文件 下面这条SQL语句执行的时候指定了ENGINE InnoDB存储引擎为InnoDB: CREATE TABLE tb_album (id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,title varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 相册名称,image varc…

Flink Checkpoint 超时问题详解

第一种、计算量大&#xff0c;CPU密集性&#xff0c;导致TM内线程一直在processElement&#xff0c;而没有时间做CP【过滤掉部分数据&#xff1b;增大并行度】 代表性作业为算法指标-用户偏好的计算&#xff0c;需要对用户在商城的曝光、点击、订单、出价、上下滑等所有事件进…

给信息安全专业想做网络安全方面的人一些忠告

别一直打CTF 打CTF是为了打基础&#xff0c;大概知道一些基础就出来吧&#xff0c;千万不要一直打下去出不来了。简历上实习经历&#xff0c;项目经历以及漏洞成果才能构成一个不错的背景&#xff0c;只有ctf比赛会很尴尬。要知道有些人是py打比赛&#xff0c;面试官知道情况&…

小迪安全24WEB 攻防-通用漏洞SQL 注入MYSQL 跨库ACCESS 偏移

#知识点&#xff1a; 1、脚本代码与数据库前置知识 2、Access 数据库注入-简易&偏移 3、MYSQL 数据库注入-简易&权限跨库 #前置知识&#xff1a; -SQL 注入漏洞产生原理分析 -SQL 注入漏洞危害利用分析 -脚本代码与数据库操作流程 -数据库名&#xff0c…

探索设计模式的魅力:深入了解适配器模式-优雅地解决接口不匹配问题

设计模式专栏&#xff1a;http://t.csdnimg.cn/nolNS 目录 一、引言 1. 概述 2. 为什么需要适配器模式 3. 本文的目的和结构 二、简价 1. 适配器模式的定义和特点 定义 特点 2. 适配器模式的作用和适用场景 作用 适用场景 3. 适配器模式与其他设计模式的比较 三、适配…

K8S搭建(centos)二、服务器设置

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

网络防御安全知识(第三版)

配置黑洞路由 --- 黑洞路由即空接口路由&#xff0c;在NAT地址池中的地址&#xff0c;建议配置达到这个地址指 向空接口的路由&#xff0c;不然&#xff0c;在特定环境下会出现环路。&#xff08;主要针对地址池中的地址和出接口地址 不再同一个网段中的场景。&#xff09; …

部署个人知识库管理软件 MrDoc详细教程

效果 一、拉取 MrDoc 代码 进入目录&#xff1a; cd /opt开源版&#xff1a; git clone https://gitee.com/zmister/MrDoc.git专业版&#xff1a; git clone https://{用户名}:{密码}git.mrdoc.pro/MrDoc/MrDocPro.git二、拉取 Docker 镜像 docker pull zmister/mrdoc:v7三…

【MyBatis】#{} 和 ${}

目录 1. #{} 使用示例&#xff1a; 2. ${} 使用示例&#xff1a; SQL注入 使用#{}的情况&#xff1a; 使用${}的情况&#xff1a; MyBatis是一种用于Java语言的持久层框架&#xff0c;它简化了数据库操作的过程。在MyBatis中&#xff0c;我们经常会看到两种不同的参数占…

保障气膜建筑稳定性的关键因素与方法

近年来&#xff0c;气膜建筑因其轻便、柔韧、环保等特点在建筑领域备受瞩目。然而&#xff0c;作为一种依赖气体支撑的结构&#xff0c;如何确保气膜建筑的稳定性成为一个重要的问题。本文将探讨保障气膜建筑稳定性的关键因素与方法&#xff0c;从气压差维持、材料选择、锚固系…