NIO三大组件

现在互联网环境下,分布式系统大相径庭,而分布式系统的根基在于网络编程,而netty恰恰是java领域的网络编程的王者,如果要致力于并发高性能的服务器程序、高性能的客户端程序,必须掌握netty网络编程。

NIO基础
NIO是从java1.4开始引入的一种新的I/O编程方式,相对于传统的IO来说,NIO更加灵活、高效、可靠,能够更好的处理海量的数据和高并发场景。简单来说,并发能力强。
三大组件
channel
Channel是数据传输的**双向通道,Stream要不就是读,要不就是写。Channel比Stream更加底层。**
常见的Channel有FileChannel、SocketChannel、DatagramChannel、ServerSocketChannel。FileChannel主要用于文件传输,其他三种用于网络通信。
Buffer
当我们有了连接通道,我们需要将拿到的数据放到一个缓冲区域,以便于程序对它的读取/写入操作
ByteBuffer

  • MappedByteBuffer 、DirectByteBuffer、 HeapByteBuffer

ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
CharBuffer
最常用的是ByteBuffer
在这里插入图片描述
select
Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。
在这里插入图片描述
这种方法的弊端:
内存占用高。每有一个socket连接,系统就要分配一个线程去对接。当出现大量连接时,会开辟大量线程,导致占用大量内存。
线程上下文切换成本高。
只适合连接数较少的场景。
什么是线程上下文切换?
一个CPU在同一个时刻是只能处理一个线程的,由于时间片耗尽或出现阻塞等情况,CPU 会转去执行另外一个线程,这个叫做线程上下文切换。
线程池技术
在这里插入图片描述
这种方法的弊端:
在阻塞模式下,线程只能处理一个连接。线程池中的线程获取任务,只有当任务完成/socket断开连接,才会去获取执行
下一个任务
只适合短链接的场景。
选择器(Selector)技术
为每个线程配合一个选择器,让选择器去管理多个channel。(注:FileChannel是阻塞式的,因此无法使用选择器。)
让选择器去管理多个工作在非阻塞式下的Channel,获取Channel上的事件,当一个Channel没有任务时,就转而去执行别的Channel上的任务。这种适合用在连接多,流量小的场景。
在这里插入图片描述
若事件未就绪,调用 selector 的 select() 方法会阻塞线程,直到 channel 发生了就绪事件。这些事件就绪后,select 方法就会返回这些事件交给 thread 来处理
ByteBuffer
属性
capacity:缓冲区的容量,不可变。(在netty中可变哦~)
limit:缓冲区的界限。limit之后的数据不允许读写
position:读写指针。position不可大于limit,且position不为负数。
mark:标记。记录当前position的值。position被改变后,可以通过调用reset() 方法恢复到mark的位置。
正确使用方法

  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear() 切换至写模式
  5. 重复 1~4 步骤
// 1. 输入输出流
try(FileChannel channel = new FileInputStream("D:/Java/netty/src/test/resources/data.txt").getChannel()) {// 2. 准备缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);while(true) {// 3. 从channel读取数据,读到buffer中去int len = channel.read(buffer);log.debug("读到的字节数 {}", len);if(len == -1) {break;}// 4. 切换buffer读模式,打印内容buffer.flip();while(buffer.hasRemaining()) {byte b = buffer.get();log.debug("实际字节 {}", (char)b);}// 切换回写模式buffer.clear();}
} catch (IOException e) {throw new RuntimeException(e);
}
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数:10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数:4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数:-1

一开始
在这里插入图片描述
写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态
在这里插入图片描述
flip后变成读模式,position变成读取位置,limit变成读取限制。
在这里插入图片描述
读取四个字节后,状态
在这里插入图片描述
clear动作发生后,状态
在这里插入图片描述
compact方法,是把未读完的部分向前压缩,然后切换至写模式
在这里插入图片描述
常用方法
分配空间

// class java.nio.HeapByteBuffer    - java 堆内存,读写效果低,受到GC影响
System.out.println(ByteBuffer.allocate(16).getClass());
// class java.nio.DirectByteBuffer  - 直接内存,读写效率高(少一次拷贝),不会受GC影响。系统内存分配效率低,可能内存泄露
System.out.println(ByteBuffer.allocateDirect(16).getClass());

向buffer写入数据

int readBytes = channel.read(buf);
buf.put((byte)127);

向buffer读取数据

int writeBytes = channel.write(buf);
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针
  • mark 和 reset(了解):
    • mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置
      字符串与 ByteBuffer 互转
// 1. 简单转换为ByteBuffer -> 写模式
ByteBuffer buffer0 = ByteBuffer.allocate(16);
buffer0.put("hello".getBytes());// 切换读模式
buffer0.flip();
String s0 = StandardCharsets.UTF_8.decode(buffer0).toString();
System.out.println(s0);// 2. encode -> 读模式
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("world");
String s1 = StandardCharsets.UTF_8.decode(buffer1).toString();
System.out.println(s1);// 3. warp
ByteBuffer buffer2 = ByteBuffer.wrap("hello".getBytes());
String s2 = StandardCharsets.UTF_8.decode(buffer2).toString();
System.out.println(s2);

分散读集中写
分散读:把一个Channel读取到三个Buffer当中去,减少数据的复制

String baseUrl = "D:/Java/netty/src/test/resources/word.txt";
try (FileChannel channel = new RandomAccessFile(baseUrl, "rw").getChannel()) {ByteBuffer a = ByteBuffer.allocate(3);ByteBuffer b = ByteBuffer.allocate(3);ByteBuffer c = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{a, b, c});a.flip();b.flip();c.flip();debugAll(a);debugAll(b);debugAll(c);
} catch (IOException e) {
}

集中写:三个Buffer写到一个Channel里面去,减少数据的复制

String baseUrl = "D:/Java/netty/src/test/resources/word.txt";
try (FileChannel channel = new RandomAccessFile(baseUrl, "rw").getChannel()) {ByteBuffer d = ByteBuffer.allocate(4);ByteBuffer e = ByteBuffer.allocate(4);d.put(new byte[]{'f', 'o', 'u', 'r'});e.put(new byte[]{'f', 'i', 'v', 'e'});d.flip();e.flip();debugAll(d);debugAll(e);channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
}

粘包与半包
网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I’m Nyima\n
How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
Hello,world\nI’m Nyima\nHo
w are you?\n
出现原因
粘包
发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去
半包
接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象
解决办法
通过get(index)方法遍历ByteBuffer,遇到分隔符时进行处理。
注意
:get(index)不会改变position的值
记录该段数据长度,以便于申请对应大小的缓冲区
将缓冲区的数据通过get()方法写入到target中
调用compact方法切换模式,因为缓冲区中可能还有未读的数据

public static void main(String[] args) {ByteBuffer source = ByteBuffer.allocate(32);source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split(source);source.put("w are you?\nhaha!\n".getBytes());split(source);
}private static void split(ByteBuffer source) {source.flip();ByteBuffer target = ByteBuffer.allocate(15);for(int i = 0; i < source.limit(); i++) {if(source.get(i) == '\n') {// 长度处理很关键int length = i + 1 - source.position();for(int j = 0; j < length; j++) {target.put(source.get());}// 打印字符debugAll(target);target.clear();}}source.compact();
}

文件编程

FileChannel
只能工作在堵塞模式下
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法
获取
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  1. 通过 FileInputStream 获取的 channel 只能读
  2. 通过 FileOutputStream 获取的 channel 只能写
  3. 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
    读取
    会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾
int readBytes = channel.read(buffer);

写入
写入的正确如下,socketchannel

ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip();   // 切换读模式while(buffer.hasRemaining()) {channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

  • 关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

  • 大小

使用 size 方法获取文件的大小

  • 强制写入
    操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
    两个 Channel 传输数据(有用)
    小文件
String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();FileChannel to = new FileOutputStream(TO).getChannel();
) {from.transferTo(0, from.size(), to);
} catch (IOException e) {e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用时:" + (end - start) / 1000_000.0);

大文件

public static void main(String[] args) {try (FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("to.txt").getChannel();) {// 效率高,底层会利用操作系统的零拷贝进行优化long size = from.size();// left 变量代表还剩余多少字节for (long left = size; left > 0; ) {System.out.println("position:" + (size - left) + " left:" + left);left -= from.transferTo((size - left), left, to);}} catch (IOException e) {e.printStackTrace();}
}

在这里插入图片描述
Files
查找检查文件是否存在

Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));Copy

创建一级目录

Path path = Paths.get("helloword/d1");
Files.createDirectory(path);Copy

如果目录已存在,会抛异常 FileAlreadyExistsException
不能一次创建多级目录,否则会抛异常 NoSuchFileException
创建多级目录

Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);Copy

拷贝及移动

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");Files.copy(source, target);Copy

如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制

Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);Copy

移动文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);Copy

在这里插入图片描述

在这里插入图片描述
遍历文件夹

// 要遍历的文件夹
Path path = Paths.get("D:\\Java\\netty");
// 文件夹个数
AtomicInteger dirCount = new AtomicInteger();
// 文件个数
AtomicInteger fileCount = new AtomicInteger();
// 开始遍历
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){// 进入文件夹之前的操作@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println("====> " + dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}// 遍历到文件的操作@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}
});
System.out.println(dirCount);
System.out.println(fileCount);

网络编程

阻塞
阻塞模式下,相关方法都会导致线程暂停
ServerSocketChannel.accept 会在没有连接建立时让线程暂停
SocketChannel.read 会在通道中没有数据可读时让线程暂停
阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
在这里插入图片描述

但多线程下,有新的问题,体现在以下方面
32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
服务端代码

public class Server {public static void main(String[] args) {// 创建缓冲区ByteBuffer buffer = ByteBuffer.allocate(16);// 获得服务器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {// 为服务器通道绑定端口server.bind(new InetSocketAddress(8080));// 用户存放连接的集合ArrayList<SocketChannel> channels = new ArrayList<>();// 循环接收连接while (true) {System.out.println("before connecting...");// 没有连接时,会阻塞线程SocketChannel socketChannel = server.accept();System.out.println("after connecting...");channels.add(socketChannel);// 循环遍历集合中的连接for(SocketChannel channel : channels) {System.out.println("before reading");// 处理通道中的数据// 当通道中没有数据可读时,会阻塞线程channel.read(buffer);buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();System.out.println("after reading");}}} catch (IOException e) {e.printStackTrace();}}
}

客户端

public class Client {public static void main(String[] args) {try (SocketChannel socketChannel = SocketChannel.open()) {// 建立连接socketChannel.connect(new InetSocketAddress("localhost", 8080));System.out.println("waiting...");} catch (IOException e) {e.printStackTrace();}}
}

客户端-服务器建立连接前:服务器端因accept阻塞
在这里插入图片描述
客户端-服务器建立连接后,客户端发送消息前:服务器端因通道为空被阻塞
在这里插入图片描述
客户端发送数据后,服务器处理通道中的数据。再次进入循环时,再次被accept阻塞
在这里插入图片描述
前的客户端再次发送消息**,服务器端因为被accept阻塞**,无法处理之前客户端发送到通道中的信息
在这里插入图片描述
非阻塞
可以通过ServerSocketChannel的configureBlocking(false)方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null
可以通过SocketChannel的configureBlocking(false)方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1
服务器代码如下

public class Server {public static void main(String[] args) {// 创建缓冲区ByteBuffer buffer = ByteBuffer.allocate(16);// 获得服务器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {// 为服务器通道绑定端口server.bind(new InetSocketAddress(8080));// 用户存放连接的集合ArrayList<SocketChannel> channels = new ArrayList<>();// 循环接收连接while (true) {// 设置为非阻塞模式,没有连接时返回null,不会阻塞线程server.configureBlocking(false);SocketChannel socketChannel = server.accept();// 通道不为空时才将连接放入到集合中if (socketChannel != null) {System.out.println("after connecting...");channels.add(socketChannel);}// 循环遍历集合中的连接for(SocketChannel channel : channels) {// 处理通道中的数据// 设置为非阻塞模式,若通道中没有数据,会返回0,不会阻塞线程channel.configureBlocking(false);int read = channel.read(buffer);if(read > 0) {buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();System.out.println("after reading");}}}} catch (IOException e) {e.printStackTrace();}}
}

这样写存在一个问题,因为设置为了非阻塞,会一直执行while(true)中的代码,CPU一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求。

Selector
多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用
如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
有可连接事件时才去连接
有可读事件才去读取
有可写事件才去写入
使用及Accpet事件
使用Selector实现多路复用,服务端代码如下改进

public class SelectServer {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(16);// 获得服务器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 创建选择器Selector selector = Selector.open();// 通道必须设置为非阻塞模式server.configureBlocking(false);// 将通道注册到选择器中,并设置感兴趣的事件server.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转// 返回值为就绪的事件个数int ready = selector.select();System.out.println("selector ready counts : " + ready);// 获取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍历事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判断key的类型if(key.isAcceptable()) {// 获得key对应的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 获取连接并处理,而且是必须处理,否则需要取消SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 处理完毕后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}}
}

获得选择器Selector

Selector selector = Selector.open();

channel 必须工作在非阻塞模式
FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用

获得的事件类型
connect - 客户端连接成功时触发
accept - 服务器端成功接受连接时触发
read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

// 通道必须设置为非阻塞模式
server.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的实践
server.register(selector, SelectionKey.OP_ACCEPT);

Selector监听事件,并获得就绪的通道个数,若没有通道就绪,线程会被阻塞

int count = selector.select();

阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

int count = selector.selectNow();

获取就绪事件并得到对应的通道,然后进行处理

// 获取所有事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍历事件
Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判断key的类型,此处为Accept类型if(key.isAcceptable()) {// 获得key对应的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();// 获取连接并处理,而且是必须处理,否则需要取消SocketChannel socketChannel = channel.accept();// 处理完毕后移除iterator.remove();}
}

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发
加粗样式
在Accept事件中,若有客户端与服务器端建立了连接,需要将其对应的SocketChannel设置为非阻塞,并注册到选择其中
添加Read事件,触发后进行读取操作

public class SelectServer {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(16);// 获得服务器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 创建选择器Selector selector = Selector.open();// 通道必须设置为非阻塞模式server.configureBlocking(false);// 将通道注册到选择器中,并设置感兴趣的实践server.register(selector, SelectionKey.OP_ACCEPT);// 为serverKey设置感兴趣的事件while (true) {// 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转// 返回值为就绪的事件个数int ready = selector.select();System.out.println("selector ready counts : " + ready);// 获取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍历事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判断key的类型if(key.isAcceptable()) {// 获得key对应的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 获取连接SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 设置为非阻塞模式,同时将连接的通道也注册到选择其中socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);// 处理完毕后移除iterator.remove();} else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();System.out.println("before reading...");channel.read(buffer);System.out.println("after reading...");buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();// 处理完毕后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}}
}

消息边界
不处理消息边界存在的问题:将缓冲区的大小设置为4个字节,发送2个汉字(你好),通过decode解码并打印时,会出现乱码
这是因为UTF-8字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好 的 好 字被拆分为了前半部分和后半部分发送,解码时就会出现问题
处理消息边界
在这里插入图片描述
附件与扩容
在这里插入图片描述
Channel的register方法还有第三个参数:附件,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey绑定,可以从SelectionKey获取到对应通道的附件

public final SelectionKey register(Selector sel, int ops, Object att)

可通过SelectionKey的attachment()方法获得附件

ByteBuffer buffer = (ByteBuffer) key.attachment();

我们需要在Accept事件发生后,将通道注册到Selector中时,对每个通道添加一个ByteBuffer附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题

// 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 添加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);

当Channel中的数据大于缓冲区时,需要对缓冲区进行扩容操作。此代码中的扩容的判定方法:Channel调用compact方法后,的position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中

// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);// 将旧buffer中的内容放入新的buffer中ewBuffer.put(buffer);// 将新buffer作为附件放到key中key.attach(newBuffer);
}

改造后的服务器代码如下

public class SelectServer {public static void main(String[] args) {// 获得服务器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 创建选择器Selector selector = Selector.open();// 通道必须设置为非阻塞模式server.configureBlocking(false);// 将通道注册到选择器中,并设置感兴趣的事件server.register(selector, SelectionKey.OP_ACCEPT);// 为serverKey设置感兴趣的事件while (true) {// 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转// 返回值为就绪的事件个数int ready = selector.select();System.out.println("selector ready counts : " + ready);// 获取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍历事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判断key的类型if(key.isAcceptable()) {// 获得key对应的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 获取连接SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件socketChannel.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16);socketChannel.register(selector, SelectionKey.OP_READ, buffer);// 处理完毕后移除iterator.remove();} else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();System.out.println("before reading...");// 通过key获得附件(buffer)ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer);if(read == -1) {key.cancel();channel.close();} else {// 通过分隔符来分隔buffer中的数据split(buffer);// 如果缓冲区太小,就进行扩容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);// 将旧buffer中的内容放入新的buffer中buffer.flip();newBuffer.put(buffer);// 将新buffer放到key中作为附件key.attach(newBuffer);}}System.out.println("after reading...");// 处理完毕后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}}private static void split(ByteBuffer buffer) {buffer.flip();for(int i = 0; i < buffer.limit(); i++) {// 遍历寻找分隔符// get(i)不会移动positionif (buffer.get(i) == '\n') {// 缓冲区长度int length = i+1-buffer.position();ByteBuffer target = ByteBuffer.allocate(length);// 将前面的内容写入target缓冲区for(int j = 0; j < length; j++) {// 将buffer中的数据写入target中target.put(buffer.get());}// 打印结果ByteBufferUtil.debugAll(target);}}// 切换为写模式,但是缓冲区可能未读完,这里需要使用compactbuffer.compact();}
}

在这里插入图片描述
Write事件
服务器通过Buffer向通道中写入数据时,可能因为通道容量小于Buffer中的数据大小,导致无法一次性将Buffer中的数据全部写入到Channel中,这时便需要分多次写入,具体步骤如下
执行一次写操作,向将buffer中的内容写入到SocketChannel中,然后判断Buffer中是否还有数据
若Buffer中还有数据,则需要将SockerChannel注册到Seletor中,并关注写事件,同时将未写完的Buffer作为附件一起放入到SelectionKey中

 int write = socket.write(buffer);
// 通道中可能无法放入缓冲区中的所有数据
if (buffer.hasRemaining()) {// 注册到Selector中,关注可写事件,并将buffer添加到key的附件中socket.configureBlocking(false);socket.register(selector, SelectionKey.OP_WRITE, buffer);
}

添加写事件的相关操作key.isWritable(),对Buffer再次进行写操作
每次写后需要判断Buffer中是否还有数据(是否写完)。若写完,需要移除SelecionKey中的Buffer附件,避免其占用过多内存,同时还需移除对写事件的关注

SocketChannel socket = (SocketChannel) key.channel();
// 获得buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 执行写操作
int write = socket.write(buffer);
System.out.println(write);
// 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
if (!buffer.hasRemaining()) {key.attach(null);key.interestOps(0);
}

整体代码如下

public class WriteServer {public static void main(String[] args) {try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));server.configureBlocking(false);Selector selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 处理后就移除事件iterator.remove();if (key.isAcceptable()) {// 获得客户端的通道SocketChannel socket = server.accept();// 写入数据StringBuilder builder = new StringBuilder();for(int i = 0; i < 500000000; i++) {builder.append("a");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString());// 先执行一次Buffer->Channel的写入,如果未写完,就添加一个可写事件int write = socket.write(buffer);System.out.println(write);// 通道中可能无法放入缓冲区中的所有数据if (buffer.hasRemaining()) {// 注册到Selector中,关注可写事件,并将buffer添加到key的附件中socket.configureBlocking(false);socket.register(selector, SelectionKey.OP_WRITE, buffer);}} else if (key.isWritable()) {SocketChannel socket = (SocketChannel) key.channel();// 获得bufferByteBuffer buffer = (ByteBuffer) key.attachment();// 执行写操作int write = socket.write(buffer);System.out.println(write);// 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣if (!buffer.hasRemaining()) {key.attach(null);key.interestOps(0);}}}}} catch (IOException e) {e.printStackTrace();}}
}

在这里插入图片描述
多线程优化
在这里插入图片描述
创建一个负责处理Accept事件的Boss线程,与多个负责处理Read事件的Worker线程
Boss线程执行的操作
接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要根据标识robin去判断将任务分配给哪个Worker

// 创建固定数量的Worker
Worker[] workers = new Worker[4];
// 用于负载均衡的原子整数
AtomicInteger robin = new AtomicInteger(0);
// 负载均衡,轮询分配Worker
workers[robin.getAndIncrement()% workers.length].register(socket);

register(SocketChannel socket)方法会通过同步队列完成Boss线程与Worker线程之间的通信,让SocketChannel的注册任务被Worker线程执行。添加任务后需要调用selector.wakeup()来唤醒被阻塞的Selector

public void register(final SocketChannel socket) throws IOException {// 只启动一次if (!started) {// 初始化操作}// 向同步队列中添加SocketChannel的注册事件// 在Worker线程中执行注册事件queue.add(new Runnable() {@Overridepublic void run() {try {socket.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}});// 唤醒被阻塞的Selector// select类似LockSupport中的park,wakeup的原理类似LockSupport中的unparkselector.wakeup();
}

Worker线程执行的操作: 从同步队列中获取注册任务,并处理Read事件

public class ThreadsServer {public static void main(String[] args) {try (ServerSocketChannel server = ServerSocketChannel.open()) {// 当前线程为Boss线程Thread.currentThread().setName("Boss");server.bind(new InetSocketAddress(8080));// 负责轮询Accept事件的SelectorSelector boss = Selector.open();server.configureBlocking(false);server.register(boss, SelectionKey.OP_ACCEPT);// 创建固定数量的WorkerWorker[] workers = new Worker[4];// 用于负载均衡的原子整数AtomicInteger robin = new AtomicInteger(0);for(int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-"+i);}while (true) {boss.select();Set<SelectionKey> selectionKeys = boss.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// BossSelector负责Accept事件if (key.isAcceptable()) {// 建立连接SocketChannel socket = server.accept();System.out.println("connected...");socket.configureBlocking(false);// socket注册到Worker的Selector中System.out.println("before read...");// 负载均衡,轮询分配Workerworkers[robin.getAndIncrement()% workers.length].register(socket);System.out.println("after read...");}}}} catch (IOException e) {e.printStackTrace();}}static class Worker implements Runnable {private Thread thread;private volatile Selector selector;private String name;private volatile boolean started = false;/*** 同步队列,用于Boss线程与Worker线程之间的通信*/private ConcurrentLinkedQueue<Runnable> queue;public Worker(String name) {this.name = name;}public void register(final SocketChannel socket) throws IOException {// 只启动一次if (!started) {thread = new Thread(this, name);selector = Selector.open();queue = new ConcurrentLinkedQueue<>();thread.start();started = true;}// 向同步队列中添加SocketChannel的注册事件// 在Worker线程中执行注册事件queue.add(new Runnable() {@Overridepublic void run() {try {socket.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}});// 唤醒被阻塞的Selector// select类似LockSupport中的park,wakeup的原理类似LockSupport中的unparkselector.wakeup();}@Overridepublic void run() {while (true) {try {selector.select();// 通过同步队列获得任务并运行Runnable task = queue.poll();if (task != null) {// 获得任务,执行注册操作task.run();}Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// Worker只负责Read事件if (key.isReadable()) {// 简化处理,省略细节SocketChannel socket = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);socket.read(buffer);buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

socket.register(selector, SelectionKey.OP_READ);selector.wakeup(); 都是在boss线程内部执行,selector.select();(这个方法会阻塞线程的执行) 在work线程内部执行,如果work线程先执行,会堵塞线程直到selector被唤醒,然后从同步队列中获取注册任务,并处理Read事件。若是boss线程运行在work线程前面,selector.wakeup()会一直作用,selector.select()就不会堵塞了。
在这里插入图片描述

NIO与BIO

在这里插入图片描述
在这里插入图片描述

IO模型
在这里插入图片描述
零拷贝
传统 IO 问题

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);

在这里插入图片描述

  • Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java
    程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU。DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
  • 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA
  • 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝
  • 接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU
    在这里插入图片描述
    NIO优化
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点-
  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输
    AIO
    在这里插入图片描述
    简易聊天室1.0
    客户端
package chat1;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;public class ChatClient {private Selector selector;private SocketChannel socketChannel;private static final String HOST = "localhost";private static final int PORT = 8080;public ChatClient() {try {selector = Selector.open();socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("欢迎来到" + HOST + ":" + PORT+"的聊天室");} catch (IOException e) {e.printStackTrace();}}public void start(){new Thread(()->{try {while (true) {if (selector.select() > 0) {for (SelectionKey key : selector.selectedKeys()) {selector.selectedKeys().remove(key);if (key.isReadable()) {readMessage();}}}}}catch (IOException e) {throw new RuntimeException(e);}}).start();try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {String input;while ((input = reader.readLine()) != null) {sendMessage(input);}} catch (IOException e) {e.printStackTrace();}}public void sendMessage(String message) throws IOException {if(message!=null && !message.trim().isEmpty()){ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());socketChannel.write(buffer);}}public void readMessage() throws IOException {ByteBuffer buffer = ByteBuffer.allocate(1024);int read= socketChannel.read(buffer);if(read>0){buffer.flip();String msg = new String(buffer.array(), 0, read);System.out.println(msg);}}public static void main(String[] args) {new ChatClient().start();}
}

服务器

package chat1;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ChatServer {private Selector selector;private ServerSocketChannel serverSocketChannel;private static final int PORT = 8080;public ChatServer() {try {selector= Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(PORT));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("聊天室已经启动了"+PORT);} catch (IOException e) {e.printStackTrace();}}public  void start(){try {while (true) {if (selector.select() > 0) {Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();handleKey(key);}}}} catch (IOException e) {e.printStackTrace();}}public void handleKey(SelectionKey key) throws IOException {if (key.isAcceptable()) {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("客户端连上了"+socketChannel.getRemoteAddress());}else if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int read = socketChannel.read(byteBuffer);if (read > 0) {byteBuffer.flip();String msg = new String(byteBuffer.array(), 0, read);System.out.println("客户端说: " + msg);socketChannel.write(ByteBuffer.wrap(("服务端回复: " + msg).getBytes()));}}}public static void main(String[] args) {new ChatServer().start();}
}

简易聊天室2.0
客户端

package chatover;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress("localhost", 8080));// 创建一个 SelectorSelector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);// 从控制台读取输入并发送给服务器端Thread sendMessageThread = new Thread(() -> {try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {while (true) {System.out.println("输入客户端消息: ");String message = reader.readLine();if (socketChannel.isConnected()) {ByteBuffer buffer = ByteBuffer.wrap((message + "\n").getBytes());socketChannel.write(buffer);}}} catch (IOException e) {e.printStackTrace();}});sendMessageThread.start();while (true) {int readyChannels = selector.select();if (readyChannels == 0) {continue;}Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();if (key.isConnectable()) {// 连接到服务器socketChannel.finishConnect();socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("已连接到服务器");} else if (key.isReadable()) {// 读取服务器端消息ByteBuffer buffer = ByteBuffer.allocate(1024);int bytesRead = socketChannel.read(buffer);if (bytesRead > 0) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String message = new String(bytes).trim();System.out.println("服务器端消息: " + message);}}keyIterator.remove();}}}
}

服务器端

package chatover;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;public class Server {public static void main(String[] args) throws IOException {// 创建一个 ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8080));// 创建一个 SelectorSelector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("聊天室服务端启动了");// 客户端连接AtomicReference<SocketChannel> clientRef = new AtomicReference<>();// 从控制台读取输入并发送给客户端Thread sendMessageThread = new Thread(() -> {try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {while (true) {System.out.println("输入服务器端消息: ");String message = reader.readLine();SocketChannel client = clientRef.get();if (client != null && client.isConnected()) {ByteBuffer buffer = ByteBuffer.wrap((message + "\n").getBytes());client.write(buffer);}}} catch (IOException e) {e.printStackTrace();}});sendMessageThread.start();while (true) {int readyChannels = selector.select();if (readyChannels == 0) {continue;}Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();if (key.isAcceptable()) {// 接受客户端连接SocketChannel client = serverSocketChannel.accept();System.out.println("客户端已连接");client.configureBlocking(false);client.register(selector, SelectionKey.OP_READ);clientRef.set(client);} else if (key.isReadable()) {// 读取客户端消息SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int bytesRead = channel.read(buffer);if (bytesRead > 0) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String message = new String(bytes).trim();System.out.println("客户端消息: " + message);}}keyIterator.remove();}}}
}

下面这篇文章写的很不棒讲异步的
Java网络编程和NIO详解5:Java 非阻塞 IO 和异步 IO
通常,我们会有一个线程池用于执行异步任务,提交任务的线程将任务提交到线程池就可以立马返回,不必等到任务真正完成。如果想要知道任务的执行结果,通常是通过传递一个回调函数的方式,任务结束后去调用这个函数。
同样的原理,Java 中的异步 IO 也是一样的,都是由一个线程池来负责执行任务,然后使用回调或自己去查询结果。

异步聊天室
客户端
在这里插入图片描述

package Asynchronous;import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;public class AsynchronousClient {public static void main(String[] args) throws IOException {try {AsynchronousSocketChannel client = AsynchronousSocketChannel.open();Future<Void> connectResult = client.connect(new InetSocketAddress("localhost", 8080));connectResult.get(); //.get()会阻塞线程,等待连接完成String message = "沉默王二,在吗?";ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));//将字符串转为字节数组Future<Integer> writeResult = client.write(buffer);//异步返回的结果writeResult.get(); // 等待发送完成System.out.println("消息发送完毕");client.close();} catch (IOException | InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

服务端

package Asynchronous;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Future;public class AsynchronousServer {public static void main(String[] args) throws IOException, InterruptedException {AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();server.bind(new InetSocketAddress(8080));System.out.println("服务端启动");server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {@Overridepublic void completed(AsynchronousSocketChannel result, Object attachment) {server.accept(null, this);ByteBuffer buffer = ByteBuffer.allocate(1024);Future<Integer> read = result.read(buffer);try {read.get();buffer.flip();String message = new String(buffer.array(), 0, buffer.remaining());System.out.println("接收到的消息: " + message);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});Thread.currentThread().join();//由于服务器通常需要持续运行以接受新的连接,使用 join() 方法可以防止 Java 程序在 main 方法执行完毕后立即退出。}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/62184.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue3 el-table 默认选中 传入的数组

一、效果&#xff1a; 二、官网是VUE2 现更改为Vue3写法 <template><el-table:data"tableData"border striperow-key"id"ref"tableRef":cell-style"{ text-align: center }":header-cell-style"{background: #b7babd…

【Python】绝地求生PUBG等FPS游戏辅助压枪软件脚本编写

【Python】绝地求生PUBG等FPS游戏辅助压枪软件脚本编写 更新以gitee为准&#xff1a; https://gitee.com/Mike_Zhou_Group/PUBG_Key/开源的是辅助压枪软件 不开源的是将之前的地图测距和压枪辅助整合在一起的整合包 关于地图测距&#xff1a; https://gitee.com/Mike_Zhou_G…

深度学习使用LSTM实现时间序列预测

大家好&#xff0c;LSTM是一种特殊的循环神经网络&#xff08;RNN&#xff09;架构&#xff0c;它被设计用来解决传统RNN在处理长序列数据时的梯度消失和梯度爆炸问题&#xff0c;特别是在时间序列预测、自然语言处理和语音识别等领域中表现出色。LSTM的核心在于其独特的门控机…

kafka进阶_2.存储消息

文章目录 一、存储消息介绍二、副本同步2.1、数据一致性2.2、HW在副本之间的传递 如果想了解kafka基础架构和生产者架构可以参考 kafka基础和 Kafka进阶_1.生产消息。 一、存储消息介绍 数据已经由生产者Producer发送给Kafka集群&#xff0c;当Kafka接收到数据后&#xff0c…

HTML飞舞的爱心

目录 系列文章 写在前面 完整代码 代码分析 写在后面 系列文章 序号目录1HTML满屏跳动的爱心&#xff08;可写字&#xff09;2HTML五彩缤纷的爱心3HTML满屏漂浮爱心4HTML情人节快乐5HTML蓝色爱心射线6HTML跳动的爱心&#xff08;简易版&#xff09;7HTML粒子爱心8HTML蓝色…

leetcode 3206. 交替组 I 简单

给你一个整数数组 colors &#xff0c;它表示一个由红色和蓝色瓷砖组成的环&#xff0c;第 i 块瓷砖的颜色为 colors[i] &#xff1a; colors[i] 0 表示第 i 块瓷砖的颜色是 红色 。colors[i] 1 表示第 i 块瓷砖的颜色是 蓝色 。 环中连续 3 块瓷砖的颜色如果是 交替 颜色&…

拥抱极简主义前端开发:NoCss.js 引领无 CSS 编程潮流

在前端开发的世界里&#xff0c;我们总是在不断追寻更高效、更简洁的方式来构建令人惊艳的用户界面。而今天&#xff0c;我要向大家隆重介绍一款具有创新性的工具 ——NoCss.js&#xff0c;它将彻底颠覆你对传统前端开发的认知&#xff0c;引领我们进入一个全新的无 CSS 编程时…

基于QT实现贪吃蛇

0.项目展示 1.游戏大厅界面搭建 1.1 效果展示 1.2 背景添加 通过重写paintEvent事件来绘画界面 部分窗口大小&#xff0c;标题&#xff0c;图标的优化 1.3 开始按钮 使用CSS机制&#xff0c;添加样式&#xff0c;去掉边框 1.4 跳转游戏界面 1.5 问题&#xff1a;如何实现…

Navicat 预览变更sql

需求 用了Flyway&#xff08;数据库迁移工具&#xff09;后&#xff0c;需要记录变更sql&#xff0c;所以要知道变更sql。 查看方式 Navicat提供了预览变更sql功能&#xff0c;右击表---->设计表&#xff0c;比如修改字段后&#xff0c;点击SQL预览标签页&#xff0c; 顺…

数据结构——用数组实现栈和队列

目录 用数组实现栈和队列 一、数组实现栈 1.stack类 2.测试 二、数组实现队列 1.Queue类 2.测试 查询——数组&#xff1a;数组在内存中是连续空间 增删改——链表&#xff1a;链表的增删改处理更方便一些 满足数据先进后出的特点的就是栈&#xff0c;先进先出就是队列…

【8210A-TX2】Ubuntu18.04 + ROS_ Melodic + TM-16多线激光 雷达评测

简介&#xff1a;介绍 TM-16多线激光雷达 在8210A载板&#xff0c;TX2核心模块环境&#xff08;Ubuntu18.04&#xff09;下测试ROS驱动&#xff0c;打开使用RVIZ 查看点云数据&#xff0c;本文的前提条件是你的TX2里已经安装了ROS版本&#xff1a;Melodic。 大家好&#xff0c;…

安装QT6.8(MSVC MinGW)+QT webengine+QT5.15.2

本篇主要针对只使用过QT5的qmake&#xff0c;没有用过MSVC&#xff0c;VS的老同学。 建议一部分一部分安装&#xff0c;全部勾选安装遇到问题会中断&#xff0c;前功尽弃。 我自己需要的是QT5&#xff0c;编出的软件用在公司设备上。 QT6&#xff1a;建议也安装学习&#xf…

【我在CSDN成长】我的五周年创作纪念日

感叹 五年的时光匆匆而过&#xff0c; 像一阵风&#xff0c;拂过岁月的湖面&#xff0c; 泛起层层涟漪&#xff0c;又悄然离去。 曾经的欢笑与泪水&#xff0c; 那些奋斗的日夜&#xff0c; 如同电影般在脑海中放映&#xff0c; 却已成为遥远的回忆。 五年&#xff0c;说长不长…

使用 Docker Compose 来编排部署LMTNR项目

使用 Docker Compose 来部署一个包含 Linux、MySQL、Tomcat、Nginx 和 Redis 的完整项目的例子。假设我们要部署一个简单的 Java Web 应用&#xff0c;并且使用 Nginx 作为反向代理服务器。 项目目录结构 首先需要确保 Docker 和docker-compose已经安装并正在运行。docker --v…

如何搭建一个小程序:从零开始的详细指南

在当今数字化时代&#xff0c;小程序以其轻便、无需下载安装即可使用的特点&#xff0c;成为了连接用户与服务的重要桥梁。无论是零售、餐饮、教育还是娱乐行业&#xff0c;小程序都展现了巨大的潜力。如果你正考虑搭建一个小程序&#xff0c;本文将为你提供一个从零开始的详细…

Spring Boot教程之十: 使用 Spring Boot 实现从数据库动态下拉列表

使用 Spring Boot 实现从数据库动态下拉列表 动态下拉列表&#xff08;或依赖下拉列表&#xff09;的概念令人兴奋&#xff0c;但编写起来却颇具挑战性。动态下拉列表意味着一个下拉列表中的值依赖于前一个下拉列表中选择的值。一个简单的例子是三个下拉框&#xff0c;分别显示…

数据结构 【双向哨兵位循环链表】

链表的结构分为8中&#xff0c;其实搞懂了单链表和双向哨兵位循环链表&#xff0c;这部分的知识也就掌握的差不多了。双向哨兵位循环链表的结构如下&#xff1a; 下面我从0构建一个双向哨兵位循环链表。 1、准备工作 构建节点结构体&#xff0c;双向循环链表的每一个…

RabbitMQ的交换机总结

1.direct交换机 2.fanout交换机

MVC、EL、JSTL

1.MVC设计模式 三层&#xff1a; MVC&#xff1a; M&#xff08;Model&#xff09;模型&#xff1a;负责业务逻辑处理&#xff0c;数据库访问。 V&#xff08;View&#xff09;视图&#xff1a;负责与用户交互。 C&#xff08;Controller&#xff09;控制器&#xff1a;负责流程…

《Python基础》之函数的用法

一、简介 在 Python 中&#xff0c;函数是一段可重用的代码块&#xff0c;用于执行特定的任务。函数可以帮助你将代码模块化&#xff0c;提高代码的可读性和可维护性。 函数的用途 代码重用&#xff1a;通过函数&#xff0c;你可以将常用的代码块封装起来&#xff0c;避免重复…