gRPC之gRPC流

1、gRPC流

从其名称可以理解,流就是持续不断的传输。有一些业务场景请求或者响应的数据量比较大,不适合使用普通的

RPC 调用通过一次请求-响应处理,一方面是考虑数据量大对请求响应时间的影响,另一方面业务场景的设计不一

定需要一次性处理完所有数据,这时就可以使用流来分批次传输数据。

HTTP/2中有两个概念,流(stream)与帧(frame),其中帧作为HTTP/2中通信的最小传输单位,通常一个请

求或响应会被分为一个或多个帧传输,流则表示已建立连接的虚拟通道,可以传输多次请求或响应。每个帧中包含

Stream Identifier,标志所属流。HTTP/2通过流与帧实现多路复用,对于相同域名的请求,通过Stream

Identifier标识可在同一个流中进行,从而减少连接开销。 而gRPC基于HTTP/2协议传输,自然而然也实现了流式

传输,其中gRPC中共有以下三种类型的流:

1、服务端流式响应

2、客户端流式请求

3、两端双向流式

本篇主要讲讲如何实现gRPC三种流式处理。

gRPC的stream只需要在service的rpc方法描述中通过 stream 关键字指定启用流特性就好了。

1.1 单向流

单向流是指客户端和服务端只有一端开启流特性,这里的单向特指发送数据的方向。

  • 当服务端开启流时,客户端和普通 RPC 调用一样通过一次请求发送数据,服务端通过流分批次响应。

  • 当客户端开启流时,客户端通过流分批次发送请求数据,服务端接完所有数据后统一响应一次。

1.1.1 服务端流

定义一个 MultiPong 方法,在服务端开启流,功能是接收到客户端的请求后响应10次 pong 消息。

ping.proto文件的编写:

// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";
// 定义PingPong服务
service PingPong {// Ping发送ping请求,接收pong响应// 服务端流模式,在响应消息前添加stream关键字rpc MultiPong(PingRequest) returns (stream PongResponse);
}// PingRequest请求结构
message PingRequest {// value字段为string类型string value = 1; 
}// PongResponse 响应结构
message PongResponse {// value字段为string类型string value = 1; 
}

ping.pb.go文件的生成:

$ protoc --go_out=plugins=grpc:. ping.proto

服务端实现,server.go的编写,第二个参数为 stream 对象的引用,可以通过它的 Send 方法发送数据。

package mainimport (// 引入编译生成的包pb "demo/protos/ping""google.golang.org/grpc""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// MultiPong 服务端流模式
func (s *PingPongServer) MultiPong(req *pb.PingRequest, stream pb.PingPong_MultiPongServer) error {for i := 0; i < 10; i++ {data := &pb.PongResponse{Value: "pong"}// 发送消息err := stream.Send(data)if err != nil {return err}}return nil
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatalln(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 20:51:04 listen on 7009

客户端实现,client.go的编写,请求方式和普通 RPC 没有区别,重点关注对响应数据流的处理,通过一个 for

循环接收数据直到结束。

package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatalln(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)// 获得对 stream 对象的引用stream, err := client.MultiPong(context.Background(), &pb.PingRequest{Value: "ping"})if err != nil {log.Fatalln(err)}// 循环接收响应数据流for {msg, err := stream.Recv()if err != nil {// 数据结束if err == io.EOF {break}log.Fatalln(err)}log.Println(msg.Value)}
}
# 客户端运行
$ go run client.go
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
# 目录结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│   └── ping
│       └── ping.pb.go
└── server.go2 directories, 6 files

1.1.2 客户端流

定义一个 MultiPing 方法,在客户端开启流,功能是持续发送多个 ping 请求,服务端统一响应一次。

ping.proto文件的编写:

// ping.proto
// 指定proto版本
syntax = "proto3"; 
// 指定包名
package protos;     
// 指定go包路径
option go_package = "protos/ping";
// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应// 客户端流模式,在请求消息前添加 stream 关键字rpc MultiPing(stream PingRequest) returns (PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}

ping.pb.go文件的生成:

$ protoc --go_out=plugins=grpc:. ping.proto

服务端实现,server.go的编写,只有一个参数为 stream 对象的引用,可以通过它的 Recv 方法接收数据。使

SendAndClose 方法关闭流并响应,服务端可以根据需要提前关闭。

package mainimport ("fmt"// 引入编译生成的包pb "demo/protos/ping""google.golang.org/grpc""io""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// MultiPing 客户端流模式
func (s *PingPongServer) MultiPing(stream pb.PingPong_MultiPingServer) error {msgs := []string{}for {// 提前结束接收消息if len(msgs) > 5 {return stream.SendAndClose(&pb.PongResponse{Value: "ping enough, max 5"})}msg, err := stream.Recv()if err != nil {// 客户端消息结束,返回响应信息if err == io.EOF {return stream.SendAndClose(&pb.PongResponse{Value: fmt.Sprintf("got %d ping", len(msgs))})}return err}msgs = append(msgs, msg.Value)}
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatalln(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 21:26:42 listen on 7009

客户端实现,client.go的编写,调用 MultiPing 方法时不再指定请求参数,而是通过返回的 stream 对象的

Send 分批发送数据。

package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatalln(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)// 获得对stream对象的引用// 调用并得到stream对象stream, err := client.MultiPing(context.Background())if err != nil {log.Fatalln(err)}// 发送数据for i := 0; i < 6; i++ {data := &pb.PingRequest{Value: "ping"}err = stream.Send(data)if err != nil {log.Fatalln(err)}}// 发送结束并获取服务端响应res, err := stream.CloseAndRecv()if err != nil {log.Fatalln(err)}log.Println(res.Value)
}
# 启动客户端
# 发送3个ping
$ go run client.go
2023/02/10 21:32:31 got 3 ping
# 发送6个ping
$ go run client.go
2023/02/10 21:32:31 ping enough, max 5
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│   └── ping
│       └── ping.pb.go
└── server.go2 directories, 6 files

1.2 双向流

双向流是指客户端在发送数据和服务端响应数据的过程中都启用流特性,实际上单向流只是双向流的特例,有了上

面的基础,双向流就很好理解了。

定义一个 MultiPingPong 方法,在客户端和服务端都开启流,功能是服务端每接收到两个 ping 就响应一次

pong。

ping.proto编写:

// ping.proto
// 指定proto版本
syntax = "proto3"; 
// 指定包名
package protos;     
// 指定go包路径
option go_package = "protos/ping";// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应// 双向流模式rpc MultiPingPong(stream PingRequest) returns (stream PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}

ping.pb.go文件的生成:

$ protoc --go_out=plugins=grpc:. ping.proto

服务端实现,server.go的编写,同样通过 streamRecvSend 方法接收和发送数据。

package mainimport (pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}func (s *PingPongServer) MultiPingPong(stream pb.PingPong_MultiPingPongServer) error {msgs := []string{}for {// 接收消息msg, err := stream.Recv()if err != nil {if err == io.EOF {break}return err}msgs = append(msgs, msg.Value)// 每收到两个消息响应一次if len(msgs)%2 == 0 {err = stream.Send(&pb.PongResponse{Value: "pong"})if err != nil {return err}}}return nil
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatal(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 21:26:42 listen on 7009

客户端实现,client.go的编写,这里在另外一个 goroutine 里处理接收数据的逻辑来演示同时发送和接收数

据。

package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log""time"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatal(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)stream, err := client.MultiPingPong(context.Background())if err != nil {log.Fatal(err)}// 在另一个goroutine中处理接收数据c := make(chan struct{})go func(stream pb.PingPong_MultiPingPongClient, c chan struct{}) {defer func() {c <- struct{}{}}()for {msg, err := stream.Recv()if err != nil {if err == io.EOF {break}log.Fatal(err)}log.Printf("recv:%s\n", msg.Value)}}(stream, c)// 发送数据for i := 0; i < 6; i++ {data := &pb.PingRequest{Value: "ping"}err = stream.Send(data)if err != nil {log.Fatal(err)}log.Printf("send:%s\n", data.Value)// 延时一段时间发送,等待响应结果time.Sleep(500 * time.Millisecond)}// 结束发送stream.CloseSend()// 等待接收完成<-c
}
# 启动客户端
$ go run client.go
2023/02/10 21:48:26 send:ping
2023/02/10 21:48:26 send:ping
2023/02/10 21:48:26 recv:pong
2023/02/10 21:48:27 send:ping
2023/02/10 21:48:27 send:ping
2023/02/10 21:48:27 recv:pong
2023/02/10 21:48:28 send:ping
2023/02/10 21:48:28 send:ping
2023/02/10 21:48:28 recv:pong
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│   └── ping
│       └── ping.pb.go
└── server.go2 directories, 6 files

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/82111.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLM预训练之RLHF(一):RLHF及其变种

在ChatGPT引领的大型语言模型时代&#xff0c;国内外的大模型呈现爆发式发展&#xff0c;尤其是以年初的LLaMA模型为首的开源大模型和最近百川智能的baichuan模型&#xff0c;但无一例外&#xff0c;都使用了「基于人类反馈的强化学习」&#xff08;RLHF&#xff09;来提升语言…

7.代理模式

1.UML 2.代码 #include <iostream> using namespace std;class Subject{ public:virtual void Request() 0; };class RealSubject:public Subject { public:virtual void Request(){cout << "RealSubject" << endl;} }; class Proxy:public Subj…

【结构型】代理模式(Proxy)

目录 代理模式(Proxy)适用场景代理模式实例代码&#xff08;Java&#xff09; 代理模式(Proxy) 为其他对象提供一种代理以控制对这个对象的访问。Proxy 模式适用于在需要比较通用和复杂的对象指针代替简单的指针的时候。 适用场景 远程代理 (Remote Proxy) 为一个对象在不同…

【Git】03-GitHub

文章目录 1. GitHub核心功能2. GitHub搜索项目3. GitHub搭建个人博客4. 团队项目创建5. git工作流选择5.1 需要考虑的因素5.2 主干开发5.2 Git Flow5.3 GitHub Flow5.4 GitLab Flow(带生产分支)5.4 GitLab Flow(带环境分支)5.4 GitLab Flow(带发布分支) 6. 分支集成策略7. 启用…

功能基础篇3——Python中的输入输出、文件读写、序列化

IO 文件读写 open open()为内置函数&#xff0c;用于读写文件 mode 读写 x&#xff0c;create&#xff0c;创建&#xff0c;文件存在报错&#xff0c;可写不可读r&#xff0c;read&#xff0c;读入&#xff0c;默认读写方式&#xff0c;文件不存在报错&#xff0c;可读不可…

redis桌面连接工具Another Redis Desktop Manager使用介绍

Another Redis Desktop Manager是一种类似于navicat的数据库连接工具&#xff0c;专门用来连接redis&#xff0c;使用起来非常简单方便&#xff0c;在这里推荐给大家。 没有用过这个软件的&#xff0c;首先通过下面的网盘链接下载Another Redis Desktop Manager 百度网盘redi…

权限提升WIN篇(腾讯云,CS,MSF)

溢出漏洞 信息收集 操作系统版本ver&#xff0c;systeminfo漏洞补丁信息systeminfo操作系统位数systeminfo杀软防护tasklist /svc网络netstat -ano,ipconfig当前权限whoami 筛选EXP 根据前面的信息收集中的系统版本&#xff0c;位数和补丁情况筛选出合适的EXP 提权 根据EX…

Ubuntu22.04 Opencv4.5.1 CPU和GPU编译攻略,Opencv CPU和GPU编译保姆教程 亲自测试。

1、安装opencv依赖 安装时最好更换一下源。 sudo apt-get -y update sudo apt-get -y install cmake git libgtk2.0-dev pkg-config libavcodec-dev libavformat-dev libswscale-dev sudo apt-get -y install libgtk-3-dev gfortran openexr libatlas-base-dev python3-dev pyt…

flutter开发实战-长按TextField输入框cut、copy设置为中文复制、粘贴

flutter开发实战-长按TextField输入框cut、copy设置为中文复制、粘贴 在开发过程中&#xff0c;需要长按TextField输入框cut、copy设置为中文“复制、粘贴”&#xff0c;这里记录一下设置的代码。 一、pubspec.yaml设置flutter_localizations 在pubspec.yaml中设置flutter_l…

[python 刷题] 238 Product of Array Except Self

[python 刷题] 238 Product of Array Except Self 题目&#xff1a; Given an integer array nums, return an array answer such that answer[i] is equal to the product of all the elements of nums except nums[i]. The product of any prefix or suffix of nums is guar…

程序员必须掌握的算法

引言 作为一名程序员&#xff0c;掌握一些重要的算法是必不可少的。算法是解决问题的方法和步骤&#xff0c;对于程序员来说&#xff0c;熟悉和掌握一些常见的算法可以提高编程能力&#xff0c;解决复杂的计算问题。与此同时&#xff0c;算法是计算机科学中的核心概念&#xff…

pgsql/mysql/clickhouse性能对比

目前公司使用的关系型数据库版本较老&#xff0c;有计划对关系型数据库进行一次升级。有两个方案&#xff0c;一是将现在的mysql5.7升级到8.0&#xff0c;二是将数据库替换成pgsql15&#xff0c;所以对这两个库的查询写入和资源损耗进行一次对比。 前置准备 服务器 两台配置…

Cilium 1.11:服务网格的未来已来

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Linux Ubuntu命令行快速配置C++开发环境

本文介绍在Linux操作系统的Ubuntu版本中&#xff0c;基于命令行&#xff0c;快速配置C 编辑、编译、运行的代码开发环境的简便方法。 在之前的文章Linux操作系统Ubuntu 22.04配置Visual Studio Code与C代码开发环境的方法(https://blog.csdn.net/zhebushibiaoshifu/article/det…

星际争霸之小霸王之小蜜蜂(十三)--接着奏乐接着舞

系列文章目录 星际争霸之小霸王之小蜜蜂&#xff08;十二&#xff09;--猫有九条命 星际争霸之小霸王之小蜜蜂&#xff08;十一&#xff09;--杀杀杀 星际争霸之小霸王之小蜜蜂&#xff08;十&#xff09;--鼠道 星际争霸之小霸王之小蜜蜂&#xff08;九&#xff09;--狂鼠之…

什么是集成测试?集成测试方法有哪些?

1、基本概念&#xff1a; 将软件集成起来后进行测试。集成测试又叫子系统测试、组装测试、部件测试等。集成测试主要是针对软件高层设计进行测试&#xff0c;一般来说是以模块和子系统为单位进行测试。 2、集成测试包含的层次&#xff1a; 1. 模块内的集成&#xff0c;主要是…

Linux sed

1.sed介绍 sed:Stream Editor&#xff0c;流编辑器、行编辑器、逐行编辑 sed将每行内容读入到“内存”中&#xff0c;在内存中进行处理&#xff0c;将结果返回给屏幕&#xff0c;此段内存空间称为模式空间。 sed默认不编辑原文件&#xff0c;仅对模式空间的数据进行处理&…

OA 电子审批流程是什么?

公司中&#xff0c;最最最常见也是最最最多的就是——各种审批。 我当年第一次实习&#xff0c;在一家国企的行政部门&#xff0c;我们部门领导那个时候最主要的工作就是“打通流程”&#xff0c;咱也不知道他在打通什么流程&#xff0c;反正这个很重要就是了。 结果&#xf…

CSS动效合集之实现气泡发散动画

前言 &#x1f44f;CSS动效合集之实现气泡发散动画&#xff0c;速速来Get吧~ &#x1f947;文末分享源代码。记得点赞关注收藏&#xff01; 1.实现效果 2.实现步骤 定义一个数组bubbles&#xff0c;用来存储气泡列表的基本新&#xff0c;w表示宽高&#xff0c;x表示绝对定位…

【Python基础】P01S01 变量与字符串

P01S01 变量与字符串 变量变量的命名规则 字符串字符串的表达方法字符串的基本操作 数整数与浮点数数中下划线常量 注释 变量 变量的命名规则 变量名只能包含字母、数字和下划线&#xff1b; 变量名能以字母或下划线开头&#xff0c;但是不能以数字开头&#xff1b; 变量名不能…