理解io/nio/netty

一、io

io即input/output,输入和输出

1.1 分类

输入流、输出流(按数据流向)
字节流(InputStream/OutputStream(细分File/Buffered))、字符流(Reader/Writer(细分File/Buffered/put))(按数据处理方式)
字节缓存流:避免频繁的io操作,缓冲区的大小默认为 8192 字节

二、字节

  • 字节:存储数据的单元
    1byte=8bit
    一个英文字母=1byte,一个汉字=2byte
  • 字符:1字符=2byte

三、nio

3.1 基本概念

  • 同步:当前任务完成前,不能做其他操作(单线程)
  • 异步:当前任务完成前,可以做其他操作(多线程)
  • 阻塞:当前任务挂起,不能做其他操作的状态(等待)
  • 非阻塞:当前任务进行中,无需挂起,可以做其他操作的状态(一心二用)

3.2 定义

bio为同步阻塞模式
nio为同步非阻塞模式,一个线程管理多个输入输出通道,涉及轮询、多路复用(一个线程不断轮询多个socket的状态,当socket有读写事件时调用io事件)

核心:channel(双向)、buffer、selector(监听通道事件)

3.3 流程

服务器端(pool)
属性:线程池、选择器selector

  • 创建一个PoolServer,
  • 初始化,并指定端口
    开通渠道ServerSocketChannel
    设置非阻塞
    绑定端口
    开通选择器
    将渠道注册到选择器
  • 监听事件
    轮询访问选择器
    处理对应的通道事件
    如果事件key状态为可接收:注册通道到选择器,设置状态为可读
    如果事件key状态为可读:将key对应通道设置为可读,线程池执行key对应的继承Thread的handler方法,重写run方法(通过key拿到通道;分配缓冲区,分配输出流;将通道读取的缓冲区内容写入输出流;将服务端回执写入通道;将通道设置可读;唤醒选择器)

3.4 应用

客户端:

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;public class Client {public static void main(String[] args) throws IOException {Socket s = new Socket("127.0.0.1", 8888);s.getOutputStream().write("HelloServer".getBytes());s.getOutputStream().flush();System.out.println("write over, waiting for msg back...");byte[] bytes = new byte[1024];int len = s.getInputStream().read(bytes);System.out.println(new String(bytes, 0, len));s.close();}
}

服务端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class Server {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888));ssc.configureBlocking(false);System.out.println("server started, listening on :" + ssc.getLocalAddress());Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {SelectionKey key = it.next();it.remove();handle(key);}}}private static void handle(SelectionKey key) {if(key.isAcceptable()) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);sc.register(key.selector(), SelectionKey.OP_READ );} catch (IOException e) {e.printStackTrace();} finally {}} else if (key.isReadable()) { //flipSocketChannel sc = null;try {sc = (SocketChannel)key.channel();ByteBuffer buffer = ByteBuffer.allocate(512);buffer.clear();int len = sc.read(buffer);if(len != -1) {System.out.println(new String(buffer.array(), 0, len));}ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes());sc.write(bufferToWrite);} catch (IOException e) {e.printStackTrace();} finally {if(sc != null) {try {sc.close();} catch (IOException e) {e.printStackTrace();}}}}}
}

服务端:pool

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class PoolServer {ExecutorService pool = Executors.newFixedThreadPool(50);private Selector selector;/**** @throws IOException*/public static void main(String[] args) throws IOException {PoolServer server = new PoolServer();server.initServer(8000);server.listen();}/**** @param port* @throws IOException*/public void initServer(int port) throws IOException {//ServerSocketChannel serverChannel = ServerSocketChannel.open();//serverChannel.configureBlocking(false);//serverChannel.socket().bind(new InetSocketAddress(port));//this.selector = Selector.open();serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务端启动成功!");}/**** @throws IOException*/@SuppressWarnings("unchecked")public void listen() throws IOException {// 轮询访问selector  while (true) {//selector.select();//Iterator ite = this.selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey) ite.next();//ite.remove();//if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();//SocketChannel channel = server.accept();//channel.configureBlocking(false);//channel.register(this.selector, SelectionKey.OP_READ);//} else if (key.isReadable()) {//key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));//pool.execute(new ThreadHandlerChannel(key));}}}}
}/**** @param* @throws IOException*/
class ThreadHandlerChannel extends Thread{private SelectionKey key;ThreadHandlerChannel(SelectionKey key){this.key=key;}@Overridepublic void run() {//SocketChannel channel = (SocketChannel) key.channel();//ByteBuffer buffer = ByteBuffer.allocate(1024);//ByteArrayOutputStream baos = new ByteArrayOutputStream();try {int size = 0;while ((size = channel.read(buffer)) > 0) {buffer.flip();baos.write(buffer.array(),0,size);buffer.clear();}baos.close();//byte[] content=baos.toByteArray();ByteBuffer writeBuf = ByteBuffer.allocate(content.length);writeBuf.put(content);writeBuf.flip();channel.write(writeBuf);//if(size==-1){channel.close();}else{//key.interestOps(key.interestOps()|SelectionKey.OP_READ);key.selector().wakeup();}}catch (Exception e) {System.out.println(e.getMessage());}}
}

四、netty

netty是JBoss提供的开源网络编程框架,提供异步的、基于事件驱动的网络应用程序框架和工具。

架构
三层网络架构,Reactor 通信调度层 -> 职责链 PipeLine -> 业务逻辑处理层

为什么选择netty

  • API使用简单,开发门槛低
  • 功能强大,预置了多种编解码功能,支持多种主流协议
  • 定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展
  • 性能高,通过与其他业界主流的nio框架对比,netty的综合性能最优
  • 成熟、稳定,netty修复了已经发现的所有的JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼
  • 社区活跃,版本迭代周期短,发现的BUGkey倍及时修复,同时更多的新功能会被加入
  • 经历了大规模的商业应用考验,质量已经得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它可以完全满足不同行业的商业应用。

4.1 流程

在这里插入图片描述
netty的接收和发送ByteBuffer采用direct buffers,使用堆外直接内存进行socket读写,不需要进行字节缓冲区的二次拷贝。(如果使用传统的堆内存进行socket读写,JVM会将堆内存buffer拷贝一份到直接内存中,然后才写入socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。)

  • 服务端:

创建服务端并指定端口,启动服务端
创建boss和worker事件组
绑定事件组到通道,并指定子处理器,初始化通道,将处理器(继承ChannelHandlerContext,重写读方法(获取信息,将信息写入上下文,关闭上下文)以及异常捕获方法(关闭上下文))加到管道的最后
绑定端口获取future

  • 客户端:

创建客户端,启动客户端
创建workers事件组
绑定事件组到通道,并指定处理器,初始化通道,将定义的客户端处理器(继承ChannelInboundHandlerAdapter,重写通道激活方法(将信息写入上下文,获取future,添加监听器,当服务端收到信息时输出提示信息)以及读方法(读取信息,最后释放信息))添加到管道的后面
绑定端口,获取future

4.2 应用

服务端

import com.mashibing.io.aio.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;public class HelloNetty {public static void main(String[] args) {new NettyServer(8888).serverStart();}
}class NettyServer {int port = 8888;public NettyServer(int port) {this.port = port;}public void serverStart() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new Handler());}});try {ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}class Handler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//super.channelRead(ctx, msg);System.out.println("server: channel read");// ByteBuf是netty的一个字节容器ByteBuf buf = (ByteBuf)msg;System.out.println(buf.toString(CharsetUtil.UTF_8));ctx.writeAndFlush(msg);ctx.close();//buf.release();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//super.exceptionCaught(ctx, cause);cause.printStackTrace();ctx.close();}
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;public class Client {public static void main(String[] args) {new Client().clientStart();}private void clientStart() {EventLoopGroup workers = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(workers).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("channel initialized!");ch.pipeline().addLast(new ClientHandler());}});try {System.out.println("start to connect...");ChannelFuture f = b.connect("127.0.0.1", 8888).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workers.shutdownGracefully();}}}class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel is activated.");final ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("HelloNetty".getBytes()));f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println("msg send!");//ctx.close();}});}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {ByteBuf buf = (ByteBuf)msg;System.out.println(buf.toString());} finally {ReferenceCountUtil.release(msg);}}
}

场景

  • 构建高性能、低时延的各种Java中间件,
    例如MQ、分布式服务框架、ESB消息总线,netty主要作为基础框架提供高性能、低时延的通信服务
  • 共有或者私有协议栈的基础通信框架,
    例如可以基于netty构建异步、高性能的websocket协议栈
  • 各领域应用,netty作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信
    例如大数据、游戏等

4.3 拆包器

TCP拆包粘包

发送的数据出现断开接收或者多个包数据发生粘连

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包
  • 待发送数据大于MSS最大报文长度,TCP在传输前将进行拆包
  • 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包
  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

解决方法

  • 发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度 了
  • 发送端将每个数据包封装为固定长度,这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开
  • 可以在数据包之间设置边界,如添加特殊符号,这样接收端通过这个边界就可以将不同的数据包拆分开

netty提供了封装的拆包器:

  • 固定长度
  • 分隔符
  • 基于长度域(最通用)

4.4 零拷贝

传统拷贝:需要4次数据拷贝和4次上下文切换
磁盘->内核缓冲区的read buffer->用户缓冲区->内核的socket buffer->网卡接口(硬件)的缓冲区

零拷贝:省略中间的2步,不需要CPU的参与
磁盘->内核缓冲区的read buffer->网卡接口(硬件)的缓冲区
零拷贝是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间,而直接在内核空间中传输到网络的方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能优化算法应用:基于卷积优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于卷积优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于卷积优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.卷积优化算法4.实验参数设定5.算法结果6.…

Duboo-入门到学废【上篇】

目录 1&#x1f95e;.什么是duboo 2&#x1f32d;.架构图 3.&#x1f37f;快速入门 4.&#x1f9c7;浅浅理解 1.什么是duboo&#x1f936;&#x1f936;&#x1f936; Dubbo是一个由阿里巴巴开发的基于Java的开源RPC框架。它提供了高性能、透明化的远程方法调用&#xff0…

操作系统 面试第一弹

1. 进程和线程的区别 进程&#xff08;Process&#xff09;和线程&#xff08;Thread&#xff09;是操作系统中的重要概念&#xff0c;它们表示执行中的程序的不同执行单元。下面是它们的区别&#xff1a; 定义&#xff1a;进程是一个独立的执行环境&#xff0c;具有独立的内存…

【深度学习】DataComp论文,数据集介绍,大数据模型的数据集介绍

参考&#xff1a; https://laion.ai/blog/datacomp/ 论文&#xff1a;https://arxiv.org/abs/2304.14108 文章目录 论文报告的一些内容datacomp-1B 数据质量比lainon2B要好不同规模数据有多少数据数据处理数据来源 论文报告的一些内容 摘要 多模态数据集是近期如CLIP、Stable …

TCP服务器的演变过程:IO多路复用机制select实现TCP服务器

IO多路复用机制select实现TCP服务器 一、前言二、新增使用API函数2.1、select()函数2.2、FD_*系列函数 三、实现步骤四、完整代码五、TCP客户端5.1、自己实现一个TCP客户端5.2、Windows下可以使用NetAssist的网络助手工具 小结 一、前言 手把手教你从0开始编写TCP服务器程序&a…

洛谷——【数据结构1-2】二叉树

文章目录 题目【深基16.例1】淘汰赛题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1基本思路&#xff1a;代码 【深基16.例3】二叉树深度题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1基本思路&#xff1a;代码 [USACO3.4] 美国血统 American Heritage题目描…

算符优先语法分析设计原理与实现

前言&#xff1a; 作者的词法分析程序以及算符优先语法分析设计程序仓库链接 1、目标任务 **[实验项目] **以专题 1 词法分析程序的输出为语法分析的输入&#xff0c;实现算符优先分析算法&#xff0c;完成以下描述算术表达式的算符优先文法的算符优先分析过程。 G[E]:E→E…

Spark编程实验三:Spark SQL编程

目录 一、目的与要求 二、实验内容 三、实验步骤 1、Spark SQL基本操作 2、编程实现将RDD转换为DataFrame 3、编程实现利用DataFrame读写MySQL的数据 四、结果分析与实验体会 一、目的与要求 1、通过实验掌握Spark SQL的基本编程方法&#xff1b; 2、熟悉RDD到DataFram…

2024免费的数据恢复软件EasyRecovery14自己操作就能恢复的方法

而今天小编为大家还是带来了同系列软件easyrecovery14&#xff0c;这是easyrecovery数据恢复软件中的技术员版本&#xff0c;不仅包含家庭版和专业版的所有功能&#xff0c;而且还旨在简化技术人员的数据恢复过程。软件拥有强大的数据恢复功能&#xff0c;支持使用的恢复场景有…

KNN与KD树博客总结

目录 总结小结&#xff1a; 总结 原始篇&#xff1a;KNN算法及其优缺点算法思想改进篇&#xff1a;KD树&#xff08;KNN的plus版算法实现第一篇&#xff1a;平衡二叉树的构建&#xff08;递归算法实现第二篇&#xff1a;KD树的构建&#xff08;递归算法实现第三篇&#xff1a;…

CentOS 7 设置网络

CentOS 7 设置网络 正常情况 ①登陆进去之后使用下面的命令修改文件 echo ONBOOTyes >> /etc/sysconfig/network-scripts/ifcfg-ens33②如果是虚拟机重启后使用如下命令进行查看IP地址 ip addr注&#xff1a;到这里如果显示有两部分&#xff0c;则代表网络设置成功&a…

华为设备VRP系统管理

为了满足企业业务对网络的需求&#xff0c;网络设备中的系统文件需要不断进行升级。另外&#xff0c;网络设备中的配置文件也需要时常进行备份&#xff0c;以防设备故障或其他灾害给业务带来损害。在升级和备份系统文件或配置文件时&#xff0c;经常会使用FTP和TFTP来传输文件。…

服务器系统时间不同步如何处理

在分布式计算环境中&#xff0c;服务器系统时间的同步至关重要。然而&#xff0c;由于各种原因&#xff0c;服务器系统时间不同步的问题时有发生,这可能会导致严重的问题&#xff0c;如日志不准确、证书验证失败等。下面我们可以一起探讨下造成服务器系统时间不同的原因以及解决…

【Vue2+3入门到实战】(5)Vue基础之Computed计算属性 详细示例

目录 一、今日学习目标1.computed计算属性 二、computed计算属性1.概念2.语法3.注意4.案例5.代码准备 三、computed计算属性 VS methods方法1.computed计算属性2.methods计算属性3.计算属性的优势4.总结 四、计算属性的完整写法五、综合案例-成绩案例六、Computed计算属性总结 …

揭秘Pod状态与生命周期管理的秘密(中)

上一篇文章中主要介绍了Pod的基础概念与使用、删除。本文将带你一起学习Pod的几种容器(Init、Pause) 点击 这里 可以查看所有相关文章。 Init 容器 本文讲解 Init 容器的基本概念&#xff0c;这是一种专用的容器&#xff0c;在应用程序容器启动之前运行&#xff0c;用来包含…

住宅代理妙用:网络抓取的必备工具

什么是住宅代理&#xff1f; 要准确理解什么是住宅代理&#xff0c;首先需要了解什么是住宅IP。IP 地址是连接到网络时分配给单个设备的唯一标识符。这允许设备或端点直接相互通信&#xff0c;而无需跨线。 住宅IP是指分配给特定设备&#xff08;例如计算机、手机、平板电脑等…

新版IDEA中Git的使用(二)

说明&#xff1a;前面介绍了在新版IDEA中Git的基本操作&#xff0c;本文介绍关于分支合并、拉取等操作&#xff1b; 例如&#xff0c;现在有一个项目&#xff0c;分支如下&#xff1a; main&#xff1a;主分支&#xff1b; dev&#xff1a;开发分支&#xff1b; test&#x…

CNVD原创漏洞审核和处理流程

一、CNVD原创漏洞审核归档和发布主流程 &#xff08;一&#xff09;审核和归档流程 审核流程分为一级、二级、三级审核&#xff0c;其中一级审核主要对提交的漏洞信息完整性进行审核&#xff0c;漏洞符合可验证&#xff08;通用型漏洞有验证代码信息或多个互联网实例、事件型…

k8s的二进制部署1

k8s的二进制部署&#xff1a;源码包部署 k8smaster01&#xff1a;192.168.176.61 kube-apiserver kube-controller-manager kube-scheduler etcd k8smaster01&#xff1a;192.168.176.62 kube-apiserver kube-controller-manager kube-scheduler node节点01&#xff1a;192.…

promise的使用和实例方法

前言 异步,是任何编程都无法回避的话题。在promise出现之前,js中也有处理异步的方案,不过还没有专门的api能去处理链式的异步操作。所以,当大量的异步任务逐个执行,就变成了传说中的回调地狱。 function asyncFn(fn1, fn2, fn3) {setTimeout(() > {//处理第一个异步任务fn1…