go语言 grpc 拦截器

文章目录

    • 拦截器
      • 服务端拦截器
        • 一元拦截器
        • 流拦截器
      • 客户端拦截器
        • 一元拦截器
        • 流拦截`
      • 多个拦截器
  • 代码仓库

拦截器

gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。

在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。

在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:

  • 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
  • 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
  • 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
  • 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。

服务端拦截器

在这里插入图片描述

一元拦截器
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"
流拦截器

流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具体返回多少个response根据业务逻辑调整for i := 0; i < 10; i++ {// 通过 send 方法不断推送数据err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}type wrappedStream struct {// 包装器流grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ServerStream.SendMsg(m)
}func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {return &wrappedStream{s}
}func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 前置处理log.Println("====== [Server Stream Interceptor] ", info.FullMethod)// 包装器流调用 流RPCerr := handler(srv, newWrappedStream(ss))if err != nil {log.Printf("RPC failed with error %v", err)}return err
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

结果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

客户端拦截器

在这里插入图片描述

一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置处理逻辑log.Println("Method : " + method)// 调用invoker 执行远程方法err := invoker(ctx, method, req, reply, cc, opts...)// 后置处理逻辑log.Println(reply)return err
}func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {grpc.ClientStream
}func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.RecvMsg(m)
}func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.SendMsg(m)
}func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {return &wrappedStream{s}
}func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {// 前置处理逻辑log.Println("======= [Client Interceptor] ", method)// 调用streamer 来获取客户端流s, err := streamer(ctx, desc, cc, method, opts...)if err != nil {return nil, err}return newWrappedStream(s), nil
}func main(){// 注册拦截器到客户端流conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// 调用客户端流RPC方法searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}
}

结果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束

多个拦截器

在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题

func chainUnaryClientInterceptors(cc *ClientConn) {interceptors := cc.dopts.chainUnaryIntsif cc.dopts.unaryInt != nil {interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)}var chainedInt UnaryClientInterceptorif len(interceptors) == 0 {chainedInt = nil} else if len(interceptors) == 1 {chainedInt = interceptors[0]} else {chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)}}cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {n := len(interceptors)if n > 1 {lastI := n - 1return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {var (chainHandler grpc.UnaryInvokercurI         int)chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {if curI == lastI {return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)}curI++err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)curI--return err}return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)}}...
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/208706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iOS app切换后台时添加模糊遮罩层

仿 支付宝 退出后台后,App整个 增加模糊遮罩层 此处只介绍 在iOS13后 SceneDelegate 下的操作 原理就是 在 App 进入后台后 在 主window上添加一个 UIVisualEffectView 在进入前台后移除 直接上代码: 先声明: //先声明 /* blurView */ property (strong, nonatomic) UI…

逆波兰表达式求解计算器

利用逆波兰表达式求解计算器有以下几个步骤: 1. 去掉字符串中的空格 s s.replaceAll(" ", "")2. 讲字符串转换为中序表达式数组 def string_to_infixlist(s):ans []keep_num ""for i in range(len(s)):if s[i].isdigit():if i < len(s)…

docker学习(四、修改容器创建新的镜像推送到云上)

镜像是只读的&#xff0c;容器是可编辑的。Docker镜像是分层的&#xff0c;支持通过扩展镜像&#xff0c;创建新的镜像。 学到这里感觉docker跟git很想~~ 通过docker commit将修改的容器做成新的镜像 # 将容器做成新的镜像 docker commit -m"提交备注" -a"作…

【1day】泛微e-office OA系统sms_page.php接口SQL 注入漏洞学习

注:该文章来自作者日常学习笔记,请勿利用文章内的相关技术从事非法测试,如因此产生的一切不良后果与作者无关。 目录 一、漏洞描述 二、影响版本 三、资产测绘 四、漏洞复现

大创项目推荐 交通目标检测-行人车辆检测流量计数 - 大创项目推荐

文章目录 0 前言1\. 目标检测概况1.1 什么是目标检测&#xff1f;1.2 发展阶段 2\. 行人检测2.1 行人检测简介2.2 行人检测技术难点2.3 行人检测实现效果2.4 关键代码-训练过程 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 毕业设计…

什么是Nginx反向代理?Nginx反向代理配置指南

Nginx反向代理是一种常见的服务器架构模式&#xff0c;它可以将客户端请求转发到多个后端服务器上&#xff0c;从而实现负载均衡、高可用性和安全性。本文将介绍Nginx反向代理的基本概念和配置方法。 什么是Nginx反向代理&#xff1f; 在传统的Web服务器架构中&#xff0c;客户…

解决selenium使用.get()报错:unknown error: unsupported protocol

解决方法 将原来的&#xff1a; url "https://www.baidu.com" browser.get(url)替换为&#xff1a; url "https://www.baidu.com" browser.execute_script(f"window.location.replace({url});") # 直接平替 .get()问题解析 之前运行都是正…

【后端学前端学习记录】学习计划

1、个人背景 写了足够久的后端了&#xff0c;常用的语言基本上都接触过&#xff0c;没有在工作中写过前端 一直想做一些前端的工作&#xff0c;但是前端技能不足加上自己审美不行&#xff0c;写出的界面总是很丑 所以一直对前端做不好&#xff0c;也没有真正下手。 2、动机 种…

Navicat 技术指引 | 连接 GaussDB 分布式

Navicat Premium&#xff08;16.3.3 Windows 版或以上&#xff09;正式支持 GaussDB 分布式数据库。GaussDB 分布式模式更适合对系统可用性和数据处理能力要求较高的场景。Navicat 工具不仅提供可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结…

SLAM ORB-SLAM2(11)单目初始化

SLAM ORB-SLAM2(11)单目初始化 1. 初始化工作1.1. 单应矩阵(Homography Matrix)1.2. 基础矩阵(Fundamental Matrix)1.3. 本质矩阵(Essential Matrix)1.4. 初始化过程2. 业务流程2.1. 创建单目初始化器2.2. 判断连续帧的特征点数目2.3. 在两帧中找匹配的特征点对2.4. 估…

软件兼容性测试:保障多样化用户体验的重要功能

随着移动设备和操作系统的快速发展&#xff0c;软件兼容性测试变得越发重要。这项测试确保软件在不同平台、设备和环境下都能够正常运行&#xff0c;提供一致而稳定的用户体验。下面是软件兼容性测试中的一些关键功能&#xff1a; 1. 跨平台兼容性测试 在不同操作系统上运行的软…

【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等

Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点&#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分&#xff0c;比如术语、架构、编程模型、编程指南、基本的…

macOS Big Sur/Mac电脑安装vscode显示您没有权限来打开应用程序‘Visual Studio Code‘ 请联系您的电脑或网络管理员问题修复

错误方法 首先我以为我的权限不足。&#xff0c;需要去用户群组里设置。结果根本不是这个的问题。 1.在系统偏好设置->用户与群组检查了一下我的用户是不是管理员 结果发现是管理员 2.根据苹果提示&#xff0c;右键我的文件夹->显示简介->最下面的共享与权限 解锁&…

SAP UI5 walkthrough step5 Controllers

在这个章节&#xff0c;我们要做的是&#xff0c;将之前的text文本展示为一个按钮&#xff0c;并将声明绑定在点击按钮事件。 因为改的是外观&#xff0c;所以我们修改的是view.XML webapp/view/App.view.xml <mvc:ViewcontrollerName"ui5.walkthrough.controller.A…

element中el-select多选v-model是对象数组

文章目录 一、问题二、解决三、最后 一、问题 element中的el-select的v-model一般都是字符串或者字符串数组&#xff0c;但是有些时候后端接口要求该字段要传对象或者对象数组&#xff0c;如果再转换一次数据&#xff0c;对于保存配置和回显都是吃力不讨好的事情。如下所示&am…

SpringBoot 项目将jar 部署在服务器引用外部 配置文件

SpringBoot 官方给出了四种方式引用外部配置文件的方式 在jar包的同一目录下建一个config文件夹&#xff0c;然后把配置文件放到这个文件夹下(最常用)直接把配置文件放到jar包的同级目录在classpath下建一个config文件夹&#xff0c;然后把配置文件放进去在classpath下直接放配…

图片整理

Lily上课时使用字母数字图片教小朋友们学习英语单词&#xff0c;每次都需要把这些图片按照大小&#xff08;ASCII码值从小到大&#xff09;排列收好。请大家给Lily帮忙&#xff0c;通过代码解决。 Lily使用的图片使用字符"A"到"Z"、“a"到"z”、…

centos7做gitlab数据灾备项目地址指向问题

如果你在 CentOS 7 上使用 GitLab 时&#xff0c;它回复的数据指向了另一个服务器的地址&#xff0c;可能是因为配置文件中的一些设置不正确。 要解决这个问题&#xff0c;可以尝试以下几个步骤&#xff1a; 检查 GitLab 配置文件&#xff1a;打开 GitLab 的配置文件&#xf…

python写数据进es中

1、自定义inde为&#xff1a;xxxx&#xff0c;data_to_insert也可以自定义函数 from elasticsearch import Elasticsearch from datetime import datetime, timedelta es Elasticsearch([http://es地址1:9200, es地址2:9200, es地址3:9200]) current_date datetime.now() for…

NTP时钟同步服务器(校时服务器)技术参数分享

NTP时钟同步服务器&#xff08;校时服务器&#xff09;技术参数分享 网络校时服务器是一款先进的智能化高精度时钟同步设备。 网络校时服务器从 GPS、北斗、GLONASS、Galileo等导航定位卫星系统上获取标准时间信息&#xff0c;并通过 NTP/SNTP 或其他网络协议&#xff0c;在网络…