文章目录
- 1. 概述
- 2. TCP 阻塞式IO 网络编程实例
- 2.1 TCP网络编程服务端
- 2.2 ByteBufferUtil
- 2.3 客户端代码
- 2.4 运行截图
- 3. TCP 非阻塞式IO 网络编程实例
- 3.1 服务端
- 3.2 客户端
- 3.3 运行截图
- 4. 多路复用
- 4.1 服务器端
- 4.2 客户端
- 4.3 运行截图
- 5. AIO
- 5.1 AIO 服务端
- 5.2 客户端
- 5.3 运行截图
- 6. Channel / Buffer
- 6.1 Channel
- 6.2 ByteBuffer
- 参考文献
1. 概述
- 网络编程, 就是编写程序, 使两台联网的电脑可以交换数据,
- 套接字是网络数据传输用的软件设备, 用来连接网络的工具
- 在 linux中 socket被认为是文件中的一种, 在网络数据传输过程中, 使用文件I/O的相关函数
- socket 帮助程序员封装了网络的底层细节,如:错误检测、包大小、包分解、包重传、网络地址等,让程序员将网络连接看作可以读/写字节的流
- 套接字常用网络协议: TCP、UDP
之前还有一篇文章: Linux C++ Socket 套接字、select、poll、epoll 实例
套接字进行网络连接流程, 如下图:
服务器端:
- 创建服务器套接字
socket()
- 绑定端口
bind()
- 监听端口
listen()
- 接受客户端请求
accept()
- 读取客户端请求的数据
read()
- 返回客户端要响应的数据
write()
- …
- 关闭与客户端的连接
close()
- 关闭服务器套接字
close()
客户端:
- 创建客户端套接字
socket()
- 连接服务端
connect()
- 请求服务端数据, 发送操作数和操作符到服务器
write()
- 从服务器读取操作结果
read()
- …
- 关闭客户端套接字
close()
流程图如下, 具体代码示例可以看下面的 2. TCP 阻塞式IO 网络编程实例
2. TCP 阻塞式IO 网络编程实例
accept 和 read 都是阻塞的, 当 accept 到新连接, 或者 read 到数据程序才往下走
为了提高服务端处理能力, 一个客户端连接一个线程处理
不能一个线程处理多个客户端, 某个客户端会阻塞这个线程处理其他客户端
2.1 TCP网络编程服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;public class BlockServer {public static void main(String[] args) throws IOException {// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建了服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信System.out.println("等待客户端连接...");SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行System.out.println("接收到客户端连接: " + sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据System.out.println("开始读取客户端中的数据:" + channel);channel.read(buffer); // 阻塞方法,线程停止运行buffer.flip();String request = ByteBufferUtil.read(buffer);System.out.println(request);buffer.clear();System.out.println("已经读取完客户端中的数据:" + channel);}}}
}
2.2 ByteBufferUtil
public class ByteBufferUtil {public static String read(ByteBuffer byteBuffer) throws CharacterCodingException {CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);return charBuffer.toString();}public static ByteBuffer read(String string) throws CharacterCodingException {return StandardCharsets.UTF_8.encode(string);}public static void main(String[] args) throws CharacterCodingException {System.out.println(ByteBufferUtil.read(ByteBufferUtil.read("test")));}}
2.3 客户端代码
public class BlockClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();System.out.println("开始连接服务端...");sc.connect(new InetSocketAddress("localhost", 8080));String str = "test";System.out.println("连接服务端成功,写入数据: " + str);sc.write(ByteBufferUtil.read(str));}
}
2.4 运行截图
3. TCP 非阻塞式IO 网络编程实例
不停的轮询, 看看有没有accept 到新连接, 没有连接不阻塞等待, 继续去看看已经建立的连接有没有read到客户端的新数据, read到新数据处理, read不到不处理
为了提高服务端处理能力, 可以一个客户端连接一个线程处理, 线程不停的轮询自己要处理的客户端
也可以一个线程处理多个客户端, 相较于上面的阻塞I/O模型, 非阻塞不至于某个客户端阻塞这个线程处理其他客户端
3.1 服务端
ssc.configureBlocking(false);
设置为非阻塞模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class NonBlockServer {public static void main(String[] args) throws IOException, InterruptedException {// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建了服务器ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 非阻塞模式// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是nullif (sc != null) {System.out.println("接收到客户端连接: " + sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {System.out.println("开始读取客户端中的数据:" + channel);// 5. 接收客户端发送的数据int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0if (read > 0) {buffer.flip();System.out.println((ByteBufferUtil.read(buffer)));buffer.clear();System.out.println("已经读取完客户端中的数据:" + channel);} else {TimeUnit.MILLISECONDS.sleep(100);}}}}
}
3.2 客户端
客户端同上
3.3 运行截图
4. 多路复用
可以调用 select/poll/epoll , 阻塞在select/poll/epoll, select/poll/epoll 监听多个客户端连接事件或写入的数据, 然后这些事件可再有多个线程分一分处理掉
4.1 服务器端
打开选择器并将其与通道注册,监听接受连接操作:
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
监听选择器上的事件,返回已就绪的通道数量:
int count = selector.select();
获取所有事件(连接、读取):
Set<SelectionKey> keys = selector.selectedKeys();
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;public class SelectorServer {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {// 绑定端口并打印通道信息channel.bind(new InetSocketAddress(6666));System.out.println(channel);// 打开选择器并将其与通道注册,监听接受连接操作Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);// 无限循环,等待选择器上的事件while (true) {// 监听选择器上的事件,返回已就绪的通道数量int count = selector.select();System.out.println("select count: " + count);// 如果没有就绪的通道,则继续循环等待if (count <= 0) {continue;}// 获取并迭代处理所有就绪的事件// 获取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 处理接受连接事件// 判断事件类型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);System.out.println("连接已建立:" + sc);}// 处理读取数据事件else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);if (read == -1) {// 如果读取返回-1,表示连接已关闭key.cancel();sc.close();} else {// 否则,将缓冲区反转并打印读取的数据buffer.flip();System.out.println(new String(buffer.array(), StandardCharsets.UTF_8));}}// 事件处理完毕后,从迭代器中移除,避免重复处理// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {// 打印IO异常堆栈跟踪e.printStackTrace();}}
}
4.2 客户端
import netty.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;public class SelectorClient {public static void main(String[] args) throws IOException {// 创建Socket通道并连接到服务器SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost", 6666));// 初始化输入和输出ByteBufferByteBuffer inputBuffer = ByteBuffer.allocate(512);ByteBuffer serverOutput = ByteBuffer.allocate(512);// 循环接收用户输入并发送给服务器while (true) {// 使用Scanner获取用户输入Scanner in = new Scanner(System.in);String input = in.nextLine();System.out.println("user input: " + input);// 清空输入缓冲区,放入用户输入,然后反转准备写入inputBuffer.clear();inputBuffer.put(input.getBytes(StandardCharsets.UTF_8));inputBuffer.flip();// 将输入数据写入Socket通道sc.write(inputBuffer);System.out.println("send to server " + input);// 循环读取服务器响应while (true) {// 清空服务器响应缓冲区,准备读取数据serverOutput.clear();// 从Socket通道读取数据sc.read(serverOutput);// 如果没有读取到数据,继续尝试读取if (!serverOutput.hasRemaining()) {continue;}// 反转缓冲区,读取数据并打印serverOutput.flip();System.out.println("server response " + ByteBufferUtil.read(serverOutput));// 读取完成后退出内层循环break;}}}
}
4.3 运行截图
5. AIO
异步I/O模型
告诉内核启动某个操作, 并且把数据copy到用户缓冲区再通知我们
5.1 AIO 服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;/*** AIO服务器类,用于演示异步IO的服务器端实现。* 使用AsynchronousServerSocketChannel处理客户端连接和数据传输。*/
public class AIOServer {/*** 程序入口,初始化并启动AIO服务器。* 绑定服务器端口并等待客户端连接。** @param args 命令行参数* @throws IOException 如果绑定端口失败*/public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();ssc.bind(new InetSocketAddress(6666));ssc.accept(null, new AcceptHandler(ssc));while (true) ;}/*** 关闭客户端通道的方法。* 用于处理读取或写入操作失败时关闭通道。** @param sc 客户端通道*/private static void closeChannel(AsynchronousSocketChannel sc) {try {System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());sc.close();} catch (IOException e) {e.printStackTrace();}}/*** 读取数据的完成处理器,实现读取客户端数据并响应的逻辑。*/private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;public ReadHandler(AsynchronousSocketChannel sc) {this.sc = sc;}/*** 当读取操作完成时被调用。* 解析读取的数据并写回响应到客户端。** @param result 读取操作的结果* @param attachment 读取操作的附加上下文*/@Overridepublic void completed(Integer result, ByteBuffer attachment) {try {if (result == -1) {return;}System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());attachment.flip();String request = Charset.defaultCharset().decode(attachment).toString();System.out.println(request.toString());attachment.clear();attachment.put(("你好:" + request).getBytes());attachment.flip();sc.write(attachment);attachment.clear();// 读取下一个读时间sc.read(attachment, attachment, new ReadHandler(sc));} catch (IOException e) {e.printStackTrace();}}/*** 当读取操作失败时被调用。* 关闭客户端通道并打印异常堆栈跟踪。** @param exc 引发的异常* @param attachment 读取操作的附加上下文*/@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}}/*** 接受连接的完成处理器,用于处理客户端的连接请求。*/private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {private final AsynchronousServerSocketChannel ssc;public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc = ssc;}/*** 当接受操作完成时被调用。* 设置读取缓冲区并开始读取客户端发送的数据。** @param sc 接受到的客户端通道* @param attachment 接受操作的附加上下文*/@Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer = ByteBuffer.allocate(1024);// 读事件由 ReadHandler 处理System.out.println("开始读");sc.read(buffer, buffer, new ReadHandler(sc));System.out.println("读完成");// 处理完第一个 accept 时,需要再次调用 accept 方法来处理下一个 accept 事件ssc.accept(null, this);}/*** 当接受操作失败时被调用。* 打印异常堆栈跟踪。** @param exc 引发的异常* @param attachment 接受操作的附加上下文*/@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}}
}
5.2 客户端
同 4.2
5.3 运行截图
6. Channel / Buffer
6.1 Channel
Channel: 传输数据的通道
其实和数据流挺像的,不过数据流是单向的而Channel 是双向的,可以向channel中写数据,也可以从channel中读取数据
NIO 基础组件之 Channel
6.2 ByteBuffer
ByteBuffer是Buffer子类,是字节缓冲区,特点如下所示。
大小不可变。一旦创建,无法改变其容量大小,无法扩容或者缩容;
读写灵活。内部通过指针移动来实现灵活读写;
支持堆上内存分配和直接内存分配
一文搞懂ByteBuffer使用与原理
参考文献
- UNIX 网络编程 卷1: 套接字联网API
- TCP/IP网络编程 尹圣雨 著 金国哲 译
- Linux IO模式及 select、poll、epoll详解
- 浅谈select,poll和epoll的区别
- 黑马 Netty 课程