一、场景描述
产业大脑平台是一个典型的审核系统,用户发布到平台的信息需要经过审核员审核后生效。
用户发布信息->审核员审核信息->用户信息生效,这一流程可能发生在用户的同一次登录周期内。为了使客户端能实时响应信息的状态变化,可通过短轮询、长轮询+事件订阅/发布、websocket+事件订阅/发布等方式实现。下面第二节简述轮询方式,第三节详述websocket,第四节详述服务端基于EventEmitter的事件订阅/发布方式和客户端基于mitt的事件订阅/发布方式。
综合websocket、服务端与客户端的事件订阅/发布机制,可实现客户端订阅服务端事件。简化流程如下所示:
二、轮询方式
(一)短轮询
短轮询是指由客户端周期性发起ajax请求,服务端执行查询并返回最新状态。
客户端伪代码如下:
function polling(){axios.get('/api/xxx').then(ret)=>{...setTimeout(polling, 5000)}).catch(err=>{...setTimeout(polling, 10000)})
}
polling()
(二)长轮询+事件订阅/发布
长轮询与短轮询原理相同,区别是客户端ajax发起连接时,在请求头中添加Connection: keep-alive信息: axios.get('/api/xxx',{headers:{'Connection': 'keep-alive'}})
,且服务端接受到客户端的请求后并不立即回复,而是添加一个事件监听,当出现需要推送给客户端的信息时,触发该事件。
长轮询相比短轮询大大降低了HTTP连接的频次,有效提升了通信效率。
三、websocket
websocket是一种网络通信协议,与http协议不同的是:http协议只能由客户端向服务端发起请求,而websocket是双向通信,一旦连接建立,也可由服务端主动向客户端推送数据。
相比长轮询,websocket是更彻底解决服务端向客户端推送信息的机制,在客户端的整个登录周期内,只需要建立一次TCP连接。
(一)websocket客户端
WebSocket是HTML5自带的模块,用法相当简单。假设已经在本地1001端口建立了websocket服务端(具体方法见后文),则客户端向服务端发起websocket连接以及使用方法如下:
var ws = new WebSocket("wss://localhost:1001");ws.onopen = function(evt) { console.log("Connection open ..."); ws.send("Hello WebSockets!");
};ws.addEventListener('open',(evt)=>{//若要指定多个回调函数,可使用addEventListener方法
})ws.onmessage = function(evt) {console.log( "Received Message: " + evt.data);ws.close();
};ws.onclose = function(evt) {console.log("Connection closed.");
}; //客户端向服务端发送信息
ws.send('message')//客户端的几种状态:
switch(ws.readyState){case WebSocket.CONNECTING://值为0,表示正在连接breakcase WebSocket.OPEN://值为1,表示连接成功breakcase WebSocket.CLOSING://值为2,表示连接正在关闭breakcase WebSocket.CLOSED://值为3,表示连接已经关闭,或打开连接失败break
}
(二)websocket服务端
nodejs-websocket是常用的websocket服务端模块,通过 pnpm i nodejs-websocket -S
安装。使用方法如下:
const ws=require('nodejs-websocket')
const server=ws.createServer(connection=>{connection.on('text',data=>{connection.send(data)console.log(data)})connection.on('close',(code, reason)=>{console.log("websocket连接断开")console.log(code, reason)})connection.on('error',(err)=>{console.log("websocket连接异常")console.log(err)})
})
server.listen(1001,()=>{console.log("websocket running")
}).on('connection',connection=>{console.log('建立连接成功')//可以使用wss://localhost:1001访问该服务,connection.path为'/'//也可以使用wss://localhost:1001/xxx访问该服务,connection.path为'/xxx'//可以利用path传递一些特殊信息,比如userid,用于建立事件监听console.log('path= '+connection.path)
})
1. server对象
(1)方法
- server.listen(port, [host], [callback]): 传入端口和主机地址后,开启一个 websocket 服务
- server.close([callback]): 关闭 websocket 服务
- server.connections: 返回包含所有 connection 的数组,可以用来广播所有消息
(2)事件
通过server.on(‘event’,callback)订阅事件。
- listening():调用
server.listen
会触发当前事件 - close(): 当服务关闭时触发该事件,如果有任何一个connection保持链接,都不会触发该事件
- error(errObj):发生错误时触发,此事件后会直接调用close事件
- connection(conn):建立新链接(完成握手后)触发,conn 是连接的实例对象
2. connection对象
(1)方法
- connection.sendText(str, [callback]):发送字符串给另一侧,可以由服务端发送字符串数据给客户端
- connection.beginBinary():要求连接开始传输二进制,返回一个
WritableStream
- connection.sendBinary(data, [callback]): 发送一个二进制块,类似
connection.beginBinary().end(data)
- connection.send(data, [callback]): 发送一个字符串或者二进制内容到客户端,如果发送的是文本,类似于
sendText()
,如果发送的是二进制,类似于sendBinary()
,callback
将监听发送完成的回调 - connection.close([code, [reason]]):开始关闭握手(发送一个关闭指令)
- connection.server:如果服务是 nodejs 启动,这里会保留 server 的引用
- connection.readyState:一个常量,表示连接的当前状态
- connection.outStream: 存储
connection.beginBinary()
返回的OutStream
对象,没有则返回 null - connection.path:表示建立连接的路径
- connection.headers:只读请求头的 name 的 value 对应的 object 对象
- connection.protocols:客户端请求的协议数组,没有则返回空数组
- connection.protocol:同意连接的协议,如果有这个协议,它会包含在
connection.protocols
数组里面
(2)事件
- close(code, reason): 连接关闭时触发
- error(err):发生错误时触发,如果握手无效,也会发出响应
- text(str):收到文本时触发,str 时收到的文本字符串
- binary(inStream):收到二进制内容时触发,
inStream
时一个ReadableStream
- connect():连接完全建立后发出
var server = ws.createServer(conn=> {conn.on('binary', function(inStream) {// 创建空的buffer对象,收集二进制数据var data = new Buffer(0)// 读取二进制数据的内容并且添加到buffer中inStream.on('readable', function() {var newData = inStream.read()if (newData)data = Buffer.concat([data, newData], data.length + newData.length)})inStream.on('end', function() {// 读取完成二进制数据后,处理二进制数据process_my_data(data)})})conn.on('close', function(code, reason) {console.log('Connection closed')})}).listen(1001)
四、事件订阅/发布
(一)服务端基于EventEmitter的事件订阅/发布
Node.js是基于事件驱动实现异步操作的,事件驱动依赖的就是events模块。events模块导出一个EventEmitter类。用法如下:
const EventEmitter=require('events').EventEmitter
const emitter=new EventEmitter()//订阅事件
function listener1(...args){console.log('listener1',args)}
function listener2(...args){console.log('listener2',args)}
emitter.on('someEvent',listener1)
emitter.on('someEvent',listener2)//订阅单次事件
emitter.once('someEvent',(...args)=>{console.log('once',args)})//发布事件
emitter.emit('someEvent','arg1','arg2','arg3')//移除事件监听
emitter.removeListener('someEvent',listener1)//移除所有事件监听,若指定事件,则移除该事件的所有监听器
emitter.removeAllListeners([event])//默认EventEmitter不能超过10个监听器
//setMaxListeners(n)函数可用于改变监听器的默认限制数量
emitter.setMaxListeners(100)
(二)客户端基于mitt的事件订阅/发布方法
mitt是一个十分小巧的事件发布/订阅库,大约只有200字节左右,安装方法 pnpm i mitt -S
。用法如下:
import mitt from 'mitt'const emitter = mitt()// 订阅事件
emitter.on('foo', e => console.log('foo', e) )// 订阅所有的事件
emitter.on('*', (type, e) => console.log(type, e) )// 触发事件
emitter.emit('foo', { a: 'b' })// 清除所有的订阅者
emitter.all.clear()// 使用事件处理函数的引用,方便移除监听
function onFoo() {}
emitter.on('foo', onFoo) // listen
emitter.off('foo', onFoo) // unlisten