目录
第三章 Netty
1.什么是Netty?
2.为什么需要使用Netty?
3.Netty的发展历程
4.谁在使用Netty?
5.为什么上述这些分布式产品都使用Netty?
6.第一个Netty应用
7.如何理解Netty是NIO的封装
8.logback日志使用的加强
9.EventLoop(NioEventLoop)
9.1 作用
9.2 类结构图
9.3 如何使用EventLoop?
10.EventLoopGroup(NioEventLoopGroup)
10.1 测试EventLoopGroup
11.NioEventLoop VS DefaultEventLoopGroup
11.1 注意点1
11.2 注意点2
11.3 注意点3
11.4 Netty中Reactor模型总结
第三章 Netty
1.什么是Netty?
Netty: Home
Netty is an asynchronous event-driven network application frameworkfor rapid development of maintainable high performance protocol servers & clients.
Netty是一个异步事件驱动的网络应用框架。用于快速开发可维护的高性能协议服务器和客户端。
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
Netty是一个NIO客户服务器框架,它能够快速和容易地开发网络应用,如协议服务器和客户端。它大大简化和精简了网络编程,如TCP和UDP套接字服务器。
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.
快速和简单 "并不意味着开发出来的应用程序会出现可维护性或性能问题。Netty的设计是经过精心设计的,其经验来自于许多协议的实施,如FTP、SMTP、HTTP以及各种基于二进制和文本的遗留协议。因此,Netty成功地找到了一种方法来实现开发的简易性、性能、稳定性和灵活性,而没有任何妥协。
- 异步事件驱动的概念
异步事件驱动的概念:
Netty是一个异步事件驱动的网络应用框架。事件驱动是指:服务器监控不同的事件,如ACCEPT,READ,WRITE等事件。异步是指:Netty其实沿用了Reactor响应式编程,开启一个或多个Boss线程专门处理所有客户端的连接操作,当连接操作处理完毕后。该客户端后续对应的读写操作会交给worker线程去做,当然对于很多个客户端而言,我们是需要多个worker线程的。但是worker线程处理客户端时的操作决定了是同步还是异步,先说同步:如果worker线程处理某个客户端读写操作,worker线程一直等待到该客户端读或写操作完成后才返回给客户端,这个过程中,该worker线程是不可以处理其他客户端的!这就是同步!再说异步:worker线程在处理客户端读或写操作时,当接收到这一读或写操作的请求后,立即返回一个确认操作,然后异步开启一个新线程去处理此次对应的读或写的操作!等这个异步线程处理完毕后,再返回结果给到对应的客户端。异步线程处理的过程中,不会阻塞当前worker线程的,worker线程可以处理其他客户端的读或写的操作!这是十分高性能的处理,对于客户端而言也很优化。
回调这里存在疑惑:!!!后续学netty的时候结合案例分析一下!!!
这个异步肯定是基于回调机制的:在worker线程交付读或写的任务给异步线程的时候,会把特定的客户端信息注册给异步线程,等到异步线程处理完后再通过回调函数给该特定的客户端。异步线程能返回任务结果 是通过调用客户端提供的回调函数
2.为什么需要使用Netty?
1.Netty是完成网络通信的框架,底层封装了NIO
2.NIO存在很多问题,比如:
API复杂难用,尤其是ByteBuffer的指针切换来切换去的(时刻切换读模式或写模式),在Netty中使用ByteBuf这一核心结构来封装解决!
需要掌握丰富的知识,比如多线程和网络编程。Netty也都进行了封装,比如:EventLoopGroup
可靠性无法保证,断线重连,半包粘包,网络拥塞问题等问题都需要我们自己去考虑,但是在Netty中这些问题都得到了很好的解决和封装!而且Netty处理的非常优秀
空轮询 bug,若Selector的轮询结果为空,也没有调用wakeup或新消息处理,Selector.select()被唤醒而发生空轮询,CPU使用率100%。在Netty中也对该问题进行了解决。
空轮询bug参考下面文章:
Netty笔记-NIO空轮询Bug的分析与解决代码 - 掘金
3.Netty的发展历程
1. Trustin Lee 2004年开发了Netty,成功入职了Arreo通信公司
2. 2008年,Trustin Lee,加入JBoss,发布了Netty3
3. 2012年,Trustin Lee,单干,发布了Netty4
4. 2013年,发布了Netty5。 引入JDK的新特性,比如 ForkJoinPool等
使用 ForkJoinPool 提升了复杂性
没有带来明显的性能提升
同时维护太多分支太耗费精力
4.谁在使用Netty?
1. 框架,gRPC、Dubbo、Spring WebFlux、Spring Cloud Gateway
2. 大数据,Spark、Hadoop、Flink
3. 消息队列,RocketMQ、ActiveMQ
4. 搜索引擎,Elasticsearch
5. 分布式协调器,Zookeeper
6. 数据库,Cassandra、Neo4j
7. 负载均衡,Ribbon
5.为什么上述这些分布式产品都使用Netty?
为什么这些技术都使用Netty?
因为这些技术都是分布式技术,对于分布式技术而言都逃不开一个话题,那就是多节点集群的搭建,像最常说的就是主从集群,主节点是一个单独的JVM进程,从节点也是一个单独的JVM进程。主节点和从节点之前需要维护数据,传输数据时需要跨进程通信,一旦涉及到跨进程通信,无论是位于一台机器上的不同进程,还是位于多台机器上的不同进程,有一个统一的解决方案就是:RPC网络通信。RPC有很多,比如说:Dubbo,gRPC,但是它们无非都是Netty的套壳,使得我们使用Dubbo,grpc做网络通信时更加方便,性能更高了。
netty是基于传输层TCP/UDP协议的快速开发框架,Dubbo基于netty开发了一套专属的协议,Dubbo协议,也有内部的Triple协议-新一代网络通信协议,Dubbo甚至还兼容了Grpc协议做了新版协议(后续会总结)。Grpc也有基于Netty的自己一套协议,这里称之为Grpc协议。
像我们平常开发,javaEE走的其实是应用层协议,http协议。
但是对于分布式框架底层都是要基于TCP/UDP这种操作系统内核提供的传输层协议做定制封装成自己的专属协议的,但是自己去封装做网络通信太麻烦,需要考虑网络的问题太多,所以都使用netty作为自己封装协议的基础,因为netty就是基于TCP/UDP的呀。
所以你java开发一个分布式产品,网络通信你只能用netty,也只有netty成熟,稳定,牛逼,高性能。
6.第一个Netty应用
- 依赖
<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.8.0</version>
</dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.45.Final</version>
</dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version>
</dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.9</version>
</dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9</version>
</dependency><dependency><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId><version>2.5</version>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version>
</dependency>
- 引入logback日志---》参考之前NIO笔记总结的步骤即可
- 服务端
package com.messi.netty_core_02.netty01;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer {private static final Logger log1 = LoggerFactory.getLogger(MyNettyServer.class);private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);public static void main(String[] args) {//构建一个服务端的处理器ServerBootstrap serverBootstrap = new ServerBootstrap() ;//处理器绑定一个服务端的SocketChannel,这里Netty就是开启一个NioServerSocketChannel通道serverBootstrap.channel(NioServerSocketChannel.class);//启动一组线程(NioEventLoopGroup),可以理解成线程池,就是管理EventLoop的。//实际上就是我们的NIO版本中实现的Reactor模型中:Boss线程通过死循环监控Accept事件。Worker线程通过死循环监控read或write事件//每个EventLoop都是一个线程,每个线程都有自己的Selector监控器serverBootstrap.group(new NioEventLoopGroup());//上面使用EventLoop监控到了各个事件的发生,下面就需要使用Handler接口处理器类来处理接收到的事件//这里就开始处理接收到的IO事件(read或write)// 注释:Accept连接事件Netty已经内置帮你做好了(因为连接代码是可复用的),你只需要手动处理IO事件// 所以这里要进行初始化NioSocketChannel,也就是通过Handler,通过Pipline来流水线处理serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//添加一个解码器,把通过网络接收到客户端发送的所有ByteBuffer解码成字符串,客户端通过网络传输给我们服务端的所有数据都是ByteBuffer//网络传输的过程中不能传输字符串//这里可以添加很多个Handler,不可以按照你的需求在后面不断的追加,完成你的业务。//new StringDecoder()这种是内置的处理器,你也可以自定义Handler然后添加进去ch.pipeline().addLast(new StringDecoder()) ;//添加一个InboundHandler解析器,所有我们从外部读入的数据这里都是Inbound。就是输入数据。//因为我们是addLast,所以前面要加一个字符串解码器,这里又追加了一个输出处理器// 按照流水线的先后处理规则:先执行字符串解码器,然后再执行这个输出处理器ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//Adapter是适配器,我们不需要全部实现所有的方法,只需要执行我们自己想要的方法,比如:我们就实现一个读操作对应的方法channelRead()//上面执行完解码器,把ByteBuffer数据处理转为字符串,那么到了这里,我们Inbound就是输入到这里,所以这里应该对应的就是读操作,我们实现读操作channel就可以//这个方法的参数:msg。如果第一个处理器把ByteBuffer解码成json,那么该msg就是json//我们后续就可以处理json转变成java对象,就会把该java对象接着传递下去了,后续可能还会添加新的处理器处理该java对象,你如果没有第一个解码处理器,这里msg拿到的就是ByteBuffer //如果拿到的是ByteBuffer,那么你就需要自己去进行解码操作,你自己看着办@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log1.info("服务端读取到的数据为:{}",msg);}});}});serverBootstrap.bind(8000);}}
- 客户端
package com.messi.netty_core_02.netty01;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;public class MyNettyClient {public static void main(String[] args) throws InterruptedException {//初始化一个客户端启动器Bootstrap bootstrap = new Bootstrap();//客户端也要开启一个NioSocketChannel通道,它是做IO和连接的bootstrap.channel(NioSocketChannel.class);//客户端也做成多线程组了,为什么?我在细节中有总结bootstrap.group(new NioEventLoopGroup());bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//客户端发数据给服务端是要进行编码的 同理服务端ch.pipeline().addLast(new StringEncoder());}});//客户端请求连接服务端,是一个异步操作。底层会开启一个新的线程处理这个阻塞式的连接操作,当该异步新线程处理完毕后,会回调把结果传输给main线程//使用ChannelFuture类型的connect进行接收到异步新线程处理的结果ChannelFuture connect = bootstrap.connect("localhost",8000);//但是要注意一点就是:一切IO操作都是建立在连接完成的前提下的,所以这里要进行阻塞式等待连接完成后才能进行下面的操作connect.sync();//连接成功后,通过这个连接获取到Channel通道,也就是当初NIO版本对应的SocketChannel,此时这里肯定是main线程处理Channel channel = connect.channel();//往服务端写数据,写数据写的是经过pipline流水线进行编码Handler处理后的"梅西",即ByteBuffer类型的字节,底层再转换成二进制字节流channel.writeAndFlush("梅西");}
}
- 测试
启动服务端和客户端,成功
- 细节
1.
你可以把流水线连接的Handler设为一个Controller,在某一个Handler中可以调用Service层的代码,但是Service层的业务代码不会在这里写:
2.
为什么客户端也是多线程?
通常客户端与服务端做通信,发数据是要经过很多步骤的:1.与服务端建立连接 2.通信:read或write数据
我们要把步骤1做成一个或多个线程,步骤2也做成一个或多个线程。并且是异步的,所谓异步就是是啥前面也解释了。
解释一下为什么:建立连接时开启一个线程。当调用read方法后,执行这个读IO,我们也开启一个线程。当调用write方法时,执行这个写IO,同样也开启一个线程。为什么?因为当read这个IO时间很久的话,如果是单线程,对于这个客户端而言,是不是就阻塞了write这个IO呢?对吧!Netty把客户端也做成了多线程,然后EventLoopGroup事件循环组监控每一个事件的触发。
每开启一个客户端,客户端所在的计算机服务器都这样做。是不是大大的提升了客户端这边的性能和效率?对吧。
3.
这不是建造者设计模式:
建造者设计模式也是点点点添加,如果少添加哪一个点的时候,该对象就意味着创建不成功,建造者设计模式创建的对象对应的类一般是不提供无参构造器的,需要一个个点点点。每一个点都是建造者设计模式创建对象的必须步骤。
但是对于这里的ServerBootstrap对象并不是这样,它是有无参构造器的,当使用无参构造器new出来的时候已经创建完成。只不过后续点点点是对该对象的一系列属性值进行赋值操作(而并不是创建对象的必须步骤)。所以这并不是建造者设计模式。
7.如何理解Netty是NIO的封装
我们前面说netty就是NIO的封装,他把nio的那些难用的操作封装起来变得简单易用了。那么既然是封
装,我们就说,不管你咋操作,咋封装,那些该有的东西还是得有,因为你不是重新创造。那我们看看
netty中哪些东西是源自于NIO的,或者说NIO有哪些东西在netty中体现着。他们的对应关系是啥。
再或者说就是netty的什么地方封装了NIO的什么东西。我们先来一段NIO的代码,前面真的写烂了。
- NIO版本伪代码
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8000));Selector selector = Selector.open();
ssc.register(selector,SelectionKey.OP_ACCEPT):while(true) {selector.select();Iterator<SelectionKey> iterator = selector.selectionKeys().iterator();while(iterator.hasNext()) {SelectionKey sscKey = iterator.next();iterator.remove();if(sscKey.isAcceptable()) {//处理连接操作 } else if(sscKey.isWritable()) {//这里省略获取sc这个SocketChannel的过程ByteBuffer byteBuffer = Charset.forName("UTF-8").encode("xxx");int write = sc.write(byteBuffer);} }
- 一张图胜过千言万语
8.logback日志使用的加强
只要你配置了日志相关的pom依赖,那么即使你不配置logback.xml这一配置文件,netty内部也会自动识别到这个pom依赖,然后会进行netty内部默认的日志输出
- 在很多情况下,我们引入logback.xml日志是为了做定制化:
情况1:比如下面这个:当出现Error错误异常时才会打印输出
输出:
情况2:
输出:
总结如下:
<?xml version="1.0" encoding="UTF-8"?>
<!--整个文件的根元素 <configuration-->
<configuration><!-- name:定义了一个名为"STDOUT"的日志附加器(appender) ,用于控制台的输出class:指定了附加器的实现类,表示将日志消息输出到控制台--><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!--定义了附加器的编码器,用于格式化输出的日志消息--><encoder><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern></encoder></appender><!--自定义日志输出器:输出com.messi包及其子包下所有DEBUG日志级别及其以上的日志信息。并且不向上传递,意思就是不进行输出<root相关的日志输出--><logger name="com.messi" level="DEBUG" additivity="false"><!--控制台输出--><appender-ref ref="STDOUT"/></logger><!--root日志输出器:输出除com.messi包及其子包之外所有包下的日志输出,并且日志输出级别是DEBUG及其以上--><root level="DEBUG"><!--控制台输出--><appender-ref ref="STDOUT"/></root></configuration>
<!--
使用logback日志的步骤
1.引入logback依赖
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
2.引入该logback.xml配置文件3.安装idea相关插件:log support4.logback相关快捷键:logd logi 等等-->
9.EventLoop(NioEventLoop)
9.1 作用
EventLoop翻译过来就是事件循环。
事件就是我们之前说的那些NIO监听的事件,比如ACCEPT以及IO事件:READ或WRITE
循环就是我们之前在NIO的时候的那个死循环,while循环里面不断的监听事件的触发
NioEventLoop由NioEventLoopGroup管理,它对应到后续Reactor模型中的worker线程或boss线程
NioEventLoop就是死循环监控事件的触发,真正的处理交给Handler去做。如何监控的?肯定是由每一个NioEventLoop专属的Selector监控器监管呀,把Channel注册到Selector上,然后Selector监控Channel上可能触发的事件。
9.2 类结构图
我们可以看到NioEventLoop这个类继承了两个类,一个是SingleThreadEventExecutor。另外一个是EventLoop。
SingleThreadEventExecutor:
这个是单线程的线程池,实际上这里面就是new了一个Thread,框架里面喜欢封装一下,顺便提供了很多内置功能,做了增强。
这其实就是我们之前在worker或Boss里面new的那个线程。可见框架也是这么做的。
EventLoop:
这里面就是处理所有的IO操作,也就是我们之前那个while死循环,然后去处理IO操作
总结:
可见这个NioEventLoop就是我们之前的worker或boss线程,当然了每一个NioEventLoop对应一种角色,但是我们这个NioEventLoopGroup分组,里面有多个NioEventLoop,由NioEventLoopGroup去管理NioEventLoop。这多个NioEventLoop,他们逮到哪个事件操作就处理哪一个,就充当什么角色,比如说:当逮到accept时,该处理accept的NioEventLoop就充当boss线程的角色。当逮到IO事件(read或write)时,对应处理的NioEventLoop就充当worker线程的角色。
9.3 如何使用EventLoop?
NioEventLoop的构造方法:
常见的创建对象的方式就是new,也就是使用构造函数创建,但是NioEventLoop不可以new创建。因为它的构造方法不是通过public修饰的,它的修饰符为default。
如果一个方法没有public修饰符而是default,那么只能在声明该方法的包中去访问它。这意味着其他包的类无法访问该方法。你发现没,这里说的是"方法",构造方法也是一种方法,所以同理即可。所以这个限制导致我们无法在NioEventLoop这个类以外的包去创建这个类,那么我们开发的时候就很容易就不在同一个包,所以人家Netty本来就不想让我们去创建。
那么应该如何创建呢?
创建的方法使用的是NioEventLoopGroup去创建。于是就引出了后续研究NioEventLoopGroup。
10.EventLoopGroup(NioEventLoopGroup)
1.EventLoopGroup就是编程角度开放出来的编程接口,其实就是一个NioEventLoopGroup
2.我们上面看到的EventLoop是一个只有一个线程的线程池,也就是单线程,但是EventLoopGroup是一个组,也就是里面有多个线程。你可以理解为EventLoopGroup是EventLoop的池化,也就是工厂。EventLoopGroup来创建并且管理多个EventLoop。
- NioEventLoopGroup类图
- EventLoopGroup这个类的构造方法
我们看到它其实是一个能继承的方法,也就是不断给子类继承最后成为public,不管怎么说,它是能new出来的。而我们看到new的时候可以通过参数指定这个组里面有几个线程,而内部逻辑是一个三目运算:如果你没指定具体线程数,那么就是0,newThreads == 0为true,那么值就是DEFAULT_EVENT_LOOP_THREADS。如果你要是指定了,newThreads==0为false,那么就按照你的参数来。
DEFAULT_EVENT_LOOP_THREADS初始化的值如下:
DEFAULT_EVENT_LOOP_THREADS初始化值就是取1和SystemPropertyUtil.getInt("io.netty.eventLoopThreads",NettyRuntime.availables()*2)的最大值,也就是说你要是在启动的时候设置了jvm参数,指定了io.netty.eventLoopThreads他就会读取你这个指定,要是你没指定,那么就是当前你电脑CPU处理器核数*2,这里的核数是指逻辑核数,并不是你真的有8个或16个CPU!为什么虚拟化出多个核?其实就是为了线程并行能力的提升,并且也进一步提升并发的性能。
10.1 测试EventLoopGroup
- 通过上述分析,我们来对比之前的代码
但是有两个问题:
1.可能Netty获取到的CPU核数不是所在计算机的精确核数,可能你有8个核,实际上Netty按照二倍去开启线2程数
2.为什么有时候我们通过指定JVM参数去初始化线程数呢?
因为现在服务都docker容器化,一旦容器化上云,一切变得不确定,可能获取到的核数极其不精确,所以在这些场景下,我们通过JVM参数进行指定。
11.NioEventLoop VS DefaultEventLoopGroup
这两个都是EventLoop,到底有啥不同呢?我们说EventLoop本质是一个单线程的线程池,那么他们的区别自然要去看这个单线程的里面的run方法的实现逻辑。我们去看下DefaultEventLoop和NioEventLoop的run方法,如下:
DefaultEventLoop:
NioEventLoop:
所以区别为:
1.NioEventLoop是一个线程,处理IO事件操作。处理write,read事件监控的
2.DefaultEventLoop就是一个简单普通的线程,就是执行你提交的任务的
后续如果在netty环境中进行多线程开发的时候,要优先使用DefaultEventLoop而不是自己去开线程,线程池,因为DefaultEventLoop是netty体系中的一员,使用它更好的和netty融合。但是本质上它就是线程池,你使用JDK的也没事,只不过和netty更好的整合了。
- 测试
11.1 注意点1
EventLoop会绑定Channel,并且同一EventLoop支持多个Channel访问
1.EventLoop会绑定Channel
EventLoop实际上就是一个线程,每个线程里面封装了一个Selector,所以它能监控到channel,如果客户端1理解过去,加入是EventLoop1处理的,那么以后客户端1的IO事件操作,就都是由这个EventLoop1来进行处理
2.EventLoop可以支持多个Channel访问
假如我们的eventLoopGroup里面只有两个eventLoop,但是客户端却有三个连接,那么一个EventLoop。实际上是可以处理多个channel的,也就是他还是能处理够的。
debug检验:
当一个客户端与服务端建立连接后,进行第一次交互(读或写)后,服务端会开启一个EventLoop处理这次交互!此时该EventLoop就与该交互数据的通道SocketChannel绑定了,当下一次相同的客户端使用该相同的SocketChannel与服务端交互数据时,服务端还是该EventLoop进行处理。ofcourse,当客户端数量过多,导致服务端EventLoop不够用的时候,同一个EventLoop可以复用处理多个客户端的操作。
细节:
11.2 注意点2
我们前面说的R模型是一个主从模型,我们说主也就是boss,它只处理一个连接事件,所以就很简单,消耗少,所以只需要少量的线程,一般一个就行了,一个线程当主Boss
而从worker是用来处理IO事件的,这个相对是耗时的,所以他会有多个,用来处理多个客户端连接后对应的IO事件
但是我们上面目前serverBootstrap.group(new NioEventLoopGroup());就这一句话,里面创建了16个线程,但是并没有区分boss还是worker,好像内部线程谁都能干boss或worker的。
但是从业务角度区分,netty也是提供这种设计的,所以我们可以如下进行设计代码:
package com.messi.netty_core_02.netty02;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyReactorServer {private static final Logger log = LoggerFactory.getLogger(MyNettyReactorServer.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);//创建boss线程对应的EventLoopGroup,用来处理accept事件,线程数一个就够了//如果你不显示指定线程数,那么NioEventLoopGroup默认创建的线程数为:JVM参数指定值 或 Netty计算出本机CPU核心数*2NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);//创建worker线程对应的EventLoopGroup,用来处理客户端连接后对应的所有IO事件(read或write)NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);/*** 提供一个两个参数的重载方法,* EventLoopGroup parentGroup,EventLoopGroup childGroup* parentGroup就是boss线程的,用来处理连接* childGroup就是worker线程的,用来处理IO事件*/serverBootstrap.group(bossEventLoopGroup,workerEventLoopGroup);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("读取到的数据为:{}",msg);}});}});//服务端监听一个端口serverBootstrap.bind(8000);}}
package com.messi.netty_core_02.netty02;import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;public class MyNettyReactorClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(new NioEventLoopGroup());bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}});//异步开启一个新线程ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000));//阻塞到该异步新线程执行任务完毕future.sync();Channel channel = future.channel();channel.writeAndFlush("hello,netty!");System.out.println("MyNettyReactorClient.main");}}
11.3 注意点3
worker线程的阻塞问题
存在一个问题:假如我们现在有一个Boss线程,两个worker线程。在这个设计中,boss用来负责处理连接的事件。而worker负责处理IO事件(read或write事件)。但是在实际开发的过程中,我们还有另外一种场景需要线程处理,即核心业务功能,这是非IO事件,是极其耗时 阻塞的一种业务处理。你如果这个时候把该操作交给worker线程去处理,那么当一个worker线程处理该耗时的非IO事件(业务处理),该worker线程就阻塞了,它无法再去处理新的客户端连接后的IO事件!所以这不太好,此时我们就搞一种新的设计:
当一个客户端请求连接服务端时,首先使用boss线程处理客户端连接事件,然后使用worker线程处理IO事件(read或write),当客户端还需要进行核心耗时的业务操作时,worker线程接收到后,会异步开启一个新线程DefaultEventLoop去处理该耗时的非IO非连接操作。为了提升客户端的并发性能,worker线程可以直接返回一个确认给客户端,客户端收到后会接着执行客户端自己下面的逻辑而无需等待该业务处理完。此时我们提升了两个点:1.服务端不会阻塞worker线程处理新的IO事件 2.客户端先接收到服务端的确认,不等待,继续执行下面的业务逻辑。当服务端的DefaultEventLoop执行完后通过回到函数机制把结果传递给客户端。ofcourse,DefaultEventLoop一定会保留客户端的一些信息的。
DefaultEventLoop具体是怎么操作的?
经过前面boss线程(NioEventLoop)处理连接,worker线程(NioEventLoop)处理IO事件,最后通过Handler处理器获取到待处理的数据,然后使用DefaultEventLoop进行处理这非IO操作(非IO操作对应的数据就是Handler获取到的待处理数据)。这样该非IO操作的处理就逃离了NioEventLoop的处理。
为什么使用DefaultEventLoop去充当一个新线程,而不是使用JDK的线程池?
1.简单好用,无需自己手动创建。
2.与netty体系完美融合
- 代码
package com.messi.netty_core_02.netty02;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyReactorServer2 {private static final Logger log = LoggerFactory.getLogger(MyNettyReactorServer2.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);//这个和NioEventLoopGroup底层调用的是同一个构造方法DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.group(bossEventLoopGroup,workerEventLoopGroup) ;serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端读取到的结果为:{}",msg);}});}});serverBootstrap.bind(8000);}}
package com.messi.netty_core_02.netty02;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;public class MyNettyReactorClient2 {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap() ;bootstrap.channel(NioSocketChannel.class);bootstrap.group(new NioEventLoopGroup());bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}});ChannelFuture connect = bootstrap.connect(new InetSocketAddress(8000));connect.sync();Channel channel = connect.channel();channel.writeAndFlush("hello,netty");}}
- 测试
服务端输出:
我们看到一共出现三种线程:
1.main线程
这是初始化ServerBootstrap还有那些group以及添加Handler处理器等主流程的执行
2.nioEventLoopGroup-3-1线程
这个就是处理的事件。包括像连接事件,IO事件的处理。
3,defaultEventLoopGroup-4-1线程
上述的nioEventLoopGroup-3-1线程(worker线程)只是接入了业务操作事件,而真正的业务操作(handler里面的操作),交给了defaultEventLoopGroup线程去异步处理,此时NioEventLoop只是做了一个接管,,但是拿到I之后的事情将会交给异步线程DefaultEventLoop去做。
defaultEventLoopGroup-4-1线程这个就是defaultEventLoop用来做异步处理的,处理了处理器里面的业务,输出了hello,netty。这就是netty整合进去帮你绑定DefaultEventLoopGroup了,在源码中封装了。
11.4 Netty中Reactor模型总结
当有一个客户端与服务端连接,然后交互。具体过程如下:服务端使用一个NioEventLoop(Boss线程)接收一个客户端的连接并且处理,处理完连接后,Boss线程把该客户端后续的操作全盘交出,另外一个NioEventLoop(Worker线程)进行处理该客户端发生的IO事件(读或写的操作)。但是如果该客户端想要执行一些业务操作(非IO操作),那么NioEventLoop(worker线程)会通过Handler处理器把该执行该业务所需要的业务数据交给一个DefaultEventLoop(异步开启的一个新线程),该新线程负责处理该客户端的非IO事件(业务操作)。这样异步开启一个新线程处理耗时的业务操作(非IO操作),可以完美的把NioEventLoop(worker线程)解放,NioEventLoop只需要专注于处理IO事件,而无需处理耗时的业务操作,是不是可以有更多的时间去处理新客户端的IO事件,提高了并发访问性能,提高了系统的吞吐性能。
还有一个问题:客户端请求服务端一个业务操作(非IO),NioEventLoop(worker线程)接收到后会立即返回一个确认,然后异步开启一个新线程(DefaultEventLoop)去处理该业务操作,当异步处理完后,通过回调机制再把结果传递给客户端。如何把结果回调传给客户端的?其实就是保留了客户端的一些信息。