Reactor模式有点类似事件驱动模式。在事件驱动模式中,当有事件触发时,事件源会将事件分发到Handler(处理器),由Handler负责事件处理。Reactor模式中的反应器角色类似于事件驱动
模式中的事件分发器(Dispatcher)角色。
具体来说,在Reactor模式中有Reactor和Handler两个重要的组件:
(1)Reactor:负责查询IO事件,当检测到一个IO事件时将其发
送给相应的Handler处理器去处理。这里的IO事件就是NIO中选择器查
询出来的通道IO事件。
(2)Handler:与IO事件(或者选择键)绑定,负责IO事件的处
理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果
写到通道等。
完整代码:
package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;//反应器
class EchoServerReactor implements Runnable {Selector selector;ServerSocketChannel serverSocket;EchoServerReactor() throws IOException {//Reactor初始化selector = Selector.open();serverSocket = ServerSocketChannel.open();InetSocketAddress address =new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);//非阻塞serverSocket.configureBlocking(false);//分步处理,第一步,接收accept事件SelectionKey sk =serverSocket.register(selector,0,new AcceptorHandler());// SelectionKey.OP_ACCEPTserverSocket.socket().bind(address);Logger.info("服务端已经开始监听:"+address);sk.interestOps(SelectionKey.OP_ACCEPT);//attach callback object, AcceptorHandler//sk.attach(new AcceptorHandler());}public void run() {try {while (!Thread.interrupted()) {//io事件的查询// 限时阻塞查询selector.select(1000);Set<SelectionKey> selected = selector.selectedKeys();if (null == selected || selected.size() == 0) {continue;}Iterator<SelectionKey> it = selected.iterator();while (it.hasNext()) {//Reactor负责dispatch收到的事件SelectionKey sk = it.next();it.remove(); //避免下次重复处理dispatch(sk);}
// selected.clear();}} catch (IOException ex) {ex.printStackTrace();}}void dispatch(SelectionKey sk) {Runnable handler = (Runnable) sk.attachment();//调用之前attach绑定到选择键的handler处理器对象if (handler != null) {handler.run();}}// Handler:新连接处理器class AcceptorHandler implements Runnable {public void run() {try {SocketChannel channel = serverSocket.accept();Logger.info("接收到一个连接");if (channel != null)new EchoHandler(selector, channel);} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {new Thread(new EchoServerReactor()).start();}
}
结果如下: