Netty网络基础的通俗理解(网络操作系统)

写在前面

说来惭愧,最近半年没怎么学习技术,时间基本都花在工作以及去熟悉了解金融领域的知识去了。从大一到现在,我一直有个持续学习技术的习惯,如果太久没学习技术,我心里就开始有点焦虑或者说不充实,所以最近想深入理解下他的核心原理以及源码的学习。

想起去年,我一个朋友跟我说,他们的技术大佬曾经对他们说:

如果有时间,一定要系统性地学习 Netty。如果没有掌握 Netty 的核心原理,那么永远都是 Java 的初学者。

Netty 无疑在 Java 网络编程生态处于一个统治级的地位,他的生态位与 spring 之于 Java 服务端编程是一样的。可想而知,Netty 的强大!

所以,我计划系统性的学习下 Netty。然在深入学习 netty 之前,我觉得我非常有必要熟悉并掌握一些网络操作系统的基础知识,比如常见的 IO 模型、TCP 传输的顺序、丢包、流量控制问题,网络编程粘包半包问题,IO多路复用技术(select、poll、epoll)。

Http 服务器是如何并发处理更多连接的?

这个问题可以很形象的类别为:一个上市公司(服务端 server)如何接更多的外部项目(客户端 client socket)?假如你作为老板,在资源有限的情况下,你会怎么做?

方式 1:将项目转包给独立的项目组(多线程方式)

公司来了一个新的项目,但是项目不一定是你自己做,老板可以成立一个新的项目组,一个项目做完了,那这个项目组就可以解散。而作为你这边,因为项目交给了项目组了,你又可以去接新的项目了。(这就相当于你是一个代理,在那里监听来的请求。一旦建立了一个连接,就会有一个已连接 Socket,这时候你可以创建一个线程,然后将基于已连接 Socket 的交互交给这个新的线程来做。)

其实这种方式还有一种原理是类似的多进程的方式,就相当于成立新的子公司来接更多的项目,成本较大。

存在问题:

1)项目接的太多了,如果每个项目都成立单独的项目组,就要招聘超过1 万人,你肯定养不起。也就是 C10K 问题,一台机器要维护 1 万个连接,操作系统是无法承受的

2)有可能一个项目周期长,会导致项目得到不到释放去接下一个项目以及项目组人力工作量不饱和。项目组人力工作量不饱和体现在项目组的进度会依赖第三方,而第三方又没有准备好。(线程被占用时间较长,得不到释放)。

(了解即可,因为如果不存在 c10k 问题,线程稍微慢点释放也问题不大,所以本质还是 c10k 的问题)

方式 2:一个项目组支撑多个项目(IO 多路复用,一个线程维护多个 Socket)

优秀的你,可能会想到一个项目组管理多个项目。这个时候,每个项目组都应该有个项目进度墙,将自己组看的项目列在那里,然后每天通过项目墙看每个项目的进度。(这里其实是有个字细节的,项目墙每个项目的进度是谁在维护的呢,我认为这里应该是类似于项目组经理每天去询问每个项目的进度,然后记录在进度墙上。)

由于 Socket 是文件描述符,因而某个线程盯的所有的 Socket,都放在一个文件描述符集合 fd_set 中,这就是项目进度墙,然后调用 select 函数来监听文件描述符集合是否有变化

ps:这里的监听还是线性扫描注册 socket,轮询该 fd 数据是否就绪了。

那些发生变化的文件描述符在 fd_set 对应的位(数据就绪状态位)都设为 1,表示 Socket 可读或者可写,从而可以进行读写操作,然后再调用 select,接着盯着下一轮的变化。

存在问题:

1)项目组经理更新进度墙需要将全部项目都过一遍的方式来查看进度。(cpu 轮询消耗)

2)所有项目都记录在进度墙上,进度墙面积有限(所有的 Socket,都放在一个文件描述符集合 fd_set 中,这大大影响了一个项目组能够支撑的最大的项目数量(够同时盯的项目数量由 FD_SETSIZE 限制,最大数量是 1024)

方式 3:一个项目组支撑多个项目(IO 多路复用,从“主动询问”到“有事通知”)

上面 select 函数的问题的,如果改成事件通知的方式,情况就会好很多,项目组不需要通过轮询挨个盯着这些项目,而是当项目进度发生变化的时候,主动通知项目组,然后项目组再根据项目进展情况做相应的操作。能完成这件事情的函数叫 epoll,它在内核中的实现不是通过轮询的方式,而是通过注册 callback 函数的方式,当某个文件描述符发送变化的时候,就会主动通知。

这种通知方式使得监听的 Socket 数据增加的时候,效率不会大幅度降低,能够同时监听的 Socket 的数目也非常的多了。上限就为系统定义的、进程打开的最大文件描述符个数。

存在问题:

1)这里的“有事通知”是事件触发之后通知,比如一个读事件触发之后,业务需要自己做读数据操作(数据在内核态复制到用户态这个操作,这个过程是阻塞的),读完成后对读出的数据进行业务操作

所以针对如上存在的问题,可以做个优化,比如读事件触发之后不通知,在读完成事件时通知业务。读操作由操作系统完成,回调的业务只需要关系如何处理数据做业务就行了,这也就是 AIO 模型的思想。

ps:以上几种 IO 方式都是操作系统直接提供支持的。

被讨论最多的 IO 模型

这块的内容笔者已经接触过很多次,但在面试中每次遇到并会议起这几个模型时,都觉得自己理解的不够透彻。每一次学习并看回IO模型,都有不一样的理解。

1. 同步阻塞

同步阻塞常用于本地文件读取,比如 Java 中 InputStream,OutputStream的实现就是同步阻塞 IO。这也是我们做业务开发接触的最多的IO。除了用于本地读取文件,在 NIO 还没推出之前,也会用于网络程序的编写。

bio 服务端:

package cn.comein.institution.controller.test.bio;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class BioServer {static class BioThreadFactory implements ThreadFactory {final AtomicInteger NUM = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"socket-executor-" + NUM.getAndIncrement());}}private static final ExecutorService executor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new BioThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException {start(9527);}private static void start(int port){try {ServerSocket serverSocket = new ServerSocket(port);while (true){Socket socket = serverSocket.accept();executor.submit(()->{try(InputStreamReader inputStreamReader = new InputStreamReader(socket.getInputStream());BufferedReader bufferedReader = new BufferedReader(inputStreamReader);PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);){String content = bufferedReader.readLine();System.out.println("context="+content);printWriter.println(content);}catch (Exception e){e.printStackTrace();}finally {if(socket != null){try{socket.close();}catch (Exception e){}}}});}}catch (Exception e){e.printStackTrace();}}
}

bio 客户端:

package cn.comein.institution.controller.test.bio;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class BioClient {public static void main(String[] args) {sendSome("127.0.0.1",9527,"hello world!!");}private static void sendSome(String address,int port,String content){try {Socket socket = new Socket(address, port);InputStreamReader inputStreamReader = new InputStreamReader(socket.getInputStream());BufferedReader bufferedReader = new BufferedReader(inputStreamReader);PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);printWriter.println(content);String echo = bufferedReader.readLine();System.out.println("echo="+echo);}catch (Exception e){e.printStackTrace();}}
}

这种方式编写的 BIO 程序实际上就是 方式一 的变体,后端通过一个线程池来处理多个客户端的请求处理,避免了创建大量的线程导致服务端的资源耗尽或者宕机。

然而尽管使用线程池,由于读写底层的通讯依然采用同步阻塞模型,在前一个 Socket 读写未完成之前,后续的 Socket 只能在队列中等待,最终导致连接积压在队列中,如果队列满了达到线程池最大线程数了,最终会拒绝连接。因此无法从根本上解决问题

2. 同步非阻塞

曾经年少无知、知识浅薄,既然有了同步阻塞,Java 还推出个 NIO 干哈?我业务开发根本就用不到,难不成我本地文件读写还用同步非阻塞 IO么?这怎么行啊,我后续的操作是依赖于文件读取的结果的呀!!

原来原来原来还是自己 too native。人家 NIO 的主要目的是用于网络编程的。

在 JDK 1.4 推出 NIO 之前,基于 Java 的所有 Socket 通讯都采用同步阻塞模式(BIO),这种一请求一应答的通讯模型简化了上层的应用开发,但是在性能和可靠性方面却存在着巨大的瓶颈。因此,在很长一段时间里,大型的应用服务器都采用 C 或者 C++ 语言开发,因为他们可以直接使用操作系统提供的同步阻塞 IO 模型能力或者 AIO。由于 Java 传统 BIO 的拙劣表现,才使得 Java 支持非阻塞 IO 的呼声日渐高涨,最终,JDK1.4 版本提供了新的 NIO 类库,Java 也终于也支持非阻塞IO了。(可以理解目前NIO是使用上文提到的方式三实现的。)

另外,笔者在这里对同步和阻塞的概念非常的执着,当时一直没想明白,NIO 都说是同步非阻塞,同步体现在哪里,非阻塞又体现在哪里呢?网上的文章对这两个词的解释理解基本千篇一律,抄来抄去,没点自己的思想,辣鸡。

1. IO 模型中的同步和异步主要描述的是程序在等待 IO 操作完成的过程中,数据的提交方式或者说数据的获取方式,是由用户主动去获取(同步),还是由系统自动通知(异步)。
结合上文方式三理解,虽然是系统自动通知,但是通知的是读事件,而非读完成。读数据这个操作,还是需要用户程序自己去读取(需要时间的)。2. 阻塞和非阻塞主要描述的是程序在等待 IO 操作完成的过程中,线程的状态。阻塞意味着线程会被挂起,直到 IO 操作完成;非阻塞则意味着线程在等待 IO 操作完成的过程中,仍然可以进行其他任务。
这样就好理解了,NIO 的 Selector 线程在 socket 数据未就绪时,并不会阻塞在IO操作上,而是继续处理其它 socket。

NIO 服务端:

package cn.comein.institution.controller.test.nio;import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class NioServer {static class BioThreadFactory implements ThreadFactory {final AtomicInteger NUM = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"socket-executor-" + NUM.getAndIncrement());}}private static final ExecutorService executor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new BioThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) {start(9527);}private static void start(int port){try {Handler handler = new Handler();Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(port),1024);serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,handler);while (true){selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();if(!CollectionUtils.isEmpty(selectionKeys)){Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey next = iterator.next();Handler attachment = (Handler) next.attachment();attachment.handle(next);iterator.remove();}}}}catch (Exception e){e.printStackTrace();}}private static class Handler {public void handle(SelectionKey selectionKey){if (selectionKey.isAcceptable()){handleAccept(selectionKey);return;}if (selectionKey.isReadable()){handleRead(selectionKey);return;}}private void handleAccept(SelectionKey selectionKey){try{ServerSocketChannel channel =(ServerSocketChannel) selectionKey.channel();Selector selector = selectionKey.selector();// 获取连接channel,设置为非阻塞,注册readSocketChannel accept = channel.accept();accept.configureBlocking(false);accept.register(selector,SelectionKey.OP_READ,selectionKey.attachment());}catch (Exception e){e.printStackTrace();}}private void handleRead(SelectionKey selectionKey){try {SocketChannel channel =(SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 读取数据int read = channel.read(byteBuffer);if ( read > 0 ){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("content="+content);if(!StringUtils.isEmpty(content)){byte[] respBytes = content.getBytes(StandardCharsets.UTF_8);byteBuffer = ByteBuffer.allocateDirect(respBytes.length);byteBuffer.put(respBytes);byteBuffer.flip();channel.write(byteBuffer);}}}catch (Exception e){e.printStackTrace();}}}}

NIO 客户端:

package cn.comein.institution.controller.test.nio;import org.springframework.util.CollectionUtils;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;public class NioClient {public static void main(String[] args) {sendSome("127.0.0.1",9527,"hello world!!");}private static void sendSome(String address,int port,String content){try {Selector selector = Selector.open();SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);Handler handler = new Handler();handler.setContent(content);if (socketChannel.connect(new InetSocketAddress(address, port))) {socketChannel.register(selector, SelectionKey.OP_READ,handler);handler.doWrite(socketChannel);}else {socketChannel.register(selector, SelectionKey.OP_CONNECT,handler);}while (true){selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();if(!CollectionUtils.isEmpty(selectionKeys)){Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey next = iterator.next();Handler attachment = (Handler) next.attachment();attachment.handle(next);iterator.remove();}}}}catch (Exception e){e.printStackTrace();}}private static class Handler {private String content;public void setContent(String content) {this.content = content;}public void handle(SelectionKey selectionKey){if (selectionKey.isConnectable()){handleConnect(selectionKey);return;}if (selectionKey.isReadable()){handleRead(selectionKey);return;}}private void handleConnect(SelectionKey selectionKey){try{SocketChannel channel =(SocketChannel) selectionKey.channel();if (channel.finishConnect()) {Selector selector = selectionKey.selector();channel.configureBlocking(false);channel.register(selector,SelectionKey.OP_READ,selectionKey.attachment());doWrite(channel);}}catch (Exception e){e.printStackTrace();}}private void handleRead(SelectionKey selectionKey){try {SocketChannel channel =(SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 读取数据int read = channel.read(byteBuffer);if ( read > 0 ){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("echo="+content);}channel.close();System.exit(0);}catch (Exception e){e.printStackTrace();}}public void doWrite(SocketChannel socketChannel){try{byte[] bytes = content.getBytes(StandardCharsets.UTF_8);ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length);byteBuffer.put(bytes);byteBuffer.flip();socketChannel.write(byteBuffer);}catch (Exception e){e.printStackTrace();}}}
}

3. 异步IO

有了同步非阻塞 IO 的基础知识,异步 IO 的提出就是为了解决同步非阻塞 IO 中同步的问题(用户主动去获取数据),由 IO 事件触发改为 IO 操作完成通知。

说白了就是同步非阻塞IO需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间

服务端:

package cn.comein.institution.controller.test.aio;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;public class AioServer {public static void main(String[] args) {start(9527);System.out.println("---service exit--");}private static void start(int port){try {AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port),1024);AcceptHandler handler = new AcceptHandler();serverSocketChannel.accept(serverSocketChannel,handler);}catch (Exception e){e.printStackTrace();}}private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsynchronousServerSocketChannel> {@Overridepublic void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);attachment.accept(attachment,this);result.read(byteBuffer,byteBuffer,new ReadHandler(result));}@Overridepublic void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {exc.printStackTrace();}}private static class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public ReadHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] bytes = new byte[attachment.remaining()];attachment.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("content="+content);ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);byteBuffer.put(content.getBytes(StandardCharsets.UTF_8));byteBuffer.flip();asynchronousSocketChannel.write(byteBuffer,byteBuffer,new WriteHandler(asynchronousSocketChannel));}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}private static class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public WriteHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {if (attachment.hasRemaining()){asynchronousSocketChannel.write(attachment,attachment,this);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}}

客户端:

package cn.comein.institution.controller.test.aio;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;public class AioClient {public static void main(String[] args) {for (int i = 0; i < 100; i++) {sendSome("127.0.0.1",9527,"hello world!!");}System.out.println("client exit");}private static void sendSome(String address,int port,String content){try{ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);byteBuffer.put(content.getBytes(StandardCharsets.UTF_8));byteBuffer.flip();AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();socketChannel.connect(new InetSocketAddress(address,port),byteBuffer,new ConnectHandler(socketChannel) );}catch (Exception e){}}private static class ConnectHandler implements CompletionHandler<Void, ByteBuffer> {private AsynchronousSocketChannel asynchronousSocketChannel;public ConnectHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Void result, ByteBuffer attachment) {asynchronousSocketChannel.write(attachment,attachment,new WriteHandler(asynchronousSocketChannel));}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}private static class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public WriteHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {if (attachment.hasRemaining()){asynchronousSocketChannel.write(attachment,attachment,this);}else {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);asynchronousSocketChannel.read(byteBuffer,byteBuffer,new ReadHandler(asynchronousSocketChannel));}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}private static class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {private final AsynchronousSocketChannel asynchronousSocketChannel;public ReadHandler(AsynchronousSocketChannel asynchronousSocketChannel){this.asynchronousSocketChannel = asynchronousSocketChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] bytes = new byte[attachment.remaining()];attachment.get(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println("echo="+content);try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();try{asynchronousSocketChannel.close();}catch (Exception e){e.printStackTrace();}}}}

AIO 的编程风格跟CompletableFuture 的思想分割很像,在完成之后将回调CompletionHandler,做业务操作,比如服务端接收接收到数据完成之后会回调对于的handler做业务操作,我们不需要自己将channel里的数据读到buffer里面。

这三种 IO 模型的通俗理解和小结

IO操作分两个阶段:

1. 等待数据准备好(读到内核缓存)
2. 将数据从内核读到用户空间(进程空间)
一般来说第1步花费的时间远远大于第2步。

同步阻塞IO:第1步上阻塞第2步上也阻塞

同步非阻塞IO:第1步上非阻塞第2步阻塞

异步非阻塞IO:第1步上非阻塞第2步上非阻塞

再举个形象的例子,假如我们去饭店点餐,饭店人很多,如果我们付了钱后站在收银台等着饭端上来我们才离开,这就成了同步阻塞了。如果我们付了钱后给你一个号就可以离开,饭好了老板会叫号,你过来取。这就是同步非阻塞模型。如果我们付了钱后给我一个号就可以坐到坐位上该干啥干啥,饭好了老板会把饭端上来送给你,这就是异步非阻塞IO模型了。

IO 多路复用技术

有的人也喜欢把方式二方式三叫做 IO 复用模型,这是因为同步阻塞模型的关键就是 IO 多路复用技术.

在 IO 编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者 IO 多路复用技术进行处理。IO 多路复用技术通过把多个 IO 的阻塞复用到同一个 select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,IO多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源。

目前支持 IO 多路复用的系统调用有selectpselectpollepoll,在 Linux 网络编程过程中,很长一段时间都使用select做轮询和网络事件通知,然而select的一些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select的替代方案,最终选择了epollepollselect 的原理比较类似,为了克服select的缺点,epoll 作了很多重大改进。

简单理解总结如下:select 最大的缺陷就是单个进程所打开的 FD 是有一定限制的,它由 FD_ SETSIZE 设置,默认值是1024,epoll 并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于 1024。并且监听的 Socket 数据增加的时候,效率不会大幅度降低。因而,epoll 被称为解决 C10K 问题的利器。

单服务器高性能模式:Reactor 与 Proactor

好,IO 多路复用技术可以解决C10K 问题的问题。既然上游接请求没问题了, 下游怎么处理呢?我们一般都会想到,通过一个线程池来处理业务。

Reactor

没错,IO 多路复用结合线程池,“大神们”给它取了一个很牛的名字:Reactor,中文是“反应堆”。联想到“核反应堆”,听起来就很吓人,实际上这里的“反应”不是聚变、裂变反应的意思,而是“事件反应”的意思,可以通俗地理解为“来了一个事件我就有相应的反应”,这里的“我”就是 Reactor,具体的反应就是我们写的代码,Reactor 会根据事件类型来调用相应的代码进行处理。Reactor 模式也叫 Dispatcher 模式(在很多开源的系统里面会看到这个名称的类,其实就是实现 Reactor 模式的)(比如Spring mvc 的DispatcherServlet),更加贴近模式本身的含义,即 IO 多路复用统一监听事件,收到事件后分配(Dispatch)给某个进程。

Reactor 模式的核心组成部分包括 Reactor 和处理资源池(进程池或线程池),其中 Reactor 负责监听和分配事件,处理资源池负责处理事件。初看 Reactor 的实现是比较简单的,但实际上结合不同的业务场景,Reactor 模式的具体实现方案灵活多变。

1. 单 Reactor 单进程 / 线程

单 Reactor 单进程 / 线程的方案示意图如下(以进程为例):

只有一个进程,无法发挥多核 CPU 的性能,并且 Handler 在处理某个连接上的业务时,整个进程无法处理其他连接的事件,很容易导致性能瓶颈。

因此,单 Reactor 单进程的方案在实践中应用场景不多,只适用于业务处理非常快速的场景,目前比较著名的开源软件中使用单 Reactor 单进程的是 Redis。

2. 单 Reactor 多线程

为了克服单 Reactor 单进程 / 线程方案的缺点,引入多进程 / 多线程是显而易见的。

单 Reator 多线程方案能够充分利用多核多 CPU 的处理能力,但Reactor 承担所有事件的监听和响应,只在主线程中运行,瞬间高并发时会成为性能瓶颈

3. 多 Reactor 多线程

为了解决单 Reactor 多线程的问题,最直观的方法就是将单 Reactor 改为多 Reactor。

目前著名的开源系统 Nginx 采用的是多 Reactor 多进程,采用多 Reactor 多线程的实现有 Memcache 和 Netty。

Proactor

Reactor 是非阻塞同步网络模型,因为真正的 read 和 send 操作都需要用户进程同步操作。这里的“同步”指用户进程在执行 read 和 send 这类 I/O 操作的时候是同步的,如果把 I/O 操作改为异步就能够进一步提升性能,这就是异步网络模型 Proactor。Proactor 中文翻译为“前摄器”比较难理解,与其类似的单词是 proactive,含义为“主动的”,因此我们照猫画虎翻译为“主动器”反而更好理解。Reactor 可以理解为“来了事件我通知你,你来处理”,而 Proactor 可以理解为“来了事件我来处理,处理完了我通知你”。这里的“我”就是操作系统内核,“事件”就是有新连接、有数据可读、有数据可写的这些 I/O 事件,“你”就是我们的程序代码。

Proactor 模型示意图是:

理论上 Proactor 比 Reactor 效率要高一些,异步 I/O 能够充分利用 DMA 特性,让 I/O 操作与计算重叠,但要实现真正的异步 I/O,操作系统需要做大量的工作。目前 Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下的 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 Reactor 模式为主。所以即使 Boost.Asio 号称实现了 Proactor 模型,其实它在 Windows 下采用 IOCP,而在 Linux 下是用 Reactor 模式(采用 epoll)模拟出来的异步模型。

什么是 DMA 特性

开启学习Netty源码之旅

通过上面文章的学习,发现高级语言 Java 实现某一个功能或者特性是离不开底层操作系统的支持,比如我们熟悉的CAS以及本章的同步阻塞 api NIO,本质都是对底层细节的包装。

好了,到这里学习 Netty 网络基础知识就差不多了。有了这些知识,相信学习 Netty 源码过程中应该会少很多阻碍吧。

笔者最近买了一本Netty权威指南(第二版),计划着能跟着它的书一起去学习,另外,极客时间也有一个关于 netty 的专栏,想买,但还没买,后期再看看。

参考文章说明:
1、Netty的朴素解释
2、Netty权威指南(第二版)
3、从 0 开始学架构(第十九章)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/229260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

半导体设备之外延炉简述

半导体设备对整个半导体行业起着重要的支撑作用。因半导体制造工艺复杂&#xff0c;各个环节需要的设备也不同&#xff0c;从流程工序分类来看&#xff0c;半导体设备主要可分为晶圆制造设备&#xff08;前道工序&#xff09;、封装测试设备&#xff08;后道工序&#xff09;等…

使用C/C++实现DNS协议栈

使用C/C实现DNS协议栈 DNS&#xff0c;全称域名系统(Domain Name System)&#xff0c;是用于将域名转换为IP地址的分布式数据库系统。实现一个完整的DNS协议栈是一个相对复杂的任务&#xff0c;但本文将为您提供一个简化的概述和实际的案例&#xff0c;以帮助您入门。 1. 基…

20来岁,大专毕业,学软件测试可行吗?

转行软件测试找不到工作&#xff01; 转行软件测试找不到工作&#xff01; 转行软件测试找不到工作&#xff01; 重要的事情说三遍&#xff01;千万别听培训班咨询老师给你画饼 &#xff1b;我就是某某软件测试培训班出来的&#xff0c;大专&#xff0c;其他专业毕业&#x…

磁盘坏道扫描工具 Macrorit Disk Scanner v6.7.0 中文免费版 -供大家学习研究参考

非常方便实用的磁盘坏道修复软件。Wipe Bad Disk功能强大好用&#xff0c;通过特殊的算法来强制将硬盘的坏道删除清空格式化&#xff0c;从而拯救因产生坏道而不敢继续使用的硬盘!要注意的是经过这块软件清空的硬盘数据基本上是不能被恢复的&#xff0c;所以操作前请一定要备份…

PyQt5连接mysql失败解决

一&#xff1a;背景 最近研究一个项目&#xff0c;里面用的Pyqt5编写的桌面应用&#xff0c;跑了下源码发现连接数据库那块出来问题&#xff0c;最终调试发现里面用的QtSql去连接mysql提示驱动找不到。 具体报错信息如下&#xff1a; Could not parse stylesheet of object …

记录 | gpu docker启动报错libnvidia-ml.so.1: file exists: unknown

困扰了两天的问题&#xff0c;记录一下 问题出在启动一个本身已经安装 cuda 的镜像上&#xff0c;具体来说&#xff0c;我是启动地平线天工开物工具链镜像的时候出现的问题&#xff0c;具体报错如下&#xff1a; docker: Error response from daemon: failed to create task …

linux 设备子系统 摘要

Linux设备模型提取了设备操作的共同属性&#xff0c;进行抽象&#xff0c;并将这部分共同的属性在内核中实现&#xff0c;而为需要新添加设备或驱动提供一般性的统一接口&#xff0c;这使得驱动程序的开发变得更简单了&#xff0c;而程序员只需要去学习接口就行了。 在内核里&…

数字病理图像分析的开源软件qupath学习 ①

介绍&#xff1a;QuPath是一种新的生物图像分析软件&#xff0c;旨在满足对用户友好、可扩展、开源解决方案日益增长的需求&#xff0c;用于数字病理学和全玻片图像分析。除了提供全面的肿瘤识别和高通量生物标志物评估工具外&#xff0c;QuPath 还为研究人员提供了强大的批处理…

写递归函数的一些思考

当编写递归函数时&#xff0c;有几个关键的思考点可以帮助你设计和实现递归算法&#xff1a; 定义递归的基本情况&#xff1a;确定递归函数应该在何时终止&#xff0c;即递归的基本情况。这是一个递归的出口条件&#xff0c;确保递归不会无限进行下去。基本情况应该是可以直接求…

FL Studio 21.1.0.3713中文版最新安装激活图文教程及系统配置要求

FL Studio 21.1.0.3713中文版是一款功能强大的编曲软件&#xff0c;它也能够剪辑、混音、录音&#xff0c;它的矢量界面&#xff0c;能更好用在4K、5K甚至8K显示器上。完全重新设计混音器、动态缩放、具有 6 种布局风格、外加 3个用户自定义面板管理音轨、多推子选择和调整、混…

正大杯获奖作品在哪可以看见

通过网盘分享的文件&#xff1a;2023年第十三届正大杯最新国家一等奖完整获奖作品报告等全套资料 链接:https://pan.baidu.com/s/1SPA4LumSCI4BZdCRXXnW6Q?pwdc8bj 提取码:c8bj 2023年第十三届最新正大杯国家一等奖完整获奖作品等全套资料获取方式链接https://ex59573j43x.fe…

root登录提示:Access denied

一、问题&#xff1a; 在使用xshell工具用root账号登录服务器时提示Access denied&#xff0c;拒绝访问&#xff0c;SSH服务器拒绝了密码&#xff0c;但用其它用户又可以连接. 二、原因 是因为sshd的设置不允许root用户用密码远程登录的问题 三、解决办法 使用可以登录的账…

【C++11特性篇】C++11中の【override】【final】关键字——帮助用户检测是否重写

前言 大家好吖&#xff0c;欢迎来到 YY 滴C系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 目录 一.【override】【final】关键字——帮…

实战经验:如何利用房产小程序提升客户满意度?

在当今的数字化时代&#xff0c;房产中介公司需要不断地适应市场变化&#xff0c;提供更加便捷、高效的服务。小程序作为一种轻量级的应用程序&#xff0c;具有无需下载、易于分享、随时可用等优点&#xff0c;可以为房产中介公司提供一个新的销售渠道。本文将介绍如何使用乔拓…

代码随想录算法训练营Day4 | 24.两两交换链表中的节点、19.删除链表的倒数第 N 个节点、面试题. 链表相交、142.环形链表II

LeetCode 24 两两交换链表中的节点 本题要注意的条件&#xff1a; 遍历终止条件改变引用指向的时候&#xff0c;需要保存一些节点记录 为了更好的操作链表&#xff0c;我定义了一个虚拟的头节点 dummyHead 指向链表。如下图所示 既然要交换链表中的节点&#xff0c;那么肯定…

小姐姐跳舞,AI 视频生成太酷了

大家好&#xff0c;我是章北海 最近AI视频领域的研究进展神速&#xff0c;看得眼花缭乱。 这里老章就把最近几天看过印象深刻的四个项目介绍给大家&#xff0c;同时附上项目相关简介、论文、代码等资料&#xff0c;感兴趣的同学可以深度研究一下。 《SMPLer-X:放大表达性人体…

uniapp实现地图电子围栏功能

该功能使用uniapp中内置组件map实现 效果图预览&#xff1a; 实现过程&#xff1a; 1.文档&#xff1a; 2.代码&#xff1a; <template><view><map :style"width: 100%; height:screenHeight" :latitude"latitude" :longitude"longit…

mysql8支持远程访问

上面的localhost要改为%号就打开了远程访问 ALTER USER root% IDENTIFIED WITH mysql_native_password BY fengzi2141;

【第1期】SpringSecurity基于角色和权限的细粒度接口权限控制

SpringSecurity 细粒度权限控制 一、Role 和 Authority的区别 角色用来表示某一类权限的集合&#xff0c;权限粒度更小&#xff0c;方便细粒度控制 二、创建用户、角色、权限相关表&#xff1a; CREATE TABLE common_user (id bigint(20) NOT NULL COMMENT 主键id,login_na…

ES查询流程

在ES中查询分为两类&#xff1a;1.基于文档ID查询&#xff0c;2.按照非文档ID查询。 基于文档id查询 1.基于文档ID查询 当执行如下查询时&#xff1a; GET /megacorp/employee/1ES在执行上述查询的具体过程如下&#xff1a; 1、客户端向 Node 1 发送获取请求&#xff0c;此…