gRPC 流式调用

gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目前支持 4 种定义服务方法类型:

类型说明
简单 RPC客户端传入一个请求对象,服务端返回一个结果对象
客户端流式 RPC客户端传入多个请求对象,服务端返回一个结果对象
服务端流式 RPC客户端传入一个请求对象,服务端返回多个结果对象
双向流式 RPC客户端传入多个请求对象,服务端返回多个结果对象

RPC 定义

简单 RPC:一般这种方式使用较多,如下:定义 SayHello 方法,输入 HelloRequest,返回 HelloResponse 。

1
2
3
4
5
6
7
8
9
10
11
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

而流式 RPC 定义与 简单 RPC 的区别只是在请求或返回参数前增加了 stream 关键词,如下:

1
2
3
4
5
6
7
8
service HelloService {
// 客户端流式 RPC
rpc SayHello1 (stream HelloRequest) returns (HelloResponse);
// 服务端流式 RPC
rpc SayHello2 (HelloRequest) returns (stream HelloResponse);
// 双向流式 RPC
rpc SayHello3 (stream HelloRequest) returns (stream HelloResponse);
}

gRPC 能支持流式调用本质是因为 gRPC 通信是基于 HTTP/2 实现的,HTTP/2 具有流的概念,流是为了实现 HTTP/2 的多路复用。流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程。

使用场景

在 gRPC 中消息接收大小 GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH 默认是 4M,如果大于该值,则会提示:Error: grpc: received message larger than max (xxxxxx vs. 4194304),当然我们可以修改默认值解决问题,但如果默认值支持过大对服务器资源也是一种消耗,这时候其实应该考虑使用流式调用,有效将数据进行分批处理,提高性能。

示例

这里主要介绍一下双向流式 RPC(客户端和服务端流式 RPC 类似),完整代码请 前往这里查看 。双向流模拟功能是客户端流式输入文件路径,服务端针对每个文件每次最多读取 1M 的数据返回,客户端拿到数据后生成新文件。

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package GrpcStream;

service StreamTest {
// 双向流程 RPC
rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {}
}

message BidirectionalStreamRequest {
// 文件路径
string file_path = 1;
}
message BidirectionalStreamResponse {
// 文件路径
string file_path = 1;
// 数据
bytes data = 2;
}

代码实现

这里是基于 .NET Core 3.0 使用 gRPC,可以通过 VS 预置的 gRPC 服务 模板来创建服务端,创建后将默认的 porto 文件替换成上面的内容。

服务端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context)
{
var i = 0;
// 监听客户端数据输入
while (await requestStream.MoveNext())
{
// 打印次数
Console.WriteLine(i++);
using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open);
var leftSize = fs.Length;
// 1M
var buff = new byte[1048576];
while (leftSize > 0)
{
var len = await fs.ReadAsync(buff);
leftSize -= len;
Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes");
// 流式返回数据
await responseStream.WriteAsync(new BidirectionalStreamResponse
{
FilePath = requestStream.Current.FilePath,
Data = ByteString.CopyFrom(buff, 0, len)
});
}
}
}

客户端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 测试文件,key 是已存在的文件,value 是需要生成的文件
static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>()
{
{@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" },
{@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" },
};
static StreamTest.StreamTestClient client;

static async Task Main(string[] args)
{
// 连接 gRPC 服务
var channel = GrpcChannel.ForAddress("https://localhost:5001");
client = new StreamTest.StreamTestClient(channel);
await BidirectionalStreamTestAsync();
Console.ReadKey();
}

static async Task BidirectionalStreamTestAsync()
{
using var call = client.BidirectionalStream();
var responseTask = Task.Run(async () =>
{
// 接收返回值
var iterator = call.ResponseStream;
// 监听服务端数据返回
while (await iterator.MoveNext())
{
Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes");
// 写入新文件
using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append);
iterator.Current.Data.WriteTo(fs);
}
});

var rand = new Random();
foreach (var item in fileDic)
{
// 流式输入
await call.RequestStream.WriteAsync(new BidirectionalStreamRequest
{
FilePath = item.Key
});
await Task.Delay(rand.Next(200));
}
await call.RequestStream.CompleteAsync();
await responseTask;
}

执行结果:

640?wx_fmt=png

参考资料

  • gRPC Concepts


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313212.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

模型压缩案例-SSDYou only look once

http://write.blog.csdn.NET/postedit 在上一篇文章中&#xff0c;介绍了以regionproposal来检测的框架&#xff0c;这一系列速度和精度不断提高&#xff0c;但是还是无法达到实时。存在的主要问题为&#xff1a;速度不够快&#xff0c;主要原因是proposal比较多&#xff0c;特…

.NET如何将字符串分隔为字符

前言如果这是一道面试题&#xff0c;答案也许非常简单&#xff1a;.ToCharArray()&#xff0c;这基本正确……我们以“AB吉??????”作为输入参数&#xff0c;首先如果按照“正常”处理的思路&#xff0c;用 .ToCharArray()&#xff0c;然后转换为 JSON&#xff08;以便方…

Orleans 知多少 | 3. Hello Orleans

1. 引言是的&#xff0c;Orleans v3.0.0 已经发布了&#xff0c;并已经完全支持 .NET Core 3.0。所以&#xff0c;Orleans 系列是时候继续了&#xff0c;抱歉&#xff0c;让大家久等了。万丈高楼平地起&#xff0c;这一节我们就先来了解下Orleans的基本使用。2. 模板项目讲解在…

.NET Core 3.0之深入源码理解ObjectPool(二)

写在前面前文主要介绍了ObjectPool的一些理论基础&#xff0c;本文主要从源码角度理解Microsoft.Extensions.ObjectPool是如何实现的。下图为其三大核心组件图&#xff1a;核心组件ObjectPoolObjectPool是一个泛型抽象类&#xff0c;里面只有两个抽象方法&#xff0c;Get和Retu…

VC维学习

http://www.flickering.cn/machine_learning/2015/04/vc维的来龙去脉/ 说说历史Hoeffding不等式Connection to Learning学习可行的两个核心条件Effective Number of HypothesesGrowth FunctionBreak Point与ShatterVC BoundVC dimension深度学习与VC维小结参考文献 VC维在机器学…

.NET Core 3.0 一个 jwt 的轻量角色/用户、单个API控制的授权认证库

作者&#xff1a;痴者工良&#xff08;朋友合作原创&#xff09;来源&#xff1a;https://www.cnblogs.com/whuanle/p/11743406.html目录说明一、定义角色、API、用户二、添加自定义事件三、注入授权服务和中间件三、如何设置API的授权四、添加登录颁发 Token五、部分说明六、验…

.NET Core 3.0 构建和部署

Default Executables 默认可执行文件 在 dotnet build 或 dotnet publish 期间&#xff0c;将创建一个与你使用的 SDK 的环境和平台相匹配的可执行文件。 和其他本机可执行文件一样&#xff0c;可以使用这些可执行文件执行相同操作&#xff0c;例如&#xff1a; 可以双击可执行…

为什么我会了SOA,你们还要逼我学微服务?

菜菜哥&#xff0c;我最近需要做一个项目&#xff0c;老大让我用微服务的方式来做那挺好呀&#xff0c;微服务现在的确很流行我以前在别的公司都是以SOA的方式&#xff0c;SOA也是面向服务的方式呀的确&#xff0c;微服务和SOA有相同之处面向服务的架构&#xff08;SOA&#xf…

面对万物互联的智能世界,你是否也想分一杯羹

第六届世界互联网大会于10月20日至22日在浙江乌镇顺利举行。作为世界互联网大会“13”架构的重要组成部分&#xff0c;“互联网之光”博览会以“智能互联网、开放合作——携手共建网络空间命运共同体”为主题&#xff0c;集中展示了全球范围内的互联网新技术、新成果、新产品、…

你必须知道的容器监控 (2) cAdvisor

# 实验环境&#xff1a;阿里云ECS主机&#xff08;两台&#xff09;&#xff0c;CentOS 7.401—cAdvisor简介为了解决容器的监控问题&#xff0c;Google开发了一款容器监控工具cAdvisor&#xff08;Container Advisor&#xff09;&#xff0c;它为容器用户提供了对其运行容器的…

代码阅读

http://alanse7en.github.io/caffedai-ma-jie-xi-4/ 三. 从一个比较宏观的层面上去了解caffe怎么去完成一些初始化的工作和使用Solver的接口函数&#xff0c;本文将主要分为四部分的内容&#xff1a; Google Flags的使用Register Brew Function的宏的定义和使用train()函数的…

动手造轮子:实现一个简单的依赖注入(一)

动手造轮子&#xff1a;实现一个简单的依赖注入(一)Intro在上一篇文章中主要介绍了一下要做的依赖注入的整体设计和大概编程体验&#xff0c;这篇文章要开始写代码了&#xff0c;开始实现自己的依赖注入框架。类图首先来温习一下上次提到的类图服务生命周期服务生命周期定义&am…

Qt 调试Caffe

http://blog.csdn.net/xg123321123/article/details/52817658 1.下载并安装Qt Creator 下载页面&#xff0c;推荐使用4.x版本&#xff0c;比如&#xff1a; Qt Creator 4.1.0 for Linux 64-bit下载的是run包&#xff0c;安装方法&#xff1a; cd到下载目录 chmod x xxx.run …

.NET Core 3.0 本地工具

.NET Core从最早期的版本就开始支持全局工具了。如果仅仅需要在某个项目中或某个文件夹中使用特定的工具&#xff0c;那么.NET Core 3.0就允许您这样做。 使用.NET Core 3.0&#xff0c;您可以在特定的文件夹下安装“本地”工具&#xff0c;它的作用范围仅限于该文件夹及其子文…

Nsight 调试 Caffe

http://blog.csdn.net/u014114990/article/details/47723877 右键属性菜单的General->Code Analysis->Paths and Symbols下进行加入&#xff1a; Includes下加入程序需要用到的头文件的路径&#xff1a; Library Path下添加需要用到库文件的路径&#xff1a; 具体用到的…

张高兴的 .NET Core IoT 入门指南:(五)PWM 信号输出

什么是 PWM在解释 PWM 之前首先来了解一下电路中信号的概念&#xff0c;其中包括模拟信号和数字信号。模拟信号是一种连续的信号&#xff0c;与连续函数类似&#xff0c;在图形上表现为一条不间断的连续曲线。数字信号为只能取有限个数值的信号&#xff0c;比如计算机中的高电平…

.NET Core 3 对 IoT 应用程序的高级支持:System.Device.Gpio

System.Device.Gpio 是一个全新的 .Net Core 开源库&#xff0c;它旨在使 IoT&#xff08;物联网&#xff09;应用程序能够通过其 GPIO 引脚或其他 I/O 控制硬件与传感器、显示器和输入设备进行交互。该库是由社区维护的多个设备绑定集合来进行增强实现的。正如微软 .NET 项目…

代码阅读2

http://blog.csdn.net/u014568921/article/details/53995455 首先&#xff0c;要知道caffe里的卷积核都是三维的 在caffe中卷积核是三维的还是二维的&#xff1f; 下面分割线之间的内容来自http://blog.csdn.NET/u014114990/article/details/51125776 /*********************…

一文带你了解华为云DevCloud为何能全面领跑中国DevOps云服务市场

近日&#xff0c;国际权威调研机构IDC发布了《IDC MarketScape&#xff1a;中国DevOps云服务市场2019厂商评估》报告&#xff0c;该报告从战略和能力两个维度对国内主流DevOps云厂商进行了评估&#xff0c;报告显示&#xff0c;华为云位于 IDC MarketScape “中国DevOps云服务 …