Netty系列教程之NIO基础知识

近30集的孙哥视频课程,看完一集整理一集来的,内容有点多,请大家放心食用~

1. 网络通讯的演变

1.1 多线程版网络通讯

在传统的开发模式中,客户端发起一个 HTTP 请求的过程就是建立一个 socket 通信的过程,服务端在建立连接之后,会创建单独的线程来处理当前请求。如下图所示:
在这里插入图片描述
其中,客户端示例代码如下:

Socket socket = new Socket("127.0.0.1",8080);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
printWriter.write("send data to server");

服务端示例代码如下:

ServerSocket serverSocket = new ServerSocket(8080);
Socket socket = null;
while (true) {socket = serverSocket.accept();// 每一个消息都单独创建一个线程去处理new Thread(new MsgServerHandler(socket)).start();
}

随着越来越多的请求发起,按上述模式,服务端会 对每一个请求单独创建线程 处理:
在这里插入图片描述
在这种模式下,会存在以下几个问题:

  1. 线程创建开销:线程是通过 JVM 调用操作系统来创建;
  2. 内存占用高:线程是占用存储资源的;
  3. CPU使用率高:(CPU轮转)线程之间上下文切换;

1.2 线程池版网络通讯

为了解决传统网络通讯开发所带来的问题,可通过在服务端 创建线程池 的方式来使得线程的创建可控(不能来一个请求就创建一个线程去处理);

在这里插入图片描述
服务端示例代码如下:

// 使用线程池,预先创建线程
static{threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 20, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
}ServerSocket serverSocket = new ServerSocket(8080);
Socket socket;
while (true){socket = serverSocket.accept();// 通过提交到线程池处理threadPoolExecutor.execute(new MsgServerHandler(socket));
}

这样一来就解决了传统网络开发中的3个问题,但是又带来了新的问题:

  • 当连接池中的线程被占用(由于客户端等待输入或其它操作导致)完,新的请求不能获取到线程,需要进入到队列中进行 等待 ~

1.3 NIO非阻塞网络通讯

可使用 NIO 来解决上述阻塞的问题,在整个数据传输过程中,NIO 与上述两种通讯方式存在以下区别:

  1. 传统的数据传输方式采用 流(inputStream、outputStream)NIO 采用 管道(channel) 来进行数据传输;
  2. NIO服务端 除了使用 ServerSocket 外,在网络编程中NIO 还引入了 选择器(selector)

在这里插入图片描述

在引入 selector 之后,服务端能对客户端的 Channel 进行监控,如果能正常读写,则分配线程处理,反之发现某些客户端 阻塞 之后,selector 可以释放已分配给当前 Channel 的线程供其它 Channel 使用;

2. NIO的两个核心

2.1 Channel简介

ChannelIO 的通讯管道,类似于 InputStreamOutputStream ,但没有方向性(流有方向性);

在这里插入图片描述

常见的Channel

  • 文件操作:
    • FileChannel :读写文件中的数据;
  • 网络操作:
    • SocketChannel :通过TCP读写网络中的数据;
    • ServerSocketChannel :监听新进来的 TCP 连接,并对每一个连接都创建 SocketChannel
    • DatagramChannel :通过 UDP 读写网络中的数据;

2.2 Buffer简介

Channel 读取或写入的数据,都要写到Buffer中,才可以被程序操作!
在这里插入图片描述

在文件读取的过程中,由于 Channel 没有方向性,所以 Buffer 为了区分读写,引入了 读模式写模式 进行区分(均站在程序的角度来看),文件通过管道( Channel )将数据存入缓存( Buffer )中供程序操作使用!

  • 读模式:将文件数据读取到程序(flip());
  • 写模式:将程序中的数据保存到文件(新创建,clear(),compact);

读、写模式只能存在一个,默认新创建为写模式!

常见的Buffer

  • ByteBuffer:应用最广泛的Buffer;
  • CharBuffer:
  • DoubleBuffer:
  • FloatBuffer:
  • IntBuffer:
  • LongBuffer:
  • ShortBuffer:
  • MapperByteBuffer:ByteBuffer的子类,用于直接内存操作!

3. 初识NIO程序

此处我们通过读取一个文件数据的程序来加深理解 ChannelBuffer ~

准备好我们的测试文件 data.txt ,并写入以下的测试数据:

1234567890

创建 Channel 的方式有以下几种

  1. FileInputStream
  2. RandomAccessFile
  3. FileChannel.open()

3.1 FileInputStream实现

创建并运行以下测试代码:

public class NIOTest {public static void main(String[] args) throws IOException {// 1. 创建Channel FileChannelFileChannel channel = new FileInputStream("D:\\Rivamed\\awesome\\message\\data.txt").getChannel();// 2. 创建缓冲区,此处分配了10字节ByteBuffer byteBuffer = ByteBuffer.allocate(10);// 3. 将读取的数据放入缓冲区channel.read(byteBuffer);// 4. 通过程序读取Buffer中的内容,设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while (byteBuffer.hasRemaining()){byte b = byteBuffer.get();System.out.println("(char)b = " + (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}
}

上述代码能正常运行并读取打印 data.txt 文件中的内容,但如果 data.txt 中的内容超过10位,如:

1234567890abc

则上述代码将不能打印出 abc 这三个字符,原因是我们设置的 ByteBuffer 缓冲区大小为 10 个字节,程序在第一次读取完之后就正常结束了!显然不符合预期~我们需要改造成以下代码来循环读取!

public class NIOTest {public static void main(String[] args) throws IOException {// 1. 创建Channel FileChannelFileChannel channel = new FileInputStream("D:\\Rivamed\\awesome\\message\\data.txt").getChannel();// 2. 创建缓冲区,此处分配了10字节ByteBuffer byteBuffer = ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区,read为实际读取的字节数,如果没有内容则返回 -1int read = channel.read(byteBuffer);if(read == -1) break;// 4. 通过程序读取Buffer中的内容,设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while (byteBuffer.hasRemaining()){byte b = byteBuffer.get();System.out.println("(char)b = " + (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}}
}

3.2 RandomAccessFile实现

使用 RandomAccessFile 实现文件读取的代码如下所示,并添加异常处理等:

public class NIOTest2 {public static void main(String[] args) {FileChannel fileChannel = null;try {// 1. 创建Channel FileChannelfileChannel = new RandomAccessFile("D:\\Rivamed\\awesome\\message\\data.txt","rw").getChannel();// 2. 创建缓冲区,此处分配了10字节ByteBuffer byteBuffer = ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区,read为实际读取的字节数,如果没有内容则返回 -1int read = fileChannel.read(byteBuffer);if(read == -1) break;// 4. 通过程序读取Buffer中的内容,设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while(byteBuffer.hasRemaining()){byte b = byteBuffer.get();System.out.println("(char)b = " + (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}}catch (Exception e){e.printStackTrace();}finally {// 7. 关闭通道if(fileChannel!=null){try {fileChannel.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}

3.3 FileChannel.open()实现

使用 FileChannel.open() 实现文件读取的代码如下所示:

public class NIOTest3 {public static void main(String[] args) {FileChannel fileChannel = null;try {// 1. 创建Channel FileChannelfileChannel = FileChannel.open(Paths.get("D:\\Rivamed\\awesome\\message\\data.txt"), StandardOpenOption.READ);// 2. 创建缓冲区,此处分配了10字节ByteBuffer byteBuffer = ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区,read为实际读取的字节数,如果没有内容则返回 -1int read = fileChannel.read(byteBuffer);if(read == -1) break;// 4. 通过程序读取Buffer中的内容,设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while(byteBuffer.hasRemaining()){byte b = byteBuffer.get();System.out.println("(char)b = " + (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}}catch (Exception e){e.printStackTrace();}finally {// 7. 关闭通道if(fileChannel!=null){try {fileChannel.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}

3.4 使用try-resource重构

JDK1.7 之后引入了 try-resource 的机制,它能帮我们自动完成在 finally 块中对资源的关闭操作,如下为改造之后的代码示例:

public class NIOTest4 {public static void main(String[] args) {// 1. 创建Channel FileChanneltry(FileChannel fileChannel = FileChannel.open(Paths.get("D:\\Rivamed\\awesome\\message\\data.txt"), StandardOpenOption.READ)) {// 2. 创建缓冲区,此处分配了10字节ByteBuffer byteBuffer = ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区,read为实际读取的字节数,如果没有内容则返回 -1int read = fileChannel.read(byteBuffer);if(read == -1) break;// 4. 通过程序读取Buffer中的内容,设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while(byteBuffer.hasRemaining()){byte b = byteBuffer.get();System.out.println("(char)b = " + (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}}
}

注意

 ByteBuffer byteBuffer = ByteBuffer.allocate(10);

ByteBuffer 一旦定义就不能动态调整

4. ByteBuffer详解

4.1 ByteBuffer 主要实现类

ByteBuffer抽象类 ,它的主要实现类为:

  1. HeadByteBuffer:用的是 JVM 内的堆内存,受 GC (堆内存不够时)的影响,在 IO 操作中效率不高;

    在这里插入图片描述

  2. MappedByteBuffer(DirectByteBuffer):用的操作系统上的内存,一步操作文件系统(不会受到 GC 影响,但可能造成内存泄漏);

    在这里插入图片描述

内存泄漏和内存溢出的区别

  • 内存泄漏 :已分配的内存 没有正常释放 或者存在 内存碎片 ,导致后续处理过程中出现所需内存不足的情况;

    在这里插入图片描述

  • 内存溢出 :程序运行或者处理时需要用到的内存大于能提供的最大内存的情况;

    在这里插入图片描述

4.2 ByteBuffer 核心结构

ByteBuffer 是一个类似数组的结构,在整个结构中包括三个主要的状态:

  1. Capacity :缓存的容量,类似于数组中的size;
  2. Position :当前缓存的下标,在读取时记录当前读取的位置,在写操作的时候记录写的位置,从0开始,每读取一次,下标+1;
  3. Limit :读写限制,在读写操作时,帮我们限制了能读多少数据和还能写多少数据;

读写的本质就是 Position 和 Limit 的相互作用,如下如所示:

在这里插入图片描述

不同写模式设置的区别

  • 调用 byteBuffer.clear() 设置写模式:
    在这里插入图片描述

  • 调用 compact() 设置写模式:

    在这里插入图片描述

4.3 ByteBuffer 核心API

使用 ByteBuffer 无非就是数据的存取,即往 buffer 中写和从 buffer 中读;

  • 写入数据(创建ByteBuffer、clear()、compact())的方法包含:

    1. channel 的 read 方法:从文件、IO中往buffer中写数据;1. channel.read(buffer)
    2. buffer 的 put() 方法:直接写入byte数据;1. buffer.put(byte)2. buffer.put(byte[])
    
  • 读取数据(flip())

    1. channel 的 write 方法(从buffer中读数据并往文件写,与上述read相反)
    2. buffer 的 get() 方法,每调用一次会影响 Position 的位置;
    3. rewind() 方法,可以将Position重置为0,用于复读数据;
    4. mark()&reset() 方法,通过mark标记Position,通过reset方法调回标记,从新执行;
    5. get(i) 方法,获取特定Position位置上的数据,但是不会对Position位置产生影响且不受读写模式的影响;
    

4.4 ByteBuffer 字符串操作

  • 字符串存储到buffer中:

    public static void main(String[] args) {ByteBuffer byteBuffer = ByteBuffer.allocate(10);// 调用 string 的 getBytes() 方法即可;byteBuffer.put("Lannis".getBytes());// 设置读模式byteBuffer.flip();while (byteBuffer.hasRemaining()){System.out.println("byteBuffer = " + (char)byteBuffer.get());}byteBuffer.clear();
    }
    

    也可使用字符集编码处理:

    // 将字符串按指定字符集编码之后存储到 ByteBuffer 中
    public static void main(String[] args) {//使用encode方法创建ByteBufferByteBuffer byteBuffer = StandardCharsets.UTF_8.("lannis");// 如果使用encode方法时,已经自动设置了读模式,需要省略flip();// 如果此处加上flip(),limit会设置为上一次的position位置,而上一次position为0,进而导致数据获取不到;// byteBuffer.flip();while (byteBuffer.hasRemaining()){byte b = byteBuffer.get();System.out.println("byteBuffer = " + (char)byteBuffer.get());}byteBuffer.clear();
    }
    

    或者使用 ByteBuffer.wrap() 方法:

    public static void main(String[] args) {ByteBuffer byteBuffer = ByteBuffer.wrap("lannis".getBytes());// 在使用wrap方法时,已经自动设置了读模式,此处需要省略flip()// 如果此处加上flip(),limit会设置为上一次的position位置,而上一次position为0// byteBuffer.flip();while (byteBuffer.hasRemaining()){System.out.println("byteBuffer = " + (char)byteBuffer.get());}byteBuffer.clear();
    }
    
  • Buffer中的数据转为字符串

    public static void main(String[] args) {// 使用encode方法创建ByteBufferByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("文明和谐");// 使用decode方法解码CharBuffer decode = StandardCharsets.UTF_8.decode(byteBuffer);System.out.println("decode = " + decode);
    }
    

粘包和半包

  • 粘包:当前接受的数据包含下一次数据的内容;

  • 半包:当前接受的数据不完整;

5. NIO的开发使用

5.1 文件读取

读取文件的代码在上面第三节已经演示过,此处不再复述;

5.2 文件写入

以下为将数据写入文件的代码示例:

public static void main(String[] args) throws IOException {// 1. 获得Channel,可通过 FileOutputStream/RandomAccessFile 获得FileChannel data = new FileOutputStream("data").getChannel();// 2. 获得bufferByteBuffer lannis = StandardCharsets.UTF_8.encode("lannis");// 3. 写文件data.write(lannis);
}

5.3 文件复制

  • 使用输入输出流实现:

    public static void main(String[] args) throws IOException {//data1 -> data2FileInputStream fileInputStream = new FileInputStream("data1");FileOutputStream fileOutputStream = new FileOutputStream("data2");byte[] buffer = new byte[1024];while (true){int read = fileInputStream.read(buffer);if(read == -1)break;fileOutputStream.write(buffer,0,read);}
    }
    
  • 使用commons-io实现:

    // 这里引入commons-io依赖
    public static void main(String[] args) throws IOException {//data1 -> data2FileInputStream fileInputStream = new FileInputStream("data1");FileOutputStream fileOutputStream = new FileOutputStream("data2");IOUtils.copy(fileInputStream,fileOutputStream)
    }
    
  • 使用NIO方式实现(零拷贝~效率高):

    public static void main(String[] args) throws IOException {FileChannel from = new FileInputStream("data1").getChannel();FileChannel to = new FileOutputStream("data2").getChannel();from.transferTo(0,from.size(),to);
    }
    

注意:
需要注意的是,NIO传输存在文件大小上限,最大支持 2G-1kb ,当实际文件大小超过 2GB 之后,只能进行分段拷贝:

  public static void main(String[] args) throws IOException {FileChannel from = new FileInputStream("data1").getChannel();FileChannel to = new FileOutputStream("data2").getChannel();// 还剩多少没有拷贝long left = from.size();while (left > 0){left = left - from.transferTo(from.size()-left,left,to);}
}

6. NIO网络编程

6.1 代码示例

NIO 网络编程中,服务端 中用于接受请求的是 ServerSocketChannel ,进行实际通信的是 SocketChannel ,以下为创建服务端和客户端的相关代码:

/*创建服务端*/
public class NIOServer {public static void main(String[] args) throws IOException {// 创建 ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));// 用于保存建立的连接List<SocketChannel> socketChannels = new ArrayList<>();ByteBuffer buffer = ByteBuffer.allocate(20);// 接受客户端的连接while (true){System.out.println("等待客户端连接... ");// socketChannel 代表服务端和客户端连接的一个通道【连接阻塞】SocketChannel socketChannel = serverSocketChannel.accept();System.out.println("已于客户端建立连接...:"+socketChannel);socketChannels.add(socketChannel);// 客户端与服务端通信过程for (SocketChannel channel : socketChannels) {System.out.println("开始接受处理客户端数据...");// 读取客户端提交的数据【IO阻塞】channel.read(buffer);// 设置读模式buffer.flip();// 打印输出接收到的消息System.out.println("客户端消息: " + StandardCharsets.UTF_8.decode(buffer));// 设置写模式buffer.clear();System.out.println("通信已结束...");}}}
}
/*NIO客户端*/
public class NIOClient {public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel = SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(StandardCharsets.UTF_8.encode("Test"));// 此处断点,服务端会出现IO阻塞的情况System.out.println("--------------------------------------------------------");}
}

6.2 阻塞问题

在上述代码运行过程中,服务端存在以下两个阻塞的情况:

  • ServerSocketChannel 阻塞:服务端等待客户端连接,accept() 方法存在阻塞;

    // 可通过设置 ServerSocketChannel 为非阻塞
    serverSocketChannel.configureBlocking(false);
    

    设置完之后,serverSocketChannel.accept() 在没有客户端连接的时候,返回值为 null :
    在这里插入图片描述

    同时,如果 socketChannel 不为 null 的时候,放入上述 list 中才有意义,需要进行判断:

    ...
    // 只有不为空的时候才添加客户端
    if(socketChannel!=null){socketChannels.add(socketChannel);    
    }
    ...
    
  • SocketChannel 阻塞:客户端IO通信的阻塞,channel.read() 方法存在阻塞;

     // 设置 socketChannel 非阻塞
    socketChannel.configureBlocking(false);
    

修改调整过后的代码如下(主要在服务端修改):

public class NIOServer {public static void main(String[] args) throws IOException {// 创建 ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));// 用于保存建立的连接List<SocketChannel> socketChannels = new ArrayList<>();ByteBuffer buffer = ByteBuffer.allocate(20);// 接受客户端的连接while (true){System.out.println("等待客户端连接... ");// socketChannel 代表服务端和客户端连接的一个通道【连接阻塞】SocketChannel socketChannel = serverSocketChannel.accept();System.out.println("已于客户端建立连接...:"+socketChannel);if(socketChannel!=null){// 设置 socketChannel 非阻塞socketChannel.configureBlocking(false);socketChannels.add(socketChannel);}// 客户端与服务端通信过程for (SocketChannel channel : socketChannels) {System.out.println("开始接受处理客户端数据...");// 读取客户端提交的数据【网络通讯IO阻塞】channel.read(buffer);// 设置读模式buffer.flip();// 打印输出接收到的消息System.out.println("客户端消息: " + StandardCharsets.UTF_8.decode(buffer));// 设置写模式buffer.clear();System.out.println("通信已结束...");}}}
}

::: tip 存在的问题

上述代码虽然解决的 ServerSocketChannelSocketChannel 阻塞的问题,但是存在 空转、死循环 ,会进一步导致CPU占用过高的问题;

故需要引入一个类似于 监管者 的角色(也就是后文的 selector),用来监管连接的创建和IO的通讯,即 ServerSocketChannelSocketChannel

:::

7. Selector

7.1 基础介绍

在引入 selector 之前,需要对它有一个大概的了解。

selector 并不会实时监管所有的 ServerSocketChannelSocketChannel ,而是在以下(常用)的特定场景(状态)下才会被监管:

  • accept():ServerSocketChannel 的连接建立;
  • read():SocketChannel 中的读操作;
  • write():SocketChannel 中的写操作;
  • connect():主要用于客户端中;

并且在实际使用时, selector 只有在非阻塞的情况下才生效,也就是需要添加以下配置才生效:

// 设置 ServerSocketChannel 为非阻塞
serverSocketChannel.configureBlocking(false);
// 设置 SocketChannel 为非阻塞
socketChannel.configureBlocking(false);

另外,还需要了解在 selector 中的两个重要属性:

  • keys :将需要监控的所有的 Channel 都注册到这个 keys 属性中;

    通过 channel.register() 配置 selector;
    通过 interestOps 配置需要监控的状态;
    
  • selectionKeys :存储的是实际发生以上监控状态的 Channel

    通过 selector.select() 去监听发生的特定状态的Channel;
    当监听到特定事件之后,会将 keys 中的 Channel 移动到 selectionKeys 中。
    后续就可以通过 selectedKeys() 方法获取并处理特定 Channel 事件;
    

由于 SelectionKey 中存在的 Channel 可能是 ServerSocketChannel 或者 SocketChannel,故在后续业务处理中,需要使用以下方法进行区分:

  • key.isAcceptable():如果返回true,则表示当前 selectKey 缓存的是 ServerSocketChannel 对象;

  • key.isReadable():如果返回true,则表示当前 selectKey 缓存的是 SocketChannel 对象;

  • key.isWritable():如果返回true,则表示当前 selectKey 缓存的是 SocketChannel 对象;

7.2 创建连接代码示例

为了进一步说明理解 selector ,接下来,我们将一步一步的结合代码进行测试演示,首先创建服务端的代码,并设置为非阻塞模式:

/*服务端代码*/
public class NIOServer {public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));}}
}

接着,引入 selector :

/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector = Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中,返回 selectKeySelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控,只有监控到有实际连接或读写操作才会处理selector.select();}}
}

此时的状态

运行上述服务端代码之后,在上述 12 行代码前后进行断点,执行注册代码前:

在这里插入图片描述

执行注册代码后:

在这里插入图片描述

接着代码正常运行,会在17行阻塞住,一直等待客户端的连接:
在这里插入图片描述

完成上述配置操作后,我们可以启动客户端进行连接,上述 selector.select() 方法在没有客户端连接发生的时候,会一直处于等待的状态,一但有连接发生,它就会将 keys 中监控的当前连接 Channel 复制到 selectionKeys 中,:

在这里插入图片描述
接下来,添加对 selectedKeys 中的数据进行处理的方法:

/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector = Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中,返回 selectKeyselector = {WindowsSelectorImpl@910}SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控,只有监控到有实际连接或读写操作才会处理selector.select();// 获取所有有效的SelectionKey(需要使用iterator遍历,因为后续会删除,不能使用for循环,for循环不能删除)Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();// 只有在确认有有效状态的情况下,才会进行以下循环,避免了空转和死循环的问题while (iterator.hasNext()) {SelectionKey key = iterator.next();// 用完之后务必删除,否则会出现空指针iterator.remove();// 获取对应的 Channelif (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();// 或者直接使用上述创建好的 serverSocketChannel// SocketChannel socketChannel = serverSocketChannel.accept();System.out.println("channel = " + socketChannel);}}}}
}

启动客户端之后,服务端正常输出连接信息:
在这里插入图片描述

7.3 服务端读消息代码示例

针对客户端的写事件,需要在连接之后进行创建:

/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector = Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中,返回 selectKeyselector = {WindowsSelectorImpl@910}SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控,只有监控到有实际连接或读写操作才会处理selector.select();// 获取所有有效的SelectionKey(需要使用iterator遍历,因为后续会删除,不能使用for循环,for循环不能删除)Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();// 只有在确认有有效状态的情况下,才会进行以下循环,避免了空转和死循环的问题while (iterator.hasNext()) {SelectionKey key = iterator.next();// 用完之后务必删除,否则会出现空指针iterator.remove();// 获取对应的 Channelif (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();// 或者直接只有上述创建好的 serverSocketChannel// SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey register = socketChannel.register(selector, 0, null);register.interestOps(SelectionKey.OP_READ);System.out.println("channel = " + socketChannel);}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(20);socketChannel.read(byteBuffer);// 设置读模式byteBuffer.flip();System.out.println("msg = " + StandardCharsets.UTF_8.decode(byteBuffer));}}}}
}

正常运行客户端服务端之后,服务端打印输出如下:
在这里插入图片描述

客户端示例代码如下:

/*NIO客户端*/
public class NIOClient {public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel = SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(StandardCharsets.UTF_8.encode("Test"));System.out.println("--------------------------------------------------------");}
}

注意,当客户端发送的数据长度大于服务端Buffer的长度时:

  1. 客户端只发送一次数据;
  2. 服务端会多次调用 select() 方法多次处理,直到当前消息处理完毕之后,整个流程才算结束;

在某些特殊操作下,服务器端无法处理,select() 方法就会频繁调用(如在客户端非正常关闭会发送-1的状态,服务端处理不了会一直进行 select() 方法的调用),可通过调用 selectKey.cancel() 来调用,修改调整以下代码:

...
}else if(key.isReadable()){try{// 读 SocketChannelSocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(20);int read = socketChannel.read(byteBuffer);if(read == -1){key.cancel();}else{// 设置读模式byteBuffer.flip();System.out.println("msg = " + StandardCharsets.UTF_8.decode(byteBuffer));}}catch (Exception e){e.printStackTrace();key.cancel();}
}
...

7.4 半包和粘包

一旦buffer缓冲区设置不合理,就会出现半包和粘包的问题(第6章节的代码亦是),例如以下客户端像服务端发生Hello World消息的代码:

/*客户端代码*/
public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel = SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));// 发送数据socketChannel.write(StandardCharsets.UTF_8.encode("Hello World"));socketChannel.close();
}

为了能演示出效果,此时服务端的代码如下所示:

/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口(客户端通过网络进行访问的时候需要IP和端口)serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector = Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中,返回 selectKeyselector = {WindowsSelectorImpl@910}SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控,只有监控到有实际连接或读写操作才会处理selector.select();// 获取所有有效的SelectionKey(需要使用iterator遍历,因为后续会删除,不能使用for循环,for循环不能删除)Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();// 只有在确认有有效状态的情况下,才会进行以下循环,避免了空转和死循环的问题while (iterator.hasNext()) {SelectionKey key = iterator.next();// 用完之后务必删除,否则会出现空指针iterator.remove();// 获取对应的 Channelif (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();// 或者直接只有上述创建好的 serverSocketChannel// SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey register = socketChannel.register(selector, 0, null);register.interestOps(SelectionKey.OP_READ);System.out.println("channel = " + socketChannel);}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(10);int read = socketChannel.read(byteBuffer);if(read == -1){key.cancel();}else{// 设置读模式byteBuffer.flip();System.out.println("msg = " + StandardCharsets.UTF_8.decode(byteBuffer));}}}}}
}

注意上述第 39 行代码,我们分配的 ByteBuffer 大小为 10 个字节,此时运行服务端和客户端之后,服务端会打印输出以下内容:

在这里插入图片描述

从运行结果分析可以发现,客户端明明只发了一次消息,但是服务端却打印出两条消息,这显然是不符合业务要求的。

Hello World 为11个字节长度,此处我们有两种解决方式可选:

  1. 修改 ByteBuffer 的大小为11,保证能接收到客户端发送的消息;( ByteBuffer 一旦定义就不可修改,故此方法不可靠)
  2. 通过分割符来甄别一条完整的消息,以此解决半包和粘包的问题;(以下为详细的解决办法)

为了能完整获取客户端发送的数据,需要进行一些数据处理,例如添加分隔符(分隔符的目的是为了甄别一条完整的信息),引入以下方法:
/*解决半包粘包问题*/
private static void doLineSplit(ByteBuffer byteBuffer) {// 设置读模式byteBuffer.flip();for (int i = 0; i < byteBuffer.limit(); i++) {if ('\n' == byteBuffer.get(i)) {int length = i + 1 - byteBuffer.position();ByteBuffer target = ByteBuffer.allocate(length);// 取数据for (int j = 0; j < length; j++) {target.put(byteBuffer.get());}// 设置读模式target.flip();System.out.println("StandardCharsets.UTF_8.decode(target) = " + StandardCharsets.UTF_8.decode(target));}}byteBuffer.compact();
}

接着修改服务端读取客户端消息部分的方法:

...
}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(10);int read = socketChannel.read(byteBuffer);if(read == -1){key.cancel();}else{doLineSplit(byteBuffer);}
}
...

修改客户端发送代码,在 Hello World 后面增加 \n 如下所示:

public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel = SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));// 发送数据socketChannel.write(StandardCharsets.UTF_8.encode("Hello World\n"));socketChannel.close();
}

运行修改过后的代码,服务端输出结果如下:
在这里插入图片描述


在这里插入图片描述

为什么输出的只有 d ,前面的内容哪里去了?别慌,请听我狡辩:

在上述的 doLineSplit() 方法中,确实是能通过分隔符 \n 来获取完整的消息的,但是有一个前提就是 ByteBuffer 必须是同一个。
但是在 select() 事件监听并处理的代码中,每一次都是一个新的 ByteBuffer,还记得下面的代码吗?
在每次进入到 key.isReadable() 条件成立的方法后,我们会新建 ByteBuffer:ByteBuffer byteBuffer = ByteBuffer.allocate(10);
这样一来,就会导致select()方法两次调用处理的ByteBuffer没有关联上,第一次不会打印是因为没有读取到 \n 分隔符,在第二次读取的时候亦没有获取到前一次读取的结果,故只读取并打印到 d\n 字符;

当然,这也有解决办法,那就是将 ByteBufferChannel 绑定在一起,保证一个 Channel 多次操作中 ByteBuffer 为同一个;

还记得 SelectionKey.register(sql,ops,att) 方法吗?这个方法中有三个参数:

  1. sql:注册Channel的选择器;
  2. ops:设置要监听的状态;
  3. att:需要绑定的附件,可以为空;

我们可以通过如下设置 att 属性来给每一个 Channel 绑定一个 Channel 共享的 ByteBuffer ,修改以下服务端代码:

...if (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);// 创建共享 ByteBufferByteBuffer byteBuffer = ByteBuffer.allocate(20);SelectionKey register = socketChannel.register(selector, 0, byteBuffer);register.interestOps(SelectionKey.OP_READ);System.out.println("channel = " + socketChannel);}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel = (SocketChannel) key.channel();// 获取共享 ByteBufferByteBuffer attachment = (ByteBuffer) key.attachment();int read = socketChannel.read(attachment);if(read == -1){key.cancel();}else{doLineSplit(attachment);}}
...

存在的问题:在上述代码调整中,我们创建了共享 ByteBuffer 来保证一个 Channel 中多次操作使用同一个 ByteBuffer ,以此确保消息能够完整的处理;

但是注意,为了避免使用 compact() 方法之后ByteBuffer 中的内容超过容量大小的问题,此处我是修改了 ByteBuffer 的容量大小哦

这样一来又会带来另外一个问题,我不能动态修改 ByteBuffer 的容量大小,如果传入的消息过长怎么办?

那就需要找一个办法去扩容 ByteBuffer ~~~

7.5 ByteBuffer扩容

在这里插入图片描述

当我们调用上述 doLineSplit() 方法对客户端的消息处理完之后,需要判断 PositionLimit 的值,如果相等,则说明当前 ByteBuffer 容量不够,需要进行扩容处理,反之则跳过,示例代码如下:

...
}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(10);int read = socketChannel.read(byteBuffer);if(read == -1){key.cancel();}else{doLineSplit(byteBuffer);if(byteBuffer.position() == byteBuffer.limit()){//此时说明容量不够了,需要进行扩容ByteBuffer newByteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2);// 将原始的ByteBuffer中的数据复制到新Buffer中newByteBuffer.put(byteBuffer);// 重新绑定新ByteBufferkey.attach(newByteBuffer);}}
}
...

待优化和考虑的地方:

  1. ByteBuffer 容量不够的时候,我们进行了扩容处理,但是在后续请求中,可能接受的数据长度远远小于扩容后的大小,在多线程请求中,会造成内存浪费!除了扩容之外,还需要考虑缩容

  2. ByteBuffer 扩容时,旧 Buffer 中的数据往新 Buffer 中的数据写时,效率很低(可通过零拷贝方式解决);

  3. 为了避免频繁检索上述代码中的 \n 分隔符,可以通过头体分离的方式来保证信息完整性:

    在这里插入图片描述

7.6 服务端写消息代码示例

上述代码已经完成了服务端创建连接并读取客户端发送的数据的代码示例,接下来将继续完善服务端向客户端发送数据的功能;

此处我们在服务端和客户端连接建立之后,随即向客户端发送数据,代码如下所示:

/*NIO服务端*/
public class NIOServer {public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();iterator.remove();if (selectionKey.isAcceptable()) {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey skey = socketChannel.register(selector, SelectionKey.OP_READ);//准备数据StringBuilder sb = new StringBuilder();for (int i = 0; i < 2000000; i++) {sb.append("abcdabcd");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());while (buffer.hasRemaining()){int write = socketChannel.write(buffer);System.out.println("write = " + write);}}}}}
}
/*NIO客户端*/
public class NIOClient {public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接try (SocketChannel socketChannel = SocketChannel.open()) {// 连接服务端socketChannel.connect(new InetSocketAddress(8000));// 接受服务端数据ByteBuffer buffer = ByteBuffer.allocate(1024);int read = 0;while (true) {read += socketChannel.read(buffer);System.out.println("read = " + read);buffer.clear();}}}
}

上述代码运行之后,服务端和客户端控制台打印输出的结果如下图所示:
在这里插入图片描述

通过上面的运行结果发现,服务端发送了很多空数据,这是因为受到了发生速率的限制,为了解决这个问题,这个时候我们就可使用 isWriteable() 方法来监听 write 的状态:

...
if (selectionKey.isAcceptable()) {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey skey = socketChannel.register(selector, SelectionKey.OP_READ);//准备数据StringBuilder sb = new StringBuilder();for (int i = 0; i < 2000000; i++) {sb.append("abcdabcd");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());// 先写一次int write = socketChannel.write(buffer);System.out.println("write = " + write);// 判断是否写完if (buffer.hasRemaining()) {//说明么有写完,为当前的 SocketChannel 增加 write 的监听// READ 和 Writeskey.interestOps(skey.interestOps() + SelectionKey.OP_WRITE);// 把当前操作传给下一个操作skey.attach(buffer);}
} else if (selectionKey.isWritable()) {// 获取客户端 ChannelSocketChannel socketChannel = (SocketChannel) selectionKey.channel();// 获取 BufferByteBuffer buffer = (ByteBuffer) selectionKey.attachment();// 写操作int write = socketChannel.write(buffer);System.out.println("write = " + write);if (!buffer.hasRemaining()) {//写完了selectionKey.attach(null);selectionKey.interestOps(selectionKey.interestOps() - SelectionKey.OP_WRITE);}
}
...

这样一来,运行改动过后的代码,服务端就不会发生过多的空数据,进而提高了服务端的处理消息的能力,服务端输出结果如下:
在这里插入图片描述

8. Reactor 模式

8.1 单线程模式

在这里插入图片描述
在单线程模式中,客户端的连接以及后续的读写操作都是由一个线程来完成的,存在效率低的问题;

8.2 主从多线程模式

在这里插入图片描述
在这种模式下,将客户端连接相关的交由一个独立的(图中Boss)线程处理,后续读写操作交由其它(图中Worker)线程处理;

8.3 代码实现

参照上述主从多线程模式的图例,我们需要将IO的读写操作用单个 Worker 线程来处理,故我们首先需要创建 Worker 线程类:

// Worker 线程类
public class Worker implements Runnable {private final String name;private Selector selector;// 多线程环境下的状态需要增加 volatileprivate volatile boolean created;// 为了传递线程间的变量private final ConcurrentLinkedDeque<Runnable> concurrentLinkedDeque = new ConcurrentLinkedDeque<>();public Worker(String name) {this.name = name;}public void register(SocketChannel sc) throws IOException {if (!created){// 每个 Worker 创建一个线程Thread thread = new Thread(this, name);selector = Selector.open();thread.start();created = true;}// 放到一个线程中保证有序执行concurrentLinkedDeque.add(()->{try {sc.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});// 唤醒阻塞的select.select()selector.wakeup();}@Overridepublic void run() {while (true) {try {selector.select();Runnable poll = concurrentLinkedDeque.poll();if(poll!=null){poll.run();}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey scKey = iterator.next();iterator.remove();if (scKey.isReadable()) {SocketChannel socketChannel = (SocketChannel) scKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(30);socketChannel.configureBlocking(false);socketChannel.read(byteBuffer);byteBuffer.flip();System.out.println("Message = " + StandardCharsets.UTF_8.decode(byteBuffer));byteBuffer.clear();}}} catch (IOException e) {throw new RuntimeException(e);}}}
}

接着修改服务端的代码:

// 服务端代码
public class ReactorBossServer {public static void main(String[] args) throws IOException, InterruptedException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8000));Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);// 模拟多线程,此处示例为2个Worker[] workers = new Worker[2];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker"+i);}AtomicInteger index = new AtomicInteger();while (true) {// 监控连接selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();iterator.remove();if (selectionKey.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);// hash取模 x%2 结果0或1workers[index.getAndIncrement()%workers.length].register(socketChannel);}}}}
}

最后运行,当多个客户端连接之后,服务端轮转进行处理!(此处自行进行代码测试)

9. 零拷贝

在没有任何优化操作前,以读取文件数据在到写数据的流程为例进行数据拷贝的分析,如下图所示:

在这里插入图片描述

在调用 Read() 方法之后,JVM 会通知操作系统,由操作系统调用操作文件相关的 API 来读取硬盘上的数据,随后将数据存储在操作系统的内存 (高速页缓存/内核缓冲区)中,进而过渡传递到 JVM 中的应用缓存【做了2次数据的拷贝】;同理,在调用 write() 写数据时也发生了两次数据拷贝,整个操作下来发生了【4次数据拷贝】,故此效率偏低;

9.1 内存映射

NIO 中有个 内存映射 的概念,通过内存映射可以将 高速页缓存 中的数据 共享应用缓存 ,同时减少了数据拷贝的次数,示例图如下:

在这里插入图片描述
在代码中可使用以下代码创建直接缓冲区:

ByteBuffer.allocateDirect(10);

内存映射 主要用于文件的操作;
使用直接内存的好处如上图所示,就是减少了数据拷贝的次数,但是带来的问题就是需要手动进行内存析构,否则会造成内存浪费

9.2 零拷贝

零拷贝:不涉及到虚拟机内存的拷贝;

Linux2.1Linux2.4 内核中,存在 sendFile() 方法,其两者的拷贝区别如下:

在这里插入图片描述
可以看出在 Linux2.4 的内核中,拷贝次数比 Linux2.1 又少了1次,效率又提高了;

Java 中使用 file.transferTo()file.transferFrom() 方法即可调用 sendFile() 方法;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VR全景图片如何拍摄制作,拍摄制作过程中要注意什么?

引言&#xff1a; VR全景图片就是通过专业的相机设备捕捉到的一个空间的高清图像&#xff0c;再经过专业工具进行拼合&#xff0c;呈现出一种环绕式的视觉效果。想象一下&#xff0c;当你站在一个完全真实的环境中&#xff0c;可以自由地转动视角&#xff0c;看到四周的景色&a…

高数定理集合啦

haha~ 最近在准备数学竞赛&#xff0c;好久没有发布笔记啦&#xff0c;今天就来一波高数里常用的定理吧&#xff0c;不全面的话后续会更新哒~ 费马定理&#xff1a;对于一个函数的局部极值点&#xff0c;如果导数存在&#xff0c;那么导数在该点处必须为零&#xff0c;即 f(x)0…

SQL数据库管理工具RazorSQL mac中文版特点与功能

RazorSQL mac是一款功能强大的SQL数据库管理工具&#xff0c;它支持多种数据库&#xff0c;包括MySQL、Oracle、Microsoft SQL Server、SQLite、PostgreSQL等。 RazorSQL mac 软件特点和功能 多种数据库支持&#xff1a;RazorSQL支持多种数据库&#xff0c;用户可以通过一个工…

基于R语言的Meta分析【全流程、不确定性分析】方法与Meta机器学习高级应用

查看原文>>>【案例教程】基于R语言的Meta分析【全流程、不确定性分析】方法与Meta机器学习高级应用 Meta分析是针对某一科研问题&#xff0c;根据明确的搜索策略、选择筛选文献标准、采用严格的评价方法&#xff0c;对来源不同的研究成果进行收集、合并及定量统计分析…

安装Elasticsearch步骤(包含遇到的问题及解决方案)

注&#xff1a;笔者是在centos云服务器环境下安装的Elasticsearch 目录 1.安装前准备 2.下载Elasticsearch 3.启动Elasticsearch 非常容易出问题 第一次运行时&#xff0c;可能出现如下错误&#xff1a; 一、内存不足原因启动失败 二、使用root用户启动问题 三、启动ES自…

uniapp使用uQRCode绘制二维码,下载到本地,调起微信扫一扫二维码核销

1.效果 2.在utils文件夹下创建uqrcode.js // uqrcode.js //--------------------------------------------------------------------- // github https://github.com/Sansnn/uQRCode //---------------------------------------------------------------------let uQRCode {…

第二证券:A股三季报披露全面启动 多领域上市公司业绩表现亮点纷呈

A股上市公司三季报宣告全面发动。Wind数据闪现&#xff0c;到10月17日记者发稿&#xff0c;来自沪深北三大交易所近80家上市公司首要晒出了最新运营效果体现的“效果单”。本周&#xff0c;相关财报宣告家数也将增至270家左右。与此同时&#xff0c;10月以来&#xff0c;亦有不…

怎么把图片改成jpg格式?

怎么把图片改成jpg格式&#xff1f;大家都知道&#xff0c;随着计算机被发明到现在已经存在了很多年&#xff0c;在这么多的的技术发展过程中&#xff0c;也形成了种类非常多的图片文件格式&#xff0c;例如平时我们能接触到的图片格式有jpg、png、gif、bmp、heic、tiff、jfif、…

vcpkg manifest 的使用

最近项目上要使用 CMakeLists 管理&#xff0c;由于 Windows 版本有依赖到 vcpkg 提供的库&#xff0c;所以需要使用 vcpkg manifest 来统一设置库的版本&#xff0c;方便后续维护 推荐一个文章&#xff0c;介绍的可以说是非常全面了 VCPKG 特性 - Versioning 不过里面也有一些…

C++标准库算法整理

目录 1、数值操作 1.1、std::accumulate 1.2、std::inner_product 1.3、std::partial_sum 1.4、std::exclusive_scan 1.5、std::inclusive_scan 1.6、std::reduce 2、相邻元素 2.1、std::adjacent_difference 2.2、std::adjacent_find 2.3、std::unique 2.4、std::u…

Apache HTTPD 换行解析漏洞(CVE-2017-15715)

Apache HTTPD是一款HTTP服务器&#xff0c;它可以通过mod_php来运行PHP网页。其2.4.0~2.4.29版本中存在 一个解析漏洞&#xff0c;在解析PHP时&#xff0c;1.php\x0a将被按照PHP后缀进行解析&#xff0c;导致绕过一些服务器的安全策略。 影响范围 apache &#xff1a;2.4.0~2.…

易点易动设备管理系统帮助生产企业提升设备巡检效率

在现代制造业中&#xff0c;设备的正常运行对于生产企业的成功至关重要。然而&#xff0c;设备巡检是确保设备安全性和可靠性的关键环节&#xff0c;但却常常耗费大量时间和资源。为了解决这个问题&#xff0c;许多企业采用了现代化的设备管理系统&#xff0c;其中易点易动设备…

山西电力市场日前价格预测【2023-10-19】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-10-19&#xff09;山西电力市场全天平均日前电价为210.83元/MWh。其中&#xff0c;最高日前电价为337.00元/MWh&#xff0c;预计出现在18: 30。最低日前电价为0.00元/MWh&#xff0c;预计出…

在Maven中配置代理服务器的详细教程

在Maven中配置代理服务器的详细教程如下&#xff1a; 首先&#xff0c;确保您已经安装了Maven。创建一个新的Maven项目。在命令行中输入以下命令&#xff1a; mvn archetype:generate -DgroupIdcom.example -DartifactIdmy-app -DarchetypeArtifactIdmaven-archetype-quickst…

【数据结构】Java对象的比较

作者主页&#xff1a;paper jie_博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文录入于《JAVA数据结构》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精力…

Spring framework Day21:Spring AOP 注解结合配置类示例

前言 Spring AOP是一种强大的面向切面编程工具&#xff0c;它能够帮助我们更好地处理与核心业务逻辑无关的横切关注点。通过使用注解和配置类的方式&#xff0c;我们可以更加简洁和灵活地实现AOP。 本文将以一个示例来介绍如何结合注解和配置类来使用Spring AOP。通过这个示例…

Windows 钉钉多开 dingtalkRC版

亲测可用 下载链接&#xff1a; https://dtapp-pub.dingtalk.com/dingtalk-desktop/win_installer/RC/DingTalk_v6.5.20-RC.7229101.exe

Redis消息队列

Redis是一个支持消息队列功能的内存数据库&#xff0c;可以作为消息队列系统使用。 基于List结构模拟消息队列 消息队列&#xff08;Message Queue&#xff09;&#xff0c;字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表&#xff0c;很容易模拟出队列效果…

徐建鸿:深耕中医康养的“托钵行者”

为什么是“庄人堂”&#xff1f;杭州“庄人堂”医药科技公司董事长徐建鸿很乐意和别人分享这个名称的由来&#xff0c;一方面是庄子首先提出“养生”这个概念&#xff0c;接近上工治未病的上医&#xff0c;取名“庄人堂”代表庄子门生&#xff0c;向古哲先贤致敬&#xff01;另…

Go 泛型解密:从基础到实战的全方位解析

Go泛型解密 一、概述 泛型编程是计算机科学中一个相当重要的概念&#xff0c;广泛应用于各种编程语言和框架中。在Go语言中&#xff0c;泛型的讨论和实现也走了一段相对漫长的路。这一路上既有激烈的讨论&#xff0c;也有种种的尝试和迭代。本节将对泛型的基础概念进行深入分析…