一、WebSocket产生背景
在传统的Web通信中,浏览器是基于请求--响应模式。这种方式的缺点是,浏览器必须始终主动发起请求才能获取更新的数据,而且每次请求都需要经过HTTP的握手和头部信息的传输,造成了较大的网络开销。如果客户端需要及时获得服务端数据,要么通过定时轮训、长轮训或Commet机制,但都不能完美解决。
WebSocket解决了这些问题,它提供了一种持久的连接机制,允许服务器实时地向浏览器推送数据,而无需浏览器重新发起请求。这种双向通信的方式使得Web应用程序能够实时地向用户提供更新的数据,比如在线聊天、实时通知等。
WebSocket底层是基于HTTP协议,并使用了类似握手的过程来建立连接。连接一旦建立,就可以通过发送和接收消息来进行实时通信。WebSocket消息可以是文本、二进制或者其他格式。
二、使用Netty创建WebSocket服务端
2.1 设置解析websocket协议的ChannelHandler
websocket基于http协议,因此netty创建websocket和http的代码非常相似,如下
private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel arg0) throws Exception {ChannelPipeline pipeline = arg0.pipeline();// HttpServerCodec: 针对http协议进行编解码pipeline.addLast("httpServerCodec", new HttpServerCodec());// ChunkedWriteHandler分块写处理,文件过大会将内存撑爆pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));// 用于处理websocket, /ws为访问websocket时的uripipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));// 这里需要再进行二次编解码}
}
上面代码前三个handler跟http一致,新增了一个webSocketServerProtocolHandler处理器,该处理器限制了该服务只提供websocket服务,屏蔽了底层握手、编解码、心跳和断开连接等事件。
至此,服务器可以接收到一个完整的WebSocketFrame包,但业务代码不是基于WebSocketFrame数据,例如jforgame消息包为Message实现类。因此我们需要二次消息解码。
这里的二次编码将进行WebSocketFrame到私有消息的转换,是websocket适配最麻烦的地方,相当于把私有协议栈重新实现一遍。对于TextWebSocketFrame可能还简单一点,客户端只需将包含包头(消息id)和包体(具体消息)的数据进行json化即可。而对于BinaryWebSocketFrame,客户端需要引入第三方消息编解码工具,例如protobuf。(如果客户端使用json,就不用多此一举使用二进制格式了)
2.2WebSocketFrame解码为私有协议消息
pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {if (frame instanceof TextWebSocketFrame) {String json = ((TextWebSocketFrame)frame).text();TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);System.out.println(textFrame);list.add(realMsg);} else if (frame instanceof BinaryWebSocketFrame) {throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");}}});
其中,TextFrame是websocket客户端与服务端通信格式,只有两个字段
static class TextFrame {// 消息idString id;// 消息内容String msg;}
注:这里只处理 TextWebSocketFrame文本格式,至于BinaryWebSocketFrame二进制格式,由于使用JavaScript需引入Protobuf等第三方库,这里不做演示。
2.3私有协议消息编码为WebSocketFrame
当服务器向客户端推送消息的时候,需要将私有协议包转为WebSocketFrame。
pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {if (DefaultMessageFactory.getInstance().contains(o.getClass())) {String json = JsonUtils.object2String(o);TextFrame frame = new TextFrame();frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));frame.msg = json;list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));} else if (o instanceof ReferenceCounted) {((ReferenceCounted)o).retain();list.add(o);} else {list.add(o);}}});
注:在二次编码的时候,遇到ReferenceCounted子类,需要 retain()一下才可以传递给下一个handler。
2.4将完整业务消息包丢给业务代码执行
pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));
至此,把websocket底层的协议差异给屏蔽掉了,无论服务端是采用socket还是websocket,业务代码无需做任何改变。
2.5 服务端完整代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.ReferenceCounted;
import jforgame.commons.NumberUtil;
import jforgame.demo.ServerScanPaths;
import jforgame.demo.socket.MessageIoDispatcher;
import jforgame.demo.utils.JsonUtils;
import jforgame.socket.netty.support.DefaultSocketIoHandler;
import jforgame.socket.share.HostAndPort;
import jforgame.socket.share.ServerNode;
import jforgame.socket.support.DefaultMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;public class NWebSocketServer implements ServerNode {private Logger logger = LoggerFactory.getLogger(NWebSocketServer.class);// 避免使用默认线程数参数private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());private List<HostAndPort> nodesConfig;public NWebSocketServer(HostAndPort hostPort) {this.nodesConfig = Arrays.asList(hostPort);}@Overridepublic void start() throws Exception {try {DefaultMessageFactory.getInstance().initMessagePool(ServerScanPaths.MESSAGE_PATH);ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new WebSocketChannelInitializer());for (HostAndPort node : nodesConfig) {logger.info("socket server is listening at " + node.getPort() + "......");serverBootstrap.bind(new InetSocketAddress(node.getPort())).sync();}} catch (Exception e) {logger.error("", e);bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();throw e;}}@Overridepublic void shutdown() throws Exception {if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}}private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel arg0) throws Exception {ChannelPipeline pipeline = arg0.pipeline();// HttpServerCodec: 针对http协议进行编解码pipeline.addLast("httpServerCodec", new HttpServerCodec());// ChunkedWriteHandler分块写处理,文件过大会将内存撑爆pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));// 用于处理websocket, /ws为访问websocket时的uripipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {if (frame instanceof TextWebSocketFrame) {String json = ((TextWebSocketFrame)frame).text();TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);System.out.println(textFrame);list.add(realMsg);} else if (frame instanceof BinaryWebSocketFrame) {throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");}}});pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {if (DefaultMessageFactory.getInstance().contains(o.getClass())) {String json = JsonUtils.object2String(o);TextFrame frame = new TextFrame();frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));frame.msg = json;list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));} else if (o instanceof ReferenceCounted) {((ReferenceCounted)o).retain();list.add(o);} else {list.add(o);}}});pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));}}static class TextFrame {// 消息idString id;// 消息内容String msg;}public static void main(String[] args) throws Exception{NWebSocketServer socketServer = new NWebSocketServer(HostAndPort.valueOf("localhost", 8080));socketServer.start();}}
三、使用js websocket客户端测试代码
3.1js封装weboskcet操作
/*** 对webSocket的封装 */
(function($) {$.config = {url: '', //链接地址};$.init=function(config) {this.config = config;return this;};/*** 连接webcocket*/$.connect = function() {var protocol = (window.location.protocol == 'http:') ? 'ws:' : 'ws:';this.host = protocol + this.config.url;window.WebSocket = window.WebSocket || window.MozWebSocket;if(!window.WebSocket) { // 检测浏览器支持 this.error('Error: WebSocket is not supported .');return;}this.socket = new WebSocket(this.host); // 创建连接并注册响应函数 this.socket.onopen = function() {$.onopen();};this.socket.onmessage = function(message) {$.onmessage(message);};this.socket.onclose = function() {$.onclose();$.socket = null; // 清理 };this.socket.onerror = function(errorMsg) {$.onerror(errorMsg);}return this;}/*** 自定义异常函数* @param {Object} errorMsg*/$.error = function(errorMsg) {this.onerror(errorMsg);}/*** 消息发送*/$.send = function(msgId, msg) {if(this.socket) {var req = {"id" : msgId,"msg" : JSON.stringify(msg)}this.socket.send(JSON.stringify(req));return true;}this.error('please connect to the server first !!!');return false;}/*** 消息二进制数据(测试)*/$.sendBytes = function(msgId, msg) {if(this.socket) {this.socket.send(new TextEncoder().encode("hello"));return true;}this.error('please connect to the server first !!!');return false;}$.close = function() {if(this.socket != undefined && this.socket != null) {this.socket.close();} else {this.error("this socket is not available");}}/*** 消息回調* @param {Object} message*/$.onmessage = function(message) {}/*** 链接回调函数*/$.onopen = function() {}/*** 关闭回调*/$.onclose = function() {}/*** 异常回调*/$.onerror = function() {}})(ws = {});
3.2 业务消息注册及发送
/*** 与服务端的通信协议绑定*/var io_handler = io_handler || {}io_handler.ReqAccountLogin = "101001";io_handler.ResAccountLogin = "101051";var self = io_handler;var msgHandler = {}io_handler.bind = function(msgId, handler) {msgHandler[msgId] = handler
}self.bind(self.ResAccountLogin, function(resp) {alert("角色登录成功-->" + resp)
})io_handler.handle = function(msgId, msg) {msgHandler[msgId](msg);
}
3.3html测试代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket 客户端</title><script src="js/ws.js" type="text/javascript"></script>
<script src="js/io_handler.js" type="text/javascript"></script>
<script type="text/javascript">ws.init({url : "localhost:8080/ws"}).connect();//当有消息过来的时候触发ws.onmessage = function(event) {var resp = JSON.parse(event.data)var respMessage = document.getElementById("respMessage");respMessage.value = respMessage.value + "\n" + resp.msg;io_handler.handle(resp.id, resp.msg)}//连接关闭的时候触发ws.onclose = function(event) {var respMessage = document.getElementById("respMessage");respMessage.value = respMessage.value + "\n断开连接";}//连接打开的时候触发ws.onopen = function(event) {var respMessage = document.getElementById("respMessage");respMessage.value = "建立连接";}function sendMsg(msg) { //发送消息 if (window.WebSocket) {var msg = {"accountId" : 123,"password":"abc"};ws.send(io_handler.ReqAccountLogin , msg);}}
</script>
</head>
<body><form onsubmit="return false"><textarea style="width: 300px; height: 200px;" name="message"></textarea><input type="button" onclick="sendMsg(this.form.message.value)"value="发送"><br><h3>信息</h3><textarea style="width: 300px; height: 200px;" id="respMessage"></textarea><input type="button" value="清空"onclick="javascript:document.getElementById('respMessage').value = ''"></form>
</body>
</html>
3.4 客户端运行示例
启动服务器后,点击html测试文件,发送数据
完整代码传送门 --》 jforgame游戏框架