5.Neety入门
什么是Netty
Netty
是一个基于Java NIO
的异步事件驱动的网络应用框架
。它被广泛用于开发高性能
、高可靠性的网络通信程序
,特别是服务器端和客户端程序。Netty提供了简洁而强大的API,使得开发者能够轻松地构建各种网络应用,包括实时通信系统、游戏服务器、分布式系统等。Netty的主要特点包括:
- 异步和事件驱动:
Netty基于事件驱动模型
,使用异步的方式处理网络IO操作
,可以处理大量的并发连接而不需要大量的线程。- 高性能:Netty采用了高效的NIO模型,以及优化的数据结构和算法,能够实现出色的性能表现。
- 可定制性:Netty提供了丰富的组件和可定制的API,使得开发者可以根据自己的需求对网络通信进行灵活的定制和扩展。
- 多协议支持:Netty支持多种常见的网络协议,包括
HTTP
、WebSocket
、TCP
、UDP
等,使得开发者可以轻松地实现各种类型的网络应用。
Netty的地位
以下是一些使用Netty框架的知名项目和框架:
- Apache Kafka:Kafka是一个分布式流处理平台,它使用Netty来处理网络通信,实现高效的消息传输。
- Elasticsearch:Elasticsearch是一个分布式搜索引擎,它使用Netty作为其节点之间通信的底层框架。
- gRPC:gRPC是一个高性能的远程过程调用框架,它使用Netty来实现底层的网络通信。
- Apache Dubbo:Dubbo是一个高性能的分布式服务框架,它使用Netty作为其网络通信的实现方式。
- RocketMQ:RocketMQ是一个开源的分布式消息队列,它使用Netty来实现消息的传输和通信。
- Spring 5:Spring 5 引入了对反应式编程的支持,使得开发人员可以更轻松地构建异步和非阻塞的应用程序,而 Netty 作为其底层的网络通信实现之一。Flux API完全抛弃Tomcat 使用Netty作为服务端
- Zookeeper:Zookeeper 是一个开源的分布式协调服务,用于管理和协调分布式系统中的各种信息,比如配置管理、命名服务、分布式锁等。
Netty的优势
Netty相对于Java NIO具有以下优势:
- 简化的编程模型:Netty提供了更高级别的抽象,使得开发者可以更轻松地编写网络应用程序。Netty隐藏了许多底层细节,提供了易于理解和使用的API,使得开发者能够更专注于业务逻辑的实现。
- 丰富的功能和组件:Netty提供了许多预置的编解码器、处理器和插件,涵盖了各种常见的网络通信场景,例如HTTP、WebSocket、UDP等。开发者可以直接使用这些组件,而无需重复实现相同的功能,大大加速了开发过程。
- 高性能和可靠性:Netty在底层实现了高效的事件驱动模型和异步IO机制,能够实现更高的吞吐量和更低的延迟。开发者可以借助Netty的优秀性能,构建高性能、高可靠性的网络应用程序。
- 可扩展性和灵活性:Netty的组件化设计使得它具有很好的可扩展性和灵活性。开发者可以根据自己的需求选择合适的组件和插件,或者自行扩展和定制特定的功能,从而更好地满足业务需求。
- 文档和社区支持:Netty拥有丰富的文档和活跃的社区支持,提供了大量的教程、示例代码和问题解答,帮助开发者快速上手并解决问题。开发者可以通过文档和社区获取到丰富的资源和支持,加速开发过程。
5.1.Hello World
5.1.1.目标
开发一个简单的服务器端 和 客户端
- 客户端向服务器端发送 Hello,World
- 服务器仅接收,不返回
implementation 'io.netty:netty-codec-http:4.1.106.Final'
5.1.2.服务器端的代码实现
public class HelloServer {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// ServerBootstrap 是一个启动 NIO 服务的辅助启动类// 1.负责组装Netty组装组件,启动服务器new ServerBootstrap()// 2。配置两个NioEventLoopGroup,一个用于接收客户端连接,另一个用于处理客户端读写// NioEventLoopGroup 是一个处理I/O操作的多线程事件循环 NioEventLoopGroup(selector,thread)(包含 选择器 和 线程 ).group(new NioEventLoopGroup())// 3.Netty 支持多种协议,这里是 NIO 的实现 还有OIO BIO ,EPOLL 等.channel(NioServerSocketChannel.class)// 4.配置ServerSocketChannel的选项 将来处理褚时间的一个分工,决定了worker(child)能执行哪些操作(handler).childHandler(// 5.channel代表和客户进行数据读写的通道 Initializer代表初始化器 负责添加别的handlernew ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler// 6.1.添加StringDecoder解码器 数据传输过来的时候,会先经过这个解码器(数据传输的时候都是ByteBuf) 解码成Stringch.pipeline().addLast(new StringDecoder());// 6.2.添加ChannelInboundHandlerAdapter处理器(自定义的业务处理器)ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 读时间处理器logger.info("server receive msg: {}", msg);}});}// 7.绑定的监听端口}).bind(8080);}
}
5.1.3.客户端代码实现
public class HelloClient {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建启动器try {new Bootstrap()// 2.指定线程模型 一个用于接收客户端连接,另一个用于处理客户端读写.group(new NioEventLoopGroup())// 3.选择客户端的Channel的实现.channel(NioSocketChannel.class)// 4.添加处理器.handler(new ChannelInitializer<NioSocketChannel>() {// 5.初始化处理器@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler 客户端是需要一个编码器ch.pipeline().addLast(new StringEncoder());}})// 7.连接到服务器.connect(new InetSocketAddress("localhost", 8080)).sync().channel()// 8.向服务器发送数据.writeAndFlush("hello, world");} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
5.2.组件
5.2.1.EventLoop
事件循环对象
EventLoop
本质是一个单线程执行器
(同时维护了一个Selector
)里面 run
方法处理Channel
上远远不断的事件
-
继承关系
-
一条线是继承
ScheduledExecutorService
因此包含了线程池中所有方法-
public interface EventLoop extends EventLoopGroup public interface EventLoopGroup extends EventExecutorGroup public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
-
-
另一条线继承了netty自己的
OrderedEventExecutor
-
public interface EventLoop extends OrderedEventExecutorpublic interface OrderedEventExecutor extends EventExecutor public interface EventExecutor extends EventExecutorGroup public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
boolean inEventLoop(Thread var1);
用于判断一个线程是否属于EventLoop
EventExecutorGroup parent();
用于判断自己属于哪一个EventLoopGroup
-
-
事件循环组
EventLoopGroup
是一组EventLoop
,Channel
一般会调用EventLoopGroup
的register
方法来绑定其中一个EventLoop
,后续这个Channel
上的IO事件
都由此EventLoop
来处理(保证了io事件处理时的线程安全)
- 继承
Netty
自己的EventLoopGrouy
- 实现了
Iterable
接口提供遍历EventLoop
的能力 next()
方法 用于获取集合中的下一个EventLoop
- 实现了
5.2.1.1. 普通-定时任务
/*** @author 13723* @version 1.0* 2024/2/24 19:34*/
public class TestEventLoop {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建事件循环粗// NioEventLoopGroup 比较全面,因为可以向他提交 IO事件 普通任务 定时任务// DefaultEventLoop 主要处理普通任务 和 定时任务// 如果不传线程 那么会自动分配// private static final int DEFAULT_EVENT_LOOP_THREADS =// Math.max(1,
// SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
// ); 1 和你当前 cpu核心数 * 2进行比较 选最大哪个// protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {// super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);// }EventLoopGroup group = new NioEventLoopGroup(2);// 2,获取下一个事件的循环对象(本身就设置了两个)logger.error("获取下一个事件的循环对象:{}", group.next());logger.error("获取下一个事件的循环对象:{}", group.next());logger.error("获取下一个事件的循环对象:{}", group.next());// 3.执行普通任务group.next().submit(()->{// 会将任务提交事件组中的某一个事件对象去执行// 类似于线程池的submit,只不过这个是提交到事件循环组中logger.error("普通任务");},"t1");// 4.执行定时任务 让你延迟执行一度任务 或者 周期性执行任务// 参数1.任务 2.延迟时间 3.间隔时间 4.时间单位// 每隔1s打印一次 定时任务group.next().scheduleAtFixedRate(()->{logger.error("定时任务");},0,1, TimeUnit.SECONDS);}}
5.2.1.2. IO任务
依次
轮流分配
,因为分配给EnventLoopGroup
是两个线程
所以 每次Channel
进来的时候,会依次和 对应的EventLoop
进行绑定,这样下次就是再执行对应的Channel
是 还是由绑定的EventLoop
去执行
5.2.1.2.1.服务端
/*** @author 13723* @version 1.0* 2024/2/24 18:23*/
public class EventLoopClient {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建启动器try {Channel channel = new Bootstrap()// netty职责进行划分 boos 和 worker// .group(new NioEventLoopGroup())// 第一个参数就是Boos 只负责处理ServerSocketChannel Accept事件// 第二个参数是worker 负责处理读SocketChannel 读写事件.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))// 3.选择客户端的Channel的实现.channel(NioSocketChannel.class)// 4.添加处理器.handler(new ChannelInitializer<NioSocketChannel>() {// 5.初始化处理器@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler 客户端是需要一个编码器ch.pipeline().addLast(new StringEncoder());}})// 7.连接到服务器.connect(new InetSocketAddress("localhost", 8080)).sync() // 阻塞方法 知道连接建立.channel();// 代表客户端和服务端的连接// TODO 在这里通过DEBUG 模式 发送多次数据System.out.println("发送数据 结束!");} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
5.2.1.2.2.客户端
/*** @author 13723* @version 1.0* 2024/2/24 18:23*/
public class EventLoopClient {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建启动器try {Channel channel = new Bootstrap()// 2.指定线程模型 一个用于接收客户端连接,另一个用于处理客户端读写.group(new NioEventLoopGroup())// 3.选择客户端的Channel的实现.channel(NioSocketChannel.class)// 4.添加处理器.handler(new ChannelInitializer<NioSocketChannel>() {// 5.初始化处理器@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler 客户端是需要一个编码器ch.pipeline().addLast(new StringEncoder());}})// 7.连接到服务器.connect(new InetSocketAddress("localhost", 8080)).sync() // 阻塞方法 知道连接建立.channel();// 代表客户端和服务端的连接// TODO 在这里通过DEBUG 模式 发送多次数据System.out.println("发送数据 结束!");} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
上面是没有进行职责划分时场景,下面对职责进行划分了
// netty职责进行划分 boos 和 worker
// .group(new NioEventLoopGroup())
// 第一个参数就是Boos 只负责处理ServerSocketChannel Accept事件
// 第二个参数是worker 负责处理读SocketChannel 读写事件
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
如果单个handler执行时间过长 可以再单独划分出来一个EventLoopGoop去处理,这样就不会影响其他IO线程
public class EventLoopServer {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 细分2:创建一个独立的EventLoopGroup// 1.创建一个独立的EventLoopGroup 用于处理耗时较长的handlerEventLoopGroup group = new DefaultEventLoop(2);new ServerBootstrap()// 细分1// netty职责进行划分 boos 和 worker// .group(new NioEventLoopGroup())// 第一个参数就是Boos 只负责处理ServerSocketChannel Accept事件// 第二个参数是worker 负责处理读SocketChannel 读写事件 将来的worker线程有两个.group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>(){@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){/*** 读事件* @param ctx 上下文* @param msg 读到的消息 ByteBuf事件* @throws Exception 异常*/@Overridepublic void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;String message = buf.toString(Charset.defaultCharset());logger.error("接收到消息:{}",message);// 处理完毕之后 交给交给handler2进行处理// 将消息传递给下一个handlerctx.fireChannelRead(msg);}// TODO 注意 这里处理较长时间的handler 会导致boos线程组的线程被阻塞 从而影响新的连接的接入 所以下面分配了,一个独立的EventLoopGroup 用于处理耗时较长的handler}).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){/*** 读事件* @param ctx 上下文* @param msg 读到的消息 ByteBuf事件* @throws Exception 异常*/@Overridepublic void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;String message = buf.toString(Charset.defaultCharset());logger.error("接收到消息:{}",message);}});}}).bind(8080);}}