从NIO到Netty开发

转载自  从NIO到Netty开发

1. 从传统BIO到NIO的升级

  1. Client/Server模型是网络编程的基本模型,服务端提供位置信息,客户端通过连接操作向服务端发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
  2. 传统的Socket编程是服务端一直处于accpet阻塞等待的状态,并且只有客户端发送了请求,服务才会从阻塞状态变成处理任务的状态,当任务处理完了,服务端接着进入阻塞状态,再此看来,服务端和客户端都是同步的情况。
  3. Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。

2. NIO新特性

  1. Java NIO: Channels and Buffers(通道和缓冲区)标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
  2. Java NIO: Non-blocking IO(非阻塞IO) Java NIO可以让你非阻塞的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
  3. Java NIO: Selectors(选择器) Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。

 

3. NIO服务端实现

根据上图,发现服务端的事件有2个,一是接受连接事件,二是读取数据:

public class NIOServer {private ByteBuffer readBuffer;    private Selector selector;    private ServerSocket serverSocket;    public static void main(String[] args) {NIOServer server = new NIOServer();server.init();System.out.println("server started:8383");server.listener();}    public void init() {        //1. 创建临时缓冲区readBuffer = ByteBuffer.allocate(1024);        //2. 创建服务端Socket非阻塞通道ServerSocketChannel serverSocketChannel;        try {serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);            //3. 指定内部Socket绑定的服务端地址 并支持重用端口,因为有可能多个客户端同时访问同一端口serverSocket=serverSocketChannel.socket();serverSocket.setReuseAddress(true);serverSocket.bind(new InetSocketAddress(8383));            //4. 创建轮询器 并绑定到管道上,开始监听客户端请求selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (Exception e) {e.printStackTrace();}}    private void listener() {        while (true) {            try {                //5. 开始监听事件,不断取出事件的key,假如存在事件,则直接处理。selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();                while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();handleKey(key);}} catch (Exception e) {e.printStackTrace();}}}    private void handleKey(SelectionKey key) throws IOException {SocketChannel channel = null;        try {            //6. 如果有客户端要连接 这里则处理是否接受连接事件if (key.isAcceptable()) {ServerSocketChannel severChannel = (ServerSocketChannel) key.channel();channel = severChannel.accept();channel.configureBlocking(false);                // 告诉轮询器 接下来关心的是读取客户端数据这件事channel.register(selector, SelectionKey.OP_READ);} else if (key.isReadable()) { //7. 如果客户端发送数据,则这里读取数据。channel = (SocketChannel) key.channel();                // 清空缓冲区readBuffer.clear();                // 当客户端关闭channel后,会不断收到read事件,此刻read方法返回-1 所以对应的服务器端也需要关闭channelint readCount = channel.read(readBuffer);                if (readCount > 0) {readBuffer.flip();String question = CharsetHelper.decode(readBuffer).toString();System.out.println("server get the question:" + question);String answer = getAnswer(question);channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));} else {channel.close();}} } catch (Exception e) {e.printStackTrace();}finally {            //8. 断开连接通道if (channel!=null) {channel.close();}}}    public static String getAnswer(String question) {String answer = null;        switch (question) {        case "who":answer = "我是小娜\n";            break;        case "what":answer = "我是来帮你解闷的\n";            break;        case "where":answer = "我来自外太空\n";            break;        case "hi":answer = "hello\n";            break;        case "bye":answer = "88\n";            break;        default:answer = "请输入 who, 或者what, 或者where";}        return answer;}}

4. NIO客户端实现:

客户端的实现有3个步骤:1.请求连接。2.当连接成功,写数据。3.读取服务端结果。

public class NIOClient implements Runnable {private BlockingQueue<String> words;    private Random random;    public static void main(String[] args) {        // 多个线程发起Socket客户端连接请求for (int i = 0; i < 5; i++) {NIOClient c = new NIOClient();c.init();            new Thread(c).start();}}    //1. 初始化要发送的数据private void init() {words = new ArrayBlockingQueue<String>(5);random = new Random();        try {words.put("hi");words.put("who");words.put("what");words.put("where");words.put("bye");} catch (Exception e) {            // TODO: handle exception}}    //2. 启动子线程代码@Overridepublic void run() {SocketChannel channel = null;Selector selector = null;        try {            //3. 创建连接服务端的通道 并设置为阻塞方法,这里需要指定服务端的ip和端口号channel = SocketChannel.open();channel.configureBlocking(false);channel.connect(new InetSocketAddress("localhost", 8383));selector = Selector.open();            //4. 请求关心连接事件channel.register(selector, SelectionKey.OP_CONNECT);            boolean isOver = false;            while (!isOver) {selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();                while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();                    if (key.isConnectable()) { //5. 当通道连接准备完毕,发送请求并指定接收允许获取服务端返回信息if (channel.isConnectionPending()) {                            if (channel.finishConnect()) {key.interestOps(SelectionKey.OP_READ);channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));sleep();} else {key.cancel();}}} else if (key.isReadable()) {//6. 开始读取服务端返回数据ByteBuffer byteBuffer = ByteBuffer.allocate(512);channel.read(byteBuffer);byteBuffer.flip();String answer = CharsetHelper.decode(byteBuffer).toString();System.out.println("client get the answer:" + answer);String word = getWord();                        if (word != null) {channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));} else {isOver = true;}sleep();}}}} catch (Exception e) {e.printStackTrace();} finally {            //7. 关闭通道if (channel != null) {                try {channel.close();} catch (IOException e) {e.printStackTrace();}}}}    public String getWord() {        return words.poll();}    private void sleep() {        try {TimeUnit.SECONDS.sleep(random.nextInt(3));} catch (InterruptedException e) {e.printStackTrace();}}}
  •  

5.Netty开发简介

上面提到,NIO可以实现同步非阻塞的数据交互,但是对于NIO来说,一个普通的请求数据需要太多的开发步骤,不利于推广,这里主要介绍NIO的实现框架Netty.

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

 

6. Netty服务端实现:

public class EchoServer {private final int port;    public EchoServer(int port) {        this.port = port;}    public void start() throws Exception {        //1.创建线程组EventLoopGroup group=new NioEventLoopGroup();        try {            //2.创建服务端启动对象 装配线程组&交互通道&服务器端口&网络请求处理器链ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(group).channel(NioServerSocketChannel.class) .localAddress("localhost", port)    .childHandler(new ChannelInitializer<Channel>() { @Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new EchoOutHandler1());ch.pipeline().addLast(new EchoOutHandler2());ch.pipeline().addLast(new EchoInHandler1());ch.pipeline().addLast(new EchoInHandler2());}});            // 3.开始监听客户端请求ChannelFuture channelFuture = serverBootstrap.bind().sync();System.out.println("开始监听,端口号为:"+channelFuture.channel().localAddress());            // 4.等待所有请求执行完毕后,关闭通道;如请求还没执行完,这里为阻塞状态。channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally {            //5.停止所有线程组内部代码的执行group.shutdownGracefully().sync();}}    public static void main(String[] args) throws Exception {        new EchoServer(2000).start();}}

7.Netty客户端实现:

public class EchoClient {public static void main(String[] args) throws InterruptedException {        new EchoClient("localhost", 2000).start();}    private final String host;    private final int port;    public EchoClient(String host, int port) {        this.host = host;        this.port = port;}    private void start() throws InterruptedException {        //1.创建线程组EventLoopGroup group = new NioEventLoopGroup();        try {            //2. 创建客户端启动对象,同样需要装配线程组,通道,绑定远程地址,请求处理器链。Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).handler(new ChannelInitializer<Channel>() {                        @Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new EchoClientHandler());}});            //3.开始请求连接ChannelFuture future = bootstrap.connect().sync();            //4.当请求操作结后,关闭通道。future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {            if (group != null) {group.shutdownGracefully().sync();}}}}

8.Netty处理器链

对于向服务端发送一个请求,并得到一个响应来说。如果使用Netty来说,需要实现两种不同的处理器,一个是读的一个是写的。他们共同组成一个链式调用,如下图:

 

对于服务端来说,上面我们创建了4个处理器,他们组成一条链,分别是:EchoInHandler1 -> EchoInHandler2 -> EchoOutHandler2 -> EchoOutHandler1.

public class EchoInHandler1 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoInHandler1  channelRead...");        //将消息传递到新的链。。。ctx.fireChannelRead(msg);}    @Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}public class EchoInHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoInHandler2  channelRead...");        // Object msg 为Netty的一种缓存对象ByteBuf buffer = (ByteBuf) msg;        byte[] req = new byte[buffer.readableBytes()];buffer.readBytes(req);String reqBody = new String(req, "UTF-8");System.out.println("获取到的客户端请求:" + reqBody);        // 往客户端写数据String date = new Date().toString();ByteBuf returnBuf = Unpooled.copiedBuffer(date.getBytes());ctx.write(returnBuf);}    @Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoOutHandler2 write...");ctx.write(msg);}}public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoOutHandler1 write...");System.out.println("write msg:" + msg);ctx.write(msg);ctx.flush();// 最后将数据刷新到客户端}}

客户端的处理器主要是当连接成功后,请求获取当前时间,并读取返回结果:

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{    //客户端连接服务器的时候调用@Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端连接服务器。。。。");        byte[] req = "QUERY TIME ORDER".getBytes();ByteBuf copiedBuffer = Unpooled.buffer(req.length);copiedBuffer.writeBytes(req);ctx.writeAndFlush(copiedBuffer);}    //读取服务端数据@Override    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf bytbuf) throws Exception {System.out.println("client get the server's data");        byte[] resp=new byte[bytbuf.readableBytes()];bytbuf.readBytes(resp);String respContent = new String(resp,"UTF-8");System.out.println("返回的数据:"+respContent);}    //强制关闭服务器的连接也会造成异常:远程主机强迫关闭了一个现有的连接。@Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getLocalizedMessage());ctx.close();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/324045.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Orleans入门例子

Orleans是微软开源的分布式actor模型框架.actor模型的原理网络上有很多文章.有许多理论性的文章,深刻地我都不知道怎么应用.在这里我就不赘述了.既然是博客,就说说自己的理解。 对于编程来说&#xff0c;不管是前台还是后台&#xff0c;在现在的计算机环境下&#xff0c;多线程…

JavaScript常用单词整理总结

第一章object对象undefined未定义变量boolean布尔类型sort()对数组排序charAt返回在指定位置的字符toLowerCase()把字符串转换为小写button按钮break结束循环toUpperCase()把字符串转换为大写split(str)将字符串分割为字符串数组length获取数组的长度continue结束当前循环&…

JWT 应用

文章目录JWT工具模块Token认证微服务JWT授权监测网关认证过滤消费端获取JWTJWT工具模块 如果要想在项目之中去使用JWT技术&#xff0c;那么就必须结合到已有的模块之中,最佳的做法就是将JWT的相关的处理 操作做为一个自动的starter组件进行接入 1、【microcloud项目】既然要开…

淘宝秒杀系统设计的几个注意点

转载自 淘宝秒杀系统设计的几个注意点 还记得2013年的小米秒杀吗&#xff1f;三款小米手机各11万台开卖&#xff0c;走的都是大秒系统&#xff0c;3分钟后成为双十一第一家也是最快破亿的旗舰店。经过日志统计&#xff0c;前端系统双11峰值有效请求约60w以上的QPS &#xff0…

.NET Core 2.0 开源Office组件 NPOI

前言 去年 12 月&#xff0c;我移植了大家所熟知 NPOI 到 .NET Core 版本&#xff0c;这里是当时发的博客&#xff0c;当时得到了很多同学的支持&#xff0c;社区反应也很好&#xff0c;在这里非常感谢当时推荐的朋友们。 去年的那个版本是针对于 .NET Core 1.0 的&#xff0…

老师们一直在……

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。【随便写写】为了了解同学们在公司的情况&#xff0c;和佟老师上了的做了个在线问卷调查&#xff0c;把一些常见的问题设置在调查中&#xff0c;根据调查数据&#xff0c;然后挨个的去解决…

海量数据的分库分表技术演进,最佳实践

转载自 海量数据的分库分表技术演进&#xff0c;最佳实践 每个优秀的程序员和架构师都应该掌握分库分表&#xff0c;移动互联网时代&#xff0c;海量的用户每天产生海量的数量 用户表订单表交易流水表 以支付宝用户为例&#xff0c;8亿&#xff1b;微信用户更是10亿。订单表…

Orleans例子再进一步

步骤 现在我想再添加一个方法,到IGrains项目内,这个方法里面有个延迟3秒,然后返回一个Task<string>.就叫做DelayedMsg吧,如下图所示: 我调用了这个DelayedMsg,同时又调用了SayHello函数,看看效果:注意这个DelayedMsg的调用方法没有await. 虽然我的SayHello的调用时间紧随…

2018/7/12-纪中某C组题【jzoj4272,jzoj4273,jzoj4274】

前言 今天我的想法都是正解&#xff0c;也都写了&#xff0c;结果才160QwQ 今日分数 去掉了十分强大的纪中dalao 正题 T1&#xff1a;jzoj4272-序章-弗兰德的秘密【树形dp】 博客链接&#xff1a;https://blog.csdn.net/mr_wuyongcong/article/details/81021994 T2&#xf…

你,下周可否“报上有名”?

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号【雄雄的小课堂】。一周一次周测&#xff0c;一直在延续&#xff0c;一般情况下不会间断。以前我只要一说&#xff0c;同学们&#xff0c;咱们本周周五考试&#xff0c;下面的同学们就沸腾的不行了&#xff0c;有的说…

Redis的3个高级数据结构

转载自 Redis的3个高级数据结构 平常接触最多的是5个入门级数据结构&#xff1a;String&#xff0c;Hash&#xff0c;List&#xff0c;Set&#xff0c;Sorted Set&#xff0c;本文介绍3个高级数据结构&#xff1a;Bitmaps&#xff0c;Hyperloglogs&#xff0c;GEO。 Bitmap…

SpringCloudConfig整合Nacos

SpringCloudConfig 的作用是可以进行配置的更新处理&#xff0c;这个的确是很好&#xff0c;但是原始的SpringCloudNetflix 架构所提供的动态的抓取配置实在是太繁琐了&#xff0c;包括还要使用到SpringCloudBus进行Actuator处理 SpringCloudAlibaba套件之中是基于Nacos 实现的…

使用VS Code开发调试.NET Core 2.0

使用VS Code 从零开始开发调试.NET Core 2.0。无需安装VS 2017 15.3即可开发调试.NET Core 2.0应用。 VS Code 全称是 Visual Studio Code&#xff0c;Visual Studio Code是一个轻量级的跨平台Web集成开发环境&#xff0c;可以运行在 Linux&#xff0c;Mac 和Windows下&#x…

不管什么事,只要用心做,总不会太差!

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号【雄雄的小课堂】。KTV项目基本已经做完&#xff0c;于是自上周五以来就开始挨个小组的进行试讲。上周五第一次试讲&#xff0c;只讲了三个小组&#xff0c;整体来说讲的都不行。组员与组长之间的协调不统一&#xff…

学生自定义的键盘,功能强大齐全!!!

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注公众号&#xff1a;雄雄的小课堂。今天给大家介绍一款键盘&#xff0c;本款键盘属于私人订制&#xff0c;“专业性”很强&#xff0c;且功能齐全&#xff0c;在使用时可以一人使用&#xff0c;在某种特定的场合下&#xff0c;两…

度量.net framework 迁移到.net core的工作量

把现有的.net framework程序迁移到.net core上&#xff0c;是一个非常复杂的工作&#xff0c;特别是一些API在两个平台上还不能同时支持。两个类库的差异性,通过人工很难识别全。好在微软的工程师们考虑到了我们顾虑&#xff0c;为我们提前设计了一个工具&#xff1a;.NET Port…

深入理解Java ClassLoader及在 JavaAgent 中的应用

转载自 深入理解Java ClassLoader及在 JavaAgent 中的应用 背景 众所周知, Java 或者其他运行在 JVM(java 虚拟机)上面的程序都需要最终便以为字节码,然后被 JVM加载运行,那么这个加载到虚拟机的过程就是 classloader 类加载器所干的事情.直白一点,就是 通过一个类的全限定…

如何从Gitee中拉取项目到HBuilder中?

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。今天&#xff0c;给大家分享一下&#xff0c;如何使用HBuilder连接gitee&#xff0c;进行代码的提交&#xff08;明天在看&#xff09;与拉取。1首先&#xff0c;在HBuilder中下载Git的插件…

ASP.NET Core Razor页面 vs MVC

作为.NET Core 2.0发行版的一部分&#xff0c;还有一些ASP.NET的更新。其中之一是添加了一个新的Web框架来创建“页面”&#xff0c;而不需要复杂的ASP.NET MVC。新的Razor页面是一个比较简单的MVC框架版本&#xff0c;在某些方面是老的“.aspx” WebForms的演变。 在本文中&a…

Gradle 简单使用

文章目录创建Gradle项目dependencies.gradlegradle.propertiesbuild.gradleGradle配置文件详解dependency-management 插件SpringBootPlugin 插件多模块热部署创建Gradle项目 dependencies.gradle ext.versions [ // 定义所有要使用的版本号springboot: 2.4.1 // Spri…