文盘Rust -- tonic-Rust grpc初体验 | 京东云技术团队

gRPC 是开发中常用的开源高性能远程过程调用(RPC)框架,tonic 是基于 HTTP/2 的 gRPC 实现,专注于高性能、互操作性和灵活性。该库的创建是为了对 async/await 提供一流的支持,并充当用 Rust 编写的生产系统的核心构建块。今天我们聊聊通过使用tonic 调用grpc的的具体过程。

工程规划

rpc程序一般包含server端和client端,为了方便我们把两个程序打包到一个工程里面 新建tonic_sample工程

cargo new tonic_sample

Cargo.toml 如下

[package]
name = "tonic_sample"
version = "0.1.0"
edition = "2021"[[bin]] # Bin to run the gRPC server
name = "stream-server"
path = "src/stream_server.rs"[[bin]] # Bin to run the gRPC client
name = "stream-client"
path = "src/stream_client.rs"[dependencies]
tokio.workspace = true
tonic = "0.9"
tonic-reflection = "0.9.2"
prost = "0.11"
tokio-stream = "0.1"
async-stream = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.7"
h2 = { version = "0.3" }
anyhow = "1.0.75"
futures-util = "0.3.28"[build-dependencies]
tonic-build = "0.9"

tonic 的示例代码还是比较齐全的,本次我们参考 tonic 的 streaming example。

首先编写 proto 文件,用来描述报文。 proto/echo.proto

syntax = "proto3";package stream;// EchoRequest is the request for echo.
message EchoRequest { string message = 1; }// EchoResponse is the response for echo.
message EchoResponse { string message = 1; }// Echo is the echo service.
service Echo {// UnaryEcho is unary echo.rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}// ServerStreamingEcho is server side streaming.rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}// ClientStreamingEcho is client side streaming.rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}// BidirectionalStreamingEcho is bidi streaming.rpc BidirectionalStreamingEcho(stream EchoRequest)returns (stream EchoResponse) {}
}

文件并不复杂,只有两个 message 一个请求一个返回,之所以选择这个示例是因为该示例包含了rpc中的流式处理,包扩了server 流、client 流以及双向流的操作。 编辑build.rs 文件

use std::{env, path::PathBuf};fn main() -> Result<(), Box<dyn std::error::Error>> {tonic_build::compile_protos("proto/echo.proto")?;Ok(())
}

该文件用来通过 tonic-build 生成 grpc 的 rust 基础代码

完成上述工作后就可以构建 server 和 client 代码了

stream_server.rs

pub mod pb {tonic::include_proto!("stream");
}use anyhow::Result;
use futures_util::FutureExt;
use pb::{EchoRequest, EchoResponse};
use std::{error::Error,io::ErrorKind,net::{SocketAddr, ToSocketAddrs},pin::Pin,thread,time::Duration,
};
use tokio::{net::TcpListener,sync::{mpsc,oneshot::{self, Receiver, Sender},Mutex,},task::{self, JoinHandle},
};
use tokio_stream::{wrappers::{ReceiverStream, TcpListenerStream},Stream, StreamExt,
};
use tonic::{transport::Server, Request, Response, Status, Streaming};
type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {let mut err: &(dyn Error + 'static) = err_status;loop {if let Some(io_err) = err.downcast_ref::<std::io::Error>() {return Some(io_err);}// h2::Error do not expose std::io::Error with `source()`// https://github.com/hyperium/h2/pull/462if let Some(h2_err) = err.downcast_ref::<h2::Error>() {if let Some(io_err) = h2_err.get_io() {return Some(io_err);}}err = match err.source() {Some(err) => err,None => return None,};}
}#[derive(Debug)]
pub struct EchoServer {}#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> {let req_str = req.into_inner().message;let response = EchoResponse { message: req_str };Ok(Response::new(response))}type ServerStreamingEchoStream = ResponseStream;async fn server_streaming_echo(&self,req: Request<EchoRequest>,) -> EchoResult<Self::ServerStreamingEchoStream> {println!("EchoServer::server_streaming_echo");println!("\tclient connected from: {:?}", req.remote_addr());// creating infinite stream with requested messagelet repeat = std::iter::repeat(EchoResponse {message: req.into_inner().message,});let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));let (tx, rx) = mpsc::channel(128);tokio::spawn(async move {while let Some(item) = stream.next().await {match tx.send(Result::<_, Status>::Ok(item)).await {Ok(_) => {// item (server response) was queued to be send to client}Err(_item) => {// output_stream was build from rx and both are droppedbreak;}}}println!("\tclient disconnected");});let output_stream = ReceiverStream::new(rx);Ok(Response::new(Box::pin(output_stream) as Self::ServerStreamingEchoStream))}async fn client_streaming_echo(&self,_: Request<Streaming<EchoRequest>>,) -> EchoResult<EchoResponse> {Err(Status::unimplemented("not implemented"))}type BidirectionalStreamingEchoStream = ResponseStream;async fn bidirectional_streaming_echo(&self,req: Request<Streaming<EchoRequest>>,) -> EchoResult<Self::BidirectionalStreamingEchoStream> {println!("EchoServer::bidirectional_streaming_echo");let mut in_stream = req.into_inner();let (tx, rx) = mpsc::channel(128);tokio::spawn(async move {while let Some(result) = in_stream.next().await {match result {Ok(v) => tx.send(Ok(EchoResponse { message: v.message })).await.expect("working rx"),Err(err) => {if let Some(io_err) = match_for_io_error(&err) {if io_err.kind() == ErrorKind::BrokenPipe {eprintln!("\tclient disconnected: broken pipe");break;}}match tx.send(Err(err)).await {Ok(_) => (),Err(_err) => break, // response was droped}}}}println!("\tstream ended");});// echo just write the same data that was receivedlet out_stream = ReceiverStream::new(rx);Ok(Response::new(Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream))}
}#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {// 基础serverlet server = EchoServer {};Server::builder().add_service(pb::echo_server::EchoServer::new(server)).serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()).await.unwrap();Ok(())
}

server 端的代码还是比较清晰的,首先通过 tonic::include_proto! 宏引入grpc定义,参数是 proto 文件中定义的 package 。我们重点说说 server_streaming_echo function 。这个function 的处理流程明白了,其他的流式处理大同小异。首先 通过std::iter::repeat function 定义一个迭代器;然后构建 tokio_stream 在本示例中 每 200毫秒产生一个 repeat;最后构建一个 channel ,tx 用来发送从stream中获取的内容太,rx 封装到response 中返回。 最后 main 函数 拉起服务。

client 代码如下

pub mod pb {tonic::include_proto!("stream");
}use std::time::Duration;
use tokio_stream::{Stream, StreamExt};
use tonic::transport::Channel;use pb::{echo_client::EchoClient, EchoRequest};fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {message: format!("msg {:02}", i),})
}async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) {for i in 0..num {let req = tonic::Request::new(EchoRequest {message: "msg".to_string() + &i.to_string(),});let resp = client.unary_echo(req).await.unwrap();println!("resp:{}", resp.into_inner().message);}
}async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {let stream = client.server_streaming_echo(EchoRequest {message: "foo".into(),}).await.unwrap().into_inner();// stream is infinite - take just 5 elements and then disconnectlet mut stream = stream.take(num);while let Some(item) = stream.next().await {println!("\treceived: {}", item.unwrap().message);}// stream is droped here and the disconnect info is send to server
}async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {let in_stream = echo_requests_iter().take(num);let response = client.bidirectional_streaming_echo(in_stream).await.unwrap();let mut resp_stream = response.into_inner();while let Some(received) = resp_stream.next().await {let received = received.unwrap();println!("\treceived message: `{}`", received.message);}
}async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {let in_stream = echo_requests_iter().throttle(dur);let response = client.bidirectional_streaming_echo(in_stream).await.unwrap();let mut resp_stream = response.into_inner();while let Some(received) = resp_stream.next().await {let received = received.unwrap();println!("\treceived message: `{}`", received.message);}
}#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();println!("Unary echo:");unary_echo(&mut client, 10).await;tokio::time::sleep(Duration::from_secs(1)).await;println!("Streaming echo:");streaming_echo(&mut client, 5).await;tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions// Echo stream that sends 17 requests then graceful end that connectionprintln!("\r\nBidirectional stream echo:");bidirectional_streaming_echo(&mut client, 17).await;// Echo stream that sends up to `usize::MAX` requests. One request each 2s.// Exiting client with CTRL+C demonstrate how to distinguish broken pipe from// graceful client disconnection (above example) on the server side.println!("\r\nBidirectional stream echo (kill client with CTLR+C):");bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;Ok(())
}

测试一下,分别运行 server 和 client

cargo run --bin stream-server
cargo run --bin stream-client

在开发中,我们通常不会再 client 和 server都开发好的情况下才开始测试。通常在开发server 端的时候采用 grpcurl 工具进行测试工作

grpcurl -import-path ./proto -proto echo.proto list
grpcurl -import-path ./proto -proto  echo.proto describe stream.Echo
grpcurl -plaintext -import-path ./proto -proto  echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho

此时,如果我们不指定 -import-path 参数,执行如下命令

grpcurl -plaintext 127.0.0.1:50051 list

会出现如下报错信息

Failed to list services: server does not support the reflection API

让服务端程序支持 reflection API

首先改造build.rs

use std::{env, path::PathBuf};fn main() -> Result<(), Box<dyn std::error::Error>> {let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());tonic_build::configure().file_descriptor_set_path(out_dir.join("stream_descriptor.bin")).compile(&["proto/echo.proto"], &["proto"]).unwrap();Ok(())
}

file_descriptor_set_path 生成一个文件,其中包含为协议缓冲模块编码的 prost_types::FileDescriptorSet 文件。这是实现 gRPC 服务器反射所必需的。

接下来改造一下 stream-server.rs,涉及两处更改。

新增 STREAM_DESCRIPTOR_SET 常量

pub mod pb {tonic::include_proto!("stream");pub const STREAM_DESCRIPTOR_SET: &[u8] =tonic::include_file_descriptor_set!("stream_descriptor");
}

修改main函数

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {// 基础server// let server = EchoServer {};// Server::builder()//     .add_service(pb::echo_server::EchoServer::new(server))//     .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())//     .await//     .unwrap();// tonic_reflection let service = tonic_reflection::server::Builder::configure().register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET).with_service_name("stream.Echo").build().unwrap();let addr = "0.0.0.0:50051".parse().unwrap();let server = EchoServer {};Server::builder().add_service(service).add_service(pb::echo_server::EchoServer::new(server)).serve(addr).await?;Ok(())
}

register_encoded_file_descriptor_set 将包含编码的 prost_types::FileDescriptorSet 的 byte slice 注册到 gRPC Reflection 服务生成器注册。

再次测试

grpcurl -plaintext 127.0.0.1:50051 list
grpcurl -plaintext 127.0.0.1:50051 describe stream.Echo

返回正确结果。

以上完整代码地址

作者:京东科技 贾世闻

来源:京东云开发者社区 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/84358.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git学习笔记9

Gitlab中的代码是要部署到生产服务器上。 CI&#xff1a; Continuous integration 简称CI&#xff1a; 是一种软件开发实践&#xff0c;即开发团队成员经常集成他们的工作&#xff0c;通常每个成员每天至少集成一次&#xff0c;也就意味着每天可能会发生多次集成。每次集成都…

动态规划之回文串问题

回文串 1. 回文子串2. 最长回文子串3. 分割回文串 IV4. 分割回文串 II5. 最长回文子序列6. 让字符串成为回⽂串的最⼩插⼊次数 1. 回文子串 1.题目链接&#xff1a;回文子串 2.题目描述&#xff1a; 给你一个字符串 s &#xff0c;请你统计并返回这个字符串中 回文子串 的数目…

多目标优化算法:基于非支配排序的鱼鹰优化算法(NSOOA)MATLAB

一、鱼鹰优化算法 鱼鹰优化算法&#xff08;Osprey optimization algorithm&#xff0c;OOA&#xff09;由Mohammad Dehghani 和 Pavel Trojovsk于2023年提出&#xff0c;其模拟鱼鹰的捕食行为。 Python&#xff1a;鱼鹰优化算法&#xff08;Osprey optimization algorithm&a…

新版发布 | Cloudpods v3.10.5 和 v3.9.13 正式发布

Cloudpods v3.10.5 本期发布中&#xff0c;ocboot 部署脚本有较多变化&#xff0c;首先支持以非 root 用户执行安装流程&#xff0c;其次响应社区的呼吁&#xff0c;增加了–stack 参数&#xff0c;允许 Allinone 一键安装仅包含私有云&#xff08;参数为 edge&#xff09;或云…

ESP8266 WiFi物联网智能插座—项目简介

目录 1、项目背景 2、设备节点功能 3、上位机功能 物联网虽然能够使家居设备和系统实现自动化、智能化管理&#xff0c;但是依然需要依靠更为先进的终端插座作为根本保障&#xff0c;插座是所有家用电器需要使用的电源设备&#xff0c;插座的有序智能管理&#xff0c;对于实…

服务器免密登录设置

例如服务器A想要免密连接服务器B&#xff0c;需要以下2个步骤 步骤1&#xff1a;在服务器A上执行命令ssh-keygen –t rsa&#xff0c;直接回车&#xff0c;会在默认路径/root/.ssh下生成私钥和公钥 步骤2&#xff1a;将服务器A上生成的公钥id_rsa.pub的内容&#xff0c;复制粘…

进程的管理

#include <unistd.h> void _exit(int status); #include <stdlib.h> void _Exit(int status); status参数&#xff1a;是进程退出时的状态信息&#xff0c;父进程在回收子进程资源的时候可以获取到 #include<stdio.h> #include<stdlib.h> #includ…

【C++】搜索二叉树底层实现

目录 一&#xff0c;概念 二&#xff0c;实现分析 1. 插入 &#xff08;1.&#xff09;非递归版本 &#xff08;2.&#xff09;递归版本 2. 打印搜索二叉树 3.查找函数 &#xff08;1.&#xff09;非递归版本 &#xff08;2.&#xff09;递归版本 4. 删除函数&#x…

flex弹性盒模型与阿里图标的使用

华子目录 flex布局flex布局原理flex使用三要素 阿里图标&#xff08;字体&#xff09; flex布局 相关学习网站&#xff1a;http://c.biancheng.net/css3/flex.html 1.flex是当前最主流的布局方式&#xff1a;用它布局起来更方便&#xff0c;取代了浮动的作用。 2.浮动布局有缺…

Redis混合模式持久化原理

前言 前面文章中我们也介绍过Redis的持久化方式有两种&#xff1a;rdb持久化和aof持久化&#xff0c;具体详情可查看之前文章redis持久化。rdb持久化还是aof持久化它们都有各自的缺点。 rdb和aof缺点 rdb持久化&#xff1a;由于是定期对内存数据快照进行持久化&#xff0c;因此…

宝塔重装注意事项

欢迎关注我的公众号&#xff1a;夜说猫&#xff0c;让一个贫穷的程序员不靠打代码也能吃饭~ 前言 宝塔8.0版本&#xff0c;宝塔卸载重装&#xff0c;或者重装Linux系统后重新安装宝塔也适用。 不能上来直接就执行安装宝塔脚本&#xff0c;除非之前没有安装过宝塔。 步骤 1、…

【Mysql主从配置方法---单主从】

Mysql主从 主服务器 创建用户 create user “for_rep”“从服务器IP地址” IDENTIFIED by “123456” 授权 grant replication slave on . to “for_rep”“从服务器IP地址” IDENTIFIED by “123456” 查看用户权限 SHOW GRANTS FOR “for_rep”“从服务器IP地址”; 修改M…

Flutter粒子生成演示

演示&#xff1a; 直接上代码&#xff1a; import dart:math; import dart:ui;import package:flutter/material.dart; import package:kq_flutter_widgets/widgets/chart/ex/extension.dart;class ParticleView extends StatefulWidget {const ParticleView({super.key});ove…

Vue 使用vue-cli构建SPA项目(超详细)

目录 一、什么是vue-cli 二&#xff0c;构建SPA项目 三、 运行SPA项目 前言&#xff1a; 在我们搭建SPA项目时候&#xff0c;我们必须去检查我们是否搭建好NodeJS环境 cmd窗口输入以下指令&#xff1a;去检查 node -v npm -v 一、什么是vue-cli Vue CLI&#xff08;Vu…

Qt/C++音视频开发53-本地摄像头推流/桌面推流/文件推流/监控推流等

一、前言 编写这个推流程序&#xff0c;最开始设计的时候是用视频文件推流&#xff0c;后面陆续增加了监控摄像头推流&#xff08;其实就是rtsp视频流&#xff09;、网络电台和视频推流&#xff08;一般是rtmp或者http开头m3u8结尾的视频流&#xff09;、本地摄像头推流&#…

短视频矩阵系统,短视频矩阵源码技术开发

开发短视频矩阵系统的源码需要以下步骤&#xff1a; 确定系统需求&#xff1a;根据客户的需求&#xff0c;确定系统的功能和特点&#xff0c;例如用户注册登录、视频上传、视频浏览、评论点赞等。 设计系统架构&#xff1a;根据系统需求&#xff0c;设计系统的整体架构&#x…

前端工程师路上的宝藏:不可错过的进阶必读文章!

JavaScript 《javascript高级程序设计》核心知识总结 必要性&#xff1a;⭐️⭐️⭐️⭐️ 难度&#xff1a;⭐️⭐️⭐️⭐️ 谏言&#xff1a;建议初学者先读一两遍红宝石书&#xff08;即JavaScript高级程序设计&#xff09;&#xff0c;犀牛书可以暂时不看&#xff08;…

csp初赛总结 那些年编程走过的坑 初高中信竞常考语法算法点

&#x1f618;个人主页&#xff1a;曲终酣兴晚的小书屋&#x1f496; &#x1f615;作者介绍&#xff1a;一个莽莽撞撞的&#x1f43b; &#x1f496;专栏介绍&#xff1a;日常生活&往事回忆 &#x1f636;‍&#x1f32b;️每日金句&#xff1a;祝大家心有山水不造作&…

typedef function<int (int,int)> func_t;

这段代码是C中用于创建函数类型别名&#xff08;function type alias&#xff09;的语法。让我们来逐步解释它&#xff1a; typedef: typedef 是C中的关键字&#xff0c;用于创建类型别名。它允许你为一个已存在的类型创建一个新的、易于使用的名称。 function: 这部分指定了要…

【Java 基础篇】Java同步代码块解决数据安全

多线程编程是现代应用程序开发中的常见需求&#xff0c;它可以提高程序的性能和响应能力。然而&#xff0c;多线程编程也带来了一个严重的问题&#xff1a;数据安全。在多线程环境下&#xff0c;多个线程同时访问和修改共享的数据可能导致数据不一致或损坏。为了解决这个问题&a…