nc localhost port
Selector
以Mac为例,初始化得到的Selector的实例为KQueueSelectorImpl。
public abstract class SelectorImpl extends AbstractSelector{// The set of keys with data ready for an operationprotected Set<SelectionKey> selectedKeys;// 注册在 Selector 上的全部channel:客户端 、服务端protected HashSet<SelectionKey> keys;// Public views of the key setsprivate Set<SelectionKey> publicKeys; // Immutableprivate Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not additionprotected SelectorImpl(SelectorProvider sp) {super(sp);keys = new HashSet<>();selectedKeys = new HashSet<>();publicKeys = Collections.unmodifiableSet(keys);publicSelectedKeys = Util.ungrowableSet(selectedKeys);}
}
如上伪代码所示:属性只存在 keys & publicKeys 、selectedKeys & publicSelectedKeys 两种类型。SelectionKey接口的唯一实现类为SelectionKeyImpl。
ServerSocketChannel 绑定端口& SocketChannel 建立连接 均选择在同一个Selector注册自身感兴趣的事件:最后都会被抽象化为 SelectionKeyImpl 类型【channel & Selector】。
public abstract class SelectorImpl{protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment){//SelectionKeyImpl 是抽象类SelectionKey的唯一实现类SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);k.attach(attachment);synchronized (publicKeys) {implRegister(k);}k.interestOps(ops);return k;}
}public class KQueueSelectorImpl extends SelectorImpl{private HashMap<Integer,MapEntry> fdMap;protected void implRegister(SelectionKeyImpl ski) {// 获取当前channel对应的文件描述符int fd = IOUtil.fdVal(ski.channel.getFD());fdMap.put(Integer.valueOf(fd), new MapEntry(ski));totalChannels++;keys.add(ski);}
}
ServerSocketChannel负责监听全部客户端连接;每个SocketChannel连接建立之后将自身注册至Selector之上。
属性fdMap是一个非常重要的变量,Selector监听到相关IO事件之后会根据文件描述符fd获取对应的SelectionKey,进而得到对应的channel。
selector.select:通常情况下如果SelectionKey注册的众多Channel尚未存在相关IO事件则一直阻塞等待。
class KQueueSelectorImpl extends SelectorImpl{protected int doSelect(long timeout){...begin();//Selector 阻塞之处,entries即代表当前 Selector 上存在IO事件的channel个数int entries = kqueueWrapper.poll(timeout);...return updateSelectedKeys(entries);}private int updateSelectedKeys(int entries){// 该变量是指最终返回的channel个数int numKeysUpdated = 0;boolean interrupted = false;updateCount++;for (int i = 0; i < entries; i++) {int nextFD = kqueueWrapper.getDescriptor(i);if (nextFD == fd0) {interrupted = true;} else {// 根据文件描述符获取与之对应的 SelectionKeyMapEntry me = fdMap.get(Integer.valueOf(nextFD));if (me != null) {// 从 kqueueWrapper 获取当前channel准备好的事件类型,Ready Opsint rOps = kqueueWrapper.getReventOps(i);SelectionKeyImpl ski = me.ski;if (selectedKeys.contains(ski)) {//如果selectedKeys集合没有显式移除,则该条件始终满足if (me.updateCount != updateCount) {// 设置当前channel中 ReadyOps,正常情况下其实就是 InterestOps 的值if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;me.updateCount = updateCount;}} else {ski.channel.translateAndUpdateReadyOps(rOps, ski);}} else {// 设置当前channel中 ReadyOps,正常情况下其实就是 InterestOps 的值ski.channel.translateAndSetReadyOps(rOps, ski);//正常情况下,当前channel的 readyOps ,interestOps 是一致的if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;// 表明当前key对应的channel是符合条件me.updateCount = updateCount;}}}}}...return numKeysUpdated;}
}
正常流程:
首先,ServerSocketChannel 在Selector注册 interestOps = 16事件,当然同时会被Selector的属性 keys & publicKeys 持有。
其次,SocketChannel发起连接后在ServerSocketChannel监听到其连接事件,将interestOps = 16事件类型重置到readyOps = 16,此时表明 ServerSocketChannel 存在真正的Accept事件,并且将ServerSocketChannel对应的SelectionKey 添加至Selector属性之selectedKeys & publicSelectedKeys 中。
然后,在ServerSocketChannel处理Accept事件中获取到客户端SocketChannel,同样在相同Selector上注册SocketChannel的interestOps = 1事件,同时会被Selector的属性 keys & publicKeys 持有,进一步remove即将集合selectedKeys & publicSelectedKeys元素之ServerSocketChannel对应的SelectionKey删除。
最后,客户端发送数据后Selector监听到集合keys & publicKeys众多channel中部分SocketChannel存在IO事件,此时就将SocketChannel之interestOps = 1重置到readyOps = 1,并且将SocketChannel对应的SelectionKey 添加至Selector属性之selectedKeys & publicSelectedKeys 中。
最终,遍历 publicSelectedKeys元素处理 SocketChannel 对应的 readyOps = 1 事件,最后remove即将集合 publicSelectedKeys 元素之SocketChannel对应的SelectionKey删除。
但是存在两个普遍的反馈:SelectionKey集合没有remove的后果 以及 select 始终返回0的问题。
结论:背后原因都是因为没有显式remove导致的。
需要知道的是translateAndSetReadyOps方法返回true的条件是:当前channel对应的interestOps & readyOps不一样。进而导致 局部变量 numKeysUpdated 始终为初始值0。
而此时 属性 selectedKeys 中元素SelectionKey对应的channel其 interestOps & readyOps 值是一样的。
截止当前,可能会有人疑问select方法时阻塞的啊,没有事件的前提下select为啥持续返回0呢?
for (;;){int count = selector.select();if (count == 0) {System.out.println("count = 0");continue;}遍历selectedKeys{... 没有显式remove}}
如上处理方式:在处理某个SocketChannel IO事件时,由于translateAndSetReadyOps方法返回false导致无法真正处理其IO事件,所以每次select都能再次接收到该channel的事件类型,导致不断循环处理该channel的无效事件。
for (;;){int count = selector.select();遍历selectedKeys{... 没有显式remove}}
如果是如上处理方式则可以接收ServerSocketChannel & SocketChannel 各类事件,但是 导致遍历selectedKeys集合中所有SelectionKey元素。出现问题包含:
- 任何一个SocketChannel IO 事件都会触发遍历selectedKeys集合中所有SelectionKey元素,其中就包括ServerSocketChannel的Accept事件,但是此时是无法从ServerSocketChannel中accept客户端SocketChannel。
- 除了当前SocketChannel,遍历其他SelectionKey对应的SocketChannel时从其缓存区得到的数据为空,但也会自动触发向客户端SocketChannel发送空字符串。
SelectionKey
SelectionKey:A token representing the registration of a SelectableChannel with a Selector。A selection key is created each time a channel is registered with a selector。
A selection key is created each time a channel is registered with a selector。 A key remains valid until it is cancelled by invoking its cancel method, by closing its channel, or by closing its selector。 Cancelling a key does not immediately remove it from its selector; it is instead added to the selector’s ancelled-key set for removal during the next selection operation。The validity of a key may be tested by invoking isValid method。
SelectionKey包含两种operation sets:interest set & ready set。
- Interest set:兴趣集,表示已注册的事件集合,下一次调用方法,将测试是否有此事件的加入。通过SelectionKey的 int interestOps() 方法,可以获取当前 SelectionKey的感兴趣事件。
- Ready set:准备集,表示已准备就绪的事件集合。通过SelectionKey的 int readyOps()方法,可以获取当前 SelectionKey的准备就绪事件。
Nio中 SocketChannel 如果设置为非阻塞方式,在其发起connect连接事件后返回的永远为false,必须显式调用finishConnect来判断连接是否成功建立。相反,如果 SocketChannel 设置为阻塞方式,则connect方法会根据连接情况返回true or false。
SocketChannel 执行connect事件之后,此时连接处于pending(待定)状态,只有显式调用finishConnect之后则连接更新为CONNECTED状态。否则出现异常NotYetConnectedException。
客户端关闭【异常、正常】(直接中断idea进程)后服务端同样会监听到该事件,并必须处理该事件,否则会导致服务器端一直不断收到该客户端的OP_READ事件【该事件如果没有处理则一直处理可读状态】,即selector.select()
会直接通过,并且是可读状态,但是实际上读到的数据是空的。
服务端处理方式:SelectionKey#cancel or SocketChannel#close。