写在前面
说来惭愧,最近半年没怎么学习技术,时间基本都花在工作以及去熟悉了解金融领域的知识去了。从大一到现在,我一直有个持续学习技术的习惯,如果太久没学习技术,我心里就开始有点焦虑或者说不充实,所以最近想深入理解下他的核心原理以及源码的学习。
想起去年,我一个朋友跟我说,他们的技术大佬曾经对他们说:
如果有时间,一定要系统性地学习 Netty。如果没有掌握 Netty 的核心原理,那么永远都是 Java 的初学者。
Netty 无疑在 Java 网络编程生态处于一个统治级的地位,他的生态位与 spring 之于 Java 服务端编程是一样的。可想而知,Netty 的强大!
所以,我计划系统性的学习下 Netty。然在深入学习 netty 之前,我觉得我非常有必要熟悉并掌握一些网络操作系统的基础知识,比如常见的 IO 模型、TCP 传输的顺序、丢包、流量控制问题,网络编程粘包半包问题,IO多路复用技术(select、poll、epoll)。
Http 服务器是如何并发处理更多连接的?
这个问题可以很形象的类别为:一个上市公司(服务端 server)如何接更多的外部项目(客户端 client socket)?假如你作为老板,在资源有限的情况下,你会怎么做?
方式 1:将项目转包给独立的项目组(多线程方式)
公司来了一个新的项目,但是项目不一定是你自己做,老板可以成立一个新的项目组,一个项目做完了,那这个项目组就可以解散。而作为你这边,因为项目交给了项目组了,你又可以去接新的项目了。(这就相当于你是一个代理,在那里监听来的请求。一旦建立了一个连接,就会有一个已连接 Socket,这时候你可以创建一个线程,然后将基于已连接 Socket 的交互交给这个新的线程来做。)
其实这种方式还有一种原理是类似的多进程的方式,就相当于成立新的子公司来接更多的项目,成本较大。
存在问题:
1)项目接的太多了,如果每个项目都成立单独的项目组,就要招聘超过1 万人,你肯定养不起。也就是 C10K 问题,一台机器要维护 1 万个连接,操作系统是无法承受的
2)有可能一个项目周期长,会导致项目得到不到释放去接下一个项目以及项目组人力工作量不饱和。项目组人力工作量不饱和体现在项目组的进度会依赖第三方,而第三方又没有准备好。(线程被占用时间较长,得不到释放)。
(了解即可,因为如果不存在 c10k 问题,线程稍微慢点释放也问题不大,所以本质还是 c10k 的问题)
方式 2:一个项目组支撑多个项目(IO 多路复用,一个线程维护多个 Socket)
优秀的你,可能会想到一个项目组管理多个项目。这个时候,每个项目组都应该有个项目进度墙,将自己组看的项目列在那里,然后每天通过项目墙看每个项目的进度。(这里其实是有个字细节的,项目墙每个项目的进度是谁在维护的呢,我认为这里应该是类似于项目组经理每天去询问每个项目的进度,然后记录在进度墙上。)
由于 Socket 是文件描述符,因而某个线程盯的所有的 Socket,都放在一个文件描述符集合 fd_set 中,这就是项目进度墙,然后调用 select 函数来监听文件描述符集合是否有变化
ps:这里的监听还是线性扫描注册 socket,轮询该 fd 数据是否就绪了。
那些发生变化的文件描述符在 fd_set 对应的位(数据就绪状态位)都设为 1,表示 Socket 可读或者可写,从而可以进行读写操作,然后再调用 select,接着盯着下一轮的变化。
存在问题:
1)项目组经理更新进度墙需要将全部项目都过一遍的方式来查看进度。(cpu 轮询消耗)
2)所有项目都记录在进度墙上,进度墙面积有限(所有的 Socket,都放在一个文件描述符集合 fd_set 中,这大大影响了一个项目组能够支撑的最大的项目数量(够同时盯的项目数量由 FD_SETSIZE 限制,最大数量是 1024)
方式 3:一个项目组支撑多个项目(IO 多路复用,从“主动询问”到“有事通知”)
上面 select 函数的问题的,如果改成事件通知的方式,情况就会好很多,项目组不需要通过轮询挨个盯着这些项目,而是当项目进度发生变化的时候,主动通知项目组,然后项目组再根据项目进展情况做相应的操作。能完成这件事情的函数叫 epoll,它在内核中的实现不是通过轮询的方式,而是通过注册 callback 函数的方式,当某个文件描述符发送变化的时候,就会主动通知。
这种通知方式使得监听的 Socket 数据增加的时候,效率不会大幅度降低,能够同时监听的 Socket 的数目也非常的多了。上限就为系统定义的、进程打开的最大文件描述符个数。
存在问题:
1)这里的“有事通知”是事件触发之后通知,比如一个读事件触发之后,业务需要自己做读数据操作(数据在内核态复制到用户态这个操作,这个过程是阻塞的),读完成后对读出的数据进行业务操作
所以针对如上存在的问题,可以做个优化,比如读事件触发之后不通知,在读完成事件时通知业务。读操作由操作系统完成,回调的业务只需要关系如何处理数据做业务就行了,这也就是 AIO 模型的思想。
ps:以上几种 IO 方式都是操作系统直接提供支持的。
被讨论最多的 IO 模型
这块的内容笔者已经接触过很多次,但在面试中每次遇到并会议起这几个模型时,都觉得自己理解的不够透彻。每一次学习并看回IO模型,都有不一样的理解。
1. 同步阻塞
同步阻塞常用于本地文件读取,比如 Java 中 InputStream,OutputStream的实现就是同步阻塞 IO。这也是我们做业务开发接触的最多的IO。除了用于本地读取文件,在 NIO 还没推出之前,也会用于网络程序的编写。
bio 服务端:
package cn.comein.institution.controller.test.bio;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class BioServer {static class BioThreadFactory implements ThreadFactory {final AtomicInteger NUM = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"socket-executor-" + NUM.getAndIncrement());}}private static final ExecutorService executor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new BioThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException {start(9527);}private static void start(int port){try {ServerSocket serverSocket = new ServerSocket(port);while (true){Socket socket = serverSocket.accept();executor.submit(()->{try(InputStreamReader inputStreamReader = new InputStreamReader(socket.getInputStream());BufferedReader bufferedReader = new BufferedReader(inputStreamReader);PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);){String content = bufferedReader.readLine();System.out.println("context="+content);printWriter.println(content);}catch (Exception e){e.printStackTrace();}finally {if(socket != null){try{socket.close();}catch (Exception e){}}}});}}catch (Exception e){e.printStackTrace();}}
}
bio 客户端:
package cn.comein.institution.controller.test.bio;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class BioClient {public static void main(String[] args) {sendSome("127.0.0.1",9527,"hello world!!");}private static void sendSome(String address,int port,String content){try {Socket socket = new Socket(address, port);InputStreamReader inputStreamReader = new InputStreamReader(socket.getInputStream());BufferedReader bufferedReader = new BufferedReader(inputStreamReader);PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);printWriter.println(content);String echo = bufferedReader.readLine();System.out.println("echo="+echo);}catch (Exception e){e.printStackTrace();}}
}
这种方式编写的 BIO 程序实际上就是 方式一
的变体,后端通过一个线程池来处理多个客户端的请求处理,避免了创建大量的线程导致服务端的资源耗尽或者宕机。
然而尽管使用线程池,由于读写底层的通讯依然采用同步阻塞
模型,在前一个 Socket 读写未完成之前,后续的 Socket 只能在队列中等待,最终导致连接积压在队列中,如果队列满了达到线程池最大线程数了,最终会拒绝连接。因此无法从根本上解决问题
2. 同步非阻塞
曾经年少无知、知识浅薄,既然有了同步阻塞,Java 还推出个 NIO 干哈?我业务开发根本就用不到,难不成我本地文件读写还用同步非阻塞 IO么?这怎么行啊,我后续的操作是依赖于文件读取的结果的呀!!
原来原来原来还是自己 too native。人家 NIO 的主要目的是用于网络编程的。
在 JDK 1.4 推出 NIO 之前,基于 Java 的所有 Socket 通讯都采用同步阻塞模式(BIO),这种一请求一应答的通讯模型简化了上层的应用开发,但是在性能和可靠性方面却存在着巨大的瓶颈。因此,在很长一段时间里,大型的应用服务器都采用 C 或者 C++ 语言开发,因为他们可以直接使用操作系统提供的同步阻塞 IO 模型能力或者 AIO。由于 Java 传统 BIO 的拙劣表现,才使得 Java 支持非阻塞 IO 的呼声日渐高涨,最终,JDK1.4 版本提供了新的 NIO 类库,Java 也终于也支持非阻塞IO了。(可以理解目前NIO是使用上文提到的方式三
实现的。)
另外,笔者在这里对同步和阻塞的概念非常的执着,当时一直没想明白,NIO 都说是同步非阻塞,同步体现在哪里,非阻塞又体现在哪里呢?网上的文章对这两个词的解释理解基本千篇一律,抄来抄去,没点自己的思想,辣鸡。
1. IO 模型中的同步和异步主要描述的是程序在等待 IO 操作完成的过程中,数据的提交方式或者说数据的获取方式,是由用户主动去获取(同步),还是由系统自动通知(异步)。
结合上文方式三理解,虽然是系统自动通知,但是通知的是读事件,而非读完成。读数据这个操作,还是需要用户程序自己去读取(需要时间的)。2. 阻塞和非阻塞主要描述的是程序在等待 IO 操作完成的过程中,线程的状态。阻塞意味着线程会被挂起,直到 IO 操作完成;非阻塞则意味着线程在等待 IO 操作完成的过程中,仍然可以进行其他任务。
这样就好理解了,NIO 的 Selector 线程在 socket 数据未就绪时,并不会阻塞在IO操作上,而是继续处理其它 socket。
NIO 服务端:
package cn.comein.institution.controller.test.nio;import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class NioServer {static class BioThreadFactory implements ThreadFactory {final AtomicInteger NUM = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"socket-executor-" + NUM.getAndIncrement());}}private static final ExecutorService executor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new BioThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) {start(9527);}private static void start(int port){try {Handler handler = new Handler();Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(port),1024);serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,handler);while (true){selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();if(!CollectionUtils.isEmpty(selectionKeys)){Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey next = iterator.next();Handler attachment = (Handler) next.attachment();attachment.handle(next);iterator.remove();}}}}catch (Exception e){e.printStackTrace();}}private static class Handler {public void handle(SelectionKey selectionKey){if (selectionKey.isAcceptable()){handleAccept(selectionKey);return;}if (selectionKey.isReadable()){handleRead(selectionKey);return;}}private void handleAccept(SelectionKey selectionKey){try{ServerSocketChannel channel =(ServerSocketChannel) selectionKey.channel();Selector selector = selectionKey.selector();// 获取连接channel,设置为非阻塞,注册readSocketChannel accept = channel.accept();accept.configureBlocking(false);accept.register(selector,SelectionKey.OP_READ,selectionKey.attachment());}catch (Exception e){e.printStackTrace();}}private void handleRead(SelectionKey selectionKey){try {SocketChannel channel =(SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 读取数据int read = channel.read(byteBuffer);if ( read > 0 ){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("content="+content);if(!StringUtils.isEmpty(content)){byte[] respBytes = content.getBytes(StandardCharsets.UTF_8);byteBuffer = ByteBuffer.allocateDirect(respBytes.length);byteBuffer.put(respBytes);byteBuffer.flip();channel.write(byteBuffer);}}}catch (Exception e){e.printStackTrace();}}}}
NIO 客户端:
package cn.comein.institution.controller.test.nio;import org.springframework.util.CollectionUtils;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;public class NioClient {public static void main(String[] args) {sendSome("127.0.0.1",9527,"hello world!!");}private static void sendSome(String address,int port,String content){try {Selector selector = Selector.open();SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);Handler handler = new Handler();handler.setContent(content);if (socketChannel.connect(new InetSocketAddress(address, port))) {socketChannel.register(selector, SelectionKey.OP_READ,handler);handler.doWrite(socketChannel);}else {socketChannel.register(selector, SelectionKey.OP_CONNECT,handler);}while (true){selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();if(!CollectionUtils.isEmpty(selectionKeys)){Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey next = iterator.next();Handler attachment = (Handler) next.attachment();attachment.handle(next);iterator.remove();}}}}catch (Exception e){e.printStackTrace();}}private static class Handler {private String content;public void setContent(String content) {this.content = content;}public void handle(SelectionKey selectionKey){if (selectionKey.isConnectable()){handleConnect(selectionKey);return;}if (selectionKey.isReadable()){handleRead(selectionKey);return;}}private void handleConnect(SelectionKey selectionKey){try{SocketChannel channel =(SocketChannel) selectionKey.channel();if (channel.finishConnect()) {Selector selector = selectionKey.selector();channel.configureBlocking(false);channel.register(selector,SelectionKey.OP_READ,selectionKey.attachment());doWrite(channel);}}catch (Exception e){e.printStackTrace();}}private void handleRead(SelectionKey selectionKey){try {SocketChannel channel =(SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 读取数据int read = channel.read(byteBuffer);if ( read > 0 ){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("echo="+content);}channel.close();System.exit(0);}catch (Exception e){e.printStackTrace();}}public void doWrite(SocketChannel socketChannel){try{byte[] bytes = content.getBytes(StandardCharsets.UTF_8);ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length);byteBuffer.put(bytes);byteBuffer.flip();socketChannel.write(byteBuffer);}catch (Exception e){e.printStackTrace();}}}
}
3. 异步IO
有了同步非阻塞 IO 的基础知识,异步 IO 的提出就是为了解决同步非阻塞 IO 中同步的问题(用户主动去获取数据),由 IO 事件触发改为 IO 操作完成通知。
说白了就是同步非阻塞IO需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间
服务端:
package cn.comein.institution.controller.test.aio;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;public class AioServer {public static void main(String[] args) {start(9527);System.out.println("---service exit--");}private static void start(int port){try {AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port),1024);AcceptHandler handler = new AcceptHandler();serverSocketChannel.accept(serverSocketChannel,handler);}catch (Exception e){e.printStackTrace();}}private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsynchronousServerSocketChannel> {@Overridepublic void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);attachment.accept(attachment,this);result.read(byteBuffer,byteBuffer,new ReadHandler(result));}@Overridepublic void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {exc.printStackTrace();}}private static class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public ReadHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] bytes = new byte[attachment.remaining()];attachment.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("content="+content);ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);byteBuffer.put(content.getBytes(StandardCharsets.UTF_8));byteBuffer.flip();asynchronousSocketChannel.write(byteBuffer,byteBuffer,new WriteHandler(asynchronousSocketChannel));}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}private static class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public WriteHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {if (attachment.hasRemaining()){asynchronousSocketChannel.write(attachment,attachment,this);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}}
客户端:
package cn.comein.institution.controller.test.aio;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;public class AioClient {public static void main(String[] args) {for (int i = 0; i < 100; i++) {sendSome("127.0.0.1",9527,"hello world!!");}System.out.println("client exit");}private static void sendSome(String address,int port,String content){try{ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);byteBuffer.put(content.getBytes(StandardCharsets.UTF_8));byteBuffer.flip();AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();socketChannel.connect(new InetSocketAddress(address,port),byteBuffer,new ConnectHandler(socketChannel) );}catch (Exception e){}}private static class ConnectHandler implements CompletionHandler<Void, ByteBuffer> {private AsynchronousSocketChannel asynchronousSocketChannel;public ConnectHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Void result, ByteBuffer attachment) {asynchronousSocketChannel.write(attachment,attachment,new WriteHandler(asynchronousSocketChannel));}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}private static class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public WriteHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {if (attachment.hasRemaining()){asynchronousSocketChannel.write(attachment,attachment,this);}else {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);asynchronousSocketChannel.read(byteBuffer,byteBuffer,new ReadHandler(asynchronousSocketChannel));}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}private static class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public ReadHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] bytes = new byte[attachment.remaining()];attachment.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("echo="+content);try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}}
AIO 的编程风格跟CompletableFuture 的思想分割很像,在完成之后将回调CompletionHandler,做业务操作,比如服务端接收接收到数据完成之后会回调对于的handler做业务操作,我们不需要自己将channel里的数据读到buffer里面。
这三种 IO 模型的通俗理解和小结
IO操作分两个阶段:
1. 等待数据准备好(读到内核缓存)
2. 将数据从内核读到用户空间(进程空间)
一般来说第1步花费的时间远远大于第2步。
同步阻塞IO:第1步上阻塞第2步上也阻塞
同步非阻塞IO:第1步上非阻塞第2步阻塞
异步非阻塞IO:第1步上非阻塞第2步上非阻塞
再举个形象的例子,假如我们去饭店点餐,饭店人很多,如果我们付了钱后站在收银台等着饭端上来我们才离开,这就成了同步阻塞了。如果我们付了钱后给你一个号就可以离开,饭好了老板会叫号,你过来取。这就是同步非阻塞模型。如果我们付了钱后给我一个号就可以坐到坐位上该干啥干啥,饭好了老板会把饭端上来送给你,这就是异步非阻塞IO模型了。
IO 多路复用技术
有的人也喜欢把方式二
、方式三
叫做 IO 复用模型,这是因为同步阻塞模型的关键就是 IO 多路复用技术.
在 IO 编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者 IO 多路复用技术进行处理。IO 多路复用技术通过把多个 IO 的阻塞复用到同一个 select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,IO多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源。
目前支持 IO 多路复用的系统调用有select
、pselect
、poll
、epoll
,在 Linux 网络编程过程中,很长一段时间都使用select
做轮询和网络事件通知,然而select
的一些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select
的替代方案,最终选择了epoll
。epoll
与select
的原理比较类似,为了克服select
的缺点,epoll
作了很多重大改进。
简单理解总结如下:select 最大的缺陷就是单个进程所打开的 FD 是有一定限制的,它由 FD_ SETSIZE 设置,默认值是1024,epoll
并没有这个限制,它所支持的FD
上限是操作系统的最大文件句柄数,这个数字远远大于 1024。并且监听的 Socket 数据增加的时候,效率不会大幅度降低。因而,epoll 被称为解决 C10K 问题的利器。
单服务器高性能模式:Reactor 与 Proactor
好,IO 多路复用技术可以解决C10K 问题的问题。既然上游接请求没问题了, 下游怎么处理呢?我们一般都会想到,通过一个线程池来处理业务。
Reactor
没错,IO 多路复用结合线程池,“大神们”给它取了一个很牛的名字:Reactor,中文是“反应堆”。联想到“核反应堆”,听起来就很吓人,实际上这里的“反应”不是聚变、裂变反应的意思,而是“事件反应”的意思,可以通俗地理解为“来了一个事件我就有相应的反应”,这里的“我”就是 Reactor,具体的反应就是我们写的代码,Reactor 会根据事件类型来调用相应的代码进行处理。Reactor 模式也叫 Dispatcher 模式(在很多开源的系统里面会看到这个名称的类,其实就是实现 Reactor 模式的)(比如Spring mvc 的DispatcherServlet),更加贴近模式本身的含义,即 IO 多路复用统一监听事件,收到事件后分配(Dispatch)给某个进程。
Reactor 模式的核心组成部分包括 Reactor 和处理资源池(进程池或线程池),其中 Reactor 负责监听和分配事件,处理资源池负责处理事件。初看 Reactor 的实现是比较简单的,但实际上结合不同的业务场景,Reactor 模式的具体实现方案灵活多变。
1. 单 Reactor 单进程 / 线程
单 Reactor 单进程 / 线程的方案示意图如下(以进程为例):
只有一个进程,无法发挥多核 CPU 的性能,并且 Handler 在处理某个连接上的业务时,整个进程无法处理其他连接的事件,很容易导致性能瓶颈。
因此,单 Reactor 单进程的方案在实践中应用场景不多,只适用于业务处理非常快速的场景,目前比较著名的开源软件中使用单 Reactor 单进程的是 Redis。
2. 单 Reactor 多线程
为了克服单 Reactor 单进程 / 线程方案的缺点,引入多进程 / 多线程是显而易见的。
单 Reator 多线程方案能够充分利用多核多 CPU 的处理能力,但Reactor 承担所有事件的监听和响应,只在主线程中运行,瞬间高并发时会成为性能瓶颈
3. 多 Reactor 多线程
为了解决单 Reactor 多线程的问题,最直观的方法就是将单 Reactor 改为多 Reactor。
目前著名的开源系统 Nginx 采用的是多 Reactor 多进程,采用多 Reactor 多线程的实现有 Memcache 和 Netty。
Proactor
Reactor 是非阻塞同步网络模型,因为真正的 read 和 send 操作都需要用户进程同步操作。这里的“同步”指用户进程在执行 read 和 send 这类 I/O 操作的时候是同步的,如果把 I/O 操作改为异步就能够进一步提升性能,这就是异步网络模型 Proactor。Proactor 中文翻译为“前摄器”比较难理解,与其类似的单词是 proactive,含义为“主动的”,因此我们照猫画虎翻译为“主动器”反而更好理解。Reactor 可以理解为“来了事件我通知你,你来处理”,而 Proactor 可以理解为“来了事件我来处理,处理完了我通知你”。这里的“我”就是操作系统内核,“事件”就是有新连接、有数据可读、有数据可写的这些 I/O 事件,“你”就是我们的程序代码。
Proactor 模型示意图是:
理论上 Proactor 比 Reactor 效率要高一些,异步 I/O 能够充分利用 DMA 特性,让 I/O 操作与计算重叠,但要实现真正的异步 I/O,操作系统需要做大量的工作。目前 Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下的 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 Reactor 模式为主。所以即使 Boost.Asio 号称实现了 Proactor 模型,其实它在 Windows 下采用 IOCP,而在 Linux 下是用 Reactor 模式(采用 epoll)模拟出来的异步模型。
什么是 DMA 特性
开启学习Netty源码之旅
通过上面文章的学习,发现高级语言 Java 实现某一个功能或者特性是离不开底层操作系统的支持,比如我们熟悉的CAS以及本章的同步阻塞 api NIO,本质都是对底层细节的包装。
好了,到这里学习 Netty 网络基础知识就差不多了。有了这些知识,相信学习 Netty 源码过程中应该会少很多阻碍吧。
笔者最近买了一本Netty权威指南(第二版),计划着能跟着它的书一起去学习,另外,极客时间也有一个关于 netty 的专栏,想买,但还没买,后期再看看。
参考文章说明:
1、Netty的朴素解释
2、Netty权威指南(第二版)
3、从 0 开始学架构(第十九章)