gRPC三种流和消息格式

文章目录

    • 消息格式
      • RPC流
      • 长度前缀的消息分帧
      • 请求消息
      • 响应信息
    • 三种流
      • 一元RPC
      • 服务流RPC
      • 客户流RPC
      • 双工流RPC
  • 代码仓库

消息格式

RPC流

服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。

  • 调用存根方法
  • 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置content-typeapplication/grpc
  • 到达服务端,会先检查请求头是不是gRPC请求,否则返回415

长度前缀的消息分帧

在写入消息前,先写入长度消息表明每条消息的大小。

每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB

帧首中还有单字节无符号整数,用来表明数据是否进行了压缩
为1表示使用 message-encoding中的编码机制进行了压缩

在这里插入图片描述

请求消息

客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记

1、对于gRPC 都是POST

2、协议:Http/Https

3、/服务名/方法名

4、目标URI的主机名

5、对不兼容代理的检测,gRPC下这个值必须为 trailers

6、超时时间

7、媒体类型

8、压缩类型

在这里插入图片描述

在这里插入图片描述

当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧

在这里插入图片描述

响应信息

服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers

END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer

在这里插入图片描述

在这里插入图片描述

三种流

一元RPC

通信时始终只有一个请求和一个响应

在这里插入图片描述

protocol buffer

syntax = "proto3";
package hello;
// 第一个分割参数,输出路径;第二个设置生成类的包路径option go_package = "./proto/hello";// 设置服务名称
service Greeter {// 设置方法rpc SayHello (HelloRequest) returns (HelloReply) {}
}// 请求信息用户名.
message HelloRequest {string name = 1;
}// 响应信息
message HelloReply {string message = 1;
}

服务端

package mainimport ("context""flag""fmt""log""net""google.golang.org/grpc"pb "mygrpc/proto/hello"
)var (port = flag.Int("port", 50051, "The server port")
)type server struct {pb.UnimplementedGreeterServer
}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName())return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer()// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("server listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

客户端

package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

服务流RPC

通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应。
在这里插入图片描述

在protobuf中的 service添加以下代码

rpc searchOrders(google.protobuf.StringValue) returns (stream Order);

服务端代码

package mainimport ("context""flag""fmt""google.golang.org/grpc""io""log"pb "mygrpc/proto/hello""net"
)var (port = flag.Int("port", 50051, "The service port")
)type server struct {pb.UnimplementedGreeterServer
}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName())return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具体返回多少个response根据业务逻辑调整for i := 0; i < 10; i++ {// 通过 send 方法不断推送数据err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}
func (s *server) UpdateOrders(stream pb.Greeter_UpdateOrdersServer) error {for {log.Println("开始接受客户端的流")// Recv 对客户端发来的请求接收order, err := stream.Recv()if err == io.EOF {// 流结束,关闭并发送响应给客户端return stream.Send(&pb.HelloReply{Message: "接受客户流结束"})}if err != nil {return err}// 更新数据log.Printf("Order ID : %s - %s", order.GetName(), "Updated")}
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer()// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

客户端代码

package mainimport ("context""flag""io""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer func(conn *grpc.ClientConn) {err := conn.Close()if err != nil {}}(conn)// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})for {// 客户端 Recv 方法接收服务端发送的流searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

在这里插入图片描述

在protobuf中的 service添加以下代码

rpc updateOrders(stream HelloRequest) returns (stream HelloReply);

服务端代码

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {ordersStr := "Updated Order IDs : "for {// Recv 对客户端发来的请求接收order, err := stream.Recv()if err == io.EOF {// 流结束,关闭并发送响应给客户端return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})}if err != nil {return err}// 更新数据orderMap[order.Id] = *orderlog.Printf("Order ID : %s - %s", order.Id, "Updated")ordersStr += order.Id + ", "}
}

客户端代码

package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer func(conn *grpc.ClientConn) {err := conn.Close()if err != nil {}}(conn)// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法updateStream, err := c.UpdateOrders(ctx)if err != nil {log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)}// Updating order 1if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)}// Updating order 2if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)}// 发送关闭信号并接收服务端响应err = updateStream.CloseSend()if err != nil {log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)}log.Printf("客户端流传输结束")
}

双工流RPC

对应的业务就比如实时的消息流

在这里插入图片描述

protobuf

  // 设置双工rpcrpc processOrders(stream HelloRequest) returns (stream HelloReply);

服务端

package mainimport ("flag""fmt""google.golang.org/grpc""io""log"pb "mygrpc/proto/hello""net""sync"
)var (port = flag.Int("port", 50051, "The service port")
)type server struct {pb.UnimplementedGreeterServer
}func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {var (waitGroup sync.WaitGroup // 一组 goroutine 的结束// 设置通道msgCh = make(chan *pb.HelloReply))// 计数器加1waitGroup.Add(1)// 消费队列中的内容go func() {// 计数器减一defer waitGroup.Done()for {v := <-msgChfmt.Println(v)err := stream.Send(v)if err != nil {fmt.Println("Send error:", err)break}}}()waitGroup.Add(1)// 向队列中添加内容go func() {defer waitGroup.Done()for {req, err := stream.Recv()if err == io.EOF {break}if err != nil {log.Fatalf("recv error:%v", err)}fmt.Printf("Recved :%v \n", req.GetName())msgCh <- &pb.HelloReply{Message: "服务端传输数据"}}close(msgCh)}()// 等待 计数器问0 推出waitGroup.Wait()// 返回nil表示已经完成响应return nil
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer()// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

客户端

package mainimport ("context""flag""fmt""io""log""sync""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer func(conn *grpc.ClientConn) {err := conn.Close()if err != nil {}}(conn)// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 设置var wg sync.WaitGroup// 调用指定方法stream, _ := c.ProcessOrders(ctx)if err != nil {panic(err)}// 3.开两个goroutine 分别用于Recv()和Send()wg.Add(1)go func() {defer wg.Done()for {req, err := stream.Recv()if err == io.EOF {fmt.Println("Server Closed")break}if err != nil {continue}fmt.Printf("Recv Data:%v \n", req.GetMessage())}}()wg.Add(1)go func() {defer wg.Done()for i := 0; i < 2; i++ {err := stream.Send(&pb.HelloRequest{Name: "hello world"})if err != nil {log.Printf("send error:%v\n", err)}}// 4. 发送完毕关闭streamerr := stream.CloseSend()if err != nil {log.Printf("Send error:%v\n", err)return}}()wg.Wait()log.Printf("服务端流传输结束")
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/206352.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【dig命令查询方法】

dig&#xff08;Domain Information Groper&#xff09;是一个用于查询DNS&#xff08;域名系统&#xff09;的命令行工具&#xff0c;它可以帮助您获取关于域名的各种信息&#xff0c;如IP地址、MX记录、NS记录等。下面是dig的详细使用教程。 基本语法&#xff1a; dig [ser…

文件被删除了怎么恢复?3个宝藏方法,快来get!

“我是一个学生党&#xff0c;期末的一些资料保存在电脑上&#xff0c;但是不知道是不是被我误删了&#xff0c;导致很多文件都找不到了。文件被删除了怎么恢复呢&#xff1f;大家帮我出出主意吧&#xff01;” 对于经常在电脑上保存各种文件的用户来说&#xff0c;文件误删除是…

IPC之五:使用 System V 信号量集解决经典的‘哲学家就餐问题‘

IPC 是 Linux 编程中一个重要的概念&#xff0c;IPC 有多种方式&#xff0c;本文主要介绍信号量集(Semaphore Sets)&#xff0c;尽管信号量集被认为是 IPC 的一种方式&#xff0c;但实际上通常把信号量集用于进程间同步或者资源访问互斥&#xff0c;信号量集和共享内存(Shared …

系列学习前端之第 2 章:一文精通 HTML

全套学习 HTMLCSSJavaScript 代码和笔记请下载网盘的资料&#xff1a; 链接: https://pan.baidu.com/s/1-vY2anBdrsBSwDZfALZ6FQ 提取码: 6666 HTML 全称&#xff1a;HyperText Markup Language&#xff08;超文本标记语言&#xff09; 1、 HTML 标签 1. 标签又称元素&#…

PR剪辑视频素材|真实胶片特效转场PR项目模板文件

Premiere转场模板&#xff0c;8个真实胶片特效转场PR项目模板视频素材。可以编辑和自定义媒体占位符。在视频的开头、中间和结尾使用。包含视频教程。 适用软件&#xff1a;PR2023及更新版本 | 分辨率&#xff1a;38402160 (4K) | 文件大小&#xff1a;16.43MB | 无需插件 来自…

AI并行计算:CUDA和ROCm

1 介绍 1.1 CUDA CUDA&#xff08;Compute Unified Device Architecture&#xff09;是Nvidia于2006年推出的一套通用并行计算架构&#xff0c;旨在解决在GPU上的并行计算问题。其易用性和便捷性能够方便开发者方便的进行GPU编程&#xff0c;充分利用GPU的并行能力&#xff0…

如何使用ArcGIS Pro制作类似CAD的尺寸注记

经常使用CAD制图的朋友应该比较熟悉CAD内的尺寸标注&#xff0c;这样的标注看起来直观且简洁&#xff0c;那么在ArcGIS Pro内能不能制作这样尺寸注记呢&#xff0c;答案是肯定的&#xff0c;这里为大家介绍一下制作的方法&#xff0c;希望能对你有所帮助。 数据来源 本教程所…

OpenGL ES 帧缓冲对象介绍和使用示例

一、介绍 1. 帧缓冲对象 默认情况下&#xff0c;OpenGL渲染的目标是屏幕&#xff0c;但如果你不想直接渲染到屏幕上&#xff0c;还需要对渲染结果做某些后期处理、渲染到纹理、阴影映射等操作&#xff0c;便可以使用帧缓冲对象&#xff0c;实现离屏渲染。 帧缓冲对象&#x…

JSON字符串转泛型对象

JSON字符串转泛型对象 以下问题只仅限于博主自身遇到&#xff0c;不代表绝对出现问题 相关类展示&#xff1a; 参数基类 public class BaseParams { }基类 public abstract class AbstractPush<Params extends BaseParams> {protected abstract void execute(Params…

怎么选护眼灯?考公必备护眼灯推荐

随着现在的近视问题受到越来越广泛的关注&#xff0c;各种护眼产品的需求量也一直在快速增加&#xff0c;其中最受广大家长、学生党欢迎的还是护眼台灯。因为专业的护眼台灯不仅可以补充我们夜晚工作、学习时不足的光亮&#xff0c;避免眼睛长时间处于昏暗的环境导致疲劳。 而…

018 OpenCV 人脸检测

目录 一、环境 二、分类器原理 2.1、概述 2.2、工作原理 三、人脸检测代码 一、环境 本文使用环境为&#xff1a; Windows10Python 3.9.17opencv-python 4.8.0.74 二、分类器原理 CascadeClassifier是OpenCV&#xff08;开源计算机视觉库&#xff09;中的一个强大的类…

day01十五模拟第二期

2 2——608#include <stdio.h> #include <math.h>int main(int argc, const char * argv[]) {int res1;for(int i1;i<2023;i){res(res*2)%1000;}printf("%d",res );return 0; }3 4-----4169 #include <stdio.h> #include<string.h> #inc…

C语言——指针(五)

&#x1f4dd;前言&#xff1a; 上篇文章C语言——指针&#xff08;四&#xff09;更加深入的介绍了不同类型指针的特点&#xff0c;这篇文章主要想记录一下函数与指针的结合运用以及const和assert关于指针的用法&#xff1a; 1&#xff0c;函数与指针 2&#xff0c;const 3&am…

3.PyTorch——常用神经网络层

import numpy as np import pandas as pd import torch as t from PIL import Image from torchvision.transforms import ToTensor, ToPILImaget.__version__2.1.13.1 图像相关层 图像相关层主要包括卷积层&#xff08;Conv&#xff09;、池化层&#xff08;Pool&#xff09;…

node.js和浏览器之间的区别

node.js是什么 Node.js是一种基于Chrome V8引擎的JavaScript运行环境&#xff0c;可以在服务器端运行JavaScript代码 Node.js 在浏览器之外运行 V8 JavaScript 引擎。 这使得 Node.js 非常高效。 浏览器如何运行js代码 nodejs运行环境 在浏览器中&#xff0c;大部分时间你所…

Linux系统调试课:USB 常用调试方法

文章目录 一、USB调试工具有哪些二、USB相关节点2.1、USB枚举成功标志2.2、USB speed查询2.3、USB 查询PID、VID沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇章主要 介绍 USB 常用调试方法。 一、USB调试工具有哪些

Sanic:一个极速Python Web框架

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;我是彭涛&#xff0c;今天为大家分享 Sanic&#xff1a;一个极速Python Web框架&#xff0c;全文3500字&#xff0c;阅读大约12分钟。 随着 Web 应用的日益复杂&#xff0c;选择一个高性能的 Web 框架变得尤为…

扫描器的使用

漏扫器 注意事项 扫描器会给客户的业务造成影响。比如&#xff0c;如果存在sql注入漏洞&#xff08;重大的漏洞&#xff09;的话&#xff0c;会给客户的数据库插入脏数据&#xff0c;后果很严重 主机漏扫 针对IP地址和网段的漏洞扫描&#xff0c;例如&#xff1a;22端口弱口…

LCM-LoRA:a universal stable-diffusion acceleration module

Consistency is All You Need - wrong.wang什么都不用做生成却快了十倍其实也并非完全不可能https://wrong.wang/blog/20231111-consistency-is-all-you-need/ 1.Stable diffusion实在预训练VAE空间训练diffusion model的结果。 2.consistency decoder是用consistency model技…

ISIS默认路由下发的各种机制

作者简介&#xff1a;大家好&#xff0c;我是Asshebaby&#xff0c;热爱网工&#xff0c;有网络方面不懂的可以加我一起探讨 :1125069544 个人主页&#xff1a;Asshebaby博客 当前专栏&#xff1a; 网络HCIP内容 特色专栏&#xff1a; 常见的项目配置 本文内容&am…