GRPC协议的相关原理

      GRPC的Client与Server,均通过Netty Channel作为数据通信,序列化、反序列化则使用Protobuf,每个请求都将被封装成HTTP2的Stream,在整个生命周期中,客户端Channel应该保持长连接,而不是每次调用重新创建Channel、响应结束后关闭Channel(即短连接、交互式的RPC),目的就是达到链接的复用,进而提高交互效率。

    1、Server端

    我们通常使用NettyServerBuilder,即IO处理模型基于Netty,将来可能会支持其他的IO模型。Netty Server的IO模型简析:

    1)创建ServerBootstrap,设定BossGroup与workerGroup线程池

    2)注册childHandler,用来处理客户端链接中的请求成帧

    3)bind到指定的port,即内部初始化ServerSocketChannel等,开始侦听和接受客户端链接。

    4)BossGroup中的线程用于accept客户端链接,并转发(轮训)给workerGroup中的线程。

    5)workerGroup中的特定线程用于初始化客户端链接,初始化pipeline和handler,并将其注册到worker线程的selector上(每个worker线程持有一个selector,不共享)

    6)selector上发生读写事件后,获取事件所属的链接句柄,然后执行handler(inbound),同时进行拆封package,handler执行完毕后,数据写入通过,由outbound handler处理(封包)通过链接发出。    注意每个worker线程上的数据请求是队列化的。

    GRPC而言,只是对Netty Server的简单封装,底层使用了PlaintextHandler、Http2ConnectionHandler的相关封装等。具体Framer、Stream方式请参考Http2相关文档。

    1)bossEventLoopGroup:如果没指定,默认为一个static共享的对象,即JVM内所有的NettyServer都使用同一个Group,默认线程池大小为1。

    2)workerEventLoopGroup:如果没指定,默认为一个static共享的对象,线程池大小为coreSize * 2。这两个对象采用默认值并不会带来问题;通常情况下,即使你的application中有多个GRPC Server,默认值也一样能够带来收益。不合适的线程池大小,有可能会是性能受限。

    3)channelType:默认为NioServerSocketChannel,通常我们采用默认值;当然你也可以开发自己的类。如果此值为NioServerSocketChannel,则开启keepalive,同时设定SO_BACKLOG为128;BACKLOG就是系统底层已经建立引入链接但是尚未被accept的Socket队列的大小,在链接密集型(特别是短连接)时,如果队列超过此值,新的创建链接请求将会被拒绝(有可能你在压力测试时,会遇到这样的问题),keepalive和BACKLOG特性目前无法直接修改。

Java代码  
  1. [root@sh149 ~]# sysctl -a|grep tcp_keepalive  
  2. net.ipv4.tcp_keepalive_time = 60  ##单位:秒  
  3. net.ipv4.tcp_keepalive_probes = 9  
  4. net.ipv4.tcp_keepalive_intvl = 75 ##单位:秒  
  5. ##可以在/etc/sysctl.conf查看和修改相关值  
  6. ##tcp_keepalive_time:最后一个实际数据包发送完毕后,首个keepalive探测包发送的时间。  
  7. ##如果首个keepalive包探测成功,那么链接会被标记为keepalive(首先TCP开启了keepalive)  
  8. ##此后此参数将不再生效,而是使用下述的2个参数继续探测  
  9. ##tcp_keepalive_intvl:此后,无论通道上是否发生数据交换,keepalive探测包发送的时间间隔  
  10. ##tcp_keepalive_probes:在断定链接失效之前,尝试发送探测包的次数;  
  11. ##如果都失败,则断定链接已关闭。  

    对于Server端,我们需要关注上述keepalive的一些设置;如果Netty Client在空闲一段时间后,Server端会主动关闭链接,有可能Client仍然保持链接的句柄,将会导致RPC调用时发生异常。这也会导致GRPC客户端调用时偶尔发生错误的原因之一。

    4)followControlWindow:流量控制的窗口大小,单位:字节,默认值为1M,HTTP2中的“Flow Control”特性;连接上,已经发送尚未ACK的数据帧大小,比如window大小为100K,且winow已满,每次向Client发送消息时,如果客户端反馈ACK(携带此次ACK数据的大小),window将会减掉此大小;每次向window中添加亟待发送的数据时,window增加;如果window中的数据已达到限定值,它将不能继续添加数据,只能等待Client端ACK。

    5)maxConcurrentCallPerConnection:每个connection允许的最大并发请求数,默认值为Integer.MAX_VALUE;如果此连接上已经接受但尚未响应的streams个数达到此值,新的请求将会被拒绝。为了避免TCP通道的过度拥堵,我们可以适度调整此值,以便Server端平稳处理,毕竟buffer太多的streams会对server的内存造成巨大压力。

    6)maxMessageSize:每次调用允许发送的最大数据量,默认为100M。

    7)maxHeaderListSize:每次调用允许发送的header的最大条数,GRPC中默认为8192。

    对于其他的比如SSL/TSL等,可以参考其他文档。

    GRPC Server端,还有一个最终要的方法:addService。【如下文service代理模式】

    在此之前,我们需要介绍一下bindService方法,每个GRPC生成的service代码中都有此方法,它以硬编码的方式遍历此service的方法列表,将每个方法的调用过程都与“被代理实例”绑定,这个模式有点类似于静态代理,比如调用sayHello方法时,其实内部直接调用“被代理实例”的sayHello方法(参见MethodHandler.invoke方法,每个方法都有一个唯一的index,通过硬编码方式执行);bindService方法的最终目的是创建一个ServerServiceDefinition对象,这个对象内部位置一个map,key为此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的GRPC封装类(ServerMethodDefinition)。

Java代码 
  1. private static final int METHODID_SAY_HELLO = 0;  
  2. private static class MethodHandlers<Req, Resp> implements  
  3.       ... {  
  4.     private final TestRpcService serviceImpl;//实际被代理实例  
  5.     private final int methodId;  
  6.   
  7.     public MethodHandlers(TestRpcService serviceImpl, int methodId) {  
  8.       this.serviceImpl = serviceImpl;  
  9.       this.methodId = methodId;  
  10.     }  
  11.   
  12.     @java.lang.SuppressWarnings("unchecked")  
  13.     public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {  
  14.       switch (methodId) {  
  15.         case METHODID_SAY_HELLO:        //通过方法的index来判定具体需要代理那个方法  
  16.           serviceImpl.sayHello((com.test.grpc.service.model.TestModel.TestRequest) request,  
  17.               (io.grpc.stub.StreamObserver<com.test.grpc.service.model.TestModel.TestResponse>) responseObserver);  
  18.           break;  
  19.         default:  
  20.           throw new AssertionError();  
  21.       }  
  22.     }  
  23.     ....  
  24.   }  
  25.   
  26.   public static io.grpc.ServerServiceDefinition bindService(  
  27.       final TestRpcService serviceImpl) {  
  28.     return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)  
  29.         .addMethod(  
  30.           METHOD_SAY_HELLO,  
  31.           asyncUnaryCall(  
  32.             new MethodHandlers<  
  33.               com.test.grpc.service.model.TestModel.TestRequest,  
  34.               com.test.grpc.service.model.TestModel.TestResponse>(  
  35.                 serviceImpl, METHODID_SAY_HELLO)))  
  36.         .build();  
  37.   }  

    addService方法可以添加多个Service,即一个Netty Server可以为多个service服务,这并不违背设计模式和架构模式。addService方法将会把service保存在内部的一个map中,key为serviceName(即{package}.{service}),value就是上述bindService生成的对象。

    那么究竟Server端是如何解析RPC过程的?Client在调用时会将调用的service名称 + method信息保存在一个GRPC“保留”的header中,那么Server端即可通过获取这个特定的header信息,就可以得知此stream需要请求的service、以及其method,那么接下来只需要从上述提到的map中找到service,然后找到此method,直接代理调用即可。执行结果在Encoder之后发送给Client。(参见:NettyServerHandler)

    因为是map存储,所以我们需要在定义.proto文件时,尽可能的指定package信息,以避免因为service过多导致名称可能重复的问题。

    2、Client端

    我们使用ManagedChannelBuilder来创建客户端channel,ManagedChannelBuilder使用了provider机制,具体是创建了哪种channel有provider决定,可以参看META-INF下同类名的文件中的注册信息。当前Channel有2种:NettyChannelBuilder与OkHttpChannelBuilder。本人的当前版本中为NettyChannelBuilder;我们可以直接使用NettyChannelBuilder来构建channel。如下描述则针对NettyChannelBuilder:

    配置参数与NettyServerBuilder基本类似,再次不再赘言。默认情况下,Client端默认的eventLoopGroup线程池也是static的,全局共享的,默认线程个数为coreSize * 2。合理的线程池个数可以提高客户端的吞吐能力。

    ManagedChannel是客户端最核心的类,它表示逻辑上的一个channel;底层持有一个物理的transport(TCP通道,参见NettyClientTransport),并负责维护此transport的活性;即在RPC调用的任何时机,如果检测到底层transport处于关闭状态(terminated),将会尝试重建transport。(参见TransportSet.obtainActiveTransport())

    通常情况下,我们不需要在RPC调用结束后就关闭Channel,Channel可以被一直重用,直到Client不再需要请求位置或者Channel无法真的异常中断而无法继续使用。当然,为了提高Client端application的整体并发能力,我们可以使用连接池模式,即创建多个ManagedChannel,然后使用轮训、随机等算法,在每次RPC请求时选择一个Channel即可。(备注,连接池特性,目前GRPC尚未提供,需要额外的开发)

    每个Service客户端,都生成了2种stub:BlockingStub和FutureStub;这两个Stub内部调用过程几乎一样,唯一不同的是BlockingStub的方法直接返回Response Model,而FutureStub返回一个Future对象。BlockingStub内部也是基于Future机制,只是封装了阻塞等待的过程:

Java代码 
  1. try {  
  2.         //也是基于Future  
  3.       ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);  
  4.       //阻塞过程  
  5.       while (!responseFuture.isDone()) {  
  6.         try {  
  7.           executor.waitAndDrain();  
  8.         } catch (InterruptedException e) {  
  9.           Thread.currentThread().interrupt();  
  10.           throw Status.CANCELLED.withCause(e).asRuntimeException();  
  11.         }  
  12.       }  
  13.       return getUnchecked(responseFuture);  
  14.     } catch (Throwable t) {  
  15.       call.cancel();  
  16.       throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);  
  17. }  

    创建一个Stub的成本是非常低的,我们可以在每次请求时都通过channel创建新的stub,这并不会带来任何问题(只不过是创建了大量对象);其实更好的方式是,我们应该使用一个Stub发送多次请求,即Stub也是可以重用的;直到Stub上的状态异常而无法使用。最常见的异常,就是“io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED”,即表示DEADLINE时间过期,我们可以为每个Stub配置deadline时间,那么如果此stub被使用的时长超过此值(不是空闲的时间),将不能再发送请求,此时我们应该创建新的Stub。很多人想尽办法来使用“withDeadlineAfter”方法来实现一些奇怪的事情,此参数的主要目的就是表明:此stub只能被使用X时长,此后将不能再进行请求,应该被释放。所以,它并不能实现类似于“keepAlive”的语义,即使我们需要keepAlive,也应该在Channel级别,而不是在一个Stub上。

    如果你使用了连接池,那么其实连接池不应该关注DEADLINE的错误,只要Channel本身没有terminated即可;就把这个问题交给调用者处理。如果你也对Stub使用了对象池,那么你就可能需要关注这个情况了,你不应该向调用者返回一个“DEADLINE”的stub,或者如果调用者发现了DEADLINE,你的对象池应该能够移除它。

    1)实例化ManagedChannel,此channel可以被任意多个Stub实例引用;如上文说述,我们可以通过创建Channel池,来提高application整体的吞吐能力。此Channel实例,不应该被shutdown,直到Client端停止服务;在任何时候,特别是创建Stub时,我们应该判定Channel的状态。

Java代码  收藏代码
  1. synchronized (this) {  
  2.     if (channel.isShutdown() || channel.isTerminated()) {  
  3.         channel = ManagedChannelBuilder.forAddress(poolConfig.host, poolConfig.port).usePlaintext(true).build();  
  4.     }  
  5.     //new Stub  
  6. }  
  7.   
  8. //或者  
  9. ManagedChannel channel = (ManagedChannel)client.getChannel();  
  10. if(channel.isShutdown() || channel.isTerminated()) {  
  11.     client = createBlockStub();  
  12. }  
  13. client.sayHello(...)  

    因为Channel是可以多路复用,所以我们用Pool机制(比如commons-pool)也可以实现连接池,只是这种池并非完全符合GRPC/HTTP2的设计语义,因为GRPC允许一个Channel上连续发送对个Requests(然后一次性接收多个Responses),而不是“交互式”的Request-Response模式,当然这么使用并不会有任何问题。

    2)对于批量调用的场景,我们可以使用FutureStub,对于普通的业务类型RPC,我们应该使用BlockingStub。

    3)每个RPC方法的调用,比如sayHello,调用开始后,将会为每个调用请求创建一个ClientCall实例,其内部封装了调用的方法、配置选项(headers)等。此后将会创建Stream对象,每个Stream都持有唯一的streamId,它是Transport用于分拣Response的凭证。最终调用的所有参数都会被封装在Stream中。

    4)检测DEADLINE,是否已经过期,如果过期,将使用FailingClientStream对象来模拟整个RPC过程,当然请求不会通过通道发出,直接经过异常流处理过程。

    5)然后获取transport,如果此时检测到transport已经中断,则重建transport。(自动重练机制,ClientCallImpl.start()方法)

    6)发送请求参数,即我们Request实例。一次RPC调用,数据是分多次发送,但是ClientCall在创建时已经绑定到了指定的线程上,所以数据发送总是通过一个线程进行(不会乱序)。

    7)将ClientCall实例置为halfClose,即半关闭,并不是将底层Channel或者Transport半关闭,只是逻辑上限定此ClientCall实例上将不能继续发送任何stream信息,而是等待Response。

    8)Netty底层IO将会对reponse数据流进行解包(Http2ConnectionDecoder),并根据streamId分拣Response,同时唤醒响应的ClientCalls阻塞。(参见ClientCalls,GrpcFuture)

    9)如果是BlockingStub,则请求返回,如果响应中包含应用异常,则封装后抛出;如果是网络异常,则可能触发Channel重建、Stream重置等。

 

转载自:http://shift-alt-ctrl.iteye.com/blog/2292862

转载于:https://www.cnblogs.com/junjiang3/p/9164513.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/251597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Echarts --- 各个省份的坐标

纯手打…效果如下 1.新疆: [86.61 , 40.79] 2.西藏:[89.13 , 30.66] 3.黑龙江:[128.34 , 47.05] 4.吉林:[126.32 , 43.38] 5.辽宁:[123.42 , 41.29] 6.内蒙古:[112.17 , 42.81] 7.北京:[116.40 , 40.40 ] 8.宁夏:[106.27 , 36.76] 9.山西:[111.95,37.65] 10.河北:[115.21 , 38.…

C++标准输入流、输出流以及文件流

1、流的控制 iomanip 在使用格式化I/O时应包含此头文件。 stdiostream 用于混合使用C和C 的I/O机制时&#xff0c;例如想将C程序转变为C程序 2、类继承关系 ios是抽象基类&#xff0c;由它派生出istream类和ostream类&#xff0c; iostream类支持输入输出操作&…

Hadoop学习笔记—8.Combiner与自定义Combiner

一、Combiner的出现背景 1.1 回顾Map阶段五大步骤 在第四篇博文《初识MapReduce》中&#xff0c;我们认识了MapReduce的八大步凑&#xff0c;其中在Map阶段总共五个步骤&#xff0c;如下图所示&#xff1a; 其中&#xff0c;step1.5是一个可选步骤&#xff0c;它就是我们今天需…

css --- 行内框和内容区

css规定font-size的大小实际上是字体的高度 可以将内容区理解为font-size的大小. 行内高可以理解为 ( (line-height) - (font-size) ) /2 然后再font-size 的上下加上前面的值 看下面的例子 <p style"font-size:12px;line-height:12px;">this is text, <em&…

DotNetTextBox V3.0 所见即所得编辑器控件 For Asp.Net2.0(ver 3.0.7Beta) 增加多语言!

英文名&#xff1a;DotNetTextBox V3.0 WYSWYG Web Control For Asp.Net2.0 中文名&#xff1a;DotNetTextBox V3.0 所见即所得编辑器控件 For Asp.Net2.0 类型: 免费控件(保留版权) 作者: 小宝.NET 2.0(Terry Deng) 主页&#xff1a;http://www.aspxcn.com.cn 控件演示页面: h…

css --- 浮动元素与 块框/行内框重叠时的细节

块框,可以认为是块级元素(如div、h1)的内容区 内边距 行内框可以认为是行内元素(如span)的内容区 内边距 当 块级框/行内框 和一个浮动元素重叠时&#xff0c;行内框的边框、背景和内容都在幅度元素之上&#xff0c;块级框的边框和背景都在浮动元素的下面&#xff0c;但内容在…

npm --- 包的发布与导入

安装好NODE后,下面演示如何编写一个包,并将其发布到NPM仓库中,最后通过NPM安装回本地. 以下例子是在windows*64环境下运行的. 1.编写模块 exports.sayHello function(){return Hello World; }将上述代码保存在hello.js中 2.初始化包描述文件: 使用npm init指令,快速生成包…

XPath 的使用

XPath 的使用 XPath&#xff0c;全称XML Path Language&#xff0c;即XML路径语言&#xff0c;它是一门在XML文档中查找信息的语言&#xff0c;最初用于搜寻XML文档&#xff0c;但是也同样适用于HTML文档的搜索。前面我们在解析或抽取网页信息时&#xff0c;使用的是正则表达式…

WinCC归档数据报表控件

1、背景 WinCC实现报表历来是老大难&#xff0c;自带的报表功能不好使&#xff0c;又没有好用的第三方控件。虽然网上也有很多实现报表的方法&#xff0c;但是毫无例外的要求使用者具有脚本编程功底&#xff0c;HwDataReport的出现将终结这一现象。您无需一行脚本即可完成…

第三次实验报告

项目一 项目分析 本项目目的就是考察我们调用searching类和sorting类中的方法&#xff0c;通过提交测试用例设计情况&#xff08;正常&#xff0c;异常&#xff0c;边界&#xff0c;正序&#xff0c;逆序&#xff09;确保类的可行性 代码截图 排序截图 运行截图 项目二 代码截…

Node --- 构建一个HTTP服务

代码如下: var http require(http); http.createServer(function (req, res){res.writeHead(200,{Content-Type: text/plain});res.end(Hello World\n); }).listen(1337, 127.0.0.1); console.log(Server running at http://127.0.0.1:1337);运行如下&#xff1a; 详情见《深…

DDR3和eMMC区别

DDR3内存条和eMMC存储器区别&#xff1a; 1. 存储性质不同&#xff1b;2. 存储容量不同&#xff1b;3. 运行速度不同&#xff1b;4. 用途不同。 具体区别如下&#xff1a; 1、存储性质不同&#xff1a;eMMC是非易失性存储器&#xff0c;不论在通电或断电状态下&#xff0c;数据…

17秋 软件工程 团队第五次作业 Alpha Scrum3

17秋 软件工程 团队第五次作业 Alpha Scrum3 今日完成的任务 杰麟&#xff1a;java后端学习&#xff1b;世强&#xff1a;Android的部门基础信息模块的信息显示和对接后台&#xff1b;港晨&#xff1a;后台管理登陆界面ui设计&#xff1b;树民&#xff1a;超级管理员Web后端数据…

event.target【转载】

[转载] 1.this和event.target的区别&#xff1a; js中事件是会冒泡的&#xff0c;所以this是可以变化的&#xff0c;但event.target不会变化&#xff0c;它永远是直接接受事件的目标DOM元素&#xff1b; 2.this和event.target都是dom对象&#xff0c;如果要使用jquey中的方法可…

使用DataTable作为存储过程的参数

最近工作中写了几个存储过程&#xff0c;需要向存 储过程中传递字符串&#xff0c;因为SQL Server 2000中没有内置类似于 split 的函数&#xff0c;只好自己处理&#xff0c;将前台数据集中的一列用逗号拆分存到一个List中&#xff0c;再转化为字符串传给存储过程&#xff0c;很…

进制转换详细解说

进制的由来&#xff1a;任何数据在计算机中都是以二进制的形式存在的。二进制早起由电信号开关演变而来。 一个整数在内存中一样也是二进制的&#xff0c;但是使用一大串的1或者0组成的数值进行使用很麻烦 所以就想把一大串缩短点&#xff0c;讲二进制中的三位用一位表示。 这三…

6.2 常见多媒体标准及压缩技术

MPEG-1是视频的压缩标准.这个标准是在1993年8月份发布的.标准就规定了视频文件以每秒钟1.5MB的速率来传输数字媒体它的运动图像以及伴音的编码.这个标准它包括了五个部分. MPEG-2它是1994年推出来的一个压缩标准&#xff0c;也是用于视频的。MPEG-2、MPEG-4、MPEG-7、MPEG-21它…

Single Number II

2018-06-17 14:04:27 问题描述&#xff1a; 问题求解&#xff1a; 方法一、如果对空间复杂度没有要求&#xff0c;那么直接使用HashMap对每个数字出现次数进行计数&#xff0c;最后对HashMap遍历一遍即可&#xff0c;总的时间复杂度为O(n)&#xff0c;空间开销较大。 方法二、对…

打造自己Django博客日记

本教程使用的开发环境 本教程写作时开发环境的系统平台为 Windows 10 &#xff08;64 位&#xff09;&#xff0c;Python 版本为 3.5.2 &#xff08;64 位&#xff09;&#xff0c;Django 版本为 1.10.6。 建议尽可能地与教程的开发环境保持一致&#xff08;尤其是 Python 与 D…

Controller上使用@CrossOrigin注解

本文首次发布于My Blog,作者Ian,转载请保留原文链接。 就是一个跨域的注解 Spring MVC 从4.2版本开始增加了对CORS的支持 CORS介绍请看这里&#xff1a;https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Access_control_CORS 参考isea533&#xff1a;https://blog.csdn.net/…