支持协议
- TCP/UDP
- HTTP/HTTPS
- WebSocket
- SPDY/HTTP2
- MQTT/CoAP
服务端
常用类
ServerBootstrap
服务端配置类
//设置线程组、parentGroup处理连接、childGroup处理I/O
group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
//Channel通过何种方式获取新的连接(NioServerSocketChannel、NioSocketChannel)
channel(Class<? extends C> channelClass)
//ServerChannel一些配置项、ChannelOption.SO_BACKLOG
option(ChannelOption option, T value)
//ServerChannel的子Channel的选项
childOption(ChannelOption childOption, T value)
//自定义ChannelInboundHandlerAdapter、编码解码器等
childHandler(ChannelHandler childHandler)
NioServerSocketChannel
服务端通道
Bootstrap
客户端配置类
NioSocketChannel
客户端通道
EventLoopGroup
事件循环组,就是个定时器任务,线程组。
NioEventLoopGroup
ChannelFuture
尚未发生的 I/O 操作
ChannelInboundHandlerAdapter
在接收到新的请求进行回调、这个类定义了一系列回调方法。常见的回调如下:
channelRead(ChannelHandlerContext ctx, Object msg) // 收到消息调用
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) // 异常调用
channelInactive(ChannelHandlerContext ctx) throws Exception //连接断开
channelActive(ChannelHandlerContext ctx) throws Exception //连接建立
ChannelOutboundHandlerAdapter
在请求进行响应、这个类定义了一系列适配方法。常见的适配如下:
代码示例
1、新建通道处理类处理每一次I/O
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;/*** Handles a server-side channel.*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
2、创建一个netty服务端
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Discards any incoming data.*/
public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.// 服务端绑定端口ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.// 一直阻塞知道通道关闭、再优雅的关闭线程组shutdownGracefully()f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;if (args.length > 0) {port = Integer.parseInt(args[0]);}new DiscardServer(port).run();}
}
客户端
代码示例
public static void main(String[] args) throws Exception {String host = "192.168.32.29";int port = 8009;EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeClientHandler());}});// Start the client.ChannelFuture f = b.connect(host, port).sync(); // (5)// Wait until the connection is closed.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}
数据解析
1、为了正确的解析传递的数据、粘包、半包等问题。需要明确包的固定长度或者固定的解析方法、如长度、特殊字符结尾等等。
2、编写编码、解码类 注意:新的编码、解码器需要在通道内配置 ChannelInitializer
public class TimeDecoder extends ByteToMessageDecoder { // (1)@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)if (in.readableBytes() < 4) {return; // (3)}//自定义UnixTime的POJO、从通道读取四个字节、做一次转换out.add(new UnixTime(in.readUnsignedInt())); //(4)}
}
解码器
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {@Overrideprotected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {out.writeInt((int)msg.value());}
}
也可以通过ChannelOutboundHandler 实现将回传POJO转换为 ByteBuf。这比编写解码器要简单得多,因为在对消息进行编码时无需处理数据包碎片和汇编。UnixTime
public class TimeEncoder extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {UnixTime m = (UnixTime) msg;ByteBuf encoded = ctx.alloc().buffer(4);encoded.writeInt((int)m.value());ctx.write(encoded, promise); // (1)}
}
TimeDecoder 应用到通道中指定TimeClientHandler
b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());}
});