Netty之WebSocket协议开发

一、WebSocket产生背景

在传统的Web通信中,浏览器是基于请求--响应模式。这种方式的缺点是,浏览器必须始终主动发起请求才能获取更新的数据,而且每次请求都需要经过HTTP的握手和头部信息的传输,造成了较大的网络开销。如果客户端需要及时获得服务端数据,要么通过定时轮训、长轮训或Commet机制,但都不能完美解决。

WebSocket解决了这些问题,它提供了一种持久的连接机制,允许服务器实时地向浏览器推送数据,而无需浏览器重新发起请求。这种双向通信的方式使得Web应用程序能够实时地向用户提供更新的数据,比如在线聊天、实时通知等。

WebSocket底层是基于HTTP协议,并使用了类似握手的过程来建立连接。连接一旦建立,就可以通过发送和接收消息来进行实时通信。WebSocket消息可以是文本、二进制或者其他格式。

二、使用Netty创建WebSocket服务端

2.1 设置解析websocket协议的ChannelHandler

websocket基于http协议,因此netty创建websocket和http的代码非常相似,如下

private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel arg0) throws Exception {ChannelPipeline pipeline = arg0.pipeline();// HttpServerCodec: 针对http协议进行编解码pipeline.addLast("httpServerCodec", new HttpServerCodec());// ChunkedWriteHandler分块写处理,文件过大会将内存撑爆pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));// 用于处理websocket, /ws为访问websocket时的uripipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));// 这里需要再进行二次编解码}
}

上面代码前三个handler跟http一致,新增了一个webSocketServerProtocolHandler处理器,该处理器限制了该服务只提供websocket服务,屏蔽了底层握手、编解码、心跳和断开连接等事件。

至此,服务器可以接收到一个完整的WebSocketFrame包,但业务代码不是基于WebSocketFrame数据,例如jforgame消息包为Message实现类。因此我们需要二次消息解码。

这里的二次编码将进行WebSocketFrame到私有消息的转换,是websocket适配最麻烦的地方,相当于把私有协议栈重新实现一遍。对于TextWebSocketFrame可能还简单一点,客户端只需将包含包头(消息id)和包体(具体消息)的数据进行json化即可。而对于BinaryWebSocketFrame,客户端需要引入第三方消息编解码工具,例如protobuf。(如果客户端使用json,就不用多此一举使用二进制格式了)

2.2WebSocketFrame解码为私有协议消息

 pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {if (frame instanceof TextWebSocketFrame) {String json =  ((TextWebSocketFrame)frame).text();TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);System.out.println(textFrame);list.add(realMsg);} else if (frame instanceof BinaryWebSocketFrame) {throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");}}});

其中,TextFrame是websocket客户端与服务端通信格式,只有两个字段

    static class TextFrame {// 消息idString id;// 消息内容String msg;}

注:这里只处理 TextWebSocketFrame文本格式,至于BinaryWebSocketFrame二进制格式,由于使用JavaScript需引入Protobuf等第三方库,这里不做演示。

2.3私有协议消息编码为WebSocketFrame

当服务器向客户端推送消息的时候,需要将私有协议包转为WebSocketFrame。

 pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {if (DefaultMessageFactory.getInstance().contains(o.getClass())) {String json = JsonUtils.object2String(o);TextFrame frame = new TextFrame();frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));frame.msg = json;list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));} else if (o instanceof ReferenceCounted) {((ReferenceCounted)o).retain();list.add(o);} else {list.add(o);}}});

注:在二次编码的时候,遇到ReferenceCounted子类,需要 retain()一下才可以传递给下一个handler。

2.4将完整业务消息包丢给业务代码执行

pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));

至此,把websocket底层的协议差异给屏蔽掉了,无论服务端是采用socket还是websocket,业务代码无需做任何改变。

2.5 服务端完整代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.ReferenceCounted;
import jforgame.commons.NumberUtil;
import jforgame.demo.ServerScanPaths;
import jforgame.demo.socket.MessageIoDispatcher;
import jforgame.demo.utils.JsonUtils;
import jforgame.socket.netty.support.DefaultSocketIoHandler;
import jforgame.socket.share.HostAndPort;
import jforgame.socket.share.ServerNode;
import jforgame.socket.support.DefaultMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;public class NWebSocketServer implements ServerNode {private Logger logger = LoggerFactory.getLogger(NWebSocketServer.class);// 避免使用默认线程数参数private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());private List<HostAndPort> nodesConfig;public NWebSocketServer(HostAndPort hostPort) {this.nodesConfig = Arrays.asList(hostPort);}@Overridepublic void start() throws Exception {try {DefaultMessageFactory.getInstance().initMessagePool(ServerScanPaths.MESSAGE_PATH);ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new WebSocketChannelInitializer());for (HostAndPort node : nodesConfig) {logger.info("socket server is listening at " + node.getPort() + "......");serverBootstrap.bind(new InetSocketAddress(node.getPort())).sync();}} catch (Exception e) {logger.error("", e);bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();throw e;}}@Overridepublic void shutdown() throws Exception {if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}}private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel arg0) throws Exception {ChannelPipeline pipeline = arg0.pipeline();// HttpServerCodec: 针对http协议进行编解码pipeline.addLast("httpServerCodec", new HttpServerCodec());// ChunkedWriteHandler分块写处理,文件过大会将内存撑爆pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));// 用于处理websocket, /ws为访问websocket时的uripipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {if (frame instanceof TextWebSocketFrame) {String json =  ((TextWebSocketFrame)frame).text();TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);System.out.println(textFrame);list.add(realMsg);} else if (frame instanceof BinaryWebSocketFrame) {throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");}}});pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {if (DefaultMessageFactory.getInstance().contains(o.getClass())) {String json = JsonUtils.object2String(o);TextFrame frame = new TextFrame();frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));frame.msg = json;list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));} else if (o instanceof ReferenceCounted) {((ReferenceCounted)o).retain();list.add(o);} else {list.add(o);}}});pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));}}static class TextFrame {// 消息idString id;// 消息内容String msg;}public static void main(String[] args) throws Exception{NWebSocketServer socketServer = new NWebSocketServer(HostAndPort.valueOf("localhost", 8080));socketServer.start();}}

三、使用js websocket客户端测试代码

3.1js封装weboskcet操作

/*** 对webSocket的封装 */
(function($) {$.config = {url: '', //链接地址};$.init=function(config) {this.config = config;return this;};/*** 连接webcocket*/$.connect = function() {var protocol = (window.location.protocol == 'http:') ? 'ws:' : 'ws:';this.host = protocol + this.config.url;window.WebSocket = window.WebSocket || window.MozWebSocket;if(!window.WebSocket) { // 检测浏览器支持  this.error('Error: WebSocket is not supported .');return;}this.socket = new WebSocket(this.host); // 创建连接并注册响应函数  this.socket.onopen = function() {$.onopen();};this.socket.onmessage = function(message) {$.onmessage(message);};this.socket.onclose = function() {$.onclose();$.socket = null; // 清理  };this.socket.onerror = function(errorMsg) {$.onerror(errorMsg);}return this;}/*** 自定义异常函数* @param {Object} errorMsg*/$.error = function(errorMsg) {this.onerror(errorMsg);}/*** 消息发送*/$.send = function(msgId, msg) {if(this.socket) {var req = {"id" : msgId,"msg" : JSON.stringify(msg)}this.socket.send(JSON.stringify(req));return true;}this.error('please connect to the server first !!!');return false;}/*** 消息二进制数据(测试)*/$.sendBytes = function(msgId, msg) {if(this.socket) {this.socket.send(new TextEncoder().encode("hello"));return true;}this.error('please connect to the server first !!!');return false;}$.close = function() {if(this.socket != undefined && this.socket != null) {this.socket.close();} else {this.error("this socket is not available");}}/*** 消息回調* @param {Object} message*/$.onmessage = function(message) {}/*** 链接回调函数*/$.onopen = function() {}/*** 关闭回调*/$.onclose = function() {}/*** 异常回调*/$.onerror = function() {}})(ws = {});

3.2 业务消息注册及发送

/*** 与服务端的通信协议绑定*/var io_handler = io_handler || {}io_handler.ReqAccountLogin = "101001";io_handler.ResAccountLogin = "101051";var self = io_handler;var msgHandler = {}io_handler.bind = function(msgId, handler) {msgHandler[msgId] = handler
}self.bind(self.ResAccountLogin, function(resp) {alert("角色登录成功-->" + resp)
})io_handler.handle = function(msgId, msg) {msgHandler[msgId](msg);
}

3.3html测试代码 

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket 客户端</title><script src="js/ws.js" type="text/javascript"></script>
<script src="js/io_handler.js" type="text/javascript"></script>
<script type="text/javascript">ws.init({url : "localhost:8080/ws"}).connect();//当有消息过来的时候触发ws.onmessage = function(event) {var resp = JSON.parse(event.data)var respMessage = document.getElementById("respMessage");respMessage.value = respMessage.value + "\n" + resp.msg;io_handler.handle(resp.id, resp.msg)}//连接关闭的时候触发ws.onclose = function(event) {var respMessage = document.getElementById("respMessage");respMessage.value = respMessage.value + "\n断开连接";}//连接打开的时候触发ws.onopen = function(event) {var respMessage = document.getElementById("respMessage");respMessage.value = "建立连接";}function sendMsg(msg) { //发送消息 if (window.WebSocket) {var msg = {"accountId" : 123,"password":"abc"};ws.send(io_handler.ReqAccountLogin , msg);}}
</script>
</head>
<body><form onsubmit="return false"><textarea style="width: 300px; height: 200px;" name="message"></textarea><input type="button" onclick="sendMsg(this.form.message.value)"value="发送"><br><h3>信息</h3><textarea style="width: 300px; height: 200px;" id="respMessage"></textarea><input type="button" value="清空"onclick="javascript:document.getElementById('respMessage').value = ''"></form>
</body>
</html>

3.4 客户端运行示例

启动服务器后,点击html测试文件,发送数据

完整代码传送门 --》 jforgame游戏框架 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/724845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爆肝!Claude3与ChatGPT-4到底谁厉害,看完你就知道了!

前言&#xff1a; 相信大家在pyq都被这张图片刷屏了把~ 昨天&#xff0c;为大家介绍了一下什么是Claude&#xff0c;今天咱终于弄到号了&#xff08;再被ban了3个号之后终于是成功的登上去了&#xff0c;如果各位看官觉得咱文章写的不错&#xff0c;麻烦点个小小的关注~你们的…

【详识C语言】自定义类型之三:联合

本章重点 联合 联合类型的定义 联合的特点 联合大小的计算 联合&#xff08;共用体&#xff09; 联合类型的定义 联合也是一种特殊的自定义类型 这种类型定义的变量也包含一系列的成员&#xff0c;特征是这些成员公用同一块空间&#xff08;所以联合也叫共用体&#xff09;…

mysql 数据库查询 查询字段用逗号隔开 关联另一个表并显示

文章目录 问题描述解决方案 问题描述 如下如所示&#xff1a; 表一&#xff1a;wechat_dynamically_config表&#xff0c;重点字段&#xff1a;wechat_object 表二&#xff1a;wechat_object表&#xff0c;重点字段&#xff1a;wxid 需求&#xff1a;根据wechat_dynamically_…

模仿Gitee实现站外链接跳转时进行确认

概述 如Gitee等网站&#xff0c;在有外部链接的时候如果不是同域则会出现一个确认页面。本文就带你看看这个功能应该如何实现。 效果 实现 1. 实现思路 将打开链接作为参数传递给一个中间页面&#xff0c;在页面加载的时候判断链接的域名和当前网站是否同域&#xff0c;同域…

Redis线程模型解析

引言 Redis是一个高性能的键值对&#xff08;key-value&#xff09;内存数据库&#xff0c;以其卓越的读写速度和灵活的数据类型而广受欢迎。在Redis 6.0之前的版本中&#xff0c;它采用的是一种独特的单线程模型来处理客户端的请求。尽管单线程在概念上似乎限制了其扩展性和并…

软考65-上午题-【面向对象技术】-面向对象分析、设计、测试

一、面向对象分析OOA 1-1、面向对象分析的定义 面向对象分析的目的&#xff1a;为了获得对应用问题的理解。理解的目的是确定系统的功能、性能要求。 面向对象分析包含5个活动&#xff1a;&#xff08;背&#xff01;&#xff09; 认定对象&#xff1b;&#xff08;重要一点…

QT和OPENGL安装和集成

1.QT安装 1.1官网下载&#xff1a; 网址&#xff1a;https://download.qt.io/archive/qt/ 1.2 开始安装 点击运行 首先注册sign up 然后Login in 选择安装目录 改为D盘&#xff1a; 选择安装项&#xff1a; 准备安装 开始安装&#xff1a; 安装完成&#xff1a; 1.3测试 …

SPI 接口

SPI 接口 SPI 简介寻址方式通信过程极性和相位IIC 和 SPI 的异同相同点不同点 SPI 简介 SPI&#xff08;Serial Peripheral Interface&#xff09;是串行外设接口的缩写&#xff0c;SPI是一种高速的、全双工、同步的串行通信总线&#xff1b;SPI采用主从方式工作&#xff0c;一…

UART 接口

UART 接口 1. UART 协议原理与编程1.1 UART 简介1.2 UART 帧格式1.3 UART 缺点1.4 Verilog 代码 2. RS232、RS485 协议原理2.1 RS232 协议简介2.1.1 RS232 接口2.1.2 RS232 信号2.1.3 RS232 缺点 2.2 RS4852.2.1 RS485协议简介2.2.2 RS458 信号2.2.3 RS458 接口2.2.4 RS485 优点…

Cocos Creator 3.8.x 制作模糊效果(比如游戏弹窗需要的模糊效果)

接着上一个讨论的话题,关于3.8.x的后效,今天来分享自定义后效来制作模糊效果,并将他应用到弹窗中做背景,话不多说开整。 一:最终效果 首先咱们来看官网自定义后效怎么搞的,从它的实例开始:自定义后效 二:定义PostProcessSettings给节点提供资源(通过编辑器修改参数的…

搭建Zabbix监控系统

简介 在企业网络运维过程中&#xff0c;管理员必须随时关注各服务器和网络的运行状况&#xff0c;以便及时发现问题.尽可能减少故障的发生。当网络中的设备,服务器等数量较多时&#xff0c;为了更加方便、快捷地获得各种监控信息&#xff0c;通常会借助于一些集中监测软件。 一…

FISCO BCOS区块链平台上的智能合约压力测试指南

引言 在当今的分布式系统中&#xff0c;区块链技术因其去中心化、安全性和透明性而备受关注。随着区块链应用的不断扩展&#xff0c;对其性能和稳定性的要求也越来越高。因此&#xff0c;对区块链网络进行压力测试显得尤为重要。 目录 引言 1. 配置FISCO BCOS节点 2. 安装和…

Windows安装MySQL详细教程

1.1 下载MySQL压缩包 官网下载链接[点击跳转] 按图中选择&#xff0c;然后点击【Download】 点击图中箭头所指方向直接下载 1.2 解压下载好的压缩包后找到【bin】文件夹&#xff0c;并记下文件路径&#xff08;下文将以路径 D:\mysql-8.0.36-winx64\bin 为例&#xff09; 1.…

【Python】成功解决TypeError: ‘int‘ object is not iterable

【Python】成功解决TypeError: ‘int’ object is not iterable &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到…

SmartX 携手 openGauss 社区发布联合方案评测与性能最佳实践 | 附优化方法与测试数据

近日&#xff0c;北京志凌海纳科技有限公司&#xff08;以下简称 “SmartX”&#xff09;携手 openGauss 社区完成了 openGauss 数据库基于 SmartX 超融合平台&#xff08;SMTX OS&#xff09;和 SmartX 分布式存储平台&#xff08;SMTX ZBS&#xff09;的性能测试和调优。 结…

Python-sklearn-LinearRegression

目录 1 手动实现/使用sklearn实现线性回归训练 1.1 单特征线性回归&#xff08;One Feature&#xff09; 1.2 多特征线性回归&#xff08;Multiple Features&#xff09; 1.3 多项式线性回归&#xff08;Polynomial&#xff09; 1 手动实现/使用sklearn实现线性回归训练 1…

flowable的java class task,也叫服务任务

源码地址12级程序猿-新年正当红/flowable-ui和服务任务 启动flowable-ui-app 浏览器输入下面的地址 http://localhost:8080/flowable-ui/#/ 在服务任务这里设置java类的路径 com.dmg.flowabledemo.task.MyServiceTask 当请假任务完成之后&#xff0c;自动触发这个服务任务…

Android开发社招面试总结,Android程序员面试必备的知识点

导语 学历永远是横在我们进人大厂的一道门槛&#xff0c;好像无论怎么努力&#xff0c;总能被那些985,211 按在地上摩擦&#xff01; 不仅要被“他们”看不起&#xff0c;在HR挑选简历&#xff0c;学历这块就直接被刷下去了&#xff0c;连证明自己的机会也没有&#xff0c;学…

关于Java并发多线程的一点思考

写在开头 在过去的2023年双11活动中&#xff0c;天猫的累计访问人次达到了8亿&#xff0c;京东超60个品牌销售破10亿&#xff0c;直播观看人数3.0亿人次&#xff0c;订单支付频率1分钟之内可达百万级峰值&#xff0c;这样的瞬间高并发活动&#xff0c;给服务端带来的冲击可想而…

HplusAdmin ASP.NET基本权限管理系统

HplusAdmin 介绍 一套ASP.NET WebForm(不用控件) hplusasp.netsqlserver 基本权限管理系统 http://hplus.baocaige.top 暂不开源&#xff0c;需要的滴滴或者留下邮箱&#xff01;&#xff01;&#xff01; 账号 普通账号 账号&#xff1a;user 密码&#xff1a;Aa123456普…