基于Netty构建Websocket服务端

除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。
项目目录:
在这里插入图片描述
引入pom依赖:

 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.69.Final</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>

编写SocketServer:

package com.lzq.websocket.config;import com.lzq.websocket.handlers.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.TimeUnit;@Slf4j
@Configuration
public class WebSocketConfig implements CommandLineRunner {private static final Integer PORT = 8888;@Overridepublic void run(String... args) throws Exception {new WebSocketConfig().start();}public void start() {// 创建EventLoopGroupEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec());// 最大数据长度pipeline.addLast(new HttpObjectAggregator(65536));// 添加接收websocket请求的url匹配路径pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));// 10秒内收不到消息强制断开连接// pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS));pipeline.addLast(new WebSocketHandler());}});ChannelFuture future = serverBootstrap.bind(PORT).sync();log.info("websocket server started, port={}", PORT);// 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭// 阻塞future.channel().closeFuture().sync();} catch (Exception e) {log.error("websocket server exception", e);throw new RuntimeException(e);} finally {log.info("websocket server close");// 关闭EventLoopGroupbossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

编写WebSocketHandler:

package com.lzq.websocket.handlers;import com.lzq.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {private WebSocketServerHandshaker webSocketServerHandshaker;private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 创建连接时执行NettyConfig.group.add(ctx.channel());log.info("client channel active, id={}", ctx.channel().id().toString());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 关闭连接时执行NettyConfig.group.remove(ctx.channel());log.info("client channel disconnected, id={}", ctx.channel().id().toString());}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 服务端接收客户端发送过来的数据结束之后调用ctx.flush();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt;log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri());}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {// 处理客户端http握手请求handlerHttpRequest(ctx, (FullHttpRequest) msg);} else if (msg instanceof WebSocketFrame) {// 处理websocket连接业务handlerWebSocketFrame(ctx, (WebSocketFrame) msg);}}/*** 处理websocket连接业务** @param ctx* @param frame*/private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName());// 判断是否是关闭websocket的指令if (frame instanceof CloseWebSocketFrame) {webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());return;}// 判断是否是ping消息if (frame instanceof PingWebSocketFrame) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}if (!(frame instanceof TextWebSocketFrame)) {throw new RuntimeException("不支持消息类型:" + frame.getClass().getName());}String text = ((TextWebSocketFrame) frame).text();if ("ping".equals(text)) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}log.info("WebSocket message received: {}", text);/*** 可通过客户传输的text,设计处理策略:* 如:text={"type": "messageHandler", "userId": "111"}* 服务端根据type,采用策略模式,自行派发处理** 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型),* Handler已经是线程处理,每个用户的请求是线程隔离的*/// 返回WebSocket响应ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text));/*// 群发TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString()+ ctx.channel().id()+ " : "+ text);NettyConfig.group.writeAndFlush(twsf);*/}/*** 处理客户端http握手请求** @param ctx* @param request*/private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {log.info("handlerHttpRequest>>>>class={}", request.getClass().getName());// 判断是否采用WebSocket协议if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) {sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);webSocketServerHandshaker = wsFactory.newHandshaker(request);if (webSocketServerHandshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {webSocketServerHandshaker.handshake(ctx.channel(), request);}}private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {if (response.getStatus().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);response.content().writeBytes(buf);buf.release();}// 服务端向客户端发送数据ChannelFuture f = ctx.channel().writeAndFlush(response);if (response.getStatus().code() != 200) {f.addListener(ChannelFutureListener.CLOSE);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 非正常断开时调用log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause);ctx.close();}
}

NettyConfig:

package com.lzq.websocket.config;import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;public class NettyConfig {/*** 存储接入的客户端的channel对象*/public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

使用Apifox测试:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/239585.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深圳鼎信|输电线路防山火视频监控预警装置:森林火灾来袭,安全不留白!

受线路走廊制约和环保要求影响&#xff0c;输电线路大多建立在高山上&#xff0c;不仅可以减少地面障碍物和人类活动的干扰&#xff0c;还能提高线路的抗灾能力和可靠性。但同时也会面临其它的难题&#xff0c;例如森林火灾预防。今天&#xff0c;深圳鼎信智慧将从不同角度分析…

在电路实际设计中PCI 的三态和 OD、OC 信号要有上拉。

在PCI总线以及其他数字电路设计中,三态(Tri-state)、开漏(Open Drain, OD)和开集(Open Collector, OC)是常见的输出信号类型。这些信号类型通常需要外部上拉电阻来确保信号线在不被任何设备驱动时能够被拉到高电平状态。 三态(Tri-state)信号 ,三态信号具有高电平、低…

signaltap立即触发的错误解决方法

signaltap点下run analysis后没有等到触发条件满足就触发了&#xff0c;原因是触发方式设置错误&#xff0c;应修改触发方式&#xff1a; 将Trigger flow control 从State-based 改为Sequential。

trino-435版本windows下源码编译

一、源码下载地址 https://github.com/trinodb/trino/tags 二、编译环境及工具准备 1、maven &#xff08;1&#xff09;版本&#xff1a;3.6.3 &#xff08;2&#xff09;settings.xml配置 <?xml version"1.0" encoding"UTF-8"?> <settin…

Jmeter 性能测试 —— 评估一个系统TPS与并发数!

问题&#xff1a;性能压测&#xff0c;如何评估一个系统的TPS和并发数&#xff1f; 1、对于新系统 由业务部门或开发人员预估交易量和TPS指标 可以参考公式&#xff1a;并发用户 在线用户数 * 10%。 当一个系统还没有上线时&#xff0c;我们可以预判的是这个系统准备要给多…

hyperf 十八 数据库 一

教程地址&#xff1a;Hyperf 一、安装 1.1 hyperf框架 composer require hyperf/db-connection 1.2 其他框架 composer require hyperf/database 二、配置 配置项类型默认值备注driverstring无数据库引擎hoststring无数据库地址databasestring无数据库默认 DBusernamest…

【数据结构】队列的使用|模拟实现|循环队列|双端队列|面试题

一、 队列(Queue) 1.1 概念 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先进先出FIFO(First In First Out) 入队列&#xff1a;进行插入操作的一端称为队尾&#xff08;Tail/Rear&#xff09; 出队列…

Vue 初始化數組后操作另一個數組onMounted和watch

Vue 的父组件和子组件的生命周期钩子函数执行顺序可以归类为以下 4 部分&#xff1a; 1、加载渲染过程 父 beforeCreate -> 父 created -> 父 beforeMount -> 子 beforeCreate -> 子 created -> 子beforeMount -> 子 mounted -> 父 mounted 注意&#x…

深度剖析:Golang中结构体方法的高级应用

深度剖析&#xff1a;Golang中结构体方法的高级应用 引言结构体方法的基础回顾结构体的定义和用法方法的定义和绑定基本语法和用法 高级特性与应用封装、继承和多态方法集与接口的关系结构体方法的匿名字段和嵌入结构体 性能优化与最佳实践接收器类型的选择&#xff1a;指针还是…

文档 - - - Docsify文档创建

目录 1. Docsify 介绍2. 创建 Docsify 项目2.1 安装 Node.js2.1 安装 docsfiy-cli2.3 初始化项目2.4 运行项目2.5 使用 Python 运行项目&#xff08;扩展&#xff0c;不推荐有bug&#xff09; 3. 配置 Docsify 项目3.1 修改等待加载文字3.2 添加网站 ico 图标3.3 创建新页面写文…

Redux与React环境准备、实现counter(及传参)、异步获取数据

环境说明&#xff1a; 一&#xff1a;说明 在React中使用redux&#xff0c;官方要求安装两个其他插件&#xff1a;Redux Toolkit和react-redux 1. Redux ToolKit(RTK) - 官方推荐编写Redux逻辑的方式&#xff0c;是一套工具的集合集&#xff0c;简化书写方式 &#xff08;简化…

【数据结构之单链表】

数据结构学习笔记---003 数据结构之单链表1、什么是单链表?1.1、概念及结构 2、单链表接口的实现2.1、单链表的SList.h2.1.1、定义单链表的结点存储结构2.1.2、声明单链表各个接口的函数 2.2、单链表的SList.c2.2.1、遍历打印链表2.2.2、销毁单链表2.2.3、打印单链表元素2.2.4…

跨域问题的解决

1.什么是跨域&#xff1f; 浏览器从一个域名的网页去请求另外一个域名的资源时&#xff0c;域名、端口或者协议不同都是跨域 2.跨域的解决方案 设置CORS响应头∶后端可以在HTTP响应头中添加相关的CORS标头&#xff0c;允许特定的源&#xff08;域名、协议、端口)访问资源。S…

VM进行TCP/IP通信

OK就变成这样 vm充当服务端的话也是差不多的操作 点击连接 这里我把端口号换掉了因为可能被占用报错了&#xff0c;如果有报错可以尝试尝试换个端口号 注&#xff1a; 还有一个点在工作中要是充当服务器&#xff0c;要去网络这边看下他的ip地址 拉到最后面

【github】github设置项目为私有

点击setting change to private 无脑下一步

day6 力扣公共前缀--go实现---对字符串的一些思考

今日份知识&#xff1a; curl -x 指定方法名 请求的url -d 请求体body里面的内容 //curl命令 curl -x Get 127.0.0.1:8080/add/user -d jinlicurl如果不指定方法&#xff0c;默认使用get方法&#xff0c;在go里面&#xff0c;get方法到底可以不可以把内容数据写在body里面传…

web架构师编辑器内容-创建业务组件和编辑器基本行为

编辑器主要分为三部分&#xff0c;左侧是组件模板库&#xff0c;中间是画布区域&#xff0c;右侧是面板设置区域。 左侧是预设各种组件模板进行添加 中间是使用交互手段来更新元素的值 右侧是使用表单的方式来更新元素的值。 大致效果&#xff1a; 左侧组件模板库 最初的模板…

基于JSP+Servlet+Mysql的调查管理系统

基于JSPServletMysql的调查管理系统 一、系统介绍二、功能展示1.项目内容2.项目骨架3.数据库3.登录4.注册3.首页5.系统管理 四、其它1.其他系统实现五.获取源码 一、系统介绍 项目名称&#xff1a;基于JSPServlet的调查管理系统 项目架构&#xff1a;B/S架构 开发语言&#…

在Next.js和React中搭建Cesium项目

在Next.js和React中搭建Cesium项目&#xff0c;需要确保Cesium能够与服务端渲染(SSR)兼容&#xff0c;因为Next.js默认是SSR的。Cesium是一个基于WebGL的地理信息可视化库&#xff0c;通常用于在网页中展示三维地球或地图。下面是一个基本的步骤&#xff0c;用于在Next.js项目中…

群组推荐模型---SoAGREE(Social-Enhanced Attentive Group Recommendation)

SoAGREE 概要方法Hierarchical Attention Network LearningAttentive User Representation Learning NCF(Neural Collaborative Filtering) 概要 此论文是在AGREE(Attentive Group Recommendation)模型上的进一步增强&#xff0c;有兴趣的朋友可以去看上一篇博客讲述的就是AGR…