分别用BIO、NIO实现客户端服务器通信
- BIO
- NIO
- NIO演示(无Selector)
- NIO演示(Selector)
前言:
Java I/O模型发展以及Netty网络模型的设计思想
BIO
Java BIO是Java平台上的BIO(Blocking I/O)模型,是Java中用于实现同步阻塞网络编程的一种方式。 在Java中,使用BIO模型需要通过Socket和ServerSocket类来完成网络连接和数据传输,但是由于BIO是同步阻塞的,所以会导致线程阻塞和资源浪费的问题。
因此,在高并发的网络编程场景中,通常会选择使用NIO(Non-blocking I/O)模型或者Netty等框架来实现。
服务端类代码
package com.yu.io.bio;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;public class BIOServer {public static void main(String[] args) throws IOException {ServerSocket serverSocket = new ServerSocket(8081);while (true) {System.out.println("wait connect...");Socket clientSocket = serverSocket.accept();System.out.println(clientSocket.getPort() + "connected");System.out.println("start read...");byte[] readArr = new byte[1024];int read = clientSocket.getInputStream().read(readArr);if (read != -1) {System.out.println("read info : " + new String(readArr,0,read));}System.out.println("end read...");byte[] resBytes = "server response".getBytes();clientSocket.getOutputStream().write(resBytes);System.out.println("response info : " + new String(readArr,0,read));clientSocket.getOutputStream().flush();}}
}
客户端类代码
package com.yu.io.bio;import java.io.IOException;
import java.net.Socket;public class BIOClient {public static void main(String[] args) throws IOException {Socket clientSocket = new Socket("127.0.0.1",8081);byte[] resBytes = "client response".getBytes();clientSocket.getOutputStream().write(resBytes);System.out.println("response info : " + new String(resBytes));clientSocket.getOutputStream().flush();byte[] readArr = new byte[1024];int read = clientSocket.getInputStream().read(readArr);if (read != -1) {System.out.println("read info : " + new String(readArr,0,read));}}
}
NIO
Java NIO 能够支持非阻塞网络编程,可以理解为new io 或者no blok io 我更喜欢称之为new io,因为他不仅仅实现了非阻塞的网络编程方式,同时也封装了常用的网络编程api,更重要的是引入了多路复用器Selector的概念
下面的代码只是展示NIO非阻塞的实现,并没有展示NIO的真正用法
NIO演示(无Selector)
服务端类代码
package com.yu.io.nio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;public class NIOServer {public static List<SocketChannel> socketChannelList = new ArrayList<>();public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(8081));serverSocketChannel.configureBlocking(false);System.out.println("server start...");while (true) {SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {System.out.println(socketChannel.socket().getPort() + " connected");socketChannel.configureBlocking(false);socketChannelList.add(socketChannel);}List<SocketChannel> rmChannelList = new ArrayList<>();for (SocketChannel channel : socketChannelList) {try {doChannel(rmChannelList, channel);} catch (IOException ioException) {//有客户端断开连接System.out.println(channel.socket().getPort() + " disconnected");channel.close();rmChannelList.add(channel);}}socketChannelList.removeAll(rmChannelList);}}private static void doChannel(List<SocketChannel> rmChannelList, SocketChannel channel) throws IOException {ByteBuffer readByteBuffer = ByteBuffer.allocate(2048);int read = channel.read(readByteBuffer);String readStr = new String(readByteBuffer.array());if (read > 0) {System.out.println(channel.socket().getPort() + " : " + readStr);if (readStr.contains("hello")) {ByteBuffer sendByteBuffer = ByteBuffer.wrap("hello! I am robot.".getBytes());channel.write(sendByteBuffer);System.out.println("me : " + new String(sendByteBuffer.array()));}if (readStr.contains("old")) {ByteBuffer sendByteBuffer = ByteBuffer.wrap("I am 1 years old.".getBytes());channel.write(sendByteBuffer);System.out.println("me : " + new String(sendByteBuffer.array()));}if (readStr.contains("bey")) {ByteBuffer sendByteBuffer = ByteBuffer.wrap("see you.".getBytes());channel.write(sendByteBuffer);System.out.println("me : " + new String(sendByteBuffer.array()));}}if (read == -1) {rmChannelList.add(channel);}}
}
客户端类代码
package com.yu.io.nio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;public class NIOClient {public static void main(String[] args) throws IOException {SocketChannel clientSocketChannel = SocketChannel.open();clientSocketChannel.connect(new InetSocketAddress("127.0.0.1",8081));clientSocketChannel.configureBlocking(false);String sendStr = "hello! I am client " + clientSocketChannel.socket().getPort() + ".";ByteBuffer sendByteBuffer = ByteBuffer.wrap(sendStr.getBytes());clientSocketChannel.write(sendByteBuffer);System.out.println("me : " + new String(sendByteBuffer.array()));int msgSize = 0;while (msgSize < 10) {ByteBuffer readByteBuffer = ByteBuffer.allocate(1024);int read = clientSocketChannel.read(readByteBuffer);String readStr = new String(readByteBuffer.array());if (read > 0) {System.out.println("robot : " + readStr);msgSize ++;ByteBuffer resByteBuffer = null;if (readStr.contains("hello")) {resByteBuffer = ByteBuffer.wrap("how old are you?.".getBytes());clientSocketChannel.write(resByteBuffer);System.out.println("me : " + new String(resByteBuffer.array()));resByteBuffer.clear();}if (readStr.contains("old")) {resByteBuffer = ByteBuffer.wrap("en, place say hello!".getBytes());clientSocketChannel.write(resByteBuffer);System.out.println("me : " + new String(resByteBuffer.array()));resByteBuffer.clear();}}}ByteBuffer resByteBuffer = ByteBuffer.wrap("bey bey!".getBytes());clientSocketChannel.write(resByteBuffer);System.out.println("me : " + new String(resByteBuffer.array()));resByteBuffer.clear();clientSocketChannel.close();}
}
NIO演示(Selector)
无Selector的NIO演示中,显然会出现空转的情况,以及无效连接的处理问题,这些问题都会影响性能。
NIO提供Selector多路复用器,优化上述问题
以下demo实现服务器与客户端通信,相互发消息。并由服务器转发给其他客户端(广播功能)
服务端类代码
server main类
import java.io.IOException;public class NIOServerMain {public static void main(String[] args) throws IOException {NIOSelectorServer server = new NIOSelectorServer();server.start();}
}
server run类
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;public class NIOSelectorServer {public void start() throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(8081));serverSocketChannel.configureBlocking(false);//注册到selector多路复用器中Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("server start...");run(selector);}/*** 遍历多路复用器的事件,处理事件*/private void run(Selector selector) throws IOException {while (true) {//阻塞等待事件selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectedKey = iterator.next();try {if (selectedKey.isAcceptable()) {doAccept(selector, selectedKey);}if (selectedKey.isReadable()) {doReadChannel(selectedKey, selector);}} catch (IOException ioException) {//有客户端断开连接disConnect(selectedKey, "exception");}iterator.remove();}}}/*** 处理连接事件*/private void doAccept(Selector selector, SelectionKey selectedKey) throws IOException {ServerSocketChannel serverChannel = (ServerSocketChannel)selectedKey.channel();SocketChannel socketChannel = serverChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println(socketChannel.socket().getPort() + " connected");}/*** 处理接收信息事件*/private void doReadChannel(SelectionKey selectedKey, Selector selector) throws IOException {SocketChannel channel = (SocketChannel)selectedKey.channel();ByteBuffer readByteBuffer = ByteBuffer.allocate(2048);int read = channel.read(readByteBuffer);String readStr = "";readByteBuffer.flip();readStr += StandardCharsets.UTF_8.decode(readByteBuffer);if (read > 0) {System.out.println(channel.socket().getPort() + " : " + readStr.length() + " - "+ readStr);//转发消息(其他客户端)broadcast(selectedKey, selector, readStr);if (readStr.contains("hello")) {sendMsg(channel, "hello! I am robot.");}if (readStr.contains("old")) {sendMsg(channel, "I am 1 years old.");}if (readStr.contains("bye")) {sendMsg(channel, "see you.");}}if (read == -1) {//有客户端断开连接disConnect(selectedKey, "read = -1");}}/*** 连接异常的处理*/private void disConnect(SelectionKey selectedKey, String type) throws IOException {SocketChannel channel = (SocketChannel)selectedKey.channel();System.out.println(channel.socket().getPort() + " disconnected. " + type);selectedKey.cancel();channel.close();}/*** 发送消息*/private void sendMsg(SocketChannel channel, String s) throws IOException {ByteBuffer sendByteBuffer = ByteBuffer.wrap(s.getBytes());channel.write(sendByteBuffer);System.out.println("me : " + new String(sendByteBuffer.array()));}/*** 广播* 转发消息(给其他客户端)*/private void broadcast(SelectionKey fromSelectedKey, Selector selector, String readStr) throws IOException {Iterator<SelectionKey> selectionKeyIterator = selector.keys().iterator();while (selectionKeyIterator.hasNext()) {SelectionKey otherKey = selectionKeyIterator.next();if (otherKey == fromSelectedKey) {continue;}if (!(otherKey.channel() instanceof SocketChannel)) {continue;}SocketChannel otherChannel = (SocketChannel)otherKey.channel();sendMsg(otherChannel, "(转发自 "+ ((SocketChannel)fromSelectedKey.channel()).socket().getPort() + ")" + readStr);}}
}
客户端代码
一共构造了两个客户端(消息客户端和looker客户端), looker客户端优先启动,随后启动消息客户端,消息客户端与服务器的通信会被转发给looker客户端
look client main类
import java.io.IOException;public class NIOClientLookMain {public static void main(String[] args) throws IOException, InterruptedException {NIOSelectorClient client = new NIOSelectorLookClient();client.start(8081, 100);}
}
msg client main类
import java.io.IOException;public class NIOClientMain {public static void main(String[] args) throws IOException, InterruptedException {NIOSelectorClient lookClient = new NIOSelectorClient();lookClient.start(8081, 10);}
}
client run类
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;public class NIOSelectorClient {protected int port;protected int size;public void start(int port, int size) throws IOException, InterruptedException {this.port = port;this.size = size;SocketChannel clientSocketChannel = SocketChannel.open();clientSocketChannel.connect(new InetSocketAddress("127.0.0.1",port));clientSocketChannel.configureBlocking(false);Selector selector = Selector.open();clientSocketChannel.register(selector, SelectionKey.OP_READ);//发送开始数据sendMsg(clientSocketChannel, "hello! I am client " + clientSocketChannel.socket().getLocalPort() + ".");run(selector);sendMsg(clientSocketChannel, "bye bye!");clientSocketChannel.close();}/*** 遍历多路复用器的事件,处理事件*/protected void run(Selector selector) throws IOException, InterruptedException {int msgSize = 0;while (msgSize < size) {int length=selector.select();if(length ==0){continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey selectionKey = iterator.next();if(selectionKey.isReadable()){boolean readChannel = false;try {readChannel = doReadChannel(selectionKey, selector);} catch (IOException e) {selectionKey.cancel();System.out.println("robot disconnect, restart connect...");while (true){try {reConnect();return;} catch (IOException ioException) {System.out.println("restart connecting(5s) ");//ioException.printStackTrace();Thread.sleep(5000);}}}if (readChannel) {msgSize ++;}}iterator.remove();}}}protected boolean doReadChannel(SelectionKey selectedKey, Selector selector) throws IOException {SocketChannel channel = (SocketChannel)selectedKey.channel();ByteBuffer readByteBuffer = ByteBuffer.allocate(2048);int read = channel.read(readByteBuffer);String readStr = "";readByteBuffer.flip();readStr += StandardCharsets.UTF_8.decode(readByteBuffer);if (read > 0) {System.out.println("robot : " + readStr);if (readStr.contains("hello")) {sendMsg(channel, "how old are you?.");}if (readStr.contains("old")) {sendMsg(channel, "en, place say hello!");}return true;}return false;}protected void sendMsg(SocketChannel channel, String sendStr) throws IOException {ByteBuffer sendByteBuffer = ByteBuffer.wrap(sendStr.getBytes());channel.write(sendByteBuffer);System.out.println("me : " + new String(sendByteBuffer.array()));}protected void reConnect() throws IOException, InterruptedException {NIOSelectorClient client = new NIOSelectorClient();client.start(port, size);}
}
look client run类
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;public class NIOSelectorLookClient extends NIOSelectorClient{@Overridepublic void start(int port, int size) throws IOException, InterruptedException {this.port = port;this.size = size;SocketChannel clientSocketChannel = SocketChannel.open();clientSocketChannel.connect(new InetSocketAddress("127.0.0.1",port));clientSocketChannel.configureBlocking(false);Selector selector = Selector.open();clientSocketChannel.register(selector, SelectionKey.OP_READ);//发送开始数据sendMsg(clientSocketChannel, "I am looker. " + clientSocketChannel.socket().getLocalPort() + ".");run(selector);}@Overrideprotected boolean doReadChannel(SelectionKey selectedKey, Selector selector) throws IOException {SocketChannel channel = (SocketChannel)selectedKey.channel();ByteBuffer readByteBuffer = ByteBuffer.allocate(2048);int read = channel.read(readByteBuffer);String readStr = "";readByteBuffer.flip();readStr += StandardCharsets.UTF_8.decode(readByteBuffer);if (read > 0) {System.out.println("robot : " + readStr.length() + " - "+ readStr);if (readStr.contains("bye")) {sendMsg(channel, "bye.");}return true;}return false;}@Overrideprotected void reConnect() throws IOException, InterruptedException {NIOSelectorClient client = new NIOSelectorLookClient();client.start(port, size);}
}