Java中的IO、NIO、AIO:
BIO:在Java1.4之前,我们建立网络连接均使用BIO,属于同步阻塞IO。默认情况下,当有一条请求接入就有一条线程专门接待。所以,在客户端向服务端请求时,会询问是否有空闲线程进行接待,如若没有则一直等待或拒接。当并发量小时还可以接受,当请求量一多起来则会有许多线程生成,在Java中,多线程的上下文切换会消耗计算机有限的资源和性能,造成资源浪费。
NIO:NIO的出现是为了解决再BIO下的大并发量问题。其特点是能用一条线程管理所有连接。如下图所示:
NIO是面向缓冲流的,即数据写入写出都是通过 Channel —— Buffer 这一途径。(双向流通)
AIO:与之前两个IO模型不同的是,AIO属于异步非阻塞模型。当进行读写操作时只须调用api的read方法和write方法,这两种方法均是异步。对于读方法来说,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。换言之就是当调用完api后,操作系统完成后会调用回调函数。
总结:一般IO分为同步阻塞模型(BIO),同步非阻塞模型(NIO),异步阻塞模型,异步非阻塞模型(AIO)
同步阻塞模型指的是当调用io操作时必须等到其io操作结束
同步非阻塞模型指当调用io操作时不必等待可以继续干其他事,但必须不断询问io操作是否完成。
异步阻塞模型指应用调用io操作后,由操作系统完成io操作,但应用必须等待或去询问操作系统是否完成。
异步非阻塞指应用调用io操作后,由操作系统完成io操作并调用回调函数,应用完成放手不管。
NIO的小Demo之服务端
首先,先看下服务端的大体代码
public class ServerHandle implements Runnable{//带参数构造函数public ServerHandle(int port){}//停止方法public void shop(){}//写方法private void write(SocketChannel socketChannel, String response)throws IOException{}//当有连接进来时的处理方法private void handleInput(SelectionKey key) throws IOException{} //服务端运行主体方法@Overridepublic void run() {}
}
复制代码
首先我们先看看该服务端的构造函数的实现:
public ServerHandle(int port){try {//创建选择器selector = Selector.open();//打开监听通道serverSocketChannel = ServerSocketChannel.open();//设置为非阻塞模式serverSocketChannel.configureBlocking(false);//传入端口,并设定连接队列最大为1024serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//监听客户端请求serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//标记启动标志started = true;System.out.println("服务器已启动,端口号为:" + port);} catch (IOException e){e.printStackTrace();System.exit(1);}}
复制代码
在这里创建了选择器和监听通道,并将该监听通道注册到选择器上并选择其感兴趣的事件(accept)。后续其他接入的连接都将通过该 监听通道 传入。
然后就是写方法的实现:
private void doWrite(SocketChannel channel, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);wirteBuffer.put(bytes);//将写模式改为读模式wirteBuffer.flip();//写入管道channel.write(wirteBuffer);}
复制代码
其次是当由事件传入时,即对连接进来的链接的处理方法
private void handleInput(SelectionKey key) throws IOException{//当该键可用时if (key.isValid()){if (key.isAcceptable()){//返回该密钥创建的通道。ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();通过该通道获取链接进来的通道SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);}if (key.isReadable()){//返回该密钥创建的通道。SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(byteBuffer);if (readBytes > 0){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String expression = new String(bytes, "UTF-8");System.out.println("服务器收到的信息:" + expression);//此处是为了区别打印在工作台上的数据是由客户端产生还是服务端产生doWrite(socketChannel, "+++++" + expression + "+++++");} else if(readBytes == 0){//无数据,忽略}else if (readBytes < 0){//资源关闭key.cancel();socketChannel.close();}}}}
复制代码
这里要说明的是,只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生。 如在构造方法中有一通道serverSocketChannel注册了accept事件。当其就绪时就可以通过调用selector的selectorKeys()方法,访问”已选择键集“中的就绪通道。
压轴方法:
@Overridepublic void run() {//循环遍历while (started) {try {//当没有就绪事件时阻塞selector.select();//返回就绪通道的键Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();SelectionKey key;while (iterator.hasNext()){key = iterator.next();//获取后必须移除,否则会陷入死循环iterator.remove();try {//对就绪通道的处理方法,上述有描述handleInput(key);} catch (Exception e){if (key != null){key.cancel();if (key.channel() != null) {key.channel().close();}}}}}catch (Throwable throwable){throwable.printStackTrace();}}}
复制代码
此方法为服务端的主体方法。大致流程如下:
- 打开ServerSocketChannel,监听客户端连接
- 绑定监听端口,设置连接为非阻塞模式(阻塞模式下不能注册到选择器)
- 创建Reactor线程,创建选择器并启动线程
- 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
- Selector轮询准备就绪的key
- Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
- 设置客户端链路为非阻塞模式
- 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息 异步读取客户端消息到缓冲区
- 调用write将消息异步发送给客户端
NIO的小Demo之客户端
public class ClientHandle implements Runnable{//构造函数,构造时顺便绑定public ClientHandle(String ip, int port){}//处理就绪通道private void handleInput(SelectionKey key) throws IOException{}//写方法(与服务端的写方法一致)private void doWrite(SocketChannel channel,String request) throws IOException{}//连接到服务端private void doConnect() throws IOException{}//发送信息public void sendMsg(String msg) throws Exception{}
}
复制代码
首先先看构造函数的实现:
public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//创建选择器selector = Selector.open();//打开监听通道socketChannel = SocketChannel.open();//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式socketChannel.configureBlocking(false);started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}}
复制代码
接下来看对就绪通道的处理办法:
private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){//这里的作用将在后面的代码(doConnect方法)说明if(sc.finishConnect()){System.out.println("已连接事件");}else{System.exit(1);}}//读消息if(key.isReadable()){//创建ByteBuffer,并开辟一个1k的缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);//读取请求码流,返回读取到的字节数int readBytes = sc.read(buffer);//读取到字节,对字节进行编解码if(readBytes>0){buffer.flip();//根据缓冲区可读字节数创建字节数组byte[] bytes = new byte[buffer.remaining()];//将缓冲区可读字节数组复制到新建的数组中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客户端收到消息:" + result);}lse if(readBytes==0){//忽略}else if(readBytes<0){//链路已经关闭,释放资源key.cancel();sc.close();}}}}
复制代码
在run方法之前需先看下此方法的实现:
private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))){System.out.println("connect");}else {socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("register");}}
复制代码
当SocketChannel工作于非阻塞模式下时,调用connect()时会立即返回: 如果连接建立成功则返回的是true(比如连接localhost时,能立即建立起连接),否则返回false。
在非阻塞模式下,返回false后,必须要在随后的某个地方调用finishConnect()方法完成连接。 当SocketChannel处于阻塞模式下时,调用connect()时会进入阻塞,直至连接建立成功或者发生IO错误时,才从阻塞状态中退出。
所以该代码在connect服务端后返回false(但还是有作用的),并在else语句将该通道注册在选择器上并选择connect事件。
客户端的run方法:
@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循环遍历selectorwhile(started){try{selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key ;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector关闭后会自动释放里面管理的资源if(selector != null){try{selector.close();}catch (Exception e) {e.printStackTrace();}}}
复制代码
发送信息到服务端的方法:
public void sendMsg(String msg) throws Exception{//覆盖其之前感兴趣的事件(connect),将其更改为OP_READsocketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}
复制代码
完整代码:
服务端:
/*** Created by innoyiya on 2018/8/20.*/
public class Service {private static int DEFAULT_POST = 12345;private static ServerHandle serverHandle;public static void start(){start(DEFAULT_POST);}public static synchronized void start(int post) {if (serverHandle != null){serverHandle.shop();}serverHandle = new ServerHandle(post);new Thread(serverHandle,"server").start();}
}
复制代码
服务端主体:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** Created by innoyiya on 2018/8/20.*/
public class ServerHandle implements Runnable{private Selector selector;private ServerSocketChannel serverSocketChannel;private volatile boolean started;public ServerHandle(int port){try {//创建选择器selector = Selector.open();//打开监听通道serverSocketChannel = ServerSocketChannel.open();//设置为非阻塞模式serverSocketChannel.configureBlocking(false);//判定端口,并设定连接队列最大为1024serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//监听客户端请求serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//标记启动标志started = true;System.out.println("服务器已启动,端口号为:" + port);} catch (IOException e){e.printStackTrace();System.exit(1);}}public void shop(){started = false;}private void doWrite(SocketChannel channel, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);wirteBuffer.put(bytes);wirteBuffer.flip();channel.write(wirteBuffer);}private void handleInput(SelectionKey key) throws IOException{if (key.isValid()){if (key.isAcceptable()){ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);}if (key.isReadable()){SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(byteBuffer);if (readBytes > 0){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String expression = new String(bytes, "UTF-8");System.out.println("服务器收到的信息:" + expression);doWrite(socketChannel, "+++++" + expression + "+++++");} else if (readBytes < 0){key.cancel();socketChannel.close();}}}}@Overridepublic void run() {//循环遍历while (started) {try {selector.select();//System.out.println(selector.select());Set<SelectionKey> keys = selector.selectedKeys();//System.out.println(keys.size());Iterator<SelectionKey> iterator = keys.iterator();SelectionKey key;while (iterator.hasNext()){key = iterator.next();iterator.remove();try {handleInput(key);} catch (Exception e){if (key != null){key.cancel();if (key.channel() != null) {key.channel().close();}}}}}catch (Throwable throwable){throwable.printStackTrace();}}}
}
复制代码
客户端:
/*** Created by innoyiya on 2018/8/20.*/
public class Client {private static String DEFAULT_HOST = "localhost";private static int DEFAULT_PORT = 12345;private static ClientHandle clientHandle;private static final String EXIT = "exit";public static void start() {start(DEFAULT_HOST, DEFAULT_PORT);}public static synchronized void start(String ip, int port) {if (clientHandle != null){clientHandle.stop();}clientHandle = new ClientHandle(ip, port);new Thread(clientHandle, "Server").start();}//向服务器发送消息public static boolean sendMsg(String msg) throws Exception {if (msg.equals(EXIT)){return false;}clientHandle.sendMsg(msg);return true;}}
复制代码
客户端主体代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** Created by innoyiya on 2018/8/20.*/public class ClientHandle implements Runnable{private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean started;public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//创建选择器selector = Selector.open();//打开监听通道socketChannel = SocketChannel.open();//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式socketChannel.configureBlocking(false);started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stop(){started = false;}private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){if(sc.finishConnect()){System.out.println("已连接事件");}else{System.exit(1);}}//读消息if(key.isReadable()){//创建ByteBuffer,并开辟一个1M的缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);//读取请求码流,返回读取到的字节数int readBytes = sc.read(buffer);//读取到字节,对字节进行编解码if(readBytes>0){//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作buffer.flip();//根据缓冲区可读字节数创建字节数组byte[] bytes = new byte[buffer.remaining()];//将缓冲区可读字节数组复制到新建的数组中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客户端收到消息:" + result);} else if(readBytes<0){key.cancel();sc.close();}}}}//异步发送消息private void doWrite(SocketChannel channel,String request) throws IOException{byte[] bytes = request.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);//flip操作writeBuffer.flip();//发送缓冲区的字节数组channel.write(writeBuffer);}private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))){System.out.println("connect");}else {socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("register");}}public void sendMsg(String msg) throws Exception{//覆盖其之前感兴趣的事件,将其更改为OP_READsocketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循环遍历selectorwhile(started){try{selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key ;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector关闭后会自动释放里面管理的资源if(selector != null){try{selector.close();}catch (Exception e) {e.printStackTrace();}}}
}
复制代码
测试类:
import java.util.Scanner;/*** Created by innoyiya on 2018/8/20.*/
public class Test {public static void main(String[] args) throws Exception {Service.start();Thread.sleep(1000);Client.start();while(Client.sendMsg(new Scanner(System.in).nextLine()));}
}
复制代码
控制台打印:
服务器已启动,端口号为:12345
register
已连接事件
1234
服务器收到的信息:1234
客户端收到消息:+++++1234+++++
5678
服务器收到的信息:5678
客户端收到消息:+++++5678+++++
复制代码
如有不妥之处,请告诉我。