异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty

文章目录

  • Netty概述
  • Netty中的一些概念
  • Netty的线程模型
    • Netty Server端
    • Netty Netty 端
  • TCP半包与粘包问题
  • 基于Netty与CompletableFuture实现RPC异步调用

在这里插入图片描述


Netty概述

Netty是一个异步、基于事件驱动的网络应用程序框架,其对Java NIO进行了封装,大大简化了TCP或者UDP服务器的网络编程开发。

Netty框架将网络编程逻辑与业务逻辑处理分离开来,其内部会自动处理好网络与异步处理逻辑,让我们专心写自己的业务处理逻辑。同时,Netty的异步非阻塞能力与CompletableFuture结合可以让我们轻松实现网络请求的异步调用。

Netty的应用还是比较广泛的,Apache Dubbo、Apache RocketMq、Zuul 2.0服务网关、Spring WebFlux、Sofa-Bolt底层网络通信等都是基于Netty来实现的。

Netty中的一些概念

  • Channel:是通道的意思。这是在JDK NIO类库里面提供的一个概念,JDK里面的通道是java.nio.channels.Channel,JDK中的实现类有客户端套接字通道java.nio.channels.SocketChannel和服务端监听套接字通道java.nio.channels.ServerSocketChannel。Channel的出现是为了支持异步IO操作。io.netty.channel.Channel是Netty框架自己定义的一个通道接口。Netty实现的客户端NIO套接字通道是io.netty.channel.socket.nio.NioSocketChannel,提供的服务器端NIO套接字通道是io.netty.channel.socket.nio.NioServerSocketChannel

  • NioSocketChannel:Netty中客户端套接字通道。内部管理了一个Java NIO中的java.nio.channels.SocketChannel实例,其被用来创建java.nio.channels.SocketChannel的实例和设置该实例的属性,并调用其connect方法向服务端发起TCP链接。

  • NioServerSocketChannel:服务器端监听套接字通道。内部管理了一个Java NIO中的java.nio.channels.ServerSocketChannel实例,用来创建ServerSocketChannel实例和设置该实例属性,并调用该实例的bind方法在指定端口监听客户端的链接。

  • EventLoopGroup:Netty之所以能提供高性能网络通信,其中一个原因是它使用Reactor线程模型。在Netty中,每个EventLoopGroup本身都是一个线程池,其中包含了自定义个数的NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop里面持有自己的NIO Selector选择器。在Netty中,客户端持有一个EventLoopGroup用来处理网络IO操作;在服务器端持有两个EventLoopGroup,其中boss组是专门用来接收客户端发来的TCP链接请求的,worker组是专门用来处理完成三次握手的链接套接字的网络IO请求的。

  • Channel与EventLoop的关系:在Netty中,NioEventLoop是EventLoop的一个实现,每个NioEventLoop中会管理自己的一个selector选择器和监控选择器就绪事件的线程;每个Channel在整个生命周期中固定关联到某一个NioEventLoop;但是,每个NioEventLoop中可以关联多个Channel。

  • ChannelPipeline:Netty中的ChannelPipeline类似于Tomcat容器中的Filter链,属于设计模式中的责任链模式,其中链上的每个节点就是一个ChannelHandler。在Netty中,每个Channel有属于自己的ChannelPipeline,管线中的处理器会对从Channel中读取或者要写入Channel中的数据进行依次处理。下图是Netty源码里面的一个图。
    在这里插入图片描述

【Netty框架数据流图】

如图所示,当有数据从连接套接字被读取后,数据会被依次传递到Channel Pipeline中的每个ChannelHandler进行处理;当通过Channel或者ChannelHandlerContext向连接套接字写入数据时,数据会先依次被ChannelPipeline中的每个Channel Handler处理,处理完毕后才会最终通过原生连接套接字写入TCP发送缓存。

需要注意的是,虽然每个Channel(更底层说是每个Socket)有自己的Channel Pipeline,但是每个ChannelPipeline里面可以复用同一个ChannelHandler的实例(当ChannelHandler使用@shared注解修饰时)。


Netty的线程模型

Netty的线程模型,因为其模型实现与Netty的异步处理能力紧密相关,其线程模型如下图所示

在这里插入图片描述
【Netty线程模型】

Netty Server端

图下侧所示为Netty Server端,

  • 当NettyServer启动时会创建两个NioEventLoop Group线程池组,其中boss组用来接收客户端发来的连接,worker组则负责对完成TCP三次握手的连接进行处理

  • 图中每个NioEventLoopGroup里面包含了多个Nio EventLoop,每个NioEventLoop中包含了一个NIO Selector、一个队列、一个线程;其中线程用来做轮询注册到Selector上的Channel的读写事件和对投递到队列里面的事件进行处理。

当NettyServer启动时会注册监听套接字通道NioServerSocketChannel到boss线程池组中的某一个NioEventLoop管理的Selector上,与其对应的线程会负责轮询该监听套接字上的连接请求;

当客户端发来一个连接请求时,boss线程池组中注册了监听套接字的NioEventLoop中的Selector会读取TCP三次握手的请求,然后创建对应的连接套接字通道NioSocketChannel,接着把其注册到worker线程池组的某一个NioEventLoop中管理的一个NIO Selector上,该连接套接字通道NioSocketChannel上的所有读写事件都由该NioEventLoop管理。

当客户端发来多个连接时,NettyServer端会创建多个NioSocketChannel,而worker线程池组中的NioEventLoop是有个数限制的,所以Netty有一定的策略把很多NioSocketChannel注册到不同的NioEventLoop上,也就是每个NioEventLoop中会管理好多客户端发来的连接,并通过循环轮询处理每个连接的读写事件。


Netty Netty 端

图上侧部分所示为Netty Client部分,当NettyClient启动时会创建一个NioEventLoopGroup,用来发起请求并对建立TCP三次连接的套接字的读写事件进行处理。当调用Bootstrap的connect方法发起连接请求后内部会创建一个NioSocketChannel用来代表该请求,并且会把该NioSocketChannel注册到NioSocketChannel管理的某个NioEventLoop的Selector上,该NioEventLoop的读写事件都由该NioEventLoop负责处理。

Netty之所以说是异步非阻塞网络框架,是因为通过NioSocketChannel的write系列方法向连接里面写入数据时是非阻塞的,是可以马上返回的(即使调用写入的线程是我们的业务线程)。这是Netty通过在ChannelPipeline中判断调用NioSocketChannel的write的调用线程是不是其对应的NioEventLoop中的线程来实现的:

private void write(Object msg, boolean flush, ChannelPromise promise) {...//1.如果调用线程是IO线程EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {//2.如果调用线程不是IO线程AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);}  else {task = WriteTask.newInstance(next, m, promise);}safeExecute(executor, task, promise, m);}
}

如上代码1所示,如果调用线程是IO线程,则会在IO线程上执行写入;

如代码2所示,如果发现调用线程不是IO线程,则会把写入请求封装为WriteTask并投递到与其对应的NioEventLoop中的队列里面,然后等其对应的NioEventLoop中的线程轮询连接套接字的读写事件时捎带从队列里面取出来并执行。

也就是说,与每个NioSocketChannel对应的读写事件都是在与其对应的NioEvent Loop管理的单线程内执行的,不存在并发,所以无须加锁处理。

另外当从NioSocketChannel中读取数据时,并不是使用业务线程来阻塞等待,而是等NioEventLoop中的IO轮询线程发现Selector上有数据就绪时,通过事件通知方式来通知我们业务数据已经就绪,可以来读取并处理了。

使用Netty框架进行网络通信时,当我们发起请求后请求会马上返回,而不会阻塞我们的业务调用线程;如果我们想要获取请求的响应结果,也不需要业务调用线程使用阻塞的方式来等待,而是当响应结果出来时使用IO线程异步通知业务,由此可知,在整个请求–响应过程中,业务线程不会由于阻塞等待而不能干其他事情。

下面我们讨论几个细节:

第一,完成TCP三次握手的套接字应该注册到worker线程池中的哪一个NioEventLoop的Selector上;

第二,如果NioEventLoop中的线程负责监听注册到Selector上的所有连接的读写事件和处理队列里面的消息,那么会不会导致由于处理队列里面任务耗时太长导致来不及处理连接的读写事件;

第三,多个套接字注册到同一个NioEventLoop的Selector上,使用单线程轮询处理每个套接字上的事件,如果某一个套接字网络请求比较频繁,轮询线程是不是会一直处理该套接字的请求,而使其他套接字请求得不到及时处理。

对于第一个问题,关于NioEventLoop的分配,Netty默认使用的是PowerOfTwoEvent ExecutorChooser,其代码如下:

private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {@Overridepublic EventExecutor next() {return children[childIndex.getAndIncrement() & children.length - 1];}
}

可知是采用轮询取模的方式来进行分配。

对于第二个问题,Netty默认是采用时间均分策略来避免某一方处于饥饿状态,可以参见NioEventLoop的run方法内的代码片段:

//1.记录开始处理时间
final long ioStartTime = System.nanoTime();
try {//1.1处理连接套接字的读写事件processSelectedKeys();
} finally {// 1.2计算连接套接字处理耗时,ioRatio默认为50final long ioTime = System.nanoTime() - ioStartTime;//1.3运行队列里面任务runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

代码1.1处理所有注册到当前NioEventLoop的Selector上的所有连接套接字的读写事件,代码1.2用来统计其耗时,由于默认情况下ioRatio为50,所以代码1.3尝试使用与代码1.2执行相同的时间来运行队列里面的任务,也就是处理套接字读写事件与运行队列里面任务是使用时间片轮转方式轮询执行。

针对第三个问题,我们可以看NioEventLoop的processSelectedKeysOptimized方法,该方法内会轮询注册到自己的Selector上的所有连接套接字的读写事件:

private void processSelectedKeysOptimized() {//3轮询处理所有套接字的读写事件for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null;final Object a = k.attachment();//如果是AbstractNioChannel子类实例if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}

在上述代码中,processSelectedKeysOptimized内会轮询处理所有套接字的读写事件,具体是调用processSelectedKey处理每个NioSocketChannel的读写事件,其代码如下:

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();...//AbstractNioByteChannel的read方法if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

如上代码如果是读事件或者套接字接收事件则会调用AbstractNioByteChannel的read方法读取数据,这里我们只关心读事件,其代码如下:

public final void read() {...try {//4循环读取套接字中的数据do {byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {readPending = false;}break;}//4.1增加读取的包数量allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;//4.2判断是否继续读取} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {...}
}

代码4循环读取当前连接套接字中的数据,代码4.1表示每当从套接字读取一批数据就让读取的消息数量加一,代码如下:

public final void incMessagesRead(int amt) {totalMessages += amt;
}

代码4.2则判断是继续读取数据,还是退出读取循环,allocHandle的continueReading代码如下:

public boolean continueReading() {return continueReading(defaultMaybeMoreSupplier);
}public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {return config.isAutoRead() &&(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&totalMessages < maxMessagePerRead &&//最大读取消息个数?totalBytesRead > 0;
}

默认情况下maxMessagePerRead为16,所以对应NioEventLoop管理的每个NioSocketChannel中的数据,在一次事件循环内最多连续读取16次数据,并不会一直读取,这就有效避免了其他NioSocketChannel的请求事件得不到及时处理的情况。


TCP半包与粘包问题

大家都知道在客户端与服务端进行网络通信时,客户端会通过socket把需要发送的内容序列化为二进制流后发送出去,当二进制流通过网络流向服务器端后,服务端会接收该请求并解析该请求包,然后反序列化后对请求进行处理。

这看似是一个很简单的过程,但是细细想来却发现没有那么简单。使用TCP进行通信时客户端与服务端之间持有一个长连接,客户端多次发送请求都是复用该连接的,下图展示了客户端与服务端的交互流程。

在这里插入图片描述
【客户端与服务器交互图】

如图所示,在客户端发送数据时,实际是把数据写入TCP发送缓存里面的,如果发送的包的大小比TCP发送缓存的容量大,那么这个数据包就会被分成多个包,通过socket多次发送到服务端。而服务端获取数据是从接收缓存里面获取的,假设服务端第一次从接收缓存里面获取的数据是整个包的一部分,这时候就产生了半包现象,半包不是说只收到了全包的一半,而是说只收到了全包的一部分。

服务器读取到半包数据后,会对读取的二进制流进行解析,一般会把二进制流反序列化为对象,这里由于服务器只读取了客户端序列化对象后的一部分,所以反序列会报错。

同理,如果发送的数据包大小比TCP发送缓存容量小,并且假设TCP缓存可以存放多个包,那么客户端和服务端的一次通信就可能传递了多个包,这时候服务端从接收缓存就可能一下读取了多个包,出现粘包现象,由于服务端从接收缓存获取的二进制流是多个对象转换来的,所以在后续的反序列化时肯定也会出错。

其实出现粘包和半包的原因是TCP层不知道上层业务的包的概念,它只是简单地传递流,所以需要上层应用层协议来识别读取的数据是不是一个完整的包。

一般有三种常用的解决半包与粘包问题的方案:

在这里插入图片描述

【Dubbo协议帧】

  • 比较常见的方案是应用层设计协议时,把协议包分为header和body(比如Dubbo协议帧格式),header里面记录body长度,当服务端从接收缓冲区读取数据后,如果发现数据大小小于包的长度则说明出现了半包,这时候就回退读取缓存的指针,等待下次读事件到来时再次测试。如果发现数据大小大于包长度则看大小是否是包长度的整数倍,如果是则循环读取多个包,否则就是出现了多个整包+半包(粘包)。

  • 第二种方案是在多个包之间添加分隔符,使用分隔符来判断一个包的结束。
    在这里插入图片描述
    【帧分隔符】

如图 所示,每个包中间使用“|”作为分隔符,此时每个包的大小可以不固定,当服务器端读取时,若遇到分隔符就知道当前包结束了,但是包的消息体内不能含有分隔符,Netty中提供了DelimiterBasedFrameDecoder用来实现该功能。

  • 还有一种方案是包定长,就是每个包大小固定长度,如下图所示。
    在这里插入图片描述

【 包大小固定长度】

使用这种方案时每个包的大小必须一致,Netty中提供了FixedLengthFrameDecoder来实现该功能。

基于Netty与CompletableFuture实现RPC异步调用

我们来基于CompletableFuture与Netty来模拟下如何异步发起远程调用,为简化设计,这里我们将应用层协议帧格式定义为文本格式,如下图所示。

在这里插入图片描述

【 协议帧格式】

如图所示,

  • 帧格式的第一部分为消息体,也就是业务需要传递的内容;
  • 第二部分为“:”号;
  • 第三部分为请求id,这里使用“:”把消息体与请求id分开,以便服务端提取这两部分内容,需要注意消息体内不能含有“:”号;
  • 第四部分“|”标识一个协议帧的结束,因为本文使用Netty的DelimiterBasedFrameDecoder来解决半包粘包问题,所以需要注意消息体内不能含有“|”号。

首先我们基于Netty开发一个简单的demo来模拟RpcServer,也就是服务提供方程序,RpcServer的代码如下:

public final class RpcServer {public static void main(String[] args) throws Exception {// 0.配置创建两级线程池EventLoopGroup bossGroup = new NioEventLoopGroup(1);// bossEventLoopGroup workerGroup = new NioEventLoopGroup();// worker// 1.创建业务处理handerNettyServerHandler servrHandler = new NettyServerHandler();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 1.1设置帧分隔符解码器ByteBuf delimiter = Unpooled.copiedBuffer("|".getBytes());p.addLast(new DelimiterBasedFrameDecoder(1000, delimiter));// 1.2设置消息内容自动转换为String的解码器到管线p.addLast(new StringDecoder());// 1.3设置字符串消息自动进行编码的编码器到管线p.addLast(new StringEncoder());// 1.4添加业务hander到管线p.addLast(servrHandler);}});// 2.启动服务,并且在12800端口监听ChannelFuture f = b.bind(12800).sync();// 3. 等待服务监听套接字关闭f.channel().closeFuture().sync();} finally {// 4.优雅关闭两级线程池,以便释放线程bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

如上代码是一个典型的NettyServer启动程序,首先代码0创建了NettyServer的boss与worker线程池,然后代码1创建了业务NettyServerHandler,这个我们后面具体讲解。

代码1.1添加DelimiterBasedFrameDecoder解码器到链接channel的管道以便使用“|”分隔符来确定一个协议帧的边界(避免半包粘包问题);

代码1.2添加字符串解码器,它在服务端链接channel接收到客户端发来的消息后会自动把消息内容转换为字符串;代码1.3设置字符串编码器,它会在服务端链接channel向客户端写入数据时,对数据进行编码;代码1.3添加业务handler到管线。

代码2启动服务,并且在端口12800监听客户端发来的链接;代码3同步等待服务监听套接字关闭;代码4优雅关闭两级线程池,以便释放线程。

这里我们主要看下业务handler的实现,服务端在接收客户端消息,且消息内容经过代码1.1、代码1.2的hanlder处理后,流转到NettyServerHandler的就是一个完整的协议帧的字符串了。NettyServerHandler代码如下:

@Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {//5. 根据消息内容和请求id,拼接消息帧public String generatorFrame(String msg, String reqId) {return msg + ":" + reqId + "|";}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {//6.处理请求try {System.out.println(msg);// 6.1.获取消息体,并且解析出请求idString str = (String) msg;String reqId = str.split(":")[1];// 6.2.拼接结果,请求id,协议帧分隔符(模拟服务端执行服务产生结果)String resp =  generatorFrame("im jiaduo ", reqId);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}// 6.3.写回结果ctx.channel().writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));} catch (Exception e) {e.printStackTrace();}}...}

由上述代码可知,@Sharable注解是让服务端所有接收的链接对应的channel复用同一个NettyServerHandler实例,这里可以使用@Sharable方式是因为NettyServer Handler内的处理是无状态的,不会存在线程安全问题。

当数据流程到NettyServerHandler时,会调用其channelRead方法进行处理,这里msg已经是一个完整的本文的协议帧了。

异步任务内代码6.1首先获取消息体的内容,然后根据协议格式,从中截取出请求id,然后调用代码6.2拼接返回给客户端的协议帧,注意这里需要把请求id带回去;然后休眠2s模拟服务端任务处理,最后代码6.3把拼接好的协议帧写回客户端。

下面我们基于Netty开发一个简单的demo来模拟RpcClient,也就是服务消费方程序,RpcClient的代码如下:

public class RpcClient {// 连接通道private volatile Channel channel;// 请求id生成器private static final AtomicLong INVOKE_ID = new AtomicLong(0);// 启动器private Bootstrap b;public RpcClient() {// 1. 配置客户端.EventLoopGroup group = new NioEventLoopGroup();NettyClientHandler clientHandler = new NettyClientHandler();try {b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 1.1设置帧分隔符解码器ByteBuf delimiter = Unpooled.copiedBuffer("|".getBytes());p.addLast(new DelimiterBasedFrameDecoder(1000, delimiter));// 1.2设置消息内容自动转换为String的解码器到管线p.addLast(new StringDecoder());// 1.3设置字符串消息自动进行编码的编码器到管线p.addLast(new StringEncoder());// 1.4添加业务Handler到管线p.addLast(clientHandler);}});// 2.发起链接请求,并同步等待链接完成ChannelFuture f = b.connect("127.0.0.1", 12800).sync();if (f.isDone() && f.isSuccess()) {this.channel = f.channel();}} catch (Exception e) {e.printStackTrace();}}private void sendMsg(String msg) {channel.writeAndFlush(msg);}public void close() {if (null != b) {b.group().shutdownGracefully();}if (null != channel) {channel.close();}}// 根据消息内容和请求id,拼接消息帧private String generatorFrame(String msg, String reqId) {return msg + ":" + reqId + "|";}public CompletableFuture rpcAsyncCall(String msg) {// 1. 创建futureCompletableFuture<String> future = new CompletableFuture<>();// 2.创建消息idString reqId = INVOKE_ID.getAndIncrement() + "";// 3.根据消息,请求id创建协议帧msg = generatorFrame(msg, reqId);// 4.nio异步发起网络请求,马上返回this.sendMsg(msg);// 5.保存future对象FutureMapUtil.put(reqId, future);return future;}public String rpcSyncCall(String msg) throws InterruptedException, ExecutionException {// 1. 创建futureCompletableFuture<String> future = new CompletableFuture<>();// 2.创建消息idString reqId = INVOKE_ID.getAndIncrement()  + "";// 3.消息体后追加消息id和帧分隔符msg = generatorFrame(msg, reqId);// 4.nio异步发起网络请求,马上返回this.sendMsg(msg);// 5.保存futureFutureMapUtil.put(reqId, future);// 6.同步等待结果String result = future.get();return result;}
}

如上代码RpcClient的构造函数创建了一个NettyClient,其使用方法与NettyServer类似,这里不再赘述。需要注意的是,这里注册了业务的NettyClientHandler处理器到链接channel的管线里面,并且在与服务端完成TCP三次握手后把对应的channel对象保存了下来。

下面先来看rpcSyncCall方法,该方法意在模拟同步远程调用,其中代码1创建了一个CompletableFuture对象;代码2使用原子变量生成一个请求id,代码3则把业务传递的msg消息体和请求id组成协议帧;代码4则调用sendMsg方法通过保存的channel对象把协议帧异步发送出去,该方法是非阻塞的,会马上返回,所以不会阻塞业务线程;代码5把代码1创建的future对象保存到FutureMapUtil中管理并发缓存,其中key为请求id,value为创建的future。FutureMapUtil代码如下,可知就是管理并发缓存的一个工具类:

public class FutureMapUtil {// <请求id,对应的future>private static final ConcurrentHashMap<String, CompletableFuture> futureMap = new ConcurrentHashMap<String, CompletableFuture>();public static void put(String id, CompletableFuture future) {futureMap.put(id, future);}public static CompletableFuture remove(String id) {return futureMap.remove(id);}
}

然后代码6调用future的get()方法,同步等待future的complete()方法设置结果完成,调用get()方法会阻塞业务线程,直到future的结果被设置了。

现在我们再来看rpcAsyncCall异步调用,其代码实现与同步的rpcSyncCall类似,只不过其没有同步等待future有结果值,而是直接将future返回给调用方,然后就直接返回了,该方法不会阻塞业务线程。

到这里我们讲解了业务调用时发起远程调用,接下来我们看服务端写回结果到客户端后。关于客户端是如何把接入写回对应的future的,这里我们需要看注册的NettyClientHandler,其代码如下:

@Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 1.根据请求id,获取对应futureCompletableFuture future = FutureMapUtil.remove(((String) msg).split(":")[1]);// 2.如果存在,则设置future结果if (null != future) {future.complete(((String) msg).split(":")[0]);}}
...
}

如上代码所示,当NettyClientHandler的channelRead方法被调用时,其中msg已经是一个完整的本文的协议帧了(因为DelimiterBasedFrameDecoder与StringDecoder已经做过解析)。

异步任务内代码1首先根据协议帧格式,从消息msg内获取到请求id,然后从FutureMapUtil管理的缓存内获取请求id对应的future对象,并移除;如果存在,代码2则从协议帧内获取服务端写回的数据,并调用future的complete方法把结果设置到future,这时候由于调用future的get()方法而被阻塞的线程就返回结果了。

上面我们讲解了RpcClient与RpcServer的实现,下面我们从两个例子看如何使用,首先看TestModelAsyncRpc的代码:

public class TestModelAsyncRpc {private static final RpcClient rpcClient = new RpcClient();public static void main(String[] args) throws InterruptedException, ExecutionException {// 1.同步调用System.out.println(rpcClient.rpcSyncCall("who are you"));// 2.发起远程调用异步,并注册回调,马上返回CompletableFuture<String> future = rpcClient.rpcAsyncCall("who are you");future.whenComplete((v, t) -> {if (t != null) {t.printStackTrace();} else {System.out.println(v);}});System.out.println("---async rpc call over");}
}

如上main函数内首先创建了一个rpcClient对象,然后代码1同步调用了其rpcSyncCall方法,由于是同步调用,所以在服务端执行返回结果前,当前调用线程会被阻塞,直到服务端把结果写回客户端,并且客户端把结果写回到对应的future对象后才会返回。

代码2调用了异步方法rpcAsyncCall,其不会阻塞业务调用线程,而是马上返回一个CompletableFuture对象,然后我们在其上设置了一个回调函数,意在等future对象的结果被设置后进行回调,这个实现了真正意义上的异步。

我们再看一个使用实例,演示如何基于CompletableFuture的能力,并发发起多次调用,然后对返回的多个CompletableFuture进行运算,首先看TestModelAsyncRpc2类:

public class TestModelAsyncRpc2 {private static final RpcClient rpcClient = new RpcClient();public static void main(String[] args) throws InterruptedException, ExecutionException {// 1.发起远程调用异步,马上返回CompletableFuture<String> future1 = rpcClient.rpcAsyncCall("who are you");// 2.发起远程调用异步,马上返回CompletableFuture<String> future2 = rpcClient.rpcAsyncCall("who are you");// 3.等两个请求都返回结果时候,使用结果做些事情CompletableFuture<String> future = future1.thenCombine(future2, (u, v) -> {return u + v;});// 4.等待最终结果future.whenComplete((v, t) -> {if (t != null) {t.printStackTrace();} else {System.out.println(v);}});System.out.println("---async rpc call over---");// rpcClient.close();}}

代码1首先发起一次远程调用,该调用马上返回future1;然后代码2又发起一次远程调用,该调用也马上返回future2对象;代码3则基于CompletableFuture的能力,意在让future1和future2都有结果后再基于两者的结果做一件事情(这里是拼接两者结果返回),并返回一个获取回调结果的新的future。

代码4基于新的future,等其结果产生后,执行新的回调函数,进行结果打印或者异常打印。

最后我们看如何把异步调用改造为Reactive编程风格,这里基于RxJava让异步调用返回结果为Flowable,其实我们只需要把返回的CompletableFuture转换为Flowable即可,可以在RpcClient里面新增一个方法:

// 异步转反应式
public Flowable<String> rpcAsyncCallFlowable(String msg) {// 1.1 使用defer操作,当订阅时候在执行rpc调用return Flowable.defer(() -> {// 1.2创建含有一个元素的流final ReplayProcessor<String> flowable = ReplayProcessor.createWithSize(1);// 1.3具体执行RPC调用CompletableFuture<String> future = rpcAsyncCall(msg);// 1.4等rpc结果返回后设置结果到流对象future.whenComplete((v, t) -> {if (t != null) {// 1.4.1结果异常则发射错误信息flowable.onError(t);} else {// 1.4.2结果OK,则发射出rpc返回结果flowable.onNext(v);// 1.4.3结束流flowable.onComplete();}});return flowable;});
}

如上代码由于CompletableFuture是可以设置回调函数的,所以把其转换为Reactive风格编程很容易。

然后我们可以使用下面代码进行测试:

public class TestModelAsyncRpcReactive {// 1.创建rpc客户端private static final RpcClient rpcClient = new RpcClient();public static void main(String[] args) throws InterruptedException, ExecutionException {// 2.发起远程调用异步,并注册回调,马上返回Flowable<String> result = rpcClient.rpcAsyncCallFlowable("who are you");//3.订阅流对象result.subscribe(/* onNext */r -> {System.out.println(Thread.currentThread().getName() + ":" + r);}, /* onError */error -> {System.out.println(Thread.currentThread().getName() + "error:" + error.getLocalizedMessage());});System.out.println("---async rpc call over");}
}

如上代码,发起rpc调用后马上返回了一个Flowable流对象,但这时真正的rpc调用还没有发出去,等代码3订阅了流对象时才真正发起rpc调用。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72734.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云云主机免费试用三个月

试用链接如下&#xff1a; 阿里云云产品免费试用 云主机 费用试用三个月&#xff0c;每月750小时 实例规格 1核(vCPU) 2 GiB S6 系列机型 适用搭建网站等场景 网络带宽 1M 公网固定网络带宽 云盘40 GiB 真香&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&…

促科技创新:高德数据优化篇之OceanBase最佳实践

本文作者&#xff1a; 振飞&#xff08;高德地图总裁&#xff09; 炳蔚&#xff08;高德技术服务平台负责人&#xff09; 福辰&#xff08;高德服务端架构师&#xff09; 背景 高德成立于2002年&#xff0c;是中国领先的移动数字地图、导航及实时交通信息服务提供商&#xff0c…

04、javascript 修改对象中原有的属性值、修改对象中原有属性的名字(两种方式)、添加对象中新属性等的操作

1、修改对象中原有的属性值 其一、代码为&#xff1a; // 想将 obj 中的 flag 值&#xff0c;根据不同的值来变化(即&#xff1a;修改对象中原有的属性值)&#xff1b; let obj {"port": "port_0","desc": "desc_0","flag&quo…

qt作业day2

//widget.cpp#include "widget.h" #include "ui_widget.h"void Widget::usr_login() {if("admin" this->edit_acc->text()){if("123456" this->edit_psd->text()){speech->say("登录成功");emit jump_sig1…

2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书南京财经大学图书馆

2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书南京财经大学图书馆

Win11搭建 Elasticsearch 7 集群(一)

一&#xff1a; ES与JDK版本匹配一览表 elasticsearch从7.0开始默认安装了java运行环境&#xff0c;以便在没有安装java运行环境的机器上运行。如果配置了环境变量JAVA_HOME&#xff0c;则elasticsearh启动时会使用JAVA_HOME作为java路径&#xff0c;否则使用elasticsearch根目…

【录用案例】CCF-C类,1/2区SCIEI,3个月14天录用,30天见刊,11天检索

计算机科学类SCI&EI 【期刊简介】IF&#xff1a;5.5-6.0&#xff0c;JCR1/2区&#xff0c;中科院2区 【检索情况】SCI&EI 双检&#xff08;CCF-C类&#xff09; 【征稿领域】边缘计算、算法与机器学习的结合研究 录用案例&#xff1a;3个月14天录用&#xff0c;录用…

Android lint配置及使用

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、将 lint 配置为不显示警告3.1 在 A…

【TSN】(一)中英译文

【Two Stream Net】 一&#xff0c;双语翻译 文章目录 【Two Stream Net】Abstract1 Introduction1.1 Related work 2 Two-stream architecture for video recognition3 Optical flow ConvNets3.1 ConvNet input configurations3.2 Relation of the temporal ConvNet archite…

论文阅读 (100):Simple Black-box Adversarial Attacks (2019ICML)

文章目录 1 概述1.1 要点1.2 代码1.3 引用 2 背景2.1 目标与非目标攻击2.2 最小化损失2.3 白盒威胁模型2.4 黑盒威胁模型 3 简单黑盒攻击3.1 算法3.2 Cartesian基3.3 离散余弦基3.4 一般基3.5 学习率 ϵ \epsilon ϵ3.6 预算 1 概述 1.1 要点 题目&#xff1a;简单黑盒对抗攻…

Docker 概念构成

0 概述 构成原理 Docker 客户端(Client)Docker 客户端通过命令行或者其他工具使用 Docker SDK与 Docker 的守护进程通信。Docker 主机(Host)一个物理或者虚拟的机器用于执行 Docker 守护进程和容器。Docker Hub 提供了庞大的镜像集合供使用。一个 Docker Registry 中可以包含多…

E5071C是德科技网络分析仪

描述 E5071C网络分析仪提供同类产品中最高的RF性能和最快的速度&#xff0c;具有宽频率范围和多功能。E5071C是制造和R&D工程师评估频率范围高达20 GHz的RF元件和电路的理想解决方案。特点: 宽动态范围:测试端口的动态范围> 123 dB(典型值)快速测量速度:41毫秒全2端口…

YOLOV8从零搭建一套目标检测系统(修改model结构必看)附一份工业缺陷检测数据集

目录 1.YOLOV8介绍 2.YOLOV8安装 2.1环境配置 3.数据集准备 1.YOLOV8介绍 Yolov8结构图&#xff1a; YoloV8相对于YoloV5的改进点&#xff1a; Replace the C3 module with the C2f module. Replace the first 6x6 Conv with 3x3 Conv in the Backbone. Delete two Convs …

Linux socket网络编程实战(tcp)实现双方聊天

在上节已经系统介绍了大致的流程和相关的API&#xff0c;这节就开始写代码&#xff01; 回顾上节的流程&#xff1a; 创建一个NET文件夹 来存放网络编程相关的代码&#xff1a; tcp服务端代码初步实现--上 这部分先实现服务器的连接部分的代码并进行验证 server1.c&#xff…

接口自动化测试总结

一、什么项目适合做自动化测试&#xff1f; 软件需求变动不频繁 测试脚本的稳定性决定了自动化测试的维护成本。如果软件需求变动过于频繁&#xff0c;测试人员需要根据变动的需求来更新测试用例以及相关的测试脚本&#xff0c;而脚本的维护本身就是一个代码开发的过程&#x…

【二分答案 dp】 Bare Minimum Difference

分析&#xff1a; 首先我们能够得知这个优秀值具有单调性&#xff1a; 如果一个优秀值 x 1 x1 x1能够满足题目要求&#xff0c;那么任何 x ( x > x 1 ) x(x>x1) x(x>x1)显然都能符合要求 基于这一特性&#xff0c;我们想到二分答案 直接二分这个答案好像难以维护。 …

【计算机网络】 静态库与动态库

文章目录 静态库实践使用方法总结 动态库实践使用方法总结 静态库与动态库的优缺点静态库优点缺点 动态库缺点优点 库有两种&#xff1a;静态库&#xff08;.a、.lib&#xff09;和动态库&#xff08;.so、.dll&#xff09;。所谓静态、动态是指链接。静态库是将整个库文件都拷…

虚幻引擎4中关于设置关于体坐标系下的物体速度的相关问题

虚幻引擎4中关于设置关于体坐标系下的物体速度的相关问题 文章目录 虚幻引擎4中关于设置关于体坐标系下的物体速度的相关问题前言全局坐标系转体坐标系速度设置X轴方向的体坐标系速度设置Y轴方向的体坐标系速度XY轴体坐标系速度整合 Z轴速度的进一步设置解决办法 小结 前言 利…

[git] 如何克隆仓库,进行项目撰写,并绑定自己的远程仓库

摘要&#xff1a;删除.git文件&#xff0c;才可重新绑定远程仓库。 具体步骤&#xff1a; 文件夹右键&#xff0c;进入”Git Bash Here“执行命令 1. 执行 ”git clone 仓库地址“&#xff0c;克隆仓库 2. 在生成的仓库中&#xff0c;删除 .git 文件 3. git init 初始化仓库…

自造简易版音频进度条

最近在做音乐播放器页面, 积累了很多有趣的经验, 今天先分享播放进度条的开发过程. 效果 话不多说&#xff0c;先看效果 支持点击修改进度&#xff0c;拖拽修改进度&#xff0c;当然大家肯定都知道ui库里面有现成的&#xff0c;为何要自己造一个 首先著名的ui库中确实都要这…