概述
Netty是什么
Netty的地位
Netty的优势
HelloWorld
public class HelloClient {public static void main(String[] args) throws InterruptedException {// 1. 启动类new Bootstrap()// 2. 添加 EventLoop.group(new NioEventLoopGroup())// 3. 选择客户端 channel 实现.channel(NioSocketChannel.class)// 4. 添加处理器.handler(new ChannelInitializer<NioSocketChannel>() {@Override // 在连接建立后被调用protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}})// 5. 连接到服务器.connect(new InetSocketAddress("localhost", 8080)).sync().channel()// 6. 向服务器发送数据.writeAndFlush("hello, world");}
}public class HelloServer {public static void main(String[] args) {// 1. 启动器,负责组装 netty 组件,启动服务器new ServerBootstrap()// 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组.group(new NioEventLoopGroup())// 3. 选择 服务器的 ServerSocketChannel 实现.channel(NioServerSocketChannel.class) // OIO BIO// 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler).childHandler(// 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handlernew ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6. 添加具体 handlerch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler@Override // 读事件public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg); // 打印上一步转换好的字符串}});}})// 7. 绑定监听端口.bind(8080);}
}
流程分析
正确理解
正确理解 Netty中各个组件的功能和职责
组件
EventLoop
普通任务和定时任务
@Slf4j
public class TestEventLoop {public static void main(String[] args) {// 1. 创建事件循环组EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
// EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务// 2. 获取下一个事件循环对象System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());// 3. 执行普通任务group.next().execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("ok");});// 4. 执行定时任务group.next().scheduleAtFixedRate(() -> {log.debug("ojbk");}, 0, 1, TimeUnit.SECONDS);log.debug("main");}
}
IO任务
Netty客户端是多线程程序,idea debug 默认
断点模式为ALL
,即会停止主线程以及守护线程,所以当客户端断点自定义Evaluate发送数据时,守护线程的发送数据Channel也被断点停止,所以无法发送数据
选择Thread
只停止当前线程,守护线程仍然可以运行
一个客户端的NIO线程跟Channel建立链接就会建立一个绑定关系,后续客户端的Channel上的IO事件都由一个EventLoop处理,
客户端-Channel-EventLoop
绑定关系
EventLoop的分工细化
第一次细分,Netty建议将EventLoop
职责细分,分为boss和worker
group中传入两个EventLoop,那么boss只负责accept事件,worker负责read事件
上诉优化,worker中的NIOEventLoopGroup除了要负责SocketChannel的NIO连接操作还要负责连接后的读写操作,如果读写较长较重,那么会阻塞影响到worker其他的连接或读写操作,所以,
再次细分,EventLoop有两种实现,NIOEventLoopGroup
能处理IO事件普通任务和定时任务,DefaultEventLoopGroup
只能处理普通任务和定时任务,将读写操作交给它去处理耗时较长的读写操作。
作为对比,第一个没有指定group,默认使用了worker的NIOEventLoopGroup来处理读写操作,而第二则使用了DefaultEventLoop来处理读写操作
切换线程
Channel
正确的链接建立:ChannelFuture
处理异步连接
由于连接的建立是耗时的,所以Channel必须等到连接建立完成再执行获取,否则是无效的
connect方法返回的ChannelFuture若没有阻塞等待连接,那么接下来获取到的Channel是没有建立好连接的Channel
如上两种方法异步等待NIO线程建立完毕
谁发起的调用谁等待链接结果
正确的链接关闭:CloseFuture的关闭
不能直接在主线程或其他线程中直接处理关闭操作,因为nioEventLoopGroup-2-1
属于异步线程,此处close
方法非阻塞,有可能在关闭操作还未完成就执行了关闭后操作
解决方法:
使用阻塞关闭方法,只有当channel真的关闭了才执行后面的方法
优雅的关闭:等待还未执行完的操作执行完后再关闭
为什么Netty是异步设计
Future & Promise
概述
Future
jdk中的Future
Future就是在线程之间传递结果的一个容器,是被动的获取结果,由执行完任务的线程给予的结果,没有暴露主动赋予结果的方法
@Slf4j
public class TestJdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.创建线程池ExecutorService service = Executors.newFixedThreadPool(2);//2.提交任务Future<Object> future = service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}});//3.祝线程通过future获取结果,get是阻塞等待方法log.debug("等待结果");log.debug("结果{}",future.get());}
}
Netty 中的 Future
与jdk中的差不多,继承至jdk的Future,做了增强
@Slf4j
public class TestNettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();//提交任务Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");System.out.println("执行计算");Thread.sleep(1000);return 50;}});//通过future获取结果,get是阻塞等待方法log.debug("等待结果");log.debug("结果{}",future.get());//异步方式获取结果future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {//getNow非阻塞等待 立即获取结果log.debug("结果{}",future.getNow());}});}
}
Promise
Promise又继承至Netty的Future,功能更强大,可以主动填充结果,对于网络通信非常有用
@Slf4j
public class TestNettyPromise {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 准备 EventLoop 对象EventLoop eventLoop = new NioEventLoopGroup().next();// 2. 可以主动创建 promise, 结果容器DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(() -> {// 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果log.debug("开始计算...");try {int i = 1 / 0;Thread.sleep(1000);promise.setSuccess(80);} catch (Exception e) {e.printStackTrace();promise.setFailure(e);}}).start();// 4. 接收结果的线程log.debug("等待结果...");log.debug("结果是: {}", promise.get());}}
Handler & Pipeline
Pipeline
Inbound
入栈是按入栈顺序出,出栈是按入栈顺序返出
channelRead 是一个调用链,如果中间没调用,那么后面的handler 则调用不到
Outbound
注意ctx.writeAndFlush
和 ch.writeAndFlush
ctx.writeAndFlush
是从当前调用的 handler 往后寻找 OutboundHandler
,若之前没有执行到OutboundHandler
那么找不到OutboundHandler
执行
而ch.writeAndFlush
则是从整个调用链的最前端 tail 处理开始往后寻找OutboundHandler
而且先执行调用链中的InboundHandler
输入,中间的 OutboundHandler
被跳过不影响正常输入执行
如图ch.writeAndFlush
的调用执行流程
ByteBuffer
netty
中 ByteBuf
容量动态扩容,netty
中 ByteBuffer
固定容量
netty 中 ByteBuffer 默认使用直接内存(系统内存、内存条)
例如 扩容 2 的整数倍 2^9=512
扩容至 2^10 =1024
tail 只能处理原始 ByteBuf
如果中途 ByteBuf
被转换成其他数据类型,则 tail 无法自动release
零拷贝 slice
slice 是 netty 中对于零拷贝的体现之一
切片后生成的对象,实际上还是操作原始bytebuf
的内容
使用习惯,切片自己增加引用计数,避免被其他调用者释放
component 组合零拷贝
writeBytes
会发生真正的数据复制,每次writeBytes
都会发生数据复制
addComponents
是使逻辑上连续,没有发生复制
双向通信
实现一个 echo server
编写 server
new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf buffer = (ByteBuf) msg;System.out.println(buffer.toString(Charset.defaultCharset()));// 建议使用 ctx.alloc() 创建 ByteBufByteBuf response = ctx.alloc().buffer();response.writeBytes(buffer);ctx.writeAndFlush(response);// 思考:需要释放 buffer 吗// 思考:需要释放 response 吗}});}}).bind(8080);
编写 client
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf buffer = (ByteBuf) msg;System.out.println(buffer.toString(Charset.defaultCharset()));// 思考:需要释放 buffer 吗}});}}).connect("127.0.0.1", 8080).sync().channel();channel.closeFuture().addListener(future -> {group.shutdownGracefully();
});new Thread(() -> {Scanner scanner = new Scanner(System.in);while (true) {String line = scanner.nextLine();if ("q".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}
}).start();
💡 读和写的误解
我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B
和 B 到 A
的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
例如
public class TestServer {public static void main(String[] args) throws IOException {ServerSocket ss = new ServerSocket(8888);Socket s = ss.accept();new Thread(() -> {try {BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));while (true) {System.out.println(reader.readLine());}} catch (IOException e) {e.printStackTrace();}}).start();new Thread(() -> {try {BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));// 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据for (int i = 0; i < 100; i++) {writer.write(String.valueOf(i));writer.newLine();writer.flush();}} catch (IOException e) {e.printStackTrace();}}).start();}
}
客户端
public class TestClient {public static void main(String[] args) throws IOException {Socket s = new Socket("localhost", 8888);new Thread(() -> {try {BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));while (true) {System.out.println(reader.readLine());}} catch (IOException e) {e.printStackTrace();}}).start();new Thread(() -> {try {BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));for (int i = 0; i < 100; i++) {writer.write(String.valueOf(i));writer.newLine();writer.flush();}} catch (IOException e) {e.printStackTrace();}}).start();}
}