NIO-Selector详解
Selector概述
Selector选择器,也可以称为多路复⽤器。它是Java NIO的核⼼组件之⼀,⽤于检查⼀个或多个Channel的状态是否处于可读、可写、可连接、可接收等。通过⼀个Selector选择器管理多个Channel,可以实现⼀个线程管理多个Channel对应的⽹络连接。使⽤单线程管理多个Channel可以避免多线程的线程上下⽂切换带来的额外开销。
SelectableChannel可选择通道
只有SelectableChannel才能被Selector管理,⽐如所有的Socket通道。⽽FileChannel并没有继承SelectableChannel,因此不能被Selector管理。
Channel注册到Selector上
Channel通过注册的⽅式关联Selector。⼀个Channel可以注册到多个Selector上,但在某⼀个Selector上只能注册⼀次。注册时需要告知Selector,Selector需要对通道的哪个操作感兴趣。
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException{return register(sel, ops, null); }
通道的操作如下:
-
可读:SelectionKey.OP_READ
-
可写:SelectionKey.OP_WRITE
-
可连接:SelectionKey.OP_CONNECT
-
可接收:SelectionKey.OP_ACCEPT
⽐如channel调⽤register⽅法进⾏注册到Selector,并告知Selector对哪个操作感兴趣:
channel.register(selector, SelectionKey.OP_READ);
也可以同时注册多个操作:
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
选择器会查询每个⼀个channel的操作事件,如果是该channel注册的操作已就绪,则进⾏响应。注意,这⾥channel的操作指的是channel完成某个操作的条件,表示该channel对于该操作已处于就绪状态。⽐如ServerSocketChannel已准备好接收新的连接,那么它注册的 SelectionKey.OP_ACCEPT
操作就处于就绪状态。⼜⽐如SocketChannel已准备好去连接Server服务器,那么它注册的SelectionKey.OP_CONNECT
操作就处于就绪状态。于是Selector就可以触发之后的动作。
SelectionKey选择键
SelectionKey封装了Channel和注册的操作。
当Selector调⽤select()⽅法时,会轮询所有注册在它身上的Channel,查看是否有处于某个操作(已注册到selector上的)就绪状态的Channel,把这些Channel放⼊到SelectionKey的集合中。
Selector的使用
-
创建Selector 通过Selector的open⽅法创建Selector对象。
// 创建Selector Selector selector = Selector.open();
-
Channel注册到Selector上 Channel必须处于非阻塞模式才能注册到Selector上
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; /*** @author zhupanlin* @version 1.0* @description: selector的使用:注册channel到selector上* @date 2024/1/26 11:02*/ public class Demo1 { public static void main(String[] args) throws IOException {// 1.创建SelectorSelector selector = Selector.open();// 2.获得ChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 3.设置成非阻塞的模式serverSocketChannel.configureBlocking(false);// 4.绑定端口serverSocketChannel.bind(new InetSocketAddress(9001));// 5.注册channel到selector上SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);}}
-
Selector轮询就绪状态的Channel
Selector通过调⽤select⽅法轮询已就绪的通道操作。select⽅法是阻塞的,直到⾄少有⼀个通道的注册操作已就绪。当完成select⽅法调⽤后,被选中的已就绪的所有channel通过Selector的selectedKeys()⽅法获得,该⽅法获得的是⼀个SelectionKey集合,其中每⼀个SelectionKey都表示⼀个Channel。于是可以根据SelectionKey的注册操作来做具体的业务处理。
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; /*** @author zhupanlin* @version 1.0* @description: Selector轮询就绪状态的Channel* @date 2024/1/26 11:10*/ public class Demo2 { public static void main(String[] args) throws IOException {Selector selector = Selector.open();// serverSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);// 绑定端口serverSocketChannel.bind(new InetSocketAddress(9001));// 注册SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// selector轮询while (true){// 阻塞等待某个操作就绪状态的channelselector.select();// 获得一个集合,里面包含了这次selector执行select方法获得的发生就绪状态的多个channelSet<SelectionKey> selectionKeys = selector.selectedKeys();// 遍历Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();if (key.isReadable()){// 处理读状态的业务}else if (key.isAcceptable()){// 处理接收状态的业务}else if (key.isConnectable()){// 处理连接状态的业务}else if (key.isWritable()){// 处理写状态的业务}// 保证下次channel有就绪状态的操作发生时可以被selector轮询到iterator.remove();}}}}
Selector示例
-
实现NIO通信的服务端
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; /*** @author zhupanlin* @version 1.0* @description: 服务端demo* @date 2024/1/26 11:45*/ public class ServerDemo { public static void main(String[] args) throws IOException {// 获得ChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 设置成非阻塞serverSocketChannel.configureBlocking(false);// 绑定端口号serverSocketChannel.bind(new InetSocketAddress(9001));// 获得SelectorSelector selector = Selector.open();// 把channel注册到selector上面serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 让selector轮询监听while (true){// 阻塞直到有通道就绪selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();// 获取有动作的selectionKey == channelIterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey selectionKey = iterator.next();handle(selectionKey);// 删除key,表示处理完成iterator.remove();}}} private static void handle(SelectionKey selectionKey) throws IOException {if (selectionKey.isAcceptable()){// 当服务端处于接收的就绪状态// 获得selectionKey中的channelServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();// 接收客户端连接,获得socketChannelSocketChannel socketChannel = serverSocketChannel.accept();// 设置成非阻塞状态,否则无法被selector复用socketChannel.configureBlocking(false);// 把socketChannel注册到selector上,让selector对socketChannel的read操作感兴趣socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);}else if (selectionKey.isReadable()){// 当socketChannel处于读数据的就绪状态SocketChannel socketChannel = (SocketChannel) selectionKey.channel();// 读取socketChannel中的数据//设置成非阻塞socketChannel.configureBlocking(false);// 创建BufferByteBuffer buffer = ByteBuffer.allocate(1024);// 读数据int len = 0;while ((len = socketChannel.read(buffer)) > 0){// 翻转buffer.flip();System.out.println(new String(buffer.array(), 0, len));// 清除buffer中的数据buffer.clear();}socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE); }else if (selectionKey.isWritable()){}} }
-
客户端
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /*** @author zhupanlin* @version 1.0* @description: 客户端demo* @date 2024/1/26 11:28*/ public class ClientDemo { public static void main(String[] args) throws IOException {// 创建ChannelSocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9001));// 设置成非阻塞模式socketChannel.configureBlocking(false);// 得到bufferByteBuffer buffer = ByteBuffer.allocate(1024);// 把数据写入到buffer中buffer.put("hello selector".getBytes());// 反转bufferbuffer.flip();// 把buffer中的数据写入到channel中socketChannel.write(buffer);// 关闭socketChannel.close();}}