一,阻塞IO线程池模型(BIO)
这是传统的网络编程方案所采用的线程模型。
即有一个主循环,socket.accept阻塞等待,当建立连接后,创建新的线程/从线程池中取一个,把该socket连接交由新线程全权处理。
这种方案优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。
// 循环监听
while (true) {// 阻塞监听客户端请求client = server.accept();System.out.println(client.getRemoteSocketAddress() + "客户端连接成功!");// 将该客户端请求通过线程池放入HandlMsg线程中进行处理executorService.execute(new HandleMsg(client));
}
二,Reactor单线程模型
有了NIO后,可以采用IO多路复用机制了。
这是一个单Reactor单线程模型,时序图见下文,该方案只有一个线程,所有Channel的连接均注册在了该Reactor上,由一个线程全权负责所有的任务。
这种方案实现简单,且不受线程数的限制,但受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用,且适合于CPU资源紧张的应用上。
三,Reactor线程池模型
Reactor负责全部IO任务(包括每个Channel的连接和读写),线程池负责业务逻辑的处理。
虽然该方案可以充分利用CPU资源,但是这个方案比单线程版本多了进出Thread Pool的两次上下文切换。
四,主从Reactor模型(Netty的线程模型)
- MainReactor负责连接任务,SubReactor负责IO读写、业务计算。
- MainReactor和每个SubReactor都是单独的线程,可以调整SubReactor的数量适应CPU资源紧张的应用。
- 该方案有一个不太明显的缺点,即Session没有分优先级,所有Session平等对待均分到所有的线程中,这样可能会导致优先级低耗资源的Session堵塞高优先级的Session。( TODO 看下Netty的优化)
五,主从Reactor线程池模型
和主从Reactor模型相比, 只是把业务计算放到线程池里了,IO读写还是在SubReactor线程里。
该模型可以更为灵活的适应大多应用场景,通过:调整SubReactor数量、调整Thread Pool参数等。
注意:
- 如果将IO读写放到线程池里,可能会出现问题:SubReactor选中读就绪事件立马交给线程池,但线程还没来得及read,Channel由于仍然读就绪被select出来重复执行。
- 上图这样把Channel的读写放在SubReactor,那么此SubReactor上不同Channel的读写会阻塞,但可能效率很高也问题不大。
主从Reactor线程池模型代码示例(调试过了,注意细节见注释)
客户端
public class ReactorClient {public static void main(String[] args) throws IOException, InterruptedException {for (int i = 0; i < 4; i++) {new Thread(() -> {try {send();} catch (IOException e) {e.printStackTrace();}}).start();}}static void send() throws IOException {// 阻塞模式读写SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("127.0.0.1", 9090));ByteBuffer writeBuff = ByteBuffer.allocate(20);/*** 分配太小时,客户端表现:接收数据不完整,但正常退出;服务端表现:业务读写正常,但业务结束后会收到2次读就绪事件,一次读返-1,关闭channel,一次读就会报java.io.IOException: Connection reset by peer* TODO 研究下这个原理和如何分配大小*/ByteBuffer readBuff = ByteBuffer.allocate(2000);writeBuff.put(("i am client " + Thread.currentThread().getName()).getBytes());writeBuff.flip();new Thread(new Runnable() {@Override@SneakyThrowspublic void run() {socketChannel.write(writeBuff);System.out.println(Thread.currentThread().getName() + " 已发送数据,等待返回");readBuff.clear();// 阻塞等服务端消息socketChannel.read(readBuff);readBuff.flip();System.out.println(Thread.currentThread().getName() + " 接受服务端消息:" + new String(readBuff.array()));// 正常来讲应放入finallysocketChannel.close();}}).start();}
}
服务端
/*** 主从Reactor多线程模型*/
public class MainSubReactorMultiThread {private static final int SUB_COUNT = 4;public static void main(String[] args) {MainSubReactorMultiThread.MainReactor mainReactor = new MainSubReactorMultiThread.MainReactor(9090);mainReactor.run();}/*** 选择就绪的连接事件*/public static class MainReactor implements Runnable {ServerSocketChannel serverSocketChannel;Selector selector;public MainReactor(int port) {try {serverSocketChannel = ServerSocketChannel.open();selector = Selector.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));serverSocketChannel.configureBlocking(false);// 注册了连接事件SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 并且在selectionKey对象附加了一个Acceptor对象,这是用来处理连接请求的类selectionKey.attach(new MainSubReactorMultiThread.Acceptor(serverSocketChannel));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {while (true) {try {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 开始监听");selector.select();System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 监听到连接件");Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型dispatcher(selectionKey);iterator.remove();}} catch (IOException e) {e.printStackTrace();}}}private void dispatcher(SelectionKey selectionKey) {Runnable runnable = (Runnable) selectionKey.attachment();// 同线程执行runnable.run();}}/*** 选择就绪的读写事件*/public static class SubReactor implements Runnable {Selector subSelector;int index;public SubReactor(Selector subSelector, int index) {this.subSelector = subSelector;this.index = index;}@Overridepublic void run() {while (true) {try {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 开始监听");int selectNum = subSelector.select();if (selectNum != 0) {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 监听到就绪事件:" + JSON.toJSONString(subSelector.selectedKeys()));} else {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 未监听到事件,继续轮训");continue;}Set<SelectionKey> selectionKeys = subSelector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型dispatcher(selectionKey);iterator.remove();}} catch (IOException e) {e.printStackTrace();}}}@SneakyThrowsprivate void dispatcher(SelectionKey selectionKey) {while (true) {Runnable runnable = (Runnable) selectionKey.attachment();if (runnable != null) {// 同线程执行runnable.run();return;}System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", runnable对象未添加完成,等待10ms");Thread.sleep(10);}/*** 可能在Acceptor里刚注册channel到selector就被reactor选中执行了,这时注册channel的地方还没执行attach方法,runnable会报NPE,所以要判空*/
// Runnable runnable = (Runnable) selectionKey.attachment();
// runnable.run();}}/*** 处理连接*/public static class Acceptor implements Runnable {private static Selector[] subSelector = new Selector[SUB_COUNT];private ServerSocketChannel serverSocketChannel;/*** 单线程不会冲突*/private int index = -1;@SneakyThrowspublic Acceptor(ServerSocketChannel serverSocketChannel) {for (int i = 0; i < SUB_COUNT; i++) {subSelector[i] = Selector.open();SubReactor subReactor = new SubReactor(subSelector[i], i);new Thread(subReactor).start();}this.serverSocketChannel = serverSocketChannel;}@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);int ind = getNextIndex();/*** 本来以为没必要的,但如果不wakeup,会在下一步register阻塞!底层在等待synchronized同步锁* TODO 研究下原理*/subSelector[ind].wakeup();SelectionKey selectionKey = socketChannel.register(subSelector[ind], SelectionKey.OP_READ);selectionKey.attach(new MainSubReactorMultiThread.ThreadPollWorkHandler(socketChannel));System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "客户端已连接:" + socketChannel.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}}private int getNextIndex() {if (index++ == SUB_COUNT - 1) {index = 0;}return index;}}/*** 处理读写*/public static class ThreadPollWorkHandler implements Runnable {private static ExecutorService executorService = Executors.newCachedThreadPool();private SocketChannel socketChannel;public ThreadPollWorkHandler(SocketChannel socketChannel) {this.socketChannel = socketChannel;}@Overridepublic void run() {try {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "开始处理socket读");/*** 读数据*/ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readLength = socketChannel.read(byteBuffer);if (readLength == -1) {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "客户端已关闭,关闭此通道");socketChannel.close();return;}String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", socket读完成: " + message);/*** 线程池处理业务计算*/TaskHandler taskHandler = new TaskHandler(socketChannel, message);Future<String> taskResult = executorService.submit(taskHandler);/*** 写数据*/ByteBuffer writeBuffer = ByteBuffer.wrap((socketChannel.getRemoteAddress() + ":" + taskResult.get()).getBytes(StandardCharsets.UTF_8));socketChannel.write(writeBuffer);System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "已返回客户端数据,请求处理最终完成");} catch (Exception e) {e.printStackTrace();}}}static class TaskHandler implements Callable<String> {private SocketChannel socketChannel;private String parameter;public TaskHandler(SocketChannel socketChannel, String parameter) {this.socketChannel = socketChannel;this.parameter = parameter;}@Overridepublic String call() throws Exception {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 开始处理业务计算 参数: " + parameter);Thread.sleep(1000);String result = String.format("response(%s) for (%s)", RandomStringUtils.randomAlphanumeric(30), parameter).trim();System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 业务计算完成 返回: " + result);return result;}}
}