Netty入门篇-从双向通信开始

百度百科描述

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 TCP 和 UDP 的 socket 服务开发。

如上摘录自百度百科的描述。


Netty 算是目前最为主流的 NIO 框架了,目前我们也在用 NIO。在 Netty 之前还有另外一个 NIO 框架—Mina,Mina 算是早起的作品,Netty 的基础架构跟Mina非常相似,使用时的思想也差不多,两者还有一些微妙的关系,类似于log4j 跟 logback,Netty 和 Mina 均出自 Trustin Lee 之手。

关于Mina跟Netty的区别不是本文重点,我们继续回到Netty上。

需求场景描述

完成对红酒窖的室内温度采集及监控功能。由本地应用程序+温度传感器定时采集室内温度上报至服务器,如果温度 >20 °C 则由服务器下发重启空调指令,如果本地应用长时间不上传温度给服务器,则给户主手机发送一条预警短信。

需求是瞎编的,但分析还是要分析的,在没有接触socker网络编程之前我们可能会这么做:你本地写一个定时器,然后将采集到的温度数据调一下服务器上的某个接口,服务器拿到数据判断一下,如果过高则返回一个带有重启空调的字段,至于本地断线的情况,在数据库维护一个时间字段,如果长时间没有被更新则调用短信发送接口。

这样分析起来,感觉也没啥问题,就是觉得怪怪的,也不能说不行,就是本地设备多了对服务器负载是个问题。

咱先不采用这种方式,上边描述的场景主要是想模拟双方通信,实现双全工操作「客户端发送数据给服务端,服务端下发指令给客户端」,一提到双方通信,我们首先先到的就是Socket吧,来吧,简单回顾一下Socker通信。

服务端

/*** 服务端*/
public class Server {public static void main(String[] args) {InputStreamReader isr;BufferedReader br;OutputStreamWriter osw;BufferedWriter bw;String str;Scanner in = new Scanner(System.in);try {/* 在本机的 8899 端口开放Server */ServerSocket server = new ServerSocket(8899);/* 只要产生连接,socket便可以代表所连接的那个物体,同时这个server.accept()只有产生了连接才会进行下一步操作。*/Socket socket = server.accept();/* 输出连接者的IP。*/System.out.println(socket.getInetAddress());System.out.println("建立了一个连接!");while (true) {isr = new InputStreamReader(socket.getInputStream());br = new BufferedReader(isr);System.out.println("客户端回复:" + br.readLine());osw = new OutputStreamWriter(socket.getOutputStream());bw = new BufferedWriter(osw);System.out.print("服务端回复:");str = in.nextLine();bw.write(str + "\n");bw.flush();}} catch (IOException e) {e.printStackTrace();}}
}

简单看一下 Server 端流程,首先创建了一个ServerSocket来监听 8899 端口,然后调用阻塞方法 accept();获取新的连接,当获取到新的连接之后,然后进入了while循环体,从该连接中读取数据,读取数据是以字节流的方式。

客户端

public class Client {public static void main(String[] args) {InputStreamReader isr;BufferedReader br;OutputStreamWriter osw;BufferedWriter bw;String str;Scanner in = new Scanner(System.in);try {Socket socket = new Socket("127.0.0.1", 8899);System.out.println("成功连接服务器");while (true) {osw = new OutputStreamWriter(socket.getOutputStream());bw = new BufferedWriter(osw);System.out.print("客户端发送:");str = in.nextLine();bw.write(str + "\n");bw.flush();isr = new InputStreamReader(socket.getInputStream());br = new BufferedReader(isr);System.out.println("服务端回复:" + br.readLine());}} catch (IOException e) {e.printStackTrace();}}
}

客户端连接上服务端 8899 端口之后,进入 while 循环体,从连接中读取数据,如下是效果图:

上方代码为了省事直接在主线程操作了,但即便是将代码移植到子线程中处理,还是存在大量问题,尤其是 while 死循环。

如果将代码放在子线程完成,那么一个连接需要一个线程来维护,一个线程包含一个死循环,一万个线程就包含一万个死循环… 再就是这些 while 循环并不是每一个都能读出数据来的,所以就会造成资源浪费,消耗性能。

总之,Socket 就是典型的传统 IO 模型操作,IO 读写是面向流的,一次性只能从流中读取一个或者多个字节,并且读完之后流无法再读取,你需要自己缓存数据。 而 NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可,关于 IO 与 NIO 的区别请移步:NIO与IO的区别

Netty实现双向通信

既然是写netty,自然是要拿netty来实现上方红酒窖的案例了,注:简单的demo,这次咱先实现双向通信,后续再根据这个系列不断完善,今天就当入个门了。

开发环境

  • IDEA
  • maven
  • netty版本:4.1.6

首先导入如下 Maven 依赖:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version>
</dependency>

服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {private static NioEventLoopGroup bossGroup = new NioEventLoopGroup();private static NioEventLoopGroup workerGroup = new NioEventLoopGroup();public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024)//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文.childOption(ChannelOption.SO_KEEPALIVE, true)//将小的数据包包装成更大的帧进行传送,提高网络的负载.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler());}});serverBootstrap.bind(8070);}@PreDestroypublic void destory() throws InterruptedException {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}

简单说一下,首先创建了两个NioEventLoopGroup对象,我们可以把它看做传统IO模型中的两大线程组,bossGroup主要用来负责创建新连接「监听端口,接收新连接的线程组」,workerGroup主要用于读取数据以及业务逻辑处理「处理每一条连接的数据读写的线程组」,再生动一点就是:一个是对外的销售员,一个是负责单子落地的工人。

然后我们创建了ServerBootstrap,这个类是用来引导我们进行服务端的启动工作,接收两个 NioEventLoopGroup 对象,把干活的两个安排的明明白白。

通过.channel(NioServerSocketChannel.class)来指定 IO 模型,NioServerSocketChannel.class 表示指定的是 NIO,可供的 IO模型 选择无非就 NIO,BIO,BIO肯定是不能选择的了。

通过.childOption()可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种TCP属性,其中

  • ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开启
  • ChannelOption.TCP_NODELAY表示是否开启Nagle算法,true表示关闭,false表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。

接着,我们调用childHandler()方法,给这个引导类创建一个ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑,不理解没关系,在后面我们会详细分析。ChannelInitializer这个类中,我们注意到有一个泛型参数NioSocketChannel,这个类呢,就是 Netty 对 NIO 类型的连接的抽象,而我们前面NioServerSocketChannel也是对 NIO 类型的连接的抽象,NioServerSocketChannelNioSocketChannel的概念可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。还没完,我们需要在ChannelInitializer 中的initChannel() 方法里面给客户端添加一个逻辑处理器,这个处理器的作用就是负责向服务端写数据,也就是代码中的如下部分:

@Override
protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler());
}

我们简单看一下这段代码,其中ch.pipeline() 返回的是和这条连接相关的逻辑处理链,采用了责任链模式,类似于之前文章中提到的Spring Security过滤器链一样,这里不理解没关系,后面再细说。

然后再调用 addLast() 方法 添加一个逻辑处理器,这个逻辑处理器为的就是在客户端建立连接成功之后,向服务端写数据,下面是这个逻辑处理器相关的代码:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 1. 获取数据ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));System.out.println(new Date() + ": 服务端写出数据");// 2. 写数据ByteBuf out = getByteBuf(ctx);ctx.channel().writeAndFlush(out);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {byte[] bytes = "我是发送给客户端的数据:请重启冰箱!".getBytes(Charset.forName("utf-8"));ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(bytes);return buffer;}}

继续如上这段代码,这个逻辑处理器继承自 ChannelInboundHandlerAdapter,然后覆盖了 channelRead()方法,这个方法在接收到客户端发来的数据之后被回调。

这里的 msg 参数指的就是 Netty 里面数据读写的载体,然后需要我们强转一下为ByteBuf类型,然后调用 byteBuf.toString() 就能够拿到我们客户端发过来的字符串数据。

ok,至此服务端创建完了,我们再看客户端。

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Date;
import java.util.concurrent.TimeUnit;public class NettyClient {private static String host = "127.0.0.1";private static int MAX_RETRY = 5;public static void main(String[] args) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap// 1.指定线程模型.group(workerGroup)// 2.指定 IO 类型为 NIO.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)// 3.IO 处理逻辑.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)).addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new NettyClientHandler());}});// 4.建立连接bootstrap.connect(host, 8070).addListener(future -> {if (future.isSuccess()) {System.out.println("连接成功!");} else {System.err.println("连接失败!");connect(bootstrap, host, 80, MAX_RETRY);}});}/*** 用于失败重连*/private static void connect(Bootstrap bootstrap, String host, int port, int retry) {bootstrap.connect(host, port).addListener(future -> {if (future.isSuccess()) {System.out.println("连接成功!");} else if (retry == 0) {System.err.println("重试次数已用完,放弃连接!");} else {// 第几次重连int order = (MAX_RETRY - retry) + 1;// 本次重连的间隔int delay = 1 << order;System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);}});}}

我们可以看到客户端的引导类不再是ServerBootstrap了,而是换成了Bootstrap,这个类负责客户端以及连接服务端,跟服务端属性大差不差,channel、option、handle等,然后同样指定了IO模型,同时还增加了连接监听 bootstrap.connect(host, 8070).addListener,其中future.isSuccess()属性值表示了连接结果,如果连接失败则跳入 connect 进行重连,重连尝试5次之后不再进行尝试,这块我们后面文章再细讲「其中包含客户端、服务端的长连接,断线重试等」,我们先来看看客户端指定的业务处理类:NettyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(new Date() + ": 客户端写出数据");// 1. 获取数据ByteBuf buffer = getByteBuf(ctx);// 2. 写数据ctx.channel().writeAndFlush(buffer);}/*** 数据解析*/private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 获取二进制抽象 ByteBufByteBuf buffer = ctx.alloc().buffer();Random random = new Random();double value = random.nextDouble() * 14 + 8;String temp = "获取室内温度:" + value;// 2. 准备数据,指定字符串的字符集为 utf-8byte[] bytes = temp.getBytes(Charset.forName("utf-8"));// 3. 填充数据到 ByteBufbuffer.writeBytes(bytes);return buffer;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(new Date() + ": 客户端读到数据 -> " + msg.toString());}}

这个逻辑处理器同样继承自 ChannelInboundHandlerAdapter,不同于服务端,客户端逻辑处理器这次使用到了 channelActive()方法,这个方法会在客户端连接建立成功之后被调用,所以我们在这个方法里完成写数据的操作「读取室内温度」。

我们简单看一下这个channelActive()方法,首先获取传递的 ByteBuf 对象,这个对象怎么来的呢,我们进入 getByteBuf() 方法,我们可以看到通过调用ctx.alloc() 获取到一个 ByteBuf ,然后我们把字符串的二进制数据填充进了 ByteBuf,这样我们就获取到了 Netty 需要的一个数据格式,最后我们调用 ctx.channel().writeAndFlush() 把数据写到服务端,至此整个客户端的写操作就完成了。

接下来就是读数据量,channelRead() 方法,这个方法我们在服务端代码中已经了解过了就不再阐述了。


测试服务端客户端代码

首先运行服务端 main() 方法,然后再运行客户端 main() 方法,执行效果如下:

客户端

服务端

至此,通过这个小demo,客户端与服务端可以完成双向通信了。还不急着技术文章,但毕竟是局域网吗,如果服务端的代码部署在外网服务端效果会怎样呢?

测试服务端在外网服务器

我们把服务端代码部署在外网环境中试一下看看效果会怎样。

首先我们修改一下客户端的host地址为外网ip地址,然后本地起一下客户端试试:

我们可以看到返回结果没问题,那说明服务端也是没问题的:

至此,外网服务端与局域网客户端的双向通信时没问题了,测试具体细节就不展示了,后面章节我会一步一步将代码迁移到SpringBoot Web项目中的,但是眼下这代码还是有点问题,我们先在本地继续完善一下。

本文首发于博客园:https://www.cnblogs.com/niceyoo/p/13269756.html

我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/413970.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

asp.net 页面静态化

页面静态化,有三种方式 伪静态 真静态,折中法 现在我做的是折中发 创建一个asp.net 页面, 连接跳转到还未生成的页面 创建HttpHandle类 using System;using System.Collections.Generic;using System.Linq;using System.Web;using System.IO;/// <summary>/// HttpHa…

[js] 内存泄漏和内存溢出有什么区别

[js] 内存泄漏和内存溢出有什么区别 内存泄露:用动态储存分配函数内存空间&#xff0c;在使用完毕后未释放&#xff0c;导致一直占据该内存单元,直到程序结束。内存溢出:不顾堆栈分配的局部数据块大小,向数据块中写入过多数据&#xff0c;导致数据越界,结果覆盖了别的数据。常…

14 异常

第15节 异常 作业解答 # 根据课堂上给出的上下文管理器&#xff0c;判断IO操作和文件操作那个速度快 ​ from datetime import datetime import io ​ class RunTime(object):def __enter__(self):self.start_time datetime.now()print(self.start_time)return self.start_tim…

[js] 写一个方法把科学计数法转换成数字或者字符串

[js] 写一个方法把科学计数法转换成数字或者字符串 function c(a) { return a.replace(/^(\d)(?:.(\d))*eE(\d)/,(_,a,a1,p,n)>{ a1a1|| if(p-&&n>0) { return 0.0.repeat(n-1)aa1 }else{ return a(a1.length>n? a1.substr(0, n).a1.substr(n): a10.repeat…

Netty心跳机制-长连接

前文需求回顾 完成对红酒窖的室内温度采集及监控功能。由本地应用程序温度传感器定时采集室内温度上报至服务器&#xff0c;如果温度 >20 C 则由服务器下发重启空调指令&#xff0c;如果本地应用长时间不上传温度给服务器&#xff0c;则给户主手机发送一条预警短信。 Netty…

带你反编译APP然后重新打包「MacOS」

最近有小伙伴留言&#xff0c;怎么把一款APP改成自己的信息呀&#xff0c;咳咳&#xff0c;这又来送题材了&#xff0c;今天水一把APP反编译回编译&#xff0c;文中会针对一款APP进行简单的修改信息&#xff0c;问问题的小伙伴还不火速右上角支持一下。 MacOS跟Windows我会分开…

Hadoop学习之pig

首先明确pig是解决什么问题而出现的&#xff0c;pig是为了简化mapreduce编程而设计的&#xff0c;并且有自己的一套脚本语言。其基本由命令和操作符来定义的&#xff0c;如load&#xff0c;store&#xff0c;它的功能很明确&#xff0c;用来大规模处理数据。其脚本形如script.p…

[js] js怎样避免原型链上的对象共享?

[js] js怎样避免原型链上的对象共享&#xff1f; 组合继承 优势 公有的写在原型 私有的卸载构造函数 可以向父类传递参数劣势 需要手动绑定constructor 封装性一般 重复调用父类性能损耗&#x1f330; function Parent (name, friends) {// 私有的部分this.name name;this…

Codeforces 552C Vanya and Scales(进制转换+思维)

题目链接&#xff1a;http://codeforces.com/problemset/problem/552/C 题目大意&#xff1a;有101个砝码重量为w^0,w^1&#xff0c;....&#xff0c;w^100和一个重量为m的物体&#xff0c;问能否在天平两边放物品和砝码使其平衡。解题思路&#xff1a;将m化为w进制的数&#x…

反编译一款APP然后重新打包(Windows环境)

最近有小伙伴私信我&#xff0c;怎么把一款APP改成自己的信息呀&#xff0c;咳咳&#xff0c;这又来送题材了&#xff0c;今天水一把APP反编译回编译&#xff0c;文中会针对一款APP进行简单的修改信息&#xff0c;问问题的小伙伴还不火速右上角支持一下。 MacOS跟Windows我是分…

[js] 写一个方法,实时验证input输入的值是否满足金额如:3.56(最多只有两位小数且只能数字和小数点)的格式,其它特殊字符禁止输入

[js] 写一个方法&#xff0c;实时验证input输入的值是否满足金额如&#xff1a;3.56(最多只有两位小数且只能数字和小数点)的格式&#xff0c;其它特殊字符禁止输入 <body><input type"text" id"amount"><em id"message"><…

Vmware下CentOs7 桥接模式下配置固定IP

1.安装完CentOS7后 修改虚拟机网络适配器配置&#xff1a;改成桥接模式 2.设置Vmware的 编辑->虚拟网络编辑器 3.重启CentOs7 , 查看网络 # ip addr interface是&#xff1a; ens33 4.查看本地真实机ip&#xff0c;然后配置虚拟机固定ip windows 进入命令行模式cmd&#xf…

[js] 使用delete删除数组,其长度会改变吗

[js] 使用delete删除数组&#xff0c;其长度会改变吗 使用delete删除数组元素&#xff0c;其长度会改变吗&#xff1f;咱来写个案例&#x1f330;看看就知道了&#xff1a;var arr [1, 2, 3] delete arr[1] console.log(arr) console.log(arr.length)结果如下&#xff1a;通过…

MacOS svn:E230001 Can‘t use Subversion command line client: svn The path to the Subversion executabl

注意&#xff1a;本文仅针对于 MacOS 系统。 错误信息如下&#xff1a; Cant use Subversion command line client: svn The path to the Subversion executable is probably wrong. Fix it.好家伙&#xff0c;今天发现 IDEA 中的 SVN 突然不能用了… 因为之前的 SVN 是使用 …

Seurat | 单细胞分析工具

Seurat是一个老牌的单细胞分析工具了&#xff08;satija的力作&#xff09;&#xff0c;我之前测试过&#xff0c;但是没怎么用。 最近发现这个工具又publish在了NBT上&#xff0c;所以很有必要看一下这篇文章。 Integrating single-cell transcriptomic data across different…

[js] 代码中如果遇到未定义的变量,会抛出异常吗?程序还会不会继续往下走?

[js] 代码中如果遇到未定义的变量&#xff0c;会抛出异常吗&#xff1f;程序还会不会继续往下走&#xff1f; 在浏览器环境下JS 解析器解析到未定义变量时&#xff0c;会抛出 Uncaught ReferenceError 错误&#xff0c;JS 引擎会停止解析后面的代码&#xff0c;但之前的代码不…

创建线程都有哪些方式?— Callable篇

今天我们来看一道面试题引发的思考 问&#xff1a; 创建线程都有哪些方式&#xff1f; 答&#xff1a; 我了解的有四种创建方式&#xff1a; 继承Thread类创建线程类通过Runnable接口创建线程类通过Callable和Future创建线程通过线程池创建 相信大家回答这个问题没什么难度吧…

[js] 说说你对JSBridge的理解

[js] 说说你对JSBridge的理解 js和原生应用之间交互的桥梁个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

ASP.NET Core 网站发布到Linux服务器

长期以来&#xff0c;使用.NET开发的应用只能运行在Windows平台上面&#xff0c;而目前国内蓬勃发展的互联网公司由于成本的考虑&#xff0c;大量使用免费的Linux平台&#xff0c;这就使得.NET空有一身绝技但无法得到广大的施展空间&#xff0c;.NET平台被认为只适合开发企业内…

如何暂停一个正在运行的线程?

今天把小伙伴问懵了&#xff0c;小刚&#xff0c;你知道怎么停止一个线程吗&#xff1f; 这…&#xff0c;这…&#xff0c;stop&#xff1f; 原来平时小刚这小子只知道创建线程&#xff0c;不知道怎么暂停线程呀~[狗头] 停止线程是在多线程开发中很重要的技术点&#xff0c;…