Java BIO、NIO(通信/群聊系统、零拷贝)、AIO
BIO、NIO、AIO特点和场景
BIO(Blocking I/O)、NIO(Non-blocking I/O)、AIO(Asynchronous I/O)是Java中用于处理I/O操作的三种不同的I/O模型,它们具有不同的特点和适用场景。
-
BIO(Blocking I/O):
- BIO是传统的I/O模型,它是同步阻塞的,也就是说当一个I/O操作发生时,应用程序会被阻塞,直到操作完成。
- 在BIO中,每个I/O操作通常需要一个单独的线程来处理,这意味着如果有大量并发的I/O操作,就需要创建大量线程,这可能会导致资源消耗和性能问题。
- BIO适用于简单的网络通信场景,但在高并发情况下,性能较差。
-
NIO(Non-blocking I/O):
-
NIO是一种更现代的I/O模型,它引入了非阻塞的概念,允许一个线程处理多个I/O操作。NIO使用了选择器(Selector)来监听多个通道的事件,从而减少了线程的数量。
-
在NIO中,应用程序可以异步地发起I/O操作,然后继续执行其他任务,而不需要等待I/O操作完成。这提高了并发性能,特别适用于需要处理大量并发连接的网络应用程序。
-
NIO适用于构建高性能的网络服务器和客户端,如Netty等框架。
-
-
AIO(Asynchronous I/O):
- AIO是Java 7引入的一种异步I/O模型,它进一步提高了I/O操作的效率。与NIO不同,AIO的异步I/O操作可以在后台完成,应用程序不需要等待。
- AIO适用于那些需要大量I/O操作并且不希望阻塞主线程的应用程序,比如高性能的文件和网络服务器。
- 在AIO中,操作系统负责通知应用程序I/O操作的完成,而不需要应用程序不断地轮询或等待。
总结:
- BIO适用于简单的同步I/O操作,但在高并发场景下性能较差。
- NIO适用于需要处理大量并发连接的网络应用程序,提供了非阻塞I/O和选择器的支持。
- AIO适用于需要高性能异步I/O操作的应用程序,它可以在后台进行I/O操作而不阻塞主线程。
对比表
特点 | BIO (Blocking I/O) | NIO (Non-blocking I/O) | AIO (Asynchronous I/O) |
---|---|---|---|
阻塞 | 阻塞式 I/O | 非阻塞式 I/O | 异步 I/O |
并发处理 | 低并发处理 | 中等并发处理 | 高并发处理 |
编程模型 | 同步编程模型 | 同步和部分异步编程模型 | 异步编程模型 |
缓冲区 | 需要自己手动管理缓冲区 | 使用缓冲区进行数据传输 | 使用缓冲区进行数据传输 |
文件操作支持 | 支持 | 支持 | 支持 |
网络操作支持 | 支持 | 支持 | 支持 |
效率 | 低效率,线程池开销大 | 相对高效,线程开销小 | 高效率,异步处理 |
适用场景 | 低并发、少连接数 | 中等并发、中等连接数 | 高并发、大连接数 |
复杂性 | 相对简单 | 相对复杂 | 较复杂 |
主要类和接口 | InputStream /OutputStream | Selector /Channel | AsynchronousChannel |
典型应用 | 单线程或多线程处理少量连接 | 网络服务器、文件操作等 | 高性能服务器、文件操作等 |
总的来说,BIO适用于连接数较少的场景,NIO适用于中等并发场景,而AIO适用于高并发的异步操作场景,如网络服务器、文件传输等。选择适当的I/O模型取决于应用程序的性能和并发需求。
BIO 案例
telnet客户端连接测试
cmd telnet [ip] [端口]
可以输入字符发送,或者ctrl+],然后使用send命令,send hello
启动Telnet客户端
服务端逻辑
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class BIOServer {public static void main(String[] args) throws Exception {//线程池机制思路//1. 创建一个线程池//2. 如果有客户端连接,就创建一个线程,与之通讯ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();//创建ServerSocketServerSocket serverSocket = new ServerSocket(8989);System.out.println("服务器启动了");while (true) {System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());//监听,等待客户端连接System.out.println("等待连接....");//会阻塞在accept()final Socket socket = serverSocket.accept();System.out.println("连接到一个客户端");//就创建一个线程,与之通讯(单独写一个方法)newCachedThreadPool.execute(() -> {//可以和客户端通讯handler(socket);});}}/*** 与客户端通讯** @param socket*/public static void handler(Socket socket) {try {System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());byte[] bytes = new byte[1024];//通过socket获取输入流InputStream inputStream = socket.getInputStream();//循环的读取客户端发送的数据while (true) {System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());System.out.println("read....");//没有可读数据时会阻塞int read = inputStream.read(bytes);if (read != -1) {System.out.println(new String(bytes, 0, read));//输出客户端发送的数据} else {break;}}} catch (Exception e) {e.printStackTrace();} finally {System.out.println("关闭和client的连接");try {socket.close();} catch (Exception e) {e.printStackTrace();}}}
}
总结
- 每个请求都需要创建独立的线程,与对应的客户端进行数据
Read
,业务处理,数据Write
。 - 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
- 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在
Read
操作上,造成线程资源浪费。
NIO三大核心组件关系
Channel
Channel
是用于进行非阻塞I/O操作的抽象,它代表了一个可以进行读写操作的数据源或数据目的地。Channel
提供了一种通用的、统一的接口,用于与不同类型的I/O设备(如文件、套接字、管道等)进行交互。
以下是关于Channel
的一些主要特点和常见的类型:
-
不同类型的
Channel
:FileChannel
:用于文件的读写操作,支持文件的随机读写。SocketChannel
:用于TCP套接字通信,可以连接到远程服务器或接受客户端连接。ServerSocketChannel
:用于监听传入的TCP连接请求,允许服务器接受客户端连接。DatagramChannel
:用于UDP通信,支持无连接的数据包传输。Pipe.SinkChannel
和Pipe.SourceChannel
:用于在同一虚拟机内的两个线程之间进行通信,支持管道传输。
-
通用操作:
- 所有类型的
Channel
都支持通用的操作,如读取数据到Buffer
、从Buffer
写入数据、关闭通道等。 Channel
的读写操作通常是非阻塞的,因此可以在一个线程中同时处理多个Channel
。
- 所有类型的
-
FileChannel:
FileChannel
是用于文件的I/O操作的主要通道类型。- 可以使用
FileChannel
来读取、写入、映射文件以及文件锁定等。 FileChannel
支持文件的随机访问,可以通过position()
方法设置读写位置。
-
SocketChannel 和 ServerSocketChannel:
SocketChannel
用于客户端套接字通信,可以连接到远程服务器。ServerSocketChannel
用于服务器套接字通信,用于监听传入的连接请求并创建对应的SocketChannel
来处理客户端连接。
-
DatagramChannel:
DatagramChannel
用于UDP通信,支持无连接的数据包传输。- 通过
DatagramChannel
可以发送和接收UDP数据报。
-
Pipe:
Pipe.SinkChannel
和Pipe.SourceChannel
通常用于在同一虚拟机内的两个线程之间进行通信。- 一个线程将数据写入
SinkChannel
,另一个线程从SourceChannel
读取数据。
总之,Channel
是Java NIO中用于进行非阻塞I/O操作的关键组件之一,它提供了统一的接口,可以用于处理不同类型的I/O设备。开发者可以根据应用程序的需求选择合适类型的Channel
,并使用它们来进行数据的读写操作,从而实现高效的I/O通信。
Buffer
Buffer
是用于进行数据读取和写入的重要抽象。它是Java NIO(New I/O)库的一部分,用于处理非阻塞I/O操作。Buffer
提供了一种方便的方式来操作底层数据源(通常是字节数组)的数据,同时可以在不同的I/O通道之间传递数据。
以下是关于NIO中的Buffer
的主要特点和用法:
-
缓冲区类型:Java NIO提供了多种类型的缓冲区,主要有以下几种:
ByteBuffer
:用于处理字节数据。CharBuffer
:用于处理字符数据。ShortBuffer
、IntBuffer
、LongBuffer
:分别用于处理短整数、整数和长整数数据。FloatBuffer
、DoubleBuffer
:用于处理浮点数和双精度浮点数数据。- 这些不同类型的缓冲区都是
Buffer
的子类,提供了不同数据类型的读写方法。
-
容量(Capacity):每个缓冲区都有一个固定的容量,表示它可以存储的数据元素数量。容量在缓冲区创建时被指定,并且不能更改。
-
位置(Position):位置表示下一个读取或写入操作将从缓冲区的哪个位置开始。初始位置通常为0,并且通过读取或写入数据而逐渐增加。位置不能超过缓冲区的容量。
-
限制(Limit):限制是一个不可读写的索引,表示在读取或写入操作时不能超过的位置。初始限制通常等于容量,但可以通过设置限制来限制读取或写入的数据范围。
-
标记(Mark):标记是一个临时的索引,可以通过
mark()
方法设置,并在需要时通过reset()
方法回到标记的位置。标记的主要用途是在读取或写入一些数据后能够返回到之前的位置。 -
读写操作:
Buffer
提供了一系列读取和写入数据的方法,如get()
、put()
、read()
、write()
等。这些方法根据当前位置(Position)和限制(Limit)来确定读写的数据范围。 -
翻转(Flip):通过
flip()
方法,可以将缓冲区从写模式切换到读模式。这个操作会将限制设置为当前位置,并将位置重置为0,准备开始读取数据。 -
清空(Clear):通过
clear()
方法,可以将缓冲区从读模式切换到写模式。这个操作会将限制设置为容量,位置重置为0,准备开始写入数据。 -
压缩(Compact):通过
compact()
方法,可以将未读取的数据复制到缓冲区的开头,然后将位置设置为复制后的数据后面。这个操作用于在写模式下向缓冲区写入数据之前,压缩未读取的数据。 -
重绕(Rewind):通过
rewind()
方法,可以将位置重置为0,保持限制不变,进入写模式。这个操作用于重新读取已经读取过的数据。 -
标记和重置:
mark()
方法用于设置标记位置,reset()
方法用于将位置重置为标记位置。
Buffer
在Java NIO中是一个重要的基础构建块,它允许开发者高效地进行数据的读取和写入操作,特别适用于非阻塞I/O编程。在使用Buffer
时,需要注意正确管理位置、限制、标记和容量,以确保数据的正确读写。
Selector
Selector
是一个多路复用器,用于管理多个Channel
的事件,以便一个线程可以同时处理多个通道上的I/O操作。Selector
是非常重要的,因为它可以帮助实现非阻塞I/O,提高网络和文件I/O的效率。
以下是关于Selector
的一些主要特点和用法:
-
多路复用:
Selector
允许一个线程同时监视多个Channel
上的I/O事件,如读就绪、写就绪、连接就绪、接收就绪等。通过使用Selector
,一个线程可以有效地管理多个I/O通道,而不需要为每个通道创建一个单独的线程。 -
SelectableChannel
:SelectableChannel
是可以在Selector
上注册的Channel
的子类,例如SocketChannel
、ServerSocketChannel
、DatagramChannel
等。通过将Channel
注册到Selector
上,可以监视它们的I/O事件。 -
SelectionKey
:当一个Channel
被注册到Selector
上时,会返回一个SelectionKey
对象,它表示了该Channel
在Selector
上的注册状态和感兴趣的事件。SelectionKey
包括事件类型(如读、写、连接、接收)、关联的Channel
等信息。 -
select()
方法:Selector
提供了select()
方法,用于阻塞等待至少一个通道在感兴趣的事件上就绪。一旦有一个或多个通道就绪,select()
方法会返回,并返回就绪通道的数量。这个方法可以帮助减少CPU的轮询开销。 -
selectedKeys()
方法:Selector
还提供了selectedKeys()
方法,用于获取就绪的SelectionKey
集合,然后可以遍历这些SelectionKey
来处理相应的事件。 -
register()
方法:SelectableChannel
可以通过register()
方法将自己注册到Selector
上,并指定感兴趣的事件类型。通常在创建Channel
后,需要将其注册到一个Selector
上才能开始监视事件。 -
cancel()
方法:通过SelectionKey
的cancel()
方法可以取消注册的通道,停止监视相应的事件。 -
非阻塞操作:使用
Selector
可以实现非阻塞I/O操作,当一个通道就绪时,可以处理它的I/O事件,而不需要等待。这允许一个线程有效地管理多个通道的状态。
Selector
在高性能的网络服务器、文件I/O操作以及需要管理多个通道的应用程序中非常有用。通过结合Selector
、Channel
和Buffer
等NIO组件,可以实现高效的非阻塞I/O编程,提高应用程序的性能和响应性。但需要注意,正确使用Selector
需要处理一些细节,如注册和取消注册通道、处理事件等,以确保程序的正确性和可维护性。
Buffer 缓冲区
这四个字段 mark
、position
、limit
和 capacity
是 Buffer
类中非常重要的属性,它们用于管理缓冲区的状态和限制。以下是对每个字段的详细解释:
-
mark
:mark
是一个标记位置,用于记录缓冲区中的某个特定位置。初始值为-1,表示没有设置标记。你可以通过mark()
方法来设置标记位置,然后通过reset()
方法来恢复到标记位置。这对于在读写模式之间切换时定位到特定位置非常有用。 -
position
:position
表示当前读取或写入的位置。初始值为0,即缓冲区的起始位置。position
在读写操作中会随着操作的进行而递增,标记了下一个要读取或写入的位置。 -
limit
:limit
表示读取或写入操作的限制位置。它限制了可以读取或写入的最大位置。初始时,limit
通常等于缓冲区的容量,表示可以读取或写入整个缓冲区。当使用flip()
方法切换到读模式时,limit
会被设置为当前的position
,这样就限制了读取操作只能在已写入数据的范围内进行。limit
的值可以随时进行修改以改变读写操作的限制。 -
capacity
:capacity
表示缓冲区的容量,即它可以容纳的数据元素的数量。一旦分配,缓冲区的容量通常不会更改。
这些字段共同管理了缓冲区的状态,允许你在读取和写入操作之间进行切换,并确保操作在正确的范围内进行。例如,position
指示了下一个要读取或写入的位置,而 limit
限制了操作的范围,mark
允许你在需要时回到特定位置。
在使用 Buffer
类时,了解和管理这些字段的值对于正确操作缓冲区非常重要。通常,你会使用 flip()
、clear()
、rewind()
、mark()
、reset()
等方法来更改和管理这些字段的值,以满足你的读写需求。
Channel 通道
FileChannel 类
FileChannel
是 Java NIO 中用于文件 IO 操作的一个重要类,它提供了对文件的读取和写入操作。FileChannel
通常与 ByteBuffer
一起使用,以高效地处理文件的读写。
-
创建 FileChannel:
若要创建一个
FileChannel
对象,通常需要通过FileInputStream
或FileOutputStream
来获取。例如,要创建一个用于读取文件的FileChannel
,可以使用以下代码:javaCopy codeFileInputStream fileInputStream = new FileInputStream("example.txt"); FileChannel fileChannel = fileInputStream.getChannel();
同样,要创建一个用于写入文件的
FileChannel
,可以使用FileOutputStream
。 -
FileChannel 的读取和写入:
FileChannel
提供了一系列方法来执行读取和写入操作,其中最常见的包括:int read(ByteBuffer dst)
:从文件中读取数据到给定的ByteBuffer
。int write(ByteBuffer src)
:将ByteBuffer
中的数据写入到文件。long position()
和FileChannel position(long newPosition)
:获取或设置当前的文件位置。long size()
:获取文件的大小。int read(ByteBuffer dst, long position)
和int write(ByteBuffer src, long position)
:在指定位置读取或写入数据。
-
FileChannel 的文件锁定:
FileChannel
还支持文件锁定机制,允许你在多个线程或进程之间控制文件的访问。通过FileChannel
的lock()
和tryLock()
方法,你可以请求文件的共享锁或排它锁。这对于确保同时只有一个进程能够对文件进行写入操作非常有用。 -
FileChannel 的关闭:
当你完成文件操作后,应该及时关闭
FileChannel
以释放资源。使用close()
方法可以关闭FileChannel
。 -
适用场景:
FileChannel
主要用于文件 IO 操作,特别适用于需要高性能和大文件处理的场景。与传统的InputStream
和OutputStream
相比,FileChannel
更为灵活,对于高并发的文件读写操作也更高效。
应用实例
本地文件写入、读取、单缓冲区读写、拷贝案例
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;import java.io.File;
import java.io.FileInputStream;public class NIOFileChannel {public static void main(String[] args) throws Exception {NIOFileChannel nio = new NIOFileChannel();
// nio.writeFile("c:\\file01.txt");
// nio.readFile("c:\\file01.txt");
// nio.readAndWriteFile("C:\\tmp\\nio\\1.txt", "C:\\tmp\\nio\\2.txt");nio.copyFile("C:\\tmp\\nio\\1.txt", "C:\\tmp\\nio\\22.txt");}/*** 写入文件数据** @param filePath 文件路径*/public void writeFile(String filePath) throws Exception {String str = "hello, world";//创建一个输出流 -> channelFileOutputStream fileOutputStream = new FileOutputStream(filePath);//通过 fileOutputStream 获取对应的 FileChannel//这个 fileChannel 真实类型是 FileChannelImplFileChannel fileChannel = fileOutputStream.getChannel();//创建一个缓冲区 ByteBufferByteBuffer byteBuffer = ByteBuffer.allocate(1024);//将 str 放入 byteBufferbyteBuffer.put(str.getBytes());//对 byteBuffer 进行 flipbyteBuffer.flip();//将 byteBuffer 数据写入到 fileChannelfileChannel.write(byteBuffer);fileOutputStream.close();}/*** 读取文件数据** @param filePath 文件路径*/public void readFile(String filePath) throws Exception {//创建文件的输入流File file = new File(filePath);FileInputStream fileInputStream = new FileInputStream(file);//通过 fileInputStream 获取对应的 FileChannel -> 实际类型 FileChannelImplFileChannel fileChannel = fileInputStream.getChannel();//创建缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());//将通道的数据读入到 BufferfileChannel.read(byteBuffer);//将 byteBuffer 的字节数据转成 StringSystem.out.println(new String(byteBuffer.array()));fileInputStream.close();}/*** 使用一个 Buffer 从通道中读取数据,并将数据写入到另一个通道中** @param filePath1 读取文件路径* @param filePath2 待写入文件路径*/public void readAndWriteFile(String filePath1, String filePath2) throws Exception {FileInputStream fileInputStream = new FileInputStream(filePath1);FileChannel fileChannel01 = fileInputStream.getChannel();FileOutputStream fileOutputStream = new FileOutputStream(filePath2);FileChannel fileChannel02 = fileOutputStream.getChannel();ByteBuffer byteBuffer = ByteBuffer.allocate(512);//循环读取while (true) {//清空 bufferbyteBuffer.clear();int read = fileChannel01.read(byteBuffer);System.out.println("read = " + read);//读完if (read == -1) {break;}//将 buffer 中的数据写入到 fileChannel02--2.txtbyteBuffer.flip();fileChannel02.write(byteBuffer);}//关闭相关的流fileInputStream.close();fileOutputStream.close();}/*** 拷贝文件** @param filePath1 源文件路径* @param filePath2 待拷贝文件路径*/public static void copyFile(String filePath1, String filePath2) throws Exception {//创建相关流FileInputStream fileInputStream = new FileInputStream(filePath1);FileOutputStream fileOutputStream = new FileOutputStream(filePath2);//获取各个流对应的 FileChannelFileChannel sourceCh = fileInputStream.getChannel();FileChannel destCh = fileOutputStream.getChannel();//使用 transferForm 完成拷贝destCh.transferFrom(sourceCh, 0, sourceCh.size());//关闭相关通道和流sourceCh.close();destCh.close();fileInputStream.close();fileOutputStream.close();}
}
关于 Buffer 和 Channel 的注意事项和细节
ByteBuffer
支持类型化的 put
和 get
,put
放入的是什么数据类型,get
就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException
异常。
可以将一个普通 Buffer
转成只读 Buffer
NIO
还提供了 MappedByteBuffer
,可以让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由 NIO
来完成。
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;/*** MappedByteBuffer 可让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次*/
public class MappedByteBufferTest {public static void main(String[] args) throws IOException {modifyFile("C:\\tmp\\nio\\1.txt");}// 直接在内存中修改文件public static void modifyFile(String filePath) throws IOException {RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw");//获取对应的通道FileChannel channel = randomAccessFile.getChannel();/*** FileChannel.MapMode.READ_WRITE表示映射后的内存可以读取和写入。* 0表示从文件的起始位置开始映射。* 5表示映射到内存的大小,即将文件的前5个字节映射到内存中。这意味着我们可以在内存中修改这5个字节的内容。*/MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);mappedByteBuffer.put(0, (byte) 'H');mappedByteBuffer.put(3, (byte) '9');// 这段代码中,尝试将第6个字节的内容修改为字符 'Y',但是我们之前只映射了5个字节的数据,所以会抛出越界异常
// mappedByteBuffer.put(5, (byte) 'Y');randomAccessFile.close();System.out.println("修改成功");}
}
前面的读写操作案例,都是通过一个 Buffer
完成的,NIO
还支持通过多个 Buffer
(即 Buffer
数组)完成读写操作,即 Scattering
和 Gathering
Selector 选择器
Selector
(选择器)是Java NIO中的一个关键类,用于实现非阻塞I/O操作。它允许一个线程同时管理多个通道(Channel),从而能够高效地处理多个客户端连接和请求。以下是关于Selector
的总结和要点:
基本介绍:
Selector
是Java NIO中的核心组件,用于实现非阻塞I/O。- 它能够监测多个注册的通道上是否有事件发生,并响应这些事件。
- 多个通道可以注册到同一个
Selector
中,实现多路复用,从而提高系统的并发性能。 - 使用
Selector
可以避免为每个连接创建一个线程,减少系统开销和线程切换的开销。
Selector的特点和用途:
Selector
允许单线程管理多个通道,即管理多个连接和请求。- 通过
Selector
,线程可以在某个通道上进行读写操作,如果没有数据可用,则线程可以在其他通道上执行IO操作,从而充分利用非阻塞I/O的特性。 - 非阻塞I/O避免了频繁的I/O阻塞,提高了线程的运行效率。
- 一个I/O线程可以并发处理多个客户端连接和读写操作,解决了传统同步阻塞I/O模型中一连接一线程的性能问题。
Selector的相关方法:
selector.select()
:阻塞方法,等待至少一个通道准备就绪,然后返回就绪通道的数量。selector.select(timeout)
:阻塞方法,等待一段时间(timeout毫秒)或至少一个通道准备就绪,然后返回就绪通道的数量。selector.wakeup()
:唤醒阻塞在select()
方法上的线程,用于强制使select()
方法立即返回。selector.selectNow()
:非阻塞方法,立即返回,不等待任何通道就绪。
总的来说,Selector
是Java NIO中非常重要的组件,用于实现高性能的非阻塞I/O操作。它可以让一个线程管理多个通道,处理多个客户端连接,提高系统的并发性能和可扩展性。Selector
的使用有助于避免多线程之间的上下文切换开销,提高系统的响应速度。
NIO 网络客户端/服务端案例
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.util.Set;public class NIOServer {public static void main(String[] args) throws IOException {// 创建ServerSocketChannel 用于监听客户端连接的通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(7777));serverSocketChannel.configureBlocking(false);// 创建SelectorSelector selector = Selector.open();// 将ServerSocketChannel注册到Selector,关注OP_ACCEPT事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 监听事件int selectCount = selector.select();if (selectCount == 0) {continue;}// 获取触发事件的SelectionKeySet<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();if (key.isAcceptable()) {// 处理连接请求SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);} else if (key.isReadable()) {// 处理读事件SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int bytesRead = socketChannel.read(buffer);if (bytesRead == -1) {socketChannel.close();key.cancel();} else if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);System.out.println("Received: " + new String(data));}}keyIterator.remove();}}}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;public class NIOClient {public static void main(String[] args) throws IOException, InterruptedException {for (int i = 0; i < 5; i++) {int tmpI = i;new Thread(() -> {try {sendMsg("hello" + tmpI);} catch (IOException e) {throw new RuntimeException(e);}}).start();}}public static void sendMsg(String msg) throws IOException {// 创建SocketChannelSocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);// 连接服务器if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 7777))) {while (!socketChannel.finishConnect()) {System.out.println("连接服务器需要时间,可以做其他工作...");}}// 发送数据给服务器ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());socketChannel.write(buffer);// 接收服务器的响应ByteBuffer responseBuffer = ByteBuffer.allocate(1024);int bytesRead = socketChannel.read(responseBuffer);if (bytesRead > 0) {responseBuffer.flip();byte[] responseData = new byte[responseBuffer.remaining()];responseBuffer.get(responseData);System.out.println("Server Response: " + new String(responseData));}// 关闭SocketChannelsocketChannel.close();}
}
NIO 群聊系统
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;public class GroupChatServer {private Selector selector;private ServerSocketChannel listenChannel;private static final int PORT = 6667;public GroupChatServer() {try {// 创建Selectorselector = Selector.open();// 创建ServerSocketChannellistenChannel = ServerSocketChannel.open();// 绑定服务器端口listenChannel.socket().bind(new InetSocketAddress(PORT));// 设置为非阻塞模式listenChannel.configureBlocking(false);// 注册ServerSocketChannel到Selector,关注连接事件listenChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void listen() {try {while (true) {// 监听事件int count = selector.select(2000);if (count > 0) {// 获取触发事件的SelectionKey集合Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isAcceptable()) {// 处理连接请求SocketChannel clientChannel = listenChannel.accept();clientChannel.configureBlocking(false);clientChannel.register(selector, SelectionKey.OP_READ);System.out.println(clientChannel.getRemoteAddress() + " 上线");}if (key.isReadable()) {// 处理读事件readData(key);}iterator.remove();}}}} catch (IOException e) {e.printStackTrace();} finally {// 处理异常后的操作}}private void readData(SelectionKey key) {SocketChannel channel = null;try {channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int count = channel.read(buffer);if (count > 0) {String message = new String(buffer.array());System.out.println("收到来自 " + channel.getRemoteAddress() + " 的消息: " + message);sendToOtherClients(message, channel);}} catch (IOException e) {try {System.out.println(channel.getRemoteAddress() + " 离线");key.cancel();channel.close();} catch (IOException ex) {ex.printStackTrace();}}}private void sendToOtherClients(String message, SocketChannel selfChannel) throws IOException {System.out.println("服务器转发消息中...");for (SelectionKey key : selector.keys()) {Channel targetChannel = key.channel();if (targetChannel instanceof SocketChannel && targetChannel != selfChannel) {SocketChannel dest = (SocketChannel) targetChannel;ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());dest.write(buffer);}}}public static void main(String[] args) {GroupChatServer server = new GroupChatServer();server.listen();}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;public class GroupChatClient {private final String HOST = "127.0.0.1";private final int PORT = 6667;private Selector selector;private SocketChannel socketChannel;private String username;public GroupChatClient() {try {// 创建Selectorselector = Selector.open();// 创建SocketChannel并连接服务器socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));// 设置为非阻塞模式socketChannel.configureBlocking(false);// 注册SocketChannel到Selector,关注读事件socketChannel.register(selector, SelectionKey.OP_READ);// 获取本地Socket地址作为用户名username = socketChannel.getLocalAddress().toString().substring(1);System.out.println(username + " is online.");} catch (IOException e) {e.printStackTrace();}}public void sendInfo(String info) {info = username + " 说: " + info;try {// 发送消息给服务器socketChannel.write(ByteBuffer.wrap(info.getBytes()));} catch (IOException e) {e.printStackTrace();}}public void readInfo() {try {int count = selector.select();if (count > 0) {Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int len = channel.read(buffer);if (len > 0) {System.out.println(new String(buffer.array(), 0, len));}}iterator.remove();}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {GroupChatClient chatClient = new GroupChatClient();// 启动一个线程来接收服务器和其他客户端的消息new Thread(() -> {while (true) {chatClient.readInfo();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();// 主线程用于发送消息到服务器while (true) {// 从控制台读取用户输入java.util.Scanner scanner = new java.util.Scanner(System.in);String message = scanner.nextLine();chatClient.sendInfo(message);}}
}
NIO 结合零拷贝
NIO(New I/O)和零拷贝(Zero-Copy)是两个与I/O操作相关的概念,它们可以在高性能的数据传输和处理中发挥重要作用。
零拷贝(Zero-Copy):
零拷贝是一种技术,旨在在数据传输和处理过程中尽量减少或消除数据的拷贝操作。它通过直接将数据从一个位置传输到另一个位置,而不需要在中间进行复制。零拷贝通常与操作系统和硬件的支持相关,以提供最佳性能。
主要特点和优点:
- 减少数据拷贝:零拷贝技术避免了数据在内存之间的复制,从而减少了CPU和内存带宽的开销。
- 提高性能:通过减少数据拷贝和内存复制,零拷贝可以提高数据传输和处理的性能。
- 节省资源:减少了不必要的CPU和内存资源消耗,特别适用于大规模的数据传输和处理任务。
零拷贝通常与操作系统的底层API和硬件的支持密切相关,例如使用mmap
映射文件、sendfile
系统调用等。
在实际应用中,NIO和零拷贝可以结合使用,以提高网络应用程序的性能。例如,你可以使用NIO来管理多个网络连接,并且使用零拷贝来传输大文件或数据块,以避免不必要的数据复制和传输开销。这两个概念在高性能和高吞吐量的应用程序中都非常有用。
案例
服务端
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;public class NIOServer {public static void main(String[] args) throws Exception {InetSocketAddress address = new InetSocketAddress(7001);ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();ServerSocket serverSocket = serverSocketChannel.socket();serverSocket.bind(address);// 创建bufferByteBuffer byteBuffer = ByteBuffer.allocate(4096);while (true) {SocketChannel socketChannel = serverSocketChannel.accept();int readcount = 0;while (-1 != readcount) {try {readcount = socketChannel.read(byteBuffer);} catch (Exception ex) {// ex.printStackTrace();break;}//byteBuffer.rewind(); // 倒带 position = 0 mark 作废}if (readcount > 0) {System.out.println("收到数据长度:" + readcount);}}}
}
客户端
package com.zxbd.project.test;import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;public class NIOClient {public static void main(String[] args) throws Exception {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("localhost", 7001));String filename = "C:\\Users\\22720\\Downloads\\历史数据.xls";// 得到一个文件channelFileChannel fileChannel = new FileInputStream(filename).getChannel();// 准备发送long startTime = System.currentTimeMillis();// 在Linux下一个transferTo方法就可以完成传输// 在Windows下一次调用transferTo只能发送8m,就需要分段传输文件// transferTo底层使用到零拷贝long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);System.out.println("发送的总的字节数 = " + transferCount + " 耗时: " + (System.currentTimeMillis() - startTime));// 关闭fileChannel.close();}
}
AIO
Java AIO(Asynchronous I/O,异步 I/O)是一种在 Java 中进行非阻塞 I/O 操作的方式,用于处理大量的并发连接或文件操作。与传统的同步 I/O 不同,它允许应用程序在等待 I/O 操作完成时执行其他任务,而不会阻塞整个线程或进程。
以下是 Java AIO 的一些基本介绍:
-
异步操作:Java AIO 提供了异步操作的支持,允许应用程序发起 I/O 请求后继续执行其他任务,而不必等待 I/O 操作完成。当 I/O 操作完成时,系统会通知应用程序。
-
NIO(New I/O)基础:Java AIO 构建在 Java NIO 的基础之上。它使用了通道(Channel)和缓冲区(Buffer)的概念,允许应用程序将数据从通道读取到缓冲区,或将数据从缓冲区写入通道。
-
异步事件驱动:Java AIO 采用了事件驱动的方式来处理异步 I/O 操作。应用程序需要注册事件监听器,以便在 I/O 操作完成时接收通知。这种方式可以有效地管理大量的并发连接。
-
高性能:由于异步操作的特性,Java AIO 可以实现高性能的 I/O 处理,特别适用于处理大量并发连接或需要高吞吐量的应用场景,如网络服务器、文件传输等。
-
复杂性:相对于传统的同步 I/O,Java AIO 的编程模型可能更复杂一些,因为它涉及到事件监听、回调函数等概念。但一旦习惯了这种模型,它可以提供更好的性能和可伸缩性。
-
主要类和接口:Java AIO 的主要类和接口包括
AsynchronousSocketChannel
、AsynchronousServerSocketChannel
、AsynchronousFileChannel
等,以及与异步事件处理相关的回调接口。
示例代码如下所示:
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;public class AIOExample {public static void main(String[] args) throws Exception {// 打开异步文件通道AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("C:\\tmp\\nio\\1.txt"),StandardOpenOption.READ);// 创建读取操作的缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);// 发起异步读取操作fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer attachment) {System.out.println("读取完成,读取字节数:" + result);// 处理读取的数据attachment.flip();while (attachment.hasRemaining()) {System.out.print((char) attachment.get());}System.out.println();// 关闭文件通道try {fileChannel.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {System.err.println("读取失败:" + exc.getMessage());// 关闭文件通道try {fileChannel.close();} catch (Exception e) {e.printStackTrace();}}});System.out.println("主线程111");}
}
上面的示例演示了如何使用 Java AIO 进行异步文件读取操作。当读取操作完成时,completed
方法将被调用,而当操作失败时,failed
方法将被调用。这种异步编程模型可以用于处理各种类型的异步 I/O 操作。