1 选择器与注册
选择器是什么?选择器和通道关系是什么?
简单的说,选择器的使用是完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监听多个通道的IO(输入输出)状况。选择器和通道的关系是监控和被监控的关系。
选择器提供了独特的API方法,能够选出(select)所监控的通道已经发生了哪些IO事件,包括读写就绪的IO操作事件。
在NIO编程中,一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以,通过选择器,一个单线程可以处理成千上万甚至更多的通道。在极端情况狂(数万个连接),只用一个线程就可以处理所有的通道,这样会大量减少线程之间上下文切换的开销。
通道和选择器之间的关联通过register(注册)的方式完成。调用通道的register(Selector sel,int ops)方法,可以将通道实例注册到一个选择器中。register方法有两个参数:第一个参数指定通道注册到的选择器实例;第二个参数指定选择器要监控的IO事件类型。
可供选择监控的通道IO事件类型包括以下四种:
(1) 可读:SelectionKey.OP_READ。
(2) 可写:SelectionKey.OP_WRITE。
(3) 连接:SelectionKey.OP_CONNECT。
(4) 接收:SelectionKey.OP_ACCEPT。
以上事件类型常量定义在SelectionKey类中。如果选择器要监听通道的多种事件,可以用"按位或"运算符来实现。例如,同时监听可读和可写IO事件:
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE
什么是IO事件?
这里的IO事件不是对通道的IO操作,而是通道处于某个IO操作的就绪状态,表示通道具备执行某个IO操作的条件。例如,某个SocketChannel传输通道如果完成了和对端的三次握手,就会发生“链接就绪”事件;某个ServerSocketChannel服务器连接监听通道,在监听到一个新连接到来时,则会发生“接受就绪”事件等。
【说明】socket连接事件的核心原理和TCP连接的建立过程有关。
2 SelectableChannel
并不是所有的通道都是可以被选择器监控或选择的。例如,FileChannel就不能被选择器复用。判读一个通道能否被选择器监控或选择有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道),如果是,就可以被选择,否则不能被选择。
SelectableChannel类,提供了实现通道可选择性所需要的公共方法。Java NIO中所有网路连接socket通道都继承了SelectableChannel类,都是可选择的。
3 SelectionKey
通道和选择器的监控关系注册成功后,就可以选择就绪事件,具体的选择工作可调用Selector的select()方法来完成。通过select()方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的那些感兴趣的IO事件。换句话说,一旦在通道中发生了某些IO事件,并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey(选择键)的集合中。
SelectionKey是什么呢?简单的说,SelectionKey就是那些被选择器选中的IO事件。前面讲到,一个IO事件发生(就绪装填达成)后,如果之前在选择器中注册过,就会被选择器选中,并放入SelectionKey中;如果之前没有注册过,那么即使发生了IO事件,也不会被选择器选中。SelectionKey和IO的关系可以简单地理解为SelectionKey就是被选中了的IO事件。
在实际编程时,SelectionKey的功能是很强大的。通过SelectionKey,不仅可以获得通道的IO事件类型(比如SelectionKey.OP_READ),还可以获得选择器实例。
4 选择器使用流程
选择器的使用主要有以下三步:
(1)获取选择器实例。选择器实例是通过调用静态工厂方法open()来获取的。具体如下:
Selector selector = Selector.open();
Selector的类方法open()的内部是向选择器SPI发出请求,通过默认的SelectorProvider(选择器提供者)对象获取一个新的选择器实例。Java中的SPI(服务提供者接口)是一种可以扩展的服务提供和发现机制。Java通过SPI的方式提供选择器的默认实现版本。也就是说,其他的服务提供者可以通过SPI的方式提供定制化版本的选择器的动态替换或者扩展。
(2)将通道注册到选择器实例。要实现选择器管理通道,需要将通道注册到相应的选择器上,简单的示例代码如下:
//1.获取Selector选择器Selector selector = Selector.open();//2.获取通道ServerSocketChannel serverChannel = ServerSocketChannel.open();ServerSocket serverSocket = serverChannel.socket();//3.设置为非阻塞serverChannel.configureBlocking(false);//4.绑定连接InetSocketAddress address = new InetSocketAddress(8080);serverSocket.bind(address);//5.将通道注册到选择器上,并注册的IO事件为:"接收新连接"serverChannel.register(selector, SelectionKey.OP_ACCEPT);
上面通过调用通道的register()方法将 ServerSocketChannel 注册到一个选择器上。当然,在注册之前,需要准备好通道。
这里需要注意:注册到选择器的通道必须处于非阻塞模式下,否则将抛出异常。这意味着,FileChannel不能与选择器一起使用,因为FileChannel只有阻塞模式,不能切换到非阻塞模式;而socket相关的所有通道都可以。其次,一个通道并一定支持所有的四种IO事件。例如,服务器监听通道ServerSocketChannel 仅支持Accept(接收到新连接)IO事件,而传输通道SocketChannel则不同,它不支持Accept类型的IO时间。
如何判断通道支持哪些事件呢?可以在注册之前通过通道的validOps()方法来获取该通道支持的所有IO事件集合。
(3)选出感兴趣的IO就绪时间(选择键集合)。通过Selector的select()方法,选出已经注册的、已经就绪的IO事件,并且保存到SelectionKey集合中。SelectionKey集合保存在选择器实例内部,其元素为SelectionKey类型实例。调用选择器的selectionKeys()方法,可以取得选择键集合。
接下来,迭代集合的每一个选择键,根据具体IO事件类型执行对应的业务操作。大致的处理流程如下:
public void test() throws IOException{//1.获取Selector选择器Selector selector = Selector.open();//2.获取通道ServerSocketChannel serverChannel = ServerSocketChannel.open();ServerSocket serverSocket = serverChannel.socket();//3.设置为非阻塞serverChannel.configureBlocking(false);//4.绑定连接InetSocketAddress address = new InetSocketAddress(8080);serverSocket.bind(address);//5.将通道注册到选择器上,并注册的IO事件为:"接收新连接"serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("serverChannel 在监听");//6.轮询感兴趣的IO就绪事件(选择键集合)while(selector.select() > 0){if(null == selector.selectedKeys()) continue;//7.获取键集合Iterator<SelectionKey> it = selector.selectedKeys().iterator();while(it.hasNext()){//8.获取单个的选择键,并处理SelectionKey key = it.next();if(null == key) continue;//IO事件ServerSocketChannel 服务器监听通道有新连接if(key.isAcceptable()){//业务处理}//IO事件传输通道连接成功else if(key.isConnectable()){}//IO事件传输通道可读else if(key.isReadable()){}//IO事件传输通道可写else if(key.isWritable()){}//处理完成后,移除选择键it.remove();}}}
处理完成后,需要将选择键从SelectionKey集合中移除,以防止下一次循环时被重复处理。SelectionKey集合不能添加元素,则将抛出异常。
用于选择就绪的IO事件select()方法有多个重载的实现版本,具体如下:
1、select():阻塞调用,直到至少有一个通道发生了注册的IO事件。
2、select(long timeout):和select()一样,但最长阻塞时间为timeout指定毫秒数。
3、selectNow():非阻塞,不管有没有IO事件都会立刻返回。
select()方法的返回值是整数类型(int),表示发生了IO事件的数量,即从上一次select到这一次select之间有多少通道发生了IO事件,更加准备地说是发生了选择器感兴趣(注册过)的IO事件数。
5 使用NIO实现Discard服务器的实战案例
Discard服务器的功能很简单:仅读取客户端通道的输入数据,读取完成后直接关闭客户端通道,并且直接抛弃掉(Discard)读取到的数据。
public class NioDiscardServer {public static void startServer() throws IOException{//1.创建一个 Selector 选择器Selector selector = Selector.open();//2. 获取通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//3.设置为非阻塞serverSocketChannel.configureBlocking(false);//4.绑定监听端口serverSocketChannel.bind(new InetSocketAddress(8080));System.out.println("服务器启动成功");//5. 将通道注册到选择器上,并注册的IO事件为接收新连接serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//6. 轮询感兴趣的IO就绪时间(选择键集合)while(selector.select() > 0){//7. 获取选择键集合Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while(selectedKeys.hasNext()){//8. 获取单个的选择键,并处理SelectionKey selectionKey = selectedKeys.next();//9.判断key是具体的什么事件if(selectionKey.isAcceptable()){System.out.println("发生了 新连接到来事件 "+ selectionKey.channel());//10. 若选择键的IO事件是"连接就绪"事件,就获取客户端连接SocketChannel socketChannel = serverSocketChannel.accept();//11. 切换非阻塞模式socketChannel.configureBlocking(false);//12. 将该通道注册到selector选择器上SelectionKey channelSK = socketChannel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_CONNECT);}if(selectionKey.isWritable()){System.out.println("发生了写就绪事件 " + selectionKey.channel());}if(selectionKey.isConnectable()){System.out.println("发生了客户端 连接成功事件 " + selectionKey.channel());}if(selectionKey.isReadable()){System.out.println("发生了读 就绪事件 " + selectionKey.channel());//13. 若选择键的IO事件是“可读”事件,读取数据SocketChannel socketChannel = (SocketChannel) selectionKey.channel();//14. 读取数据ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int length = 0;while((length = socketChannel.read(byteBuffer)) > 0){byteBuffer.flip();Logger.info(new String(byteBuffer.array(), 0, length));byteBuffer.clear();}socketChannel.close();}//15.移除选择键selectedKeys.remove();}}//16.关闭连接serverSocketChannel.close();}public static void main(String[] args) throws IOException {startServer();}
}
5.1 代码分析
实现DiscardServer共分为16步,其中第7~15是循环执行的,不断查询,将感兴趣的IO事件选择到选择键集合中,然后通过.selector.selectedKeys()获取该选择键集合,并且进行迭代处理。在事件处理过程中,对于新建立的socketChannel客户端传输通道,也要注册到同一个选择器上,这样就能使用同一个选择线程不断地对所有的注册通道进行选择键的查询。
在DiscardServer程序中,涉及两次选择器注册:一次是注册serverChannel(服务器通道);另一次是注册接收的socketChannel客户端传输通道。serverChannel所注册的是新连接的IO事件SelectionKey.OP_ACCEPT,socketChannel所注册的是可读IO事件SelectionKey.OP_READ。
注册完成后,如果有事件发生,则DiscardServer在对选择键进行处理时先判断类型,然后进行相应的处理:
(1)如果是SelectionKey.OP_ACCEPT新连接事件类型,代表serverChannel接收到新的客户端连接,发生了新连接时间,则通过服务器通道的accept方法获取新的socketChannel传输通道,并且将新通道注册到选择器。
(2)如果是SelectionKey.OP_READ可读事件类型,代表某个客户端通道有数据可读,则读取选择键中socketChannel传输通道的数据,进行业务处理,这里是直接丢弃数据。