1:pom.xml配置
< dependency> < groupId> io.netty</ groupId> < artifactId> netty-all</ artifactId> < version> 4.1.73.Final</ version> </ dependency>
2:Netty作为HTTP服务器
import io. netty. bootstrap. ServerBootstrap ;
import io. netty. buffer. ByteBuf ;
import io. netty. channel. * ;
import io. netty. channel. nio. NioEventLoopGroup ;
import io. netty. channel. socket. SocketChannel ;
import io. netty. channel. socket. nio. NioServerSocketChannel ;
import io. netty. handler. codec. http. * ;
import io. netty. util. CharsetUtil ; public class HttpServer { private final int port; public HttpServer ( int port) { this . port = port; } public void start ( ) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup ( 1 ) ; EventLoopGroup workerGroup = new NioEventLoopGroup ( ) ; try { ServerBootstrap b = new ServerBootstrap ( ) ; b. group ( bossGroup, workerGroup) . channel ( NioServerSocketChannel . class ) . childHandler ( new ChannelInitializer < SocketChannel > ( ) { @Override public void initChannel ( SocketChannel ch) throws Exception { ChannelPipeline p = ch. pipeline ( ) ; p. addLast ( new HttpServerCodec ( ) ) ; p. addLast ( new HttpObjectAggregator ( 1024 * 1024 ) ) ; p. addLast ( new LargeJsonHandler ( ) ) ; } } ) . option ( ChannelOption . SO_BACKLOG , 128 ) . childOption ( ChannelOption . SO_KEEPALIVE , true ) ; ChannelFuture f = b. bind ( port) . sync ( ) ; f. channel ( ) . closeFuture ( ) . sync ( ) ; } finally { workerGroup. shutdownGracefully ( ) ; bossGroup. shutdownGracefully ( ) ; } } public static void main ( String [ ] args) throws Exception { new HttpServer ( 8080 ) . start ( ) ; }
} class LargeJsonHandler extends SimpleChannelInboundHandler < FullHttpRequest > { @Override public void channelRead0 ( ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { if ( HttpUtil . is100ContinueExpected ( request) ) { send100Continue ( ctx) ; } ByteBuf content = request. content ( ) ; String jsonStr = content. toString ( CharsetUtil . UTF_8 ) ; System . out. println ( jsonStr) ; FullHttpResponse response = new DefaultFullHttpResponse ( HttpVersion . HTTP_1_1 , HttpResponseStatus . OK ) ; response. headers ( ) . set ( HttpHeaderNames . CONTENT_TYPE , "text/plain; charset=UTF-8" ) ; response. content ( ) . writeBytes ( "OK" . getBytes ( CharsetUtil . UTF_8 ) ) ; response. headers ( ) . set ( HttpHeaderNames . CONTENT_LENGTH , response. content ( ) . readableBytes ( ) ) ; ctx. writeAndFlush ( response) ; } private static void send100Continue ( ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse ( HttpVersion . HTTP_1_1 , HttpResponseStatus . CONTINUE ) ; ctx. writeAndFlush ( response) ; } @Override public void channelReadComplete ( ChannelHandlerContext ctx) { } @Override public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause) throws Exception { cause. printStackTrace ( ) ; ctx. close ( ) ; }
}
注意:如果发送的JSO数据如果大于1M,是会分包发送的,每次发送都会执行channelReadComplete方法,所以不可以关闭通道,发送完数据才执行channelRead0方法
3:Netty作为webSocket服务器
package com. example. slave. netty. server ; import io. netty. bootstrap. ServerBootstrap ;
import io. netty. channel. * ;
import io. netty. channel. nio. NioEventLoopGroup ;
import io. netty. channel. socket. SocketChannel ;
import io. netty. channel. socket. nio. NioServerSocketChannel ;
import io. netty. handler. codec. http. HttpObjectAggregator ;
import io. netty. handler. codec. http. HttpServerCodec ;
import io. netty. handler. codec. http. websocketx. WebSocketServerProtocolHandler ;
public class CustomWebSocket { private final int port; public CustomWebSocket ( int port) { this . port = port; } public void start ( ) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup ( 1 ) ; EventLoopGroup workerGroup = new NioEventLoopGroup ( ) ; try { ServerBootstrap b = new ServerBootstrap ( ) ; b. group ( bossGroup, workerGroup) . channel ( NioServerSocketChannel . class ) . childHandler ( new ChannelInitializer < SocketChannel > ( ) { @Override public void initChannel ( SocketChannel ch) throws Exception { ChannelPipeline p = ch. pipeline ( ) ; p. addLast ( new HttpServerCodec ( ) ) ; p. addLast ( new HttpObjectAggregator ( 1024 * 1024 ) ) ; p. addLast ( new WebSocketServerProtocolHandler ( "/ws" , "WebSocket" , true , 65536 * 10 ) ) ; p. addLast ( new MyWebSocketHandler ( ) ) ; } } ) . option ( ChannelOption . SO_BACKLOG , 128 ) . childOption ( ChannelOption . SO_KEEPALIVE , true ) ; ChannelFuture f = b. bind ( port) . sync ( ) ; f. channel ( ) . closeFuture ( ) . sync ( ) ; } finally { workerGroup. shutdownGracefully ( ) ; bossGroup. shutdownGracefully ( ) ; } } }
public class MyWebSocketHandler extends SimpleChannelInboundHandler < TextWebSocketFrame > { public static ChannelGroup channelGroup; static { channelGroup = new DefaultChannelGroup ( GlobalEventExecutor . INSTANCE ) ; } @Override public void channelActive ( ChannelHandlerContext ctx) throws Exception { System . out. println ( "与客户端建立连接,通道开启!" ) ; channelGroup. add ( ctx. channel ( ) ) ; } @Override public void channelInactive ( ChannelHandlerContext ctx) throws Exception { System . out. println ( "与客户端断开连接,通道关闭!" ) ; channelGroup. remove ( ctx. channel ( ) ) ; } @Override public void channelReadComplete ( ChannelHandlerContext ctx) { ctx. writeAndFlush ( Unpooled . EMPTY_BUFFER ) . addListener ( ChannelFutureListener . CLOSE ) ; }
@Override public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause) { ctx. close ( ) ; } @Override protected void channelRead0 ( ChannelHandlerContext ctx, TextWebSocketFrame msg) { System . out. println ( "服务器收到的数据:" + msg. text ( ) ) ; sendAllMessage ( ) ; } private void sendMessage ( ChannelHandlerContext ctx) { String message = "你好," + ctx. channel ( ) . localAddress ( ) + " 给固定的人发消息" ; ctx. channel ( ) . writeAndFlush ( new TextWebSocketFrame ( message) ) ; } private void sendAllMessage ( ) { String message = "我是服务器,这里发送的是群消息" ; channelGroup. writeAndFlush ( new TextWebSocketFrame ( message) ) ; }
}