netty-学习

Netty

    • Netty 的核心概念
    • Netty 的主要特性
    • Netty 的应用场景
    • Netty 的基本使用
        • 服务器端
        • 处理器
      • 总结
  • 代码分析
    • 1.心跳检测
      • 代码解析
        • 类和成员变量
        • `userEventTriggered`方法
        • 总结
    • 4.参数详解
        • `ChannelHandlerContext ctx`
        • `Object evt`
      • 事件来源
      • 示例:配置 `IdleStateHandler`
      • 事件处理
      • 示例回调
    • 2.Netty WebSocket 服务器启动器类
      • 1.代码解析
        • 类和成员变量
        • 资源关闭方法
        • `run` 方法
      • 2.总结
      • 3.Netty WebSocket 处理器 HandlerWebSocket
      • 代码解析
        • 类和成员变量
        • `channelActive` 方法
        • `channelInactive` 方法
        • `channelRead0` 方法
        • `userEventTriggered` 方法
        • `getToken` 方法
      • 总结
    • 完整过程
      • 1. 客户端连接到 WebSocket 服务器
      • 2. 服务器端初始化和配置
        • Netty 服务器启动
      • 3. 通道初始化
      • 4. 握手和连接事件
        • WebSocket 协议处理和心跳检测
        • 心跳检测
      • 5. 消息处理
      • 6. 连接关闭
      • 总结

Netty 是一个基于 Java 的异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络应用。Netty 提供了丰富的 API,支持多种传输协议和多种编解码方式,广泛应用于高性能的网络服务器和客户端的开发。

Netty 的核心概念

  1. Channel:Netty 中的基本网络操作抽象。它代表一个打开的连接,比如一个到远程服务器的 TCP 连接。Channel 提供了异步的网络 I/O 操作,如读、写、连接和绑定。

  2. EventLoop:Netty 中处理 I/O 操作的核心。EventLoop 是一个处理 I/O 事件、运行任务和处理定时任务的循环。每个 Channel 都会分配给一个 EventLoop。

  3. ChannelFuture:Netty 中的异步操作结果。ChannelFuture 提供了在操作完成时通知的机制,允许你在操作完成后执行一些特定的操作(如写操作完成后的回调处理)。

  4. ChannelHandler:用于处理 Channel 的 I/O 事件和数据。ChannelHandler 是处理网络事件和数据的核心接口。你可以实现 ChannelHandler 来处理入站和出站数据。

  5. ChannelPipeline:Netty 中的责任链模式实现。每个 Channel 都有一个 ChannelPipeline,它持有一个 ChannelHandler 的链。当一个 I/O 事件发生时,事件在 ChannelPipeline 中传播,通过其中的 ChannelHandler 进行处理。

  6. ByteBuf:Netty 中的数据容器,比 Java 的 NIO ByteBuffer 更高效和灵活。ByteBuf 提供了丰富的 API,支持动态扩展、零拷贝等特性。

Netty 的主要特性

  1. 异步和事件驱动:Netty 基于异步和事件驱动的编程模型,能够处理大量并发连接,并提供高吞吐量和低延迟。

  2. 多协议支持:Netty 支持多种协议,如 HTTP、WebSocket、TCP、UDP 等。你可以轻松实现自定义协议。

  3. 高效的内存管理:Netty 提供了高效的内存管理机制,避免了频繁的垃圾回收,提升了应用的性能。

  4. 丰富的工具类和扩展点:Netty 提供了大量的工具类和扩展点,方便开发者定制和扩展。

Netty 的应用场景

  1. 高性能的网络服务器:如 HTTP 服务器、游戏服务器、聊天服务器等。
  2. 分布式系统中的网络通信:如 RPC 框架、消息中间件等。
  3. 代理服务器:如反向代理、网关等。
  4. 任何需要高并发和低延迟的网络应用

Netty 的基本使用

以下是一个简单的 Netty 服务器示例:

服务器端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {private final int port;public NettyServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 处理连接事件EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理I/O事件try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture f = b.bind(port).sync(); // 绑定端口f.channel().closeFuture().sync(); // 等待服务器关闭} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;new NettyServer(port).start();}
}
处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg); // 将接收到的消息写回去}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush(); // 刷新所有写入到客户端的消息}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close(); // 发生异常时关闭连接}
}

总结

Netty 是一个强大的网络编程框架,适用于构建高性能、高可用性的网络应用。它提供了丰富的 API 和灵活的扩展机制,使得开发高性能的网络应用变得更加简单和高效。通过异步事件驱动模型和高效的内存管理,Netty 可以处理大量并发连接并提供低延迟的服务。


代码分析

1.心跳检测

public class HandlerHeartBeat extends ChannelDuplexHandler {private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}}
}

代码解析

这是一个Netty的处理器类HandlerHeartBeat,继承了ChannelDuplexHandler。这个类主要用于处理心跳检测逻辑,以确保连接的存活性。以下是对代码的详细解析:

类和成员变量
public class HandlerHeartBeat extends ChannelDuplexHandler {private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);
}
  • HandlerHeartBeat:继承自ChannelDuplexHandler,它是Netty提供的一个用于处理双向事件的处理器类。
userEventTriggered方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}
}
  • userEventTriggered:这是Netty中的一个回调方法,当用户事件触发时会调用这个方法。在心跳检测中,当连接变为空闲时,Netty会触发一个IdleStateEvent事件。

  • if (evt instanceof IdleStateEvent):检查事件是否为IdleStateEventIdleStateEvent是Netty提供的一个特殊事件,用于表示连接的空闲状态。

  • IdleStateEvent e = (IdleStateEvent) evt:将事件强制转换为IdleStateEvent

  • 处理READER_IDLE状态

    if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();
    }
    
    • IdleState.READER_IDLE:表示读取通道空闲,即长时间没有从客户端读取到数据。
    • ctx.channel().attr(...):获取与通道相关的属性。在这里,通过通道的ID作为键来获取用户ID。
    • logger.info(...):记录日志,说明哪个用户没有发送心跳包导致连接断开。
    • ctx.close():关闭连接。
  • 处理WRITER_IDLE状态

    else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");
    }
    
    • IdleState.WRITER_IDLE:表示写入通道空闲,即长时间没有向客户端发送数据。
    • ctx.writeAndFlush("heart"):向客户端发送一个心跳消息 "heart",保持连接的活跃状态。
总结

这个处理器类HandlerHeartBeat主要用于处理心跳检测逻辑,以确保客户端和服务器之间的连接在长时间没有数据交互时保持活跃或及时关闭:

  • 如果长时间没有读取到数据(READER_IDLE),则关闭连接并记录日志。
  • 如果长时间没有向客户端发送数据(WRITER_IDLE),则发送一个心跳消息 "heart" 以保持连接活跃。
    在Netty中,userEventTriggered 方法的参数是 ChannelHandlerContextObject 类型的事件。以下是详细解释:

4.参数详解

ChannelHandlerContext ctx
  • 类型ChannelHandlerContext
  • 作用ChannelHandlerContext 提供了各种操作以触发 IO 操作和事件处理(如读取、写入、连接、断开等)。它关联了一个 Channel,并且允许访问 ChannelPipeline 中的其他 ChannelHandler
Object evt
  • 类型Object
  • 作用:这是传递给该方法的事件对象。在心跳检测的场景下,这个事件对象通常是 IdleStateEvent,它表示连接的空闲状态(读空闲、写空闲、读写空闲)。

事件来源

在Netty中,空闲状态检测通常是通过 IdleStateHandler 来实现的。IdleStateHandler 会监测通道的读写操作,如果通道在指定的时间内没有读或写操作,就会触发 IdleStateEvent 事件。

示例:配置 IdleStateHandler

以下是一个示例,展示如何将 IdleStateHandler 添加到 ChannelPipeline 中,以便在通道空闲时触发 IdleStateEvent

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;public class NettyServer {public static void main(String[] args) throws Exception {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 添加 IdleStateHandler,配置读、写空闲时间p.addLast(new IdleStateHandler(60, 30, 0)); // 读空闲60秒,写空闲30秒// 添加自定义的心跳检测处理器p.addLast(new HandlerHeartBeat());}});b.bind(8080).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

事件处理

IdleStateHandler 发现通道空闲时,会触发 IdleStateEvent,并调用 HandlerHeartBeat 中的 userEventTriggered 方法。此时,ctxevt 参数的具体信息如下:

  • ctx:当前通道的上下文,提供了通道的各种操作方法。
  • evt:具体的事件对象,这里是 IdleStateEvent,表示通道的空闲状态。

示例回调

假设配置的读空闲时间是60秒,写空闲时间是30秒:

  • 如果60秒内没有读取到任何数据,IdleStateHandler 会触发 IdleStateEvent,其中状态为 IdleState.READER_IDLE
  • 如果30秒内没有向通道写入数据,IdleStateHandler 会触发 IdleStateEvent,其中状态为 IdleState.WRITER_IDLE

HandlerHeartBeat 中:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}
}
  • IdleStateEvent:包含了通道的空闲状态(读空闲、写空闲、读写空闲)。
  • IdleState.READER_IDLE:表示读空闲事件。
  • IdleState.WRITER_IDLE:表示写空闲事件。

2.Netty WebSocket 服务器启动器类

@Component
public class NettyWebSocketStarter implements Runnable {private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;/*** boss线程组,用于处理连接*/private EventLoopGroup bossGroup = new NioEventLoopGroup(1);/*** work线程组,用于处理消息*/private EventLoopGroup workerGroup = new NioEventLoopGroup();/*** 资源关闭——在容器销毁时关闭*/@PreDestroypublic void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void run() {try {//创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();//设置几个重要的处理器// 对http协议的支持,使用http的编码器,解码器pipeline.addLast(new HttpServerCodec());//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest//保证接收的http请求的完整性pipeline.addLast(new HttpObjectAggregator(64 * 1024));//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit// readerIdleTime  读超时事件 即测试段一定事件内未接收到被测试段消息// writerIdleTime  为写超时时间 即测试端一定时间内想被测试端发送消息//allIdleTime  所有类型的超时时间pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HandlerHeartBeat());//将http协议升级为ws协议,对websocket支持pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});//启动ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

1.代码解析

这段代码定义了一个 Netty WebSocket 服务器启动器类 NettyWebSocketStarter,该类实现了 Runnable 接口,可以在独立的线程中运行。这个类负责配置并启动 Netty WebSocket 服务器。

类和成员变量
@Component
public class NettyWebSocketStarter implements Runnable {private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;/*** boss线程组,用于处理连接*/private EventLoopGroup bossGroup = new NioEventLoopGroup(1);/*** work线程组,用于处理消息*/private EventLoopGroup workerGroup = new NioEventLoopGroup();
}
  • NettyWebSocketStarter:实现了 Runnable 接口,使得该类可以在独立的线程中运行。
  • logger:用于记录日志的 Logger 对象。
  • appConfig:注入的应用配置类,用于获取配置项,如 WebSocket 端口。
  • handlerWebSocket:注入的 WebSocket 处理器,用于处理 WebSocket 消息。
  • bossGroupworkerGroup:分别用于处理连接和处理消息的线程组。
资源关闭方法
/*** 资源关闭——在容器销毁时关闭*/
@PreDestroy
public void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();
}
  • @PreDestroy:在 Spring 容器销毁之前调用 close 方法,优雅地关闭 bossGroupworkerGroup,释放资源。
run 方法
@Override
public void run() {try {//创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();//设置几个重要的处理器// 对http协议的支持,使用http的编码器,解码器pipeline.addLast(new HttpServerCodec());//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest//保证接收的http请求的完整性pipeline.addLast(new HttpObjectAggregator(64 * 1024));//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit// readerIdleTime  读超时事件 即测试段一定事件内未接收到被测试段消息// writerIdleTime  为写超时时间 即测试端一定时间内想被测试端发送消息//allIdleTime  所有类型的超时时间pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HandlerHeartBeat());//将http协议升级为ws协议,对websocket支持pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});//启动ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
  1. 创建并配置 ServerBootstrap

    • ServerBootstrap 是 Netty 用于引导服务器的启动助手类。
    • serverBootstrap.group(bossGroup, workerGroup) 设置了用于处理连接的 bossGroup 和用于处理消息的 workerGroup
    • serverBootstrap.channel(NioServerSocketChannel.class) 设置了服务器通道类型为 NioServerSocketChannel,它适用于 NIO 传输。
  2. 添加处理器

    • serverBootstrap.handler(new LoggingHandler(LogLevel.DEBUG)):添加一个日志处理器,用于记录调试级别的日志。
    • serverBootstrap.childHandler(new ChannelInitializer<Channel>() {...}):添加一个子处理器,用于初始化每个新连接的通道。
  3. 初始化通道

    • HttpServerCodec:添加 HTTP 编解码器,支持 HTTP 协议。
    • HttpObjectAggregator:添加 HTTP 对象聚合器,确保接收完整的 HTTP 请求。
    • IdleStateHandler:添加空闲状态处理器,用于检测读超时(60秒)。
    • HandlerHeartBeat:添加心跳检测处理器,用于处理空闲事件。
    • WebSocketServerProtocolHandler:添加 WebSocket 协议处理器,将 HTTP 协议升级为 WebSocket 协议。
    • handlerWebSocket:添加自定义的 WebSocket 消息处理器。
  4. 启动服务器

    • ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();:绑定端口并启动服务器。
    • logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());:记录服务器启动成功的日志。
    • channelFuture.channel().closeFuture().sync();:等待服务器通道关闭。
  5. 异常处理和资源释放

    • 如果发生异常,记录堆栈跟踪并优雅地关闭 bossGroupworkerGroup,释放资源。

2.总结

NettyWebSocketStarter 是一个用于启动 Netty WebSocket 服务器的类。它通过配置一系列处理器(如 HTTP 编解码器、心跳检测处理器、WebSocket 协议处理器等)来初始化服务器,并在指定端口上启动服务器。同时,通过 @PreDestroy 注解确保在 Spring 容器销毁时优雅地关闭资源。该类实现了 Runnable 接口,使其可以在独立线程中运行,通常可以用于多线程环境中启动 Netty 服务器。

3.Netty WebSocket 处理器 HandlerWebSocket

/*** 设置通道共享*/
@ChannelHandler.Sharable
@Component("handlerWebSocket")
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);//    @Resource
//    private ChannelContextUtils channelContextUtils;@Resourceprivate RedisComponet redisComponet;/*** 当通道就绪后会调用此方法,通常我们会在这里做一些初始化操作** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// Channel channel = ctx.channel();logger.info("有新的连接加入。。。");}/*** 当通道不再活跃时(连接关闭)会调用此方法,我们可以在这里做一些清理工作** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("有连接已经断开。。。");//channelContextUtils.removeContext(ctx.channel());}/*** 读就绪事件 当有消息可读时会调用此方法,我们可以在这里读取消息并处理。** @param ctx* @param textWebSocketFrame* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {//接收心跳Channel channel = ctx.channel();logger.info("接收到消息,{}", textWebSocketFrame.text());// Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));//String userId = attribute.get();//redisComponet.saveUserHeartBeat(userId);}//用于处理用户自定义的事件  当有用户事件触发时会调用此方法,例如连接超时,异常等。@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String url = complete.requestUri();String token = getToken(url);if (token == null) {ctx.channel().close();return;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);if (null == tokenUserInfoDto) {ctx.channel().close();return;}/*** 用户加入*///channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}}/*** 获取url中的token** @param url* @return*/private String getToken(String url) {if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {return null;}String[] queryParams = url.split("\\?");if (queryParams.length < 2) {return url;}String[] params = queryParams[1].split("=");if (params.length != 2) {return url;}return params[1];}
}

代码解析

这段代码定义了一个 Netty WebSocket 处理器 HandlerWebSocket,继承自 SimpleChannelInboundHandler<TextWebSocketFrame>。该处理器主要用于处理 WebSocket 的各种事件,如连接建立、消息接收、连接关闭等。以下是对代码的详细解析:

类和成员变量
@ChannelHandler.Sharable
@Component("handlerWebSocket")
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);@Resourceprivate RedisComponet redisComponet;
}
  • @ChannelHandler.Sharable:注解表明这个处理器是可以在多个 Channel 之间共享的。
  • @Component("handlerWebSocket"):将这个类注册为 Spring 的一个组件,并指定组件名称为 handlerWebSocket
  • logger:用于记录日志的 Logger 对象。
  • redisComponet:注入的 Redis 组件,用于与 Redis 进行交互。
channelActive 方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("有新的连接加入。。。");
}
  • channelActive:当通道就绪时调用这个方法,通常用于初始化操作。
  • logger.info:记录有新的连接加入的日志。
channelInactive 方法
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("有连接已经断开。。。");//channelContextUtils.removeContext(ctx.channel());
}
  • channelInactive:当通道不再活跃(连接关闭)时调用这个方法,通常用于清理工作。
  • logger.info:记录有连接断开的日志。
channelRead0 方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {Channel channel = ctx.channel();logger.info("接收到消息,{}", textWebSocketFrame.text());// Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));// String userId = attribute.get();// redisComponet.saveUserHeartBeat(userId);
}
  • channelRead0:当有消息可读时调用这个方法,读取并处理消息。
  • Channel:获取当前的通道。
  • logger.info:记录接收到的消息。
  • 注释掉的部分代码:可以用于获取用户 ID 并保存心跳到 Redis。
userEventTriggered 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String url = complete.requestUri();String token = getToken(url);if (token == null) {ctx.channel().close();return;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);if (null == tokenUserInfoDto) {ctx.channel().close();return;}// 用户加入// channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}
}
  • userEventTriggered:处理用户自定义事件,如连接超时、异常等。
  • WebSocketServerProtocolHandler.HandshakeComplete:处理 WebSocket 握手完成事件。
  • getToken:从 URL 中获取 token。
  • ctx.channel().close():关闭通道,如果 token 为空或无效。
getToken 方法
private String getToken(String url) {if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {return null;}String[] queryParams = url.split("\\?");if (queryParams.length < 2) {return url;}String[] params = queryParams[1].split("=");if (params.length != 2) {return url;}return params[1];
}
  • getToken:从 URL 中提取 token 参数。
  • StringTools.isEmpty(url):检查 URL 是否为空。
  • url.indexOf("?"):检查 URL 是否包含查询参数。
  • url.split("\\?"):分割 URL 获取查询参数部分。
  • params.length:检查参数长度是否符合预期。

总结

HandlerWebSocket 是一个 Netty WebSocket 处理器,用于处理各种 WebSocket 事件。它包含以下功能:

  1. channelActive:当通道就绪时,记录新的连接加入。
  2. channelInactive:当通道不再活跃时,记录连接断开。
  3. channelRead0:读取并处理接收到的 WebSocket 消息,记录消息内容。
  4. userEventTriggered:处理 WebSocket 握手完成事件,从 URL 中提取 token 并验证,如果无效则关闭连接。
  5. getToken:从 URL 中提取 token 参数,用于验证用户身份。

通过这些方法,HandlerWebSocket 能够有效地管理 WebSocket 连接的生命周期和消息处理,确保连接的可靠性和安全性。



完整过程

当然可以,我会详细解释从访问 WebSocket 接口到完成所有步骤的整个流程,包括各个代码片段是如何协同工作的。

1. 客户端连接到 WebSocket 服务器

当客户端尝试连接到 WebSocket 服务器时,例如通过 ws://localhost:5051/ws?token=16515151,Netty 会接受这个连接请求并触发一系列的事件来处理这个连接。

2. 服务器端初始化和配置

在服务器端,我们使用了 NettyWebSocketStarter 类来初始化和配置 Netty 服务器。

Netty 服务器启动
@Component
public class NettyWebSocketStarter implements Runnable {private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workerGroup = new NioEventLoopGroup();@PreDestroypublic void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void run() {try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(64 * 1024));pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HandlerHeartBeat());pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

3. 通道初始化

当客户端连接到服务器时,Netty 会初始化通道并调用配置的处理器。这里的处理器包括 HttpServerCodecHttpObjectAggregatorIdleStateHandlerHandlerHeartBeatWebSocketServerProtocolHandler 以及我们的 HandlerWebSocket

4. 握手和连接事件

WebSocket 协议处理和心跳检测

WebSocketServerProtocolHandler 处理 WebSocket 协议的握手。当握手完成时,会触发 userEventTriggered 方法。

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String url = complete.requestUri();String token = getToken(url);if (token == null) {ctx.channel().close();return;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);if (null == tokenUserInfoDto) {ctx.channel().close();return;}// 用户加入// channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}
}
  • 握手完成:当 WebSocket 握手完成时,WebSocketServerProtocolHandler 触发 HandshakeComplete 事件。
  • 提取 Token:从 URL 中提取 token
  • 验证 Token:从 Redis 中验证 Token 是否有效。
  • 管理连接:将用户信息和通道关联(注释部分)。
心跳检测

IdleStateHandler 会检测连接的空闲状态(例如60秒内没有读取到数据),触发相应的事件。

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}
}
  • 读空闲:如果长时间未读取到数据(READER_IDLE),则关闭连接。
  • 写空闲:如果长时间未写入数据(WRITER_IDLE),发送心跳消息。

5. 消息处理

当客户端发送消息时,Netty 会触发 channelRead0 方法,处理接收到的 WebSocket 消息。

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {Channel channel = ctx.channel();logger.info("接收到消息,{}", textWebSocketFrame.text());// Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));// String userId = attribute.get();// redisComponet.saveUserHeartBeat(userId);
}
  • 接收消息:记录接收到的消息内容。
  • 心跳更新(注释部分):可以从消息中提取用户ID,并在 Redis 中更新心跳记录。

6. 连接关闭

当连接关闭时,Netty 会调用 channelInactive 方法。

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("有连接已经断开。。。");//channelContextUtils.removeContext(ctx.channel());
}
  • 记录日志:记录连接断开的日志。
  • 清理资源(注释部分):从上下文中移除断开的连接。

总结

整个过程包括以下步骤:

  1. 客户端连接:客户端通过 WebSocket URL 连接到服务器(如 ws://localhost:5051/ws?token=16515151)。
  2. 服务器初始化NettyWebSocketStarter 启动服务器,并配置各个处理器。
  3. 握手和协议升级WebSocketServerProtocolHandler 处理握手并升级协议。
  4. 事件触发和处理
    • 握手完成:在 userEventTriggered 中处理握手完成事件,提取并验证 token。
    • 心跳检测:在 HandlerHeartBeat 中处理读空闲和写空闲事件。
  5. 消息处理:在 HandlerWebSocketchannelRead0 方法中处理接收到的 WebSocket 消息。
  6. 连接关闭:在 HandlerWebSocketchannelInactive 方法中处理连接关闭事件。

通过这些步骤,Netty 能够高效地管理 WebSocket 连接,确保连接的可靠性和数据的实时性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/23680.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux上的电子邮件服务器安装教程

&#x1f341; 作者&#xff1a;知识浅谈&#xff0c;CSDN签约讲师&#xff0c;CSDN博客专家&#xff0c;华为云云享专家&#xff0c;阿里云专家博主 &#x1f4cc; 擅长领域&#xff1a;全栈工程师、爬虫、ACM算法&#xff0c;大数据&#xff0c;深度学习 &#x1f492; 公众号…

浏览器内置对象 window 用法集锦,看这篇就够了

文章导读&#xff1a;AI 辅助学习前端&#xff0c;包含入门、进阶、高级部分前端系列内容&#xff0c;当前是 javascript 的部分&#xff0c;瑶琴会持续更新&#xff0c;适合零基础的朋友&#xff0c;已有前端工作经验的可以不看&#xff0c;也可以当作基础知识回顾。 上面文章…

[网鼎杯 2020 青龙组]singal

记录下angr初使用 这道题是很简单的逻辑 32位 我们提取opcode (你可以用convert) 我是用的IDApython\ import idc adr0x00403040 step4#距离 op[] n10#多少个数据 while(n):op.append(hex(idc.get_wide_dword(adr)))adrstepn-1 print(op)然后我又下断点,提取每个"i&q…

持续总结中!2024年面试必问 20 道 Kafka面试题(十)

上一篇地址&#xff1a;持续总结中&#xff01;2024年面试必问 20 道 Kafka面试题&#xff08;九&#xff09;-CSDN博客 十九、Kafka的ACK机制是什么&#xff1f; Kafka的ACK&#xff08;Acknowledgement&#xff0c;确认&#xff09;机制是确保消息被成功发送和接收的重要部分…

MySQL—多表查询—内连接

一、引言 &#xff08;1&#xff09;内连接查询语法 内连接查询的是两张表的交集部分的数据。&#xff08;也就是绿色部分展示的数据&#xff09; &#xff08;2&#xff09;内连接有两种形式&#xff1a; 1、隐式内连接 语法结构&#xff1a; 2、显示内连接 语法结构&#xf…

pycharm FuncAnimation画动态图不显示, 以及画图

网上的一些方法给出了解决措施&#xff0c;如&#xff1a;https://blog.csdn.net/qq_41725313/article/details/122048864?spm1001.2101.3001.6650.2&utm_mediumdistribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogOpenSearchComplete%7ERate-2-122048864-blog-139…

红酒:如何避免红酒过度氧化

红酒过度氧化是影响其品质的重要因素&#xff0c;尤其是在储存和运输过程中。过度氧化的红酒会失去原有的果香和口感&#xff0c;变得平淡无味。因此&#xff0c;避免红酒过度氧化至关重要。以下是一些进一步的措施&#xff0c;可以帮助您保护云仓酒庄雷盛红酒的品质&#xff1…

QT学习过程中遇到的问题自记

文章目录 前言问题1问题2问题3 前言 学习QT嵌入式实战开发(从串口通信到JSON通信微课视频版)的过程中遇到的几个小问题 问题1 1.将书中的示例代码导入自己的电脑&#xff0c;然后点击工程进去&#xff0c;不能运行&#xff0c;报错 no kits are enabled for this project… 我…

LangChain :构建个人AI代理从这里开始

LangChain&#xff0c;一个强大的工具&#xff0c;允许根据用户输入创建对语言模型和其他工具的复杂调用链。就像拥有一个私人助理&#xff0c;可以根据手头的任务做出决定。本文来分享一下在 LangChain 中使用 Agents 的心路历程。 LangChain中代理的概念 在 LangChain 中&a…

指针还是学不会?跟着小代老师学,进入深入理解指针(4)

指针还是学不会&#xff1f;跟着小代老师学&#xff0c;进入深入理解指针&#xff08;4&#xff09; 1回调函数2qsort使用举例2.1使用qsort函数排序整行数据2.2使用qsort排序结构体数据 3qsort函数的模拟实现 1回调函数 回调函数就是一个通过函数指针调用的函数。 如果你把函数…

小球的种类(ball)

小球的种类 题目描述 小红有 n n n种不同颜色的小球&#xff0c;第 i i i种颜色的小球有 a i a_i ai​个&#xff0c;放在同一个盒子中。 小红每次任意取出 k k k个小球并丢弃&#xff0c;直到盒子中剩余的球数小于 k k k个为止。 小红希望最终盒子里的小球颜色种类尽可能少…

荧光标记Avidin与特定生物分子的靶向结合-星戈瑞

亲和素Avidin是一种具有生物学特性的蛋白质&#xff0c;能够与生物素&#xff08;biotin&#xff09;进行高亲和力结合。通过荧光标记技术&#xff0c;我们可以将荧光基团与Avidin结合&#xff0c;形成荧光标记Avidin&#xff0c;从而实现对特定生物分子的靶向结合和可视化。 …

LeetCode-239.滑动窗口最大值

给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回 滑动窗口中的最大值 。 示例 1&#xff1a; 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7], k 3 输…

【无标题】2024.6.6

2024.6.6 【一天高考&#xff01;&#xff01;&#xff01; “夏天周而复始、该相逢的人会再相逢”】 Thursday 五月初一 <theme oi-“DP”> 来学习一下DP的优化 其实考试时我应该很难用到优化的 P2569 [SCOI2010] 股票交易 DP柿子比较好推&#xff0c; T&#xff0…

I2C通信外设

I2C外设介绍 主机&#xff0c;就是拥有主动控制总线的权利。从机&#xff0c;只能在从机允许的情况下&#xff0c;才能控制总线。 多主机模型可分为固定多主机和可变多主机。固定多主机就是总线上&#xff0c;有2个或2个以上固定的主机&#xff0c;上面固定为主机&#xff0c;下…

【Unity | Editor强化工具】资产快速访问工具

经常在Project窗口中翻找资产相对麻烦&#xff0c;Unity自带的Favorite功能又和Project窗口强绑定&#xff0c;且只能在双列视图下使用&#xff0c;故制作了一个可以在独立窗口中列举常用资产的小工具&#xff1a; Unity Asset Quick Access 。 CSDN弄了个Github加速计划&…

在敏捷项目中如何使用WBS?

工作分解结构 (WBS) 是管理规划、监控和控制项目或计划范围的关键要素&#xff0c;在项目管理的许多不同分支中都有应用。它的主要目的是将复杂的项目分解成更易于管理的小块&#xff0c;通常以简单的流程图形式呈现。 WBS 通常与瀑布法等传统项目管理方法相关联&#xff0c;在…

教你申请永久免费的 us.kg 域名 支持接入 Cloudflare

本文首发于只抄博客&#xff0c;欢迎点击原文链接了解更多内容。 前言 之前的永久免费域名 eu.org 已经很久没有审批新的域名了&#xff0c;今天给大家推荐的 us.kg 不需要审批&#xff0c;注册账号申请域名后直接可以使用&#xff0c;并且它也可以像 eu.org 一样接入 Cloudfl…

2024中国机器人开发大会

具身智能 『 具有身体的智能 』 忘了是谁 小脑大模型&#xff1a;运动&#xff1b;大脑大模型&#xff1a;认知&#xff1b; 机器人操作系统 | 黄晓庆 云端&#xff0c;机器人的大脑&#xff0c;每个人的数字化&#xff0c;人类的第三台计算机&#xff1b;每个人生产力的提…

联合(union)和枚举(enum)学习(c语言)

前言 Hello,亲爱的小伙伴们&#xff0c;好久不见&#xff0c;今天我们继续来学习新的内容-----联合和枚举 如果喜欢作者菌的文章的话&#xff0c;就不要吝啬手中的三连呀&#xff0c;万分感谢&#xff01;&#xff01; 联合&#xff08;共用体&#xff09;&#xff08;union&…