编写Netty程序

编写代码

1.创建线程池

一般来说,我们会声明两个线程池,一个线程池用来处理Accept事件,一个是用于处理消息的读写事件。

// 用来处理Accept事件
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 用来处理消息的读写时间
EventLoopGroup workerGroup = new NioEventLoopGroup();

一般我们只需要监听一个端口,所以bossGroup一般声明为1个线程,bossGroup接收到新的连接后会交给workerGroup处理,workerGroup负责监听这些连接的读写事件。

2.创建启动类

启动类有两个,客户端启动类Bootstrap,服务端启动类ServerBootstrap,启动类负责整个Nett 程序的正常运行。

以服务端为例

ServerBootstrap serverBootstrap = new ServerBootstrap();

3.设置线程池

把第一步声明的线程池设置到ServerBootstrap中, bossGroup负责监听Accept连接,workerGroup负责监听连接的读写数据。

serverBootstrap.group(bossGroup, workerGroup)

4.设置ServerSocketChannel

设置Netty程序以什么样的IO模型运行,以NioServerSocketChannel为例

serverBootstrap.channel(NioServerSocketChannel.class);

如果程序运行在Linux系统上,可以使用EpollServerSocketChannel,它使用的是Linux系统上的epoll模型,比select 模型更高效。

这里提到的IO模型是指IO多路复用模型,一共有三个类型:selectpollepoll

select 实现多路复用的方式是,将已连接的Socket都放到一个文件描述符集合,然后调用select函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。

所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。

select 使用固定长度的 BitsMap,表示文件描述符集合,而且所支持的文件描述符的个数是有限制的,在 Linux 系统中,由内核中的 FD_SETSIZE 限制, 默认最大值为 1024,只能监听 0~1023 的文件描述符。

poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符限制。

但是 poll 和 select 并没有太大的本质区别,都是使用「线性结构」存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长。

epoll 通过两个方面,很好解决了 select/poll 的问题。

第一点,epoll 在内核里使用红黑树来跟踪进程所有待检测的文件描述字,把需要监控的 socket 通过 epoll_ctl() 函数加入内核中的红黑树里,红黑树是个高效的数据结构,增删查一般时间复杂度是 O(logn),通过对这棵黑红树进行操作,这样就不需要像 select/poll 每次操作时都传入整个 socket 集合,只需要传入一个待检测的 socket,减少了内核和用户空间大量的数据拷贝和内存分配。

第二点, epoll 使用事件驱动的机制,内核里维护了一个链表来记录就绪事件,当某个 socket 有事件发生时,通过回调函数内核会将其加入到这个就绪事件列表中,当用户调用 epoll_wait() 函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率。

原文链接:https://blog.csdn.net/qq_34827674/article/details/115619261

5.设置可选参数

设置Netty中可以使用的参数,这些参数都在ChannelOption及其子类中。

这里设置了一个SO_BACKLOG系统参数,它表示的是最大等待连接数量。

serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);

6.设置可选Handler

只能设置一个,它会在SocketChannel建立起来之前执行,后面再分析它的执行时机。

serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))

7.编写并设置子Handler

Netty中的Handler分成两种,一种叫做Inbound,一种叫做Outbound

这里简单地写一个Inbound类型的Handler,它把接收到数据直接写回给客户端。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 读取数据后写回客户端ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

设置子Handler 设置的是SocketChannel对应的Handler,也是只能设置一个,它用于处理SocketChannel的事件。

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 可以添加多个子Handlerp.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new EchoServerHandler());}
});

虽然只能设置一个子Handler,但是Netty提供了一种可以设置多个Handler的途径,即使用ChannelInitializer方式,第六步设置 Handler也可以使用这种方式设置多个Handler

这里,我们设置了一个打印日志的LoggingHandler和一个自定义的EchoServerHandler

8.绑定端口

ChannelFuture f = serverBootstrap.bind(PORT).sync();

9.等待服务端端口关闭

等待服务端监听端口关闭,sync () 会阻塞主线程,内部调用的是 Object 的 wait () 方法。

f.channel().closeFuture().sync();

10.关闭线程池

bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

上面设置了ServerSocketChannel的类型,而没有设置SocketChannel的类型。

是因为SocketChannelServerSocketChannel在接受连接之后创建出来的,所以,并不需要单独再设置它的类型,比如,NioServerSocketChannel创建出来的肯定是NioSocketChannel,而EpollServerSocketChannel创建出来的肯定是 EpollSocketChannel

完整代码

服务端代码

/*** 1.打开命令行窗口: telnet localhost 8001* 2.进入发送消息模式: Ctrl + ]* 3.使用send命令发送消息: send hello*/
public class EchoServer {static final int PORT = 8001;public static void main(String[] args) {// 1. 声明线程池EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 2. 服务端引导器ServerBootstrap serverBootstrap = new ServerBootstrap();// 3. 设置线程池serverBootstrap.group(bossGroup, workerGroup)// 4. 设置ServerSocketChannel的类型.channel(NioServerSocketChannel.class)// 5. 设置参数.option(ChannelOption.SO_BACKLOG, 100)// 6. 设置ServerSocketChannel对应的Handler,只能设置一个.handler(new LoggingHandler(LogLevel.INFO))// 7. 设置SocketChannel对应的Handler.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 可以添加多个子Handler// p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new EchoServerHandler());}});// 8. 绑定端口ChannelFuture f = serverBootstrap.bind(PORT).sync();// 9. 等待服务端监听端口关闭,这里会阻塞主线程f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 10. 优雅地关闭两个线程池bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

EchoServerHandler

public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 读取数据后写回客户端ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

客户端代码

public class NettyClient {static final int PORT = 8001;public static void main(String[] args) throws Exception {// 工作线程池NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// pipeline.addLast(new LoggingHandler(LogLevel.INFO));pipeline.addLast(new NettyClientHandler());}});// 连接到服务端ChannelFuture future = bootstrap.connect(new InetSocketAddress(PORT)).sync();System.out.println("connect to server success");// 调用后这里会阻塞// future.channel().closeFuture().sync();// 这里实现可以在控制台输入发送信息Channel channel = future.channel();Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.nextLine();channel.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));if ("quit".equals(msg)) {channel.close();break;}}} finally {workerGroup.shutdownGracefully();}}
}

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(LocalDateTime.now().toString() + " " + ctx.channel() + " " + ((ByteBuf)msg).toString(CharsetUtil.UTF_8));}
}

Netty实现群聊

NettyChatHolder

public class NettyChatHolder {static final Map<SocketChannel, Long> userMap = new ConcurrentHashMap<>();private static AtomicLong userIdList = new AtomicLong(10000);static void join(SocketChannel socketChannel) {// 有人加入就给他分配一个idLong userId = userIdList.addAndGet(1);send(socketChannel, "userId:" + userId);for (SocketChannel channel : userMap.keySet()) {send(channel, userId + " 加入了群聊");}// 将当前用户加入到map中userMap.put(socketChannel, userId);}private static void send(SocketChannel socketChannel, String msg) {try {ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length);writeBuffer.writeCharSequence(msg, Charset.defaultCharset());socketChannel.writeAndFlush(writeBuffer);} catch (Exception e) {e.printStackTrace();}}static void quit(SocketChannel socketChannel) {Long userId = userMap.get(socketChannel);send(socketChannel, "您退出了群聊");userMap.remove(socketChannel);for (SocketChannel channel : userMap.keySet()) {if (channel != socketChannel) {send(channel, userId + " 退出了群聊");}}}public static void propagate(SocketChannel socketChannel, String content) {Long userId = userMap.get(socketChannel);for (SocketChannel channel : userMap.keySet()) {if (channel != socketChannel) {send(channel, userId + ":" + content);}}}
}

NettyChatHandler

public class NettyChatHandler extends SimpleChannelInboundHandler<ByteBuf> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel socketChannel = (SocketChannel)ctx.channel();System.out.println("one conn active: " + socketChannel);// socketChannel是在ServerBootstrapAcceptor中放到EventLoopGroup中的NettyChatHolder.join(socketChannel);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {/**** 如果这里有耗时操作的话, 可以自定义一个线程池来处理这下面的逻辑*/byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes, StandardCharsets.UTF_8);System.out.println(ctx.channel() + ":" + content);if (content.equals("quit")) {ctx.channel().close();} else {NettyChatHolder.propagate((SocketChannel) ctx.channel(), content);}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {SocketChannel socketChannel = (SocketChannel)ctx.channel();System.out.println("one conn inactive: " + socketChannel);NettyChatHolder.quit((SocketChannel) ctx.channel());}
}

NettyChatServer

public class NettyChatServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8001"));public static void main(String[] args) {// 1. 声明线程池EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 2. 服务端引导器ServerBootstrap serverBootstrap = new ServerBootstrap();// 3. 设置线程池serverBootstrap.group(bossGroup, workerGroup)// 4. 设置ServerSocketChannel的类型.channel(NioServerSocketChannel.class)// 5. 设置参数.option(ChannelOption.SO_BACKLOG, 100)// 6. 设置ServerSocketChannel对应的Handler,只能设置一个.handler(new LoggingHandler(LogLevel.INFO))// 7. 设置SocketChannel对应的Handler.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 可以添加多个子Handlerp.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new NettyChatHandler());}});// 8. 绑定端口ChannelFuture f = serverBootstrap.bind(PORT).sync();// 9. 等待服务端监听端口关闭,这里会阻塞主线程f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 10. 优雅地关闭两个线程池bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

Netty版本

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.63.Final</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/642671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1986-Minimum error thresholding

1 论文简介 《Minimum error thresholding》是由 Kittler 和 Illingworth 于 1986 年发布在 Pattern Recognition 上的一篇论文。该论文假设原始图像中待分割的目标和背景的分布服从高斯分布&#xff0c;然后根据最小误差思想构建最小误差目标函数&#xff0c;最后取目标函数最…

A - Streets of Working Lanterns - 2

警察阿纳托利再次监视一个无组织犯罪集团散布被禁止的亚洲绘画的巢穴。目前&#xff0c;犯罪分子还共享无线互联网&#xff0c;任何人都可以匿名使用。巢穴仍然只有一个入口&#xff0c;也是一个出口。当有人进入巢穴时&#xff0c;阿纳托利在他的记事本上写一个开口圆括号&…

JAVAEE初阶 网络编程(三)

TCP回显服务器 一. TCP的API二. TCP回显服务器的代码分析三. TCP回显服务器代码中存在的问题四. TCP回显服务器代码五. TCP客户端的代码六.TCP为基准的回显服务器的执行流程 一. TCP的API 二. TCP回显服务器的代码分析 这的clientSocket并不是表示用户端的层面东西&#xff0c;…

kubernets集群搭建

集群搭建 1.准备工作(所有节点都执行)1.1配置/etc/hosts文件1.2关闭防火墙1.3关闭selinux1.4关闭交换分区&#xff0c;提升性能1.5修改机器内核参数1.6配置时间同步1.7配置阿里云镜像源 2.安装docker服务(所有节点都执行)2.1安装docker服务2.2配置docker镜像加速和驱动 3.安装配…

【分布式技术】消息队列Kafka

目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群 步骤一&#xff1a;在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二&#xff1a…

如何在conda中的创建查询删除虚拟环境等

最近发现conda环境中有太多的虚拟环境&#xff0c;想要删除&#xff0c;重新创建管理。因此&#xff0c;查找资料后&#xff0c;记录如下&#xff1a; 一.创建虚拟环境 打开终端或命令提示符&#xff0c;并执行以下命令&#xff1a; bash conda create --name your_environ…

【GitHub项目推荐--微软开源的课程(Web开发课程/机器学习课程/物联网课程/数据科学课程)】【转载】

微软在 GitHub 开源了四大课程&#xff0c;面向计算机专业或者入门编程的同学。分别是 Web 开发课程、机器学习课程、物联网课程和数据分析课程。 四大课程在 GitHub 上共斩获 90K 的Star&#xff0c;每一课程包含 20 多小节&#xff0c;完成课程大约需要 12 周。每小节除了视…

中文自然语言处理(NLP)的命名实体识别(NER)任务常见序列标注方法

中文NLP的NER任务中的数据集序列标注方法&#xff0c;主要有以下几种常用的标注方案&#xff1a; BIO标注法&#xff08;Begin-Inside-Outside&#xff09;&#xff1a; B&#xff08;Begin&#xff09;表示实体的开始部分。I&#xff08;Inside&#xff09;表示实体的中间部分…

如何解决Xshell 连接不上虚拟机Ubuntu?

一、 在终端输入 sudo apt-get install openssh-server 二、 执行如下命令 sudo apt-get install ssh 三、 开启 ssh-server&#xff0c;输入密码 service ssh start 四、 验证&#xff0c;输入 ps -e|grep ssh&#xff0c;看到sshd成功 ps -e|grep ssh五、 连接

【Linux编辑器-vim使用】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 一、vim的基本概念 二、vim的基本操作 分屏操作&#xff1a; 三、vim正常&#xff08;命令&#xff09;模式命令集 四、vim末行&#xff08;底行&#xff09;模…

【经验分享】MAC系统安装R和Rstudio(保姆级教程)安装下载只需5min

最近换了Macbook的Air电脑&#xff0c;自然要换很多新软件啦&#xff0c;首先需要安装的就是R和Rstudio啦&#xff0c;网上的教程很多很繁琐&#xff0c;为此我特意总结了最简单实用的安装方式: 一、先R后Rstudio 二、R下载 下载网址&#xff1a;https://cran.r-project.org …

shell脚本基础演练

简介 Shell脚本是一种用于自动化执行一系列命令的脚本语言。在Unix和类Unix系统中&#xff0c;常见的Shell包括Bash、Zsh、Sh等。下面我将简要讲解Shell脚本的基本结构和一些常用写法&#xff0c;并附上一些标准的例子。 基础示例 基本结构 #!/bin/bash # 注释: 这是一个简…

什么是ORM思想?

1. ORM概念 ORM&#xff08;Object Relational Mapping&#xff09;对象关系映射模式&#xff0c;是一种技术&#xff0c;解决了面向对象与关系型数据库存互不匹配的现象。 ORM在业务逻辑层和数据库层之间充当了桥梁的作用。 2. ORM由来 在软件开发的过程中&#xff0c;通常…

python基础教程九 抽象三(函数参数续)

1. 关键字参数和默认值 前面使用的都是位置参数&#xff0c;因为它们的位置至关重要。本节介绍的技巧让你能完全的忽略位置。要熟悉这种技巧需要一段时间&#xff0c;但随着程序规模的增大&#xff0c;你很快就发现它很有用。 >>> def hello_1(greeting,name): ... …

力扣279. 完全平方数

动态规划 思路&#xff1a; 假设 dp[i] 为最少组成数 i 的平方数个数&#xff1b;则其上一个状态为 dp[i - j^2] 1&#xff0c;1 为 j^2&#xff1a; 即 i 的最少完全平方数 i - j^2 的最少完全平方数 1&#xff0c;其中 j^2 < i 为最接近 i 的平方数&#xff1b;初始值…

云计算管理-linux

1.权限 基本权限与归属 访问权限 读取&#xff1a;允许查看内容-read r 写入&#xff1a;允许修改内容-write w 可执行&#xff1a;允许运行和切换-excute x 对于文本文件&#xff1a; r读取权限&#xff1a;cat、less、grep、head、tail w写入权…

Ansible手册

Ansible常用命令 ansible ansible-config ansible-config list ansible-config dump ansible-config viewansible-connection ansible-console ansible-doc ansible-galaxy ansible-inventory ansible-playbook ansible-pull ansible-vault ansible myhost -m setup# 查看当…

第31关 代码仓库Gitlab的升级之路

------> 课程视频同步分享在今日头条和B站 大家好&#xff0c;我是博哥爱运维。 在2021年&#xff0c;博哥分享了完整的一套K8S架构师课程&#xff0c;链接在此&#xff0c;最后的CI/CD自动化流水线就是基于gitlab来完成的。那么时间过去2年多了&#xff0c;gitlab版本也更…

递归实现n的k次方

题目&#xff1a; 编写一个函数实现n的k次方&#xff0c;使用递归实现。 代码实现&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h>int Pow(int x,int n) {if (n 1)return x;elsereturn x * Pow(x, n - 1); }int main() {int x, n;scanf("%d …

在C#中调用C++函数并返回const char*类型的值

在C#中调用C函数并返回const char*类型的值&#xff0c;可以使用Interop服务来实现。以下是一个示例代码&#xff1a; C代码&#xff08;generate_project_code.cpp&#xff09;&#xff1a; const char* generateProjectCode() {const char* code "Generated code&quo…