文章目录
- 1 proto文件编辑
- 2 生成C++代码
- 2.1 生成protobuf(反)序列化代码
- 2.2 生成服务框架代码
- 3 同步server端
- 3.1 命名空间
- 3.2 重写服务
- 3.3 启动服务
- 3.4 完整代码
- 4 同步client端
- 4.1 命名空间
- 4.2 定义客户端
- 4.3 完整代码
- 5 异步server端
手把手写rpc范例流程:
- 编写proto文件
- 根据proto文件生成对应的.cc和.h文件
- server程序继承Service
- client程序使用stub
- 编译server和client程序。
1 proto文件编辑
IM.Login.proto
syntax = "proto3";
// 命名空间一般和文件名保持一致
package IM.Login;
// 定义服务
service ImLogin {
// 定义服务函数
rpc Regist (IMRegistReq) returns (IMRegistRes) {}
rpc Login (IMLoginReq) returns (IMLoginRes){}
}
// 注册账号
message IMRegistReq{
string user_name = 1; // 用户名
string password = 2; // 密码
}
message IMRegistRes{
string user_name = 1;
uint32 user_id = 2;
uint32 result_code = 3; // 返回0的时候注册正常
}
// 登录账号
message IMLoginReq{
string user_name = 1;
string password = 2;
}
message IMLoginRes{
uint32 user_id = 1;
uint32 result_code = 2; // 返回0的时候正确
}
2 生成C++代码
2.1 生成protobuf(反)序列化代码
在 .proto 文件中定义了数据结构,这些数据结构是面向开发者和业务程序的,并不面向存储和传输。当
需要把这些数据进行存储或传输时,就需要将这些结构数据进行序列化、反序列化以及读写。通过
protoc 这个编译器,ProtoBuf 将会为我们提供相应的接口代码。可通过如下命令生成相应的接口代
码:
// $SRC_DIR: .proto 所在的源目录
// --cpp_out: 生成 c++ 代码
// $DST_DIR: 生成代码的目标目录
// xxx.proto: 要针对哪个 proto 文件生成接口代码
protoc -I=$SRC_DIR --cpp_out=$DST_DIR $SRC_DIR/xxx.proto
// 如:
protoc -I ./ --cpp_out=. simple.proto
- -I 是 --proto_path 的缩写形式,
$SRC_DIR
指定在解析导入指令时查找 .proto 文件的目录。如果省
略,则使用当前目录。可以通过多次传递 --proto_path 选项来指定多个导入目录,他们将按顺序
搜索。 - 执行下面命令后,将在$DST_DIR目录下生成 xxx.pb.h 和 xxx.ph.cc文件
2.2 生成服务框架代码
// 执行下面命令后,将在当前目录下生成 simple.grpc.pb.h 和 simple.grpc.pb.cc 文件
protoc -I ./ --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin`
simple.proto
// 上面的 `which grpc_cpp_plugin` 也可以替换为 grpc_cpp_plugin 程序的路径(如果不在系统PATH下)
// 生成的代码需要依赖第一步生成序列化代码,所以在使用的时候必须都要有(生成的时候不依赖)
这里我们生成对应要使用的文件:
protoc --cpp_out=. IM.Login.proto
protoc --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin IM.Login.proto
或protoc -I . --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` *.proto
生成C++代码IM.Login.pb.h和IM.Login.pb.cc, IM.Login.grpc.pb.h和IM.Login.grpc.pb.cc。
3 同步server端
3.1 命名空间
记住首先一定要开放命名空间
// 命名空间
// grcp
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
3.2 重写服务
定义服务端的类,继承 proto 文件定义的 grpc 服务 ImLogin ,重写 grpc 服务定义的方法
class IMLoginServiceImpl : public ImLogin::Service {// 注册virtual Status Regist(ServerContext* context, const IMRegistReq* request,IMRegistRes* response) override {std::cout << "Regist user_name: " << request->user_name() << std::endl;response->set_user_name(request->user_name());response->set_user_id(10);response->set_result_code(0);return Status::OK;}// 登录virtual Status Login(ServerContext* context, const IMLoginReq* request,IMLoginRes* response) override {std::cout << "Login user_name: " << request->user_name() << std::endl;response->set_user_id(10);response->set_result_code(0);return Status::OK;}
};
3.3 启动服务
std::string server_address("0.0.0.0:50051");
/* 定义重写的服务类 */
ImLoginServiceImpl service;
/* 创建工厂类 */
ServerBuilder builder;
/* 监听端口和地址 */
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 5000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
/* 注册服务 */
builder.RegisterService(&service);
/** 创建和启动一个RPC服务器*/
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
/* 进入服务事件循环 */
server->Wait();
3.4 完整代码
#include <iostream>
#include <string>// grpc头文件
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>// 包含我们自己proto文件生成的.h
#include "IM.Login.pb.h"
#include "IM.Login.grpc.pb.h"// 命名空间
// grcp
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;class IMLoginServiceImpl : public ImLogin::Service {// 注册virtual Status Regist(ServerContext* context, const IMRegistReq* request, IMRegistRes* response) override {std::cout << "Regist user_name: " << request->user_name() << std::endl;response->set_user_name(request->user_name());response->set_user_id(10);response->set_result_code(0);return Status::OK;}// 登录virtual Status Login(ServerContext* context, const IMLoginReq* request, IMLoginRes* response) override {std::cout << "Login user_name: " << request->user_name() << std::endl;response->set_user_id(10);response->set_result_code(0);return Status::OK;}};void RunServer()
{std::string server_addr("0.0.0.0:50051");// 创建一个服务类IMLoginServiceImpl service;ServerBuilder builder;builder.AddListeningPort(server_addr, grpc::InsecureServerCredentials());builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 5000);builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);builder.RegisterService(&service);//创建/启动std::unique_ptr<Server> server(builder.BuildAndStart());std::cout << "Server listening on " << server_addr << std::endl;// 进入服务循环server->Wait();
}// 怎么编译?
// 手动编译
// 通过cmake的方式
int main(int argc, const char** argv)
{RunServer();return 0;
}
4 同步client端
4.1 命名空间
记住首先一定要开放命名空间
// 命名空间
// grcp
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
4.2 定义客户端
定义客户端的类,实现两个方法用来发送grpc请求以及接收grpc响应
class ImLoginClient
{
public:ImLoginClient(std::shared_ptr<Channel> channel):stub_(ImLogin::NewStub(channel)) {}void Regist(const std::string &user_name, const std::string &password) {IMRegistReq request;request.set_user_name(user_name);request.set_password(password);IMRegistRes response;ClientContext context;std::cout << "-> Regist req" << std::endl;Status status = stub_->Regist(&context, request, &response);if(status.ok()) {std::cout << "user_name:" << response.user_name() << ", user_id:" << response.user_id() << std::endl;} else {std::cout << "user_name:" << response.user_name() << "Regist failed: " << response.result_code()<< std::endl;}}void Login(const std::string &user_name, const std::string &password) {IMLoginReq request;request.set_user_name(user_name);request.set_password(password);IMLoginRes response;ClientContext context;std::cout << "-> Login req" << std::endl;Status status = stub_->Login(&context, request, &response);if(status.ok()) {std::cout << "user_id:" << response.user_id() << " login ok" << std::endl;} else {std::cout << "user_name:" << request.user_name() << "Login failed: " << response.result_code()<< std::endl;}}private:std::unique_ptr<ImLogin::Stub> stub_;
};
4.3 完整代码
#include <iostream>
#include <memory>
#include <string>
// /usr/local/include/grpcpp/grpcpp.h
#include <grpcpp/grpcpp.h>// 包含我们自己proto文件生成的.h
#include "IM.Login.pb.h"
#include "IM.Login.grpc.pb.h"// 命名空间
// grcp
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;class ImLoginClient
{
public:ImLoginClient(std::shared_ptr<Channel> channel):stub_(ImLogin::NewStub(channel)) {}void Regist(const std::string &user_name, const std::string &password) {IMRegistReq request;request.set_user_name(user_name);request.set_password(password);IMRegistRes response;ClientContext context;std::cout << "-> Regist req" << std::endl;Status status = stub_->Regist(&context, request, &response);if(status.ok()) {std::cout << "user_name:" << response.user_name() << ", user_id:" << response.user_id() << std::endl;} else {std::cout << "user_name:" << response.user_name() << "Regist failed: " << response.result_code()<< std::endl;}}void Login(const std::string &user_name, const std::string &password) {IMLoginReq request;request.set_user_name(user_name);request.set_password(password);IMLoginRes response;ClientContext context;std::cout << "-> Login req" << std::endl;Status status = stub_->Login(&context, request, &response);if(status.ok()) {std::cout << "user_id:" << response.user_id() << " login ok" << std::endl;} else {std::cout << "user_name:" << request.user_name() << "Login failed: " << response.result_code()<< std::endl;}}private:std::unique_ptr<ImLogin::Stub> stub_;
};// 照葫芦画瓢
int main()
{// 服务器的地址std::string server_addr = "localhost:50051";ImLoginClient im_login_client(grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials()));std::string user_name = "darren";std::string password = "123456";im_login_client.Regist(user_name, password);im_login_client.Login(user_name, password);return 0;
}
5 异步server端
//异步多类调用,实际项目中使用多类的比一个类多函数的要多,因为一个类多函数new的效率低,
//并且还可能产生无用数据/*** Copyright 2015 gRPC authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.**/#include <iostream>
#include <memory>
#include <string>
#include <thread>#include "IM.Login.grpc.pb.h"
#include "IM.Login.pb.h"#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;class ServerImpl final {public:~ServerImpl() {server_->Shutdown();// Always shutdown the completion queue after the server.cq_->Shutdown();}// There is no shutdown handling in this code.void Run() { // 启动std::string server_address("0.0.0.0:50051");ServerBuilder builder;// Listen on the given address without any authentication mechanism.builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());// Register "service_" as the instance through which we'll communicate with// clients. In this case it corresponds to an *asynchronous* service.builder.RegisterService(&service_);// Get hold of the completion queue used for the asynchronous communication// with the gRPC runtime.cq_ = builder.AddCompletionQueue();// Finally assemble the server.server_ = builder.BuildAndStart();std::cout << "Server listening on " << server_address << std::endl;// Proceed to the server's main loop.HandleRpcs();}private:// Class encompasing the state and logic needed to serve a request.class CallData {public:// Take in the "service" instance (in this case representing an asynchronous// server) and the completion queue "cq" used for asynchronous communication// with the gRPC runtime.CallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq): service_(service), cq_(cq), status_(CREATE) {std::cout << "CallData constructing, this: " << this<< std::endl; // darren 增加// Invoke the serving logic right away.Proceed();}virtual ~CallData(){}virtual void Proceed() {// std::cout << "CallData Prceed" << std::endl//;return;}// 通用的// The means of communication with the gRPC runtime for an asynchronous// server.ImLogin::AsyncService* service_;// The producer-consumer queue where for asynchronous server notifications.ServerCompletionQueue* cq_;// Context for the rpc, allowing to tweak aspects of it such as the use// of compression, authentication, as well as to send metadata back to the// client.ServerContext ctx_;// 有差异的// What we get from the client.// IMRegistReq request_;// // What we send back to the client.// IMRegistRes reply_;// // The means to get back to the client.// ServerAsyncResponseWriter<IMRegistRes> responder_;// Let's implement a tiny state machine with the following states.enum CallStatus { CREATE, PROCESS, FINISH };CallStatus status_; // The current serving state.};class RegistCallData : public CallData {public:RegistCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq) :CallData(service, cq), responder_(&ctx_) {Proceed();}~RegistCallData() {}void Proceed() override {// std::cout << "RegistCallData Prceed" << std::endl//;std::cout << "this: " << this<< " RegistCallData Proceed(), status: " << status_<< std::endl; // darren 增加if (status_ == CREATE) { // 0std::cout << "this: " << this << " RegistCallData Proceed(), status: "<< "CREATE" << std::endl;// Make this instance progress to the PROCESS state.status_ = PROCESS;// As part of the initial CREATE state, we *request* that the system// start processing SayHello requests. In this request, "this" acts are// the tag uniquely identifying the request (so that different CallData// instances can serve different requests concurrently), in this case// the memory address of this CallData instance.service_->RequestRegist(&ctx_, &request_, &responder_, cq_, cq_, this);} else if (status_ == PROCESS) { // 1std::cout << "this: " << this << " RegistCallData Proceed(), status: "<< "PROCESS" << std::endl;// Spawn a new CallData instance to serve new clients while we process// the one for this CallData. The instance will deallocate itself as// part of its FINISH state.new RegistCallData(service_, cq_); // 1. 创建处理逻辑reply_.set_user_name(request_.user_name());reply_.set_user_id(10);reply_.set_result_code(0);// And we are done! Let the gRPC runtime know we've finished, using the// memory address of this instance as the uniquely identifying tag for// the event.status_ = FINISH;responder_.Finish(reply_, Status::OK, this);} else {std::cout << "this: " << this << " RegistCallData Proceed(), status: "<< "FINISH" << std::endl;GPR_ASSERT(status_ == FINISH);// Once in the FINISH state, deallocate ourselves (RegistCallData).delete this;}}private:IMRegistReq request_;IMRegistRes reply_;ServerAsyncResponseWriter<IMRegistRes> responder_;};class LoginCallData : public CallData {public:LoginCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq) :CallData(service, cq), responder_(&ctx_) {Proceed();}~LoginCallData() {}void Proceed() override {// std::cout << "LoginCallData Prceed" << std::endl//;std::cout << "this: " << this<< " LoginCallData Proceed(), status: " << status_<< std::endl; // darren 增加if (status_ == CREATE) { // 0std::cout << "this: " << this << " LoginCallData Proceed(), status: "<< "CREATE" << std::endl;// Make this instance progress to the PROCESS state.status_ = PROCESS;// As part of the initial CREATE state, we *request* that the system// start processing SayHello requests. In this request, "this" acts are// the tag uniquely identifying the request (so that different CallData// instances can serve different requests concurrently), in this case// the memory address of this CallData instance.service_->RequestLogin(&ctx_, &request_, &responder_, cq_, cq_, this);} else if (status_ == PROCESS) { // 1std::cout << "this: " << this << " LoginCallData Proceed(), status: "<< "PROCESS" << std::endl;// Spawn a new CallData instance to serve new clients while we process// the one for this CallData. The instance will deallocate itself as// part of its FINISH state.new LoginCallData(service_, cq_); // 1. 创建处理逻辑reply_.set_user_id(10);reply_.set_result_code(0);// And we are done! Let the gRPC runtime know we've finished, using the// memory address of this instance as the uniquely identifying tag for// the event.status_ = FINISH;responder_.Finish(reply_, Status::OK, this);} else {std::cout << "this: " << this << " LoginCallData Proceed(), status: "<< "FINISH" << std::endl;GPR_ASSERT(status_ == FINISH);// Once in the FINISH state, deallocate ourselves (LoginCallData).delete this;}}private:IMLoginReq request_;IMLoginRes reply_;ServerAsyncResponseWriter<IMLoginRes> responder_;};// This can be run in multiple threads if needed.void HandleRpcs() { // 可以运行在多线程// Spawn a new CallData instance to serve new clients.new RegistCallData(&service_, cq_.get()); //new LoginCallData(&service_, cq_.get());void* tag; // uniquely identifies a request.bool ok;while (true) {// Block waiting to read the next event from the completion queue. The// event is uniquely identified by its tag, which in this case is the// memory address of a CallData instance.// The return value of Next should always be checked. This return value// tells us whether there is any kind of event or cq_ is shutting down.std::cout << "before cq_->Next "<< std::endl; // 1. 等待消息事件 darren 增加GPR_ASSERT(cq_->Next(&tag, &ok));std::cout << "after cq_->Next " << std::endl; // darren 增加GPR_ASSERT(ok);std::cout << "before static_cast" << std::endl; // darren 增加static_cast<CallData*>(tag)->Proceed();std::cout << "after static_cast" << std::endl; // darren 增加}}std::unique_ptr<ServerCompletionQueue> cq_;ImLogin::AsyncService service_;std::unique_ptr<Server> server_;
};int main(int argc, char** argv) {ServerImpl server;server.Run();return 0;
}