1、实时通信有哪些实现方式?
特性 | 轮询(Polling) | WebSocket | SSE (Server-Sent Events) |
---|---|---|---|
通信方向 | 单向(客户端 → 服务端) | 双向(客户端 ↔ 服务端) | 单向(服务端 → 客户端) |
连接方式 | 客户端定时发起HTTP请求 | 持久连接(基于TCP协议) | 持久连接(基于HTTP协议) |
实现复杂度 | 简单 | 较复杂(需处理握手、协议转换等) | 简单 |
实时性 | 低(依赖轮询间隔) | 高(支持实时双向通信) | 高(服务端实时推送) |
性能开销 | 高(频繁建立/关闭连接) | 低(单个持久连接) | 低(单个持久连接) |
示例场景 | 定时获取天气、股票数据 | 在线聊天、多人游戏、实时协作 | 新闻推送、通知、实时股票行情 |
- 轮询:开发简单,但是效率太低了,适合对实时性要求不高的场景。在项目中,基本是不推荐使用,这种做法比较 low。
- WebSocket:开发起来较为复杂。性能好,可以双向实时通信,适合需要交互的场景。推荐使用 Socket.IO 包来开发。
- SSE:开发简单。性能好,但只能单向实时通信,适合服务端向客户端主动推送数据的场景。
另外值得一提的是,现在各个 AI 平台的消息推送
,也都是使用SSE
来实现的。
2、SSE 的基础实现
// 设置正确的响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');// 定时发送数据const intervalId = setInterval(() => {const data = {message: `当前时间是 ${new Date().toLocaleTimeString()}`,};res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 2000);// 处理客户端断开连接
req.on('close', () => {clearInterval(intervalId); // 清除定时器res.end(); // 结束响应
});
- 想要用
SSE来推送数据
,顶部
要按照这个格式来设置响应头
,明确指明为event-stream
。这样才能被识别成SSE。 中间
部分,简单的写了一个定时执行
。每隔两秒钟,计算一下当前的时间。- 然后注意了,这里用的是
res.write
,这是实现SSE的关键代码
。使用res.write可以不断的,分多次发送数据,而不需要一次性发送完整的响应
。 - 还有要注意的是,我们这里
发送数据的格式
,按照SSE的规则
,必须是以data:开头,以两个\n结束
。\n是换行的意思。 - 最下面,如果
连接断开了
,就停止定时器
,并结束响应
。
3、前端部分实现
SSE默认不支持在header中传递数据
,那就直接在URL里传token好了
// 定义管理员令牌,用于身份验证,在实际使用时需将 'xxxx' 替换为真实有效的令牌
const token = 'xxxx';// 创建一个 EventSource 对象,用于建立与服务器的 SSE 连接
// 这里使用模板字符串将 token 拼接到请求的 URL 中,该 URL 指向服务器上处理订单流数据的接口
const eventSource = new EventSource(`http://localhost:3000/admin/charts/stream_order?token=${ token }`);// 当服务器向客户端发送消息时,会触发 onmessage 事件
eventSource.onmessage = function (event) {// 使用 try...catch 块来捕获可能出现的异常,例如数据解析失败的情况try {// 服务器发送的数据存储在 event.data 中,通常是 JSON 格式的字符串// 使用 JSON.parse 方法将其解析为 JavaScript 对象const data = JSON.parse(event.data);// 将解析后的数据打印到控制台,方便调试和查看console.log(data);} catch (error) {// 如果解析数据过程中出现错误,将错误信息打印到控制台console.error('解析数据出错:', error);}
};// 当 SSE 连接出现错误时,会触发 onerror 事件
eventSource.onerror = function (error) {// 将连接错误信息打印到控制台,方便调试和排查问题console.error('SSE 连接出错:', error);
};
4、测试
检查
,选择网络
。然后刷新一下
,就能看到发出的请求
了
注意
:请求的状态是pending
,等待 2 秒钟
后,状态变为200
的时候我们再点进去。这是因为如果点早了
,浏览器还没有将它识别成SSE
。
5、 SSE 小结
- 在
Node
部分,设置好响应头
,并用res.write
不断的发数据出去就好。 前端
部分,用new EventSource
建立连接。并通过onmessage
事件,来接收数据。
6、实践
redis 封装
6.1、后端代码实现
6.1.1、新建 SSE 处理工具类
在根目录
新建streams
文件夹,里面再新建一个sse-handler.js
,用于处理服务器发送事件(Server-Sent Events, SSE)的工具类,代码如下:
const {setKey, getKey} = require('../utils/redis');// 定义一个名为 SSEHandler 的类,用于处理服务器发送事件(SSE)的连接和数据广播
class SSEHandler {// 构造函数,在创建 SSEHandler 实例时会自动调用constructor() {// 使用 ES6 的 Set 数据结构来存储与浏览器建立 SSE 连接的响应对象(res)// Set 可以确保存储的元素唯一,避免重复this.clients = new Set();}/*** 初始化 SSE 数据流,处理新的客户端连接* @param {Object} res - Express 响应对象,用于向客户端发送数据* @param {Object} req - Express 请求对象,包含客户端的请求信息*/initStream(res, req) {// 设置响应头,指定响应内容类型为 text/event-stream,这是 SSE 的标准内容类型res.setHeader('Content-Type', 'text/event-stream');// 设置缓存控制,禁止浏览器缓存响应内容,确保每次请求都能获取最新数据res.setHeader('Cache-Control', 'no-cache');// 设置连接类型为 keep-alive,保持与客户端的长连接res.setHeader('Connection', 'keep-alive');// 刷新响应头,将设置的响应头信息发送给客户端res.flushHeaders();// 将当前客户端的响应对象添加到 clients 集合中,以便后续广播数据时使用this.clients.add(res);// 监听客户端连接关闭事件,当客户端断开连接时触发回调函数req.on('close', () => {// 从 clients 集合中删除当前客户端的响应对象this.clients.delete(res);// 打印日志,提示客户端已断开连接console.log('Client disconnected');});}/*** 向所有连接的客户端广播数据* @param {Object} data - 要广播的数据对象,会被序列化为 JSON 字符串发送给客户端*/async broadcastData(data) {await setKey('sse_broadcast_data', data);// 遍历 clients 集合中的每个客户端响应对象this.clients.forEach((client) => {// 检查客户端响应是否已经结束,如果未结束则继续发送数据if (!client.finished) {// 按照 SSE 的格式,以 "data: " 开头,后面跟上 JSON 序列化后的数据,以两个换行符 "\n\n" 结尾// 并将其写入客户端响应流,发送给客户端client.write(`data: ${JSON.stringify(data)}\n\n`);}});}
}// 将 SSEHandler 类导出,以便其他模块可以引入和使用
module.exports = SSEHandler;
6.1.2、定义和管理统计查询的 SQL 语句·
创建utils/stats-query.js
文件,用于定义和管理统计查询的 SQL 语句:
const statsQueries = {order: "SELECT DATE_FORMAT(`createdAt`, '%Y-%m') AS `month`, COUNT(*) AS `value` FROM `Orders` GROUP BY `month` ORDER BY `month` ASC",user: "SELECT DATE_FORMAT(`createdAt`, '%Y-%m') AS `month`, COUNT(*) AS `value` FROM `Users` GROUP BY `month` ORDER BY `month` ASC",// 可以添加更多类型的统计查询
};function getStatsQuery(type) {return statsQueries[type];
}module.exports = {getStatsQuery
};
6.1.3、 广播服务的实现
创建utils/broadcast-service.js
文件,实现广播服务:
const {sequelize} = require('../models');
const SSEHandler = require('../streams/sse-handler');// 直接在 broadcast-service.js 中定义统计查询配置
const {getStatsQuery} = require('./stats-query');
// 存储不同类型的 SSE 处理程序
const sseHandlers = {};async function broadcastStats(type) {try {if (!getStatsQuery(type)) {console.error(`Invalid stats type: ${type}`);return;}if (!sseHandlers[type]) {sseHandlers[type] = new SSEHandler();}const [results] = await sequelize.query(getStatsQuery(type));const data = {months: results.map(item => item.month),values: results.map(item => item.value)};sseHandlers[type].broadcastData(data);console.log(`${type} stats broadcasted successfully`);} catch (error) {console.error(`Error broadcasting ${type} stats:`, error);}
}function initSSEStream(type, res, req) {if (!getStatsQuery(type)) {console.error(`Invalid stats type: ${type}`);return;}if (!sseHandlers[type]) {sseHandlers[type] = new SSEHandler();}sseHandlers[type].initStream(res, req);
}module.exports = {broadcastStats,initSSEStream
};
6.1.4、新建 charts.js
文件实现 ECharts
路由(仅展示关键代码)
在这一部分,我们将创建一个 charts.js 文件,用于实现与 ECharts 相关的路由功能。该文件主要负责处理不同类型的统计数据请求,并利用 Redis 进行数据缓存,同时支持通过 SSE(Server-Sent Events)进行实时数据推送。
const {getStatsQuery} = require('../../utils/stats-query');
const {initSSEStream} = require('../../utils/broadcast-service');
const {getKey, setKey} = require('../../utils/redis');// 获取统计数据的通用路由
// 此路由根据请求参数中的 type 来获取相应的统计数据
router.get('/:type', async (req, res) => {// 从请求参数中获取统计数据的类型const type = req.params.type;// 根据类型获取对应的统计查询语句const query = getStatsQuery(type);// 如果未找到对应的查询语句,返回错误信息if (!query) {return failure(res, new Error('Invalid stats type'));}try {// 首先尝试从 Redis 中获取缓存的数据let cachedData = await getKey(`stats_data_${type}`);// 如果 Redis 中存在缓存数据,直接返回该数据if (cachedData) {return success(res, 'Stats data fetched successfully', {data: cachedData});}// 若 Redis 中没有缓存数据,执行数据库查询const [results] = await sequelize.query(query);// 对查询结果进行处理,将月份和对应的值分别提取出来const data = {months: results.map(item => item.month),values: results.map(item => item.value)};// 将处理后的数据存储到 Redis 中,以便后续请求可以直接使用缓存数据await setKey(`stats_data_${type}`, data);// 返回查询成功的响应,并将数据发送给客户端success(res, 'Stats data fetched successfully', {data});} catch (error) {// 若出现错误,返回错误响应failure(res, error);}
});/*** SSE 统计不同类型数据* GET /admin/charts/stream/:type* 此路由用于处理 SSE 连接,实时推送不同类型的统计数据*/
router.get('/stream/:type', async (req, res) => {// 从请求参数中获取统计数据的类型const type = req.params.type;// 初始化 SSE 数据流,开始向客户端推送数据initSSEStream(type, res, req);
});
6.1.5、表单新增数据
时 删除redis缓存
以order
为例
// 定义一个处理 POST 请求的路由,路径为根路径('/')
// 当客户端向该路径发送 POST 请求时,此中间件函数会被调用
// req 表示请求对象,包含客户端发送的请求信息
// res 表示响应对象,用于向客户端发送响应
// next 是 Express 中的中间件函数,用于将控制权传递给下一个中间件
router.post('/', async function (req, res, next) {try {// 生成一个唯一的订单号// 使用 uuidv4 函数生成一个通用唯一识别码(UUID)// 然后使用 replace 方法将 UUID 中的连字符(-)替换为空字符串,得到一个无连字符的订单号const outTradeNo = uuidv4().replace(/-/g, '');// 调用 getMembership 函数,根据请求对象 req 获取会员信息// 这个函数可能会从数据库、缓存或其他数据源中获取与当前请求相关的会员信息const membership = await getMembership(req);// 使用 Sequelize 的 create 方法创建一个新的订单记录// 传入一个包含订单信息的对象,这些信息将被插入到数据库的 Order 表中const order = await Order.create({// 订单号,使用前面生成的唯一订单号outTradeNo: outTradeNo,// 用户 ID,从请求对象中获取当前用户的 IDuserId: req.userId,// 订单主题,使用会员的名称subject: membership.name,// 会员时长(月),从会员信息中获取会员的持续月数membershipMonths: membership.durationMonths,// 订单总金额,使用会员的价格totalAmount: membership.price,// 订单状态,初始状态设为 0,通常表示待支付status: 0,});// 删除 Redis 中存储的订单统计数据缓存// 当有新订单创建时,之前的统计数据可能不再准确,所以需要删除缓存// 后续请求统计数据时会重新从数据库获取最新数据await delKey('stats_data_order');// 调用广播服务,将订单统计数据的更新广播出去// 这可能会触发前端页面或其他相关服务更新订单统计信息await broadcastStats('order');// 调用 success 函数,向客户端发送成功响应// 第一个参数是响应对象 res,用于发送响应// 第二个参数是成功消息,告知客户端订单创建成功// 第三个参数是包含订单信息的对象,客户端可以使用这些信息进行后续处理success(res, '订单创建成功。', { order });} catch (error) {// 如果在订单创建过程中出现错误,调用 failure 函数向客户端发送错误响应// 第一个参数是响应对象 res,用于发送响应// 第二个参数是捕获到的错误对象,客户端可以根据错误信息进行相应处理failure(res, error);}
});
6.2、前端代码实现
6.2.1、前端charts
封装
在前端开发中,为了高效管理 ECharts 图表,我们封装了 ChartManager 类,它能处理图表的初始化、数据获取、SSE 连接以及错误处理等操作。以下是详细的代码及说明:
// 配置对象,可根据实际情况灵活修改,用于存储与图表管理相关的配置信息
const config = {// API 请求的基础 URL,用于获取图表数据和建立 SSE 连接API_BASE_URL: 'http://localhost:3000',// 最大重试次数,当 SSE 连接失败时会进行重试,达到该次数后停止重试MAX_RETRIES: 5,// 重试延迟时间(毫秒),每次重试之间的间隔时长RETRY_DELAY: 3000
};/*** ChartManager 类,负责管理 ECharts 图表的初始化、数据获取、SSE 连接以及错误处理等功能*/
class ChartManager {/*** 构造函数,用于初始化图表管理器的基本属性* @param {string} chartId - 图表容器的 ID,用于查找对应的 DOM 元素* @param {string} type - 图表类型,例如 'order' 或 'user',决定获取数据的接口路径* @param {string} token - 用于身份验证的令牌,在请求数据时使用* @param {Object} [option={}] - 可选的 ECharts 配置选项,用于自定义图表样式*/constructor(chartId, type, token, option = {}) {this.chartId = chartId;this.type = type;this.token = token;this.chart = null;this.initialDataFetched = false;this.sseSource = null;this.retryCount = 0;this.maxRetries = config.MAX_RETRIES;this.retryDelay = config.RETRY_DELAY;this.option = {title: {text: `月度${type === 'order' ? '订单' : '用户'}统计`,textStyle: { color: '#333' }},tooltip: {trigger: 'axis',axisPointer: { type: 'shadow' }},grid: {left: '3%',right: '4%',bottom: '3%',containLabel: true},xAxis: {type: 'category',data: [],axisTick: { alignWithLabel: true }},yAxis: {type: 'value'},series: [{name: '数量',type: 'bar',barWidth: '60%',data: []}],...option};}/*** 初始化图表,包含创建 ECharts 实例、获取初始数据以及建立 SSE 连接等操作*/async init() {try {const chartDom = document.getElementById(this.chartId);if (!chartDom) {throw new Error(`未找到指定 ID 的图表容器:${this.chartId}`);}this.chart = echarts.init(chartDom);this.chart.setOption(this.option);await this.fetchInitialData();this.connectSSE();} catch (error) {console.error(`图表 ${this.chartId} 初始化失败:`, error);this.showError('图表初始化失败,请尝试刷新页面重试', error.message);}}/*** 异步获取初始数据*/async fetchInitialData() {try {const url = `${config.API_BASE_URL}/admin/charts/${this.type}?token=${this.token}`;const response = await this.fetchData(url);const { data } = response;if (data.data.months.length === 0) {this.showError('当前暂无数据,10 秒后将尝试重新获取');setTimeout(() => this.fetchInitialData(), 10000);return;}if (data.data.months.length!== data.data.values.length) {this.showError('数据格式存在错误:月份和数量数组长度不匹配');return;}this.option.xAxis.data = data.data.months;this.option.series[0].data = data.data.values;this.initialDataFetched = true;this.chart.setOption(this.option);} catch (error) {this.showError('数据加载失败,5 秒后将尝试重新加载', error.message);setTimeout(() => this.fetchInitialData(), 5000);}}/*** 建立 SSE 连接,实时监听服务器发送的数据*/connectSSE() {if (this.sseSource &&!this.sseSource.closed) {this.sseSource.close();}const url = `${config.API_BASE_URL}/admin/charts/stream/${this.type}?token=${this.token}`;this.sseSource = new EventSource(url);this.sseSource.onmessage = (event) => {try {const responseData = JSON.parse(event.data);const data = responseData.data || responseData;if (!data ||!Array.isArray(data.months) ||!Array.isArray(data.values)) {this.showError('SSE 返回的数据格式有误,请检查后端接口');return;}if (data.months.length === 0) {this.showError('暂无数据,请创建订单后再进行查看');return;}if (data.months.length!== data.values.length) {this.showError('SSE 数据格式错误:月份和数量数组长度不匹配');return;}this.option.xAxis.data = data.months;this.option.series[0].data = data.values;this.chart.setOption(this.option);} catch (parseError) {console.error('解析 SSE 数据时出现错误:', parseError);this.showError('实时数据解析失败,请检查网络连接或尝试刷新页面', parseError.message);}};this.sseSource.onerror = (error) => {console.error('SSE 连接出现错误:', error);this.showError('实时数据连接中断,正在尝试重新连接...', error.message);this.handleSseError();};}/*** 处理 SSE 连接错误,执行重试操作*/handleSseError() {this.retryCount++;if (this.retryCount <= this.maxRetries) {setTimeout(() => this.connectSSE(), this.retryDelay);} else {this.showError('重连失败,请手动刷新页面');this.retryCount = 0;}}/*** 发起网络请求获取数据* @param {string} url - 请求的 URL* @returns {Promise<Object>} - 返回解析后的 JSON 数据*/async fetchData(url) {try {const response = await fetch(url);if (!response.ok) {throw new Error(`HTTP 请求失败,状态码:${response.status}`);}const contentType = response.headers.get('content-type');if (!contentType ||!contentType.includes('application/json')) {throw new Error('响应数据并非 JSON 格式');}return await response.json();} catch (error) {this.showError('网络请求出现问题', error.message);throw error;}}/*** 显示错误信息并更新图表标题* @param {string} message - 错误消息内容* @param {string} [errorCode=null] - 可选的错误代码*/showError(message, errorCode = null) {const errorDiv = document.createElement('div');errorDiv.className = 'error-message';errorDiv.innerHTML = `${message}${errorCode? `<small>错误码: ${errorCode}</small>` : ''}<button onclick="this.parentElement.remove()">关闭</button>`;document.body.prepend(errorDiv);this.option.series[0].data = [];this.chart.setOption(this.option);}/*** 销毁图表和 SSE 连接,释放相关资源*/destroy() {if (this.chart) {this.chart.dispose();this.chart = null;}if (this.sseSource &&!this.sseSource.closed) {this.sseSource.close();}}
}// 将 ChartManager 类暴露到全局作用域,方便在其他地方使用
window.ChartManager = ChartManager;
6.2.1、前端demo
实现
面是一个前端示例,展示了如何使用 ChartManager 类创建实时统计图表:
<!DOCTYPE html>
<html lang="zh-CN"><head><meta charset="UTF-8"><!-- 让页面在移动设备上能正确显示 --><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>实时统计图表</title><!-- 引入 ECharts 库,用于创建各种类型的图表 --><script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script><!-- 引入自定义的图表管理脚本,包含 ChartManager 类等逻辑 --><script src="charts.js"></script><style>body {/* 设置页面整体字体为 Arial 无衬线字体 */font-family: Arial, sans-serif;/* 使用 Flexbox 布局,让内容水平和垂直居中 */display: flex;justify-content: center;align-items: center;/* 让内容垂直排列 */flex-direction: column;/* 页面四周添加 20px 的外边距 */margin: 20px;}.chart-container {/* 设置图表容器的宽度为 800px */width: 800px;/* 设置图表容器的高度为 500px */height: 500px;/* 图表容器四周添加 20px 的外边距 */margin: 20px;/* 图表容器添加 1px 宽的浅灰色边框 */border: 1px solid #eee;/* 图表容器四个角设置 4px 的圆角 */border-radius: 4px;}.error-message {/* 错误消息文本颜色设置为红色 */color: red;/* 错误消息四周添加 20px 的外边距 */margin: 20px;/* 错误消息内容四周添加 10px 的内边距 */padding: 10px;/* 错误消息背景颜色设置为浅红色 */background-color: #ffebee;/* 错误消息框四个角设置 4px 的圆角 */border-radius: 4px;}</style>
</head><body>
<!-- 订单统计图表的容器,后续 ECharts 图表会渲染到这个容器中 -->
<div class="chart-container" id="orderChart"></div>
<!-- 用户统计图表的容器,后续 ECharts 图表会渲染到这个容器中 -->
<div class="chart-container" id="userChart"></div>
<script>// 当文档的 DOM 内容加载完成后执行以下逻辑document.addEventListener('DOMContentLoaded', async () => {try {// 这里的 token 用于身份验证,实际使用时需要替换为从后端获取的真实 tokenconst token = '';// 自定义的 ECharts 配置选项,可用于覆盖默认配置const customOption = {// 修改图表标题的文本颜色为蓝色title: {textStyle: { color: 'blue' }}};// 创建订单统计图表的管理实例,传入容器 ID、图表类型、token 和自定义配置const orderChart = new ChartManager('orderChart', 'order', token, customOption);// 创建用户统计图表的管理实例,传入容器 ID、图表类型、token 和自定义配置const userChart = new ChartManager('userChart', 'user', token, customOption);// 使用 Promise.all 并行初始化订单图表和用户图表await Promise.all([orderChart.init(), userChart.init()]);// 监听窗口的 beforeunload 事件,当用户关闭或刷新页面时执行以下逻辑window.addEventListener('beforeunload', () => {// 销毁订单图表实例,释放相关资源orderChart.destroy();// 销毁用户图表实例,释放相关资源userChart.destroy();});} catch (error) {// 如果在初始化过程中出现错误,在控制台输出错误信息console.error('图表初始化失败:', error);// 调用 showGlobalError 函数显示全局错误消息showGlobalError('图表初始化失败,请检查浏览器控制台');}});/*** 显示全局错误消息的函数* @param {string} message - 要显示的错误消息内容*/function showGlobalError(message) {// 创建一个新的 div 元素用于显示错误消息const errorDiv = document.createElement('div');// 为错误消息 div 元素添加 error-message 类名,以便应用相应的样式errorDiv.className = 'error-message';// 设置错误消息 div 元素的文本内容为传入的消息errorDiv.textContent = message;// 将错误消息 div 元素添加到页面 body 元素的最前面document.body.prepend(errorDiv);}
</script>
</body></html>