1 心跳监测
MyServer.java
public class MyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//加入Netty提供的 IdleStateHandler/*说明:IdleStateHandler 是netty提供的处理空闲状态的处理器long readerIdleTime:表示多长时间没有读,就会发送一个心跳监测包 检测是否连接long writerIdelTime:表示多长时间没有写,就会发送一个心跳监测包 监测是否连接long allIdelTime:表示多长时间没有读写,就会发送一个心跳检测包 监测是否连接*/pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));//加入自定义Handler,对空闲检测进一步处理pipeline.addLast(new MyServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
MyServerHandler.java
@Slf4j
public class MyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()){case READER_IDLE:eventType = "读空闲";break;case WRITER_IDLE:eventType = "写空闲";break;case ALL_IDLE:eventType = "读写空闲";break;}log.info("{},---超时时间---,{}",ctx.channel().remoteAddress(),eventType);log.info("服务器做相应处理");//如果发生空闲,关闭通道ctx.channel().close();}}
}
NettyChatClient.java
@Slf4j
public class NettyChatClient {private String host;private int port;public NettyChatClient(String host, int port) {this.host = host;this.port = port;}private void run(){NioEventLoopGroup loopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder",new StringDecoder());pipeline.addLast("encoder",new StringEncoder());pipeline.addLast(new NettyChatClientHandler());}});ChannelFuture channelFuture = bootstrap.connect(host, port).sync();Channel channel = channelFuture.channel();log.info("客户端连接成功,地址是:{}",channel.remoteAddress());Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()){String msg = scanner.nextLine();channel.writeAndFlush(msg + "\r\n");}}catch (Exception e){e.printStackTrace();}finally {loopGroup.shutdownGracefully();}}public static void main(String[] args) {new NettyChatClient("127.0.0.1",8000).run();}
}
NettyChatClientHandler.java
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg.trim());}
}
服务端运行结果:
2 WebSocket实现服务器和客户端长连接
2.1 需求
(1)Http协议是无状态的,浏览器和服务器之间的请求响应一次,下一次会重新创建连接。
(2)实现基于webSocket的长连接的全双工的交互。
(3)改变Http协议多次请求的约束,实现长连接,服务端可以发送消息给浏览器。
(4)客户端浏览器和服务端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务端也会感知。
服务端代码:MyServer.java
public class MyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//基于http协议,使用和图片的编解码pipeline.addLast(new HttpServerCodec());//以块方式写,添加chunkedwritehandler处理器pipeline.addLast(new ChunkedWriteHandler());/*说明1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求*/pipeline.addLast(new HttpObjectAggregator(8192));/*说明1. 对应websocket ,它的数据是以 帧(frame) 形式传递2. 可以看到WebSocketFrame 下面有六个子类3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接5. 是通过一个 状态码 101*/pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));//自定义handlerpipeline.addLast(new MyTextWebSocketFrameHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
自定义Handler:MyTextWebSocketFrameHandler.java
@Slf4j
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info("服务器接收消息:{}",msg.text());//回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间"+ LocalDateTime.now()+ " " +msg.text()));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("发生异常:{}",cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded 被调用,channel id 是:{}",ctx.channel().id().asLongText());log.info("handlerAdded 被调用,channel id 是:{}",ctx.channel().id().asShortText());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved 被调用,channel id 是:{}",ctx.channel().id().asLongText());}
}
客户端代码:hello.html
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<script>var socket;//判断当前浏览器是否支持websocketif(window.WebSocket) {//go onsocket = new WebSocket("ws://localhost:8000/hello");//相当于channelReado, ev 收到服务器端回送的消息socket.onmessage = function (ev) {var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + ev.data;}//相当于连接开启(感知到连接开启)socket.onopen = function (ev) {var rt = document.getElementById("responseText");rt.value = "连接开启了.."}//相当于连接关闭(感知到连接关闭)socket.onclose = function (ev) {var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + "连接关闭了.."}} else {alert("当前浏览器不支持websocket")}//发送消息到服务器function send(message) {if(!window.socket) { //先判断socket是否创建好return;}if(socket.readyState == WebSocket.OPEN) {//通过socket 发送消息socket.send(message)} else {alert("连接没有开启");}}
</script><form onsubmit="return false"><textarea name="message" style="height: 300px; width: 300px"></textarea><input type="button" value="发生消息" onclick="send(this.form.message.value)"><textarea id="responseText" style="height: 300px; width: 300px"></textarea><input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''"></form>
</body>
</html>
服务端运行结果:
客户端运行结果: