RPC 框架

RPC 全称 Remote Procedure Call——远程过程调用。

  • RPC技术简单说就是为了解决远程调用服务的一种技术,使得调用者像调用本地服务一样方便透明。
  • RPC是一种通过网络从远程计算机程序上请求服务,不需要了解底层网络技术的协议。

集群和分布式

集群:集群(cluster)是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。在不同的服务器中部署相同的功能。

分布式:指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。不同服务器中部署不同的功能,通过网络连接起来,组成一个完整的系统。

分布式是以缩短单个任务的执行时间来提升效率的,而集群则是通过提高单位时间内执行的任务数来提升效率。

为什么要有RPC?

服务化:微服务化,跨平台的服务之间远程调用;
分布式系统架构:分布式服务跨机器进行远程调用;
服务可重用:开发一个公共能力服务,供多个服务远程调用。
系统间交互调用:两台服务器A、B,服务器 A 上的应用 a 需要调用服务器 B 上的应用 b 提供的方法,而应用 a 和应用 b 不在一个内存空间,不能直接调用,此时,需要通过网络传输来表达需要调用的语义及传输调用的数据。

使用场景

  • 大型网站:内部涉及多个子系统,服务、接口较多。
  • 注册发现机制:如Nacos、Dubbo等,一般都有注册中心,服务有多个实例,调用方调用的哪个实例无感知。
  • 安全性:不暴露资源。
  • 服务化治理:微服务架构、分布式架构。

常用RPC技术或框架

  • 应用级的服务框架:阿里的 Dubbo/Dubbox、Google gRPC、Spring Boot/Spring Cloud。
  • 远程通信协议:RMI、Socket、SOAP(HTTP XML)、REST(HTTP JSON)。
  • 通信框架:MINA 和 Netty

RPC 原理

在这里插入图片描述
RPC 是指计算机 A 上的进程,调用另外一台计算机 B 上的进程,其中 A 上的调用进程被挂起,而 B 上的被调用进程开始执行,当值返回给 A 时,A 进程继续执行。调用方可以通过使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。而这一过程,对于开发人员来说是透明的。

远程过程调用采用客户机/服务器(C/S)模式。请求程序就是一个客户机,而服务提供程序就是一台服务器。和常规或本地过程调用一样,远程过程调用是同步操作,在远程过程结果返回之前,需要暂时中止请求程序。使用相同地址空间的低权进程或低权线程允许同时运行多个远程过程调用。

在这里插入图片描述

RPC五大模块及交互关系

在这里插入图片描述

  • user(客户端)
  • user-stub(客户端存根)
  • RPCRuntime(RPC通信包)
  • server-stub(服务端存根)
  • server(服务端)

用户端:当用户希望进行远程调用时,实际上是调用的本地 user-stub 中相应的代码。user-stub 负责将调用的规范和参数打包成一个或多个包,通过 RPCRuntime(RPC通信包)传输到被调用机器。
服务端:服务端接收到这些数据包后,对应的 RPCRuntime(RPC通信包)将它们传递给 server-stub。然后 server-stub 将它们解包,并调用对应的本地实现。同时用户端的调用进程挂起,等待服务端返回结果包。当服务端调用完成时,返回到 server-stub,并通过服务端的RPCRuntime 将结果传回用户端对应的 RPCRuntime(RPC通信包)挂起的进程中。然后通过 user-stub 解包,最后将它们返回给用户。

如果把用户端和服务端代码放在一台机器上,直接绑定在一起,不使用 user-stub 和 server-stub,程序仍然可以工作。RPCRuntime(RPC通信包)是Cedar系统的一个标准部分,因此不用程序员编写通信相关代码,但是 user-stub 和 server-stub 是由一个叫做 Lupine 的程序自动生成的,也不需要程序员编写对应包处理层面的代码。

RPC 业务实现

Callee 对外提供远端可调用方法 LoginRegister,要在 user.proto 中进行注册(service UserServiceRpc)。在Callee中的Login方法接受 LoginRequest message,执行完逻辑后返回LoginResponse message 给 Caller。

Caller 可以调用 UserServiceRpc_Stub::Login发起远端调用,而 Callee 则继承UserServiceRpc类并重写UserServiceRpc::Login函数,实现Login函数的处理逻辑。这是 protobuf 提供的接口,需要服务方法提供者重写这个 Login 函数。

class UserService : public fixbug::UserServiceRpc{  //使用在rpc服务发布端(rpc服务提供者)public:bool Login(std::string name, std::string pwd){std::cout << "doing local service : LOGIN " << std::endl;std::cout << "name: " << name << " pwd: "<< pwd << std::endl;return true;}//新增的测试方法bool Register(uint32_t id,std::string name,std::string pwd){std::cout << "doing local service: Register" << std::endl;std::cout << "id:" << id <<" name:" << name << " pwd:" << pwd << std::endl;return true;}// 重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的// 1. caller --> Login(LoginRequest) --> muduo --> callee// 2. callee --> Login(LoginRequest) --> 交到下面重写的这个Login方法上了void Login(::google::protobuf::RpcController* controller,const ::fixbug::LoginRequest* request,::fixbug::LoginResponse* response,::google::protobuf::Closure* done){//框架给业务.上报了请求参数LoginRequest,应用获取相应数据做本地业务std::string name = request->name();std::string pwd = request->pwd();// 做本地业务bool login_result = Login(name, pwd); // 把响应写入包括错误码、 错误消息、返回值fixbug::ResultCode *code = response->mutable_result();code->set_errcode(0);code->set_errmsg("");response->set_success(login_result);//执行回调操作执行, 响应对象数据的序列化和网络发送 (都是由框架来完成的)done->Run();}void Register(::google::protobuf::RpcController* controller,const ::fixbug::RegisterRequest* request,::fixbug::RegisterResponse* response,::google::protobuf::Closure* done){uint32_t id = request->id();std::string name = request->name();std::string pwd = request->pwd();//开始做本地业务bool ret = Register(id, name, pwd);//填充回调结果response->mutable_result()->set_errcode(0);response->mutable_result()->set_errmsg("");response->set_success(ret);done->Run();}
};

RPC 服务提供

  1. RpcProvider 是一个服务器,接收来自 rpc 客户端的请求,且能在一定程度上承载高并发的需求(考虑多个 rpcClient 给当前 rpcProvider 发送 rpc 调用请求)。
  2. 一个 rpcclient 发送请求过来调用一个远程方法,那么 rpcProvider 收到这个请求之后,能根据请求所携带的数据自动调用发布的 rpc 方法,那么请求必须包含服务名、方法名、以及参数,这样 rpcProvider 才知道怎么调用。即 buffer = service_name + method_name + args。
//框架提供的专门负责发布rpc服务的网络对象类
class RpcProvider{
public://这里是框架提供给外部使用的,可以发布rpc方法的函数接口//此处应该使用Service类,而不是指定某个方法void NotifyService(google::protobuf::Service *service);//启动rpc服务节点,开始提供rpc远程网络调用服务void Run();private://组合 EventLoopmuduo::net::EventLoop m_eventLoop;//service服务类型信息struct ServiceInfo{google::protobuf::Service *m_service;//保存服务对象std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;//保存服务方法};//存储注册成功的服务对象和其服务方法的所有信息std::unordered_map<std::string,ServiceInfo> m_serviceMap;// 新的 socket 连接时的回调void OnConnection(const muduo::net::TcpConnectionPtr &conn);// 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应void OnMessage(const muduo::net::TcpConnectionPtr &conn,muduo::net::Buffer *buffer,muduo::Timestamp);//Closure的回调操作,用于序列化RPC的响应和网络发送void SendRpcResponse(const muduo::net::TcpConnectionPtr&,google::protobuf::Message* );
}; 
int main(int argc, char *argv[])
{//先调用框架的初始化操作 provider -i config.conf,从init方法读取配置服务,比如IP地址和端口号MprpcApplication::Init(argc,argv);//项目提供者,让我们可以发布该服务RpcProvider provider;//把UserService对象发布到rpc节点上provider.NotifyService(new UserService());//启动一个rpc服务发布节点,run以后,进程进入阻塞状态,等待远程的rpc请求provider.Run();return 0;
}

NotifyService 函数可以将UserService服务对象及其提供的方法进行预备发布。发布完服务对象后再调用Run()就将预备发布的服务对象及方法注册到ZooKeeper上并开启了对远端调用的网络监听。

Muduo提供的网络模块监听到连接事件并处理完连接逻辑后会调用OnConnection函数,监听到已建立的连接发生可读事件后会调用OnMessage函数

RpcProvider::NotifyService() 实现

Service_Info结构体内定义了一个服务对象,以及这个服务对象内提供的方法们(以std::unordered_map形式存储)

将传入进来的服务对象 service 进行预备发布。其实说直白点就是将这个 service 服务对象及其提供的方法的 Descriptor 描述类,存储在RpcProvider::m_serviceMap中。

/*
service_name <=> service 描述  => service* 记录服务对象=> method_name => method 方法对象
json protobuf
*///这里是框架提供给外部使用的,可以发布rpc方法的函数接口
//此处应该使用Service类,而不是指定某个方法
void RpcProvider::NotifyService(google::protobuf::Service *service){//服务表ServiceInfo service_info;//服务表//获取了服务对象的描述信息const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();//获取服务的名字std::string service_name = pserviceDesc->name();//获取服务对象service的方法数量int methodCnt= pserviceDesc->method_count();std::cout<<"service name:"<<service_name<<std::endl;    // 添加日志信息后更改for(int i=0; i<methodCnt; ++i){//获取了服务对象指定下标的服务方法的描述(抽象描述)const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);std::string method_name = pmethodDesc->name();//插入服务service_info.m_methodMap.insert({method_name, pmethodDesc});printf("method_name:%s \n",method_name.c_str());}//可以使用该表来调用方法service_info.m_service = service;m_serviceMap.insert({service_name, service_info});}

RpcProvider::Run() 实现

将待发布的服务对象及其方法发布到ZooKeeper上,同时利用Muduo库提供的网络模块开启对RpcServer的(IP, Port)的监听。

// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run(){// 获取配置文件中的 ip 和端口号初始化结构体std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());muduo::net::InetAddress address(ip, port);// 为了方便用户使用框架,在 Run 方法中封装 muduo// 创建 TcpServer 对象muduo::net::TcpServer tcpServer_(&m_eventLoop, address, "MprpcProvider");// 绑定连接回调和消息读写回调方法tcpServer_.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));tcpServer_.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置 muduo 库的线程数tcpServer_.setThreadNum(4);//把当前rpc节点上要发布的服务全部注册在zk上,让rpc client可以从zk上发现服务//session的timeout默认为30s,zkclient的网络I/O线程1/3的timeout内不发送心跳则丢弃此节点ZkClient zkCli;zkCli.Start();//链接zkserverfor(auto &sp:m_serviceMap){//service_namestd::string service_path ="/"+sp.first;//拼接路径zkCli.Create(service_path.c_str(),nullptr,0);//创建临时性节点for(auto &mp:sp.second.m_methodMap){//service_name/method_namestd::string method_path=service_path+"/"+mp.first;//拼接服务器路径和方法路径char method_path_data[128]={0};sprintf(method_path_data,"%s:%d",ip.c_str(),port);//向data中写入路径//创建节点,ZOO_EPHEMERAL表示临时节点zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);}}std::cout << "MprpcProvider start service at: " << ip << ':' << port << '\n';// 启动网络服务tcpServer_.start();m_eventLoop.loop();}

RpcProvider::OnConnection() 实现

// 新的 socket 连接时的回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn){if(!conn->connected()){//和rpcclient的链接断开了conn->shutdown();}
}

RpcProvider::OnMessage() 实现

Caller 端发起远程调用的时候, 会对callee的rpcserver发起tcp连接,rpcserver接受连接后,开启对客户端连接描述符的可读事件监听。caller将请求的服务方法及参数发给callee的rpcserver,此时rpcserver上的muduo网络模块监听到该连接的可读事件,然后就会执行OnMessage(…)函数逻辑。

该方法表示已建立连接用户的读写事件操作,如果有一个远程 RPC 服务的调用请求,那么OnMessage方法就会响应。

  1. 首先要从网络上接收的远程rpc调用请求的字符流;
  2. 从字符流中读取前4个字节的内容,将头部的大小转换成二进制存到这四字节里,不可能会超出范围;
  3. 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息;
  4. 获取rpc方法参数的字符流数据,略过recv_buf的前面的头部信息(header_size和header_str),4字节加header_size即为开始的位置;
  5. 获取service对象和method对象;
  6. 生成rpc方法调用的请求request和响应response参数;
  7. 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法。
/*
在框架内部需要提前协商好通信使用的protobuf数据类型:比如发送过来的数据类型为:service_name,method_name,args
需要定义proto的message类型,进行数据头的序列化和反序列化,为防止TCP的粘包,需要对各个参数进行参数的长度明确定义header_size(4字节) + header_str + args_str已建立连接的用户的读写事件回调,网络上如果有一个远程的rpc服务请求,则onmessge方法就会响应
*/// 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp){//获取到数据,即网络上接受的远程rpc调用请求的字符流, Login和argsstd::string recv_buf= buffer->retrieveAllAsString();//读取header_size,此时的整数若按照字符串格式发送,读取时会出现问题,所以需要直接按二进制发送//从字符流中读取前四个字节的内容uint32_t header_size=0;recv_buf.copy((char*)&header_size,4,0);//根据header_size读取数据头的原始字符流,反序列化数据,得到rpc的详细请求数据std::string rpc_header_str=recv_buf.substr(4,header_size);//substr从4开始读读取header_size个字节的数据mprpc::RpcHeader rpcHeader;std::string service_name;//用于存储反序列化成功的服务名字std::string method_name;//用于存储反序列化成功的服务方法uint32_t args_size;//用于存储反序列化成功的参数个数//开始反序列化,参数接受类型为引用,返回值为bool型if(rpcHeader.ParseFromString(rpc_header_str)){//数据头反序列化成功service_name=rpcHeader.service_name();method_name=rpcHeader.method_name();args_size=rpcHeader.args_size();}else {//数据头反序列化失败std::cout<<"rpc_header_str: "<<rpc_header_str<<"parse error!"<<std::endl;return;}//获取rpc参数方法的字符流数据std::string args_str=recv_buf.substr(4+header_size,args_size);//打印调试信息std::cout << "======================================" << std::endl;std::cout << "header_size: " << header_size << std::endl;std::cout << "rpc_header_str" << rpc_header_str << std::endl;std::cout << "service_name: " << service_name << std::endl;std::cout << "method_name: " << method_name << std::endl;std::cout << "args_str: " << args_str << std::endl;std::cout << "======================================" << std::endl;//获取service对象和method对象auto it = m_serviceMap.find(service_name);if(it == m_serviceMap.end()){//如果方法不存在std::cout << service_name << "is not exist!" << std::endl;return;}auto mit = it->second.m_methodMap.find(method_name);if(mit == it->second.m_methodMap.end()){//如果服务提供的方法不存在std::cout << service_name << ":" << method_name << "is not exists!" << std::endl;return;}google::protobuf::Service *service=it->second.m_service;    // 获取service对象,对应Userserviceconst google::protobuf::MethodDescriptor *method=mit->second;   // 获取method对象,对应Login方法//生成rpc方法调用的请求request和相应response参数google::protobuf::Message *request = service->GetRequestPrototype(method).New();//生成一个新对象if(!request->ParseFromString(args_str)){std::cout << "request parse error, content:" << args_str << std::endl;return;}google::protobuf::Message *response = service->GetResponsePrototype(method).New();//生成一个新对象//给下面的method方法的调用,绑定一个Closure的回调函数,因为模板的实参推演失败,所以需要指定类型google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*>(this, &RpcProvider::SendRpcResponse, conn,response);//在框架上根据远端rpc请求,调用当前rpc节点上发布的方法//相当于UserService调用了Login方法service->CallMethod(method, nullptr, request, response, done);
}

NewCallback函数会返回一个google::protobuf::Closure类的对象,该Closure类其实相当于一个闭包。这个闭包捕获了一个成员对象的成员函数,以及这个成员函数需要的参数。然后闭包类提供了一个方法Run(),当执行这个闭包对象的Run()函数时,他就会执行捕获到的成员对象的成员函数,也就是相当于执行void RpcProvider::SendRpcResponse(conn, response);,这个函数可以将reponse消息体发送给Tcp连接的另一端,即caller

CallMethod 将服务名方法名进行组装,并用protobuf提供的序列化方法序列化,然后通过服务名方法名查找ZooKeeper服务器上提供该服务方法的RpcServer的地址信息,然后返回。接着再将请求的服务方法及其参数组装并序列化,向RpcServer发起tcp连接请求,连接建立后将序列化的数据发送给RpcServer,然后再等待接收来自RpcServer的返回消息体。

RpcProvider::SendRpcResponse() 实现

//Closure的回调操作,用于序列化RPC的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn,google::protobuf::Message* response){std::string response_str;//response进行序列化if(response->SerializeToString(&response_str)){//序列化成功后,通过网络把rpc方法执行的结果发送回rpc的调用方conn->send(response_str);} else {std::cout<<"Serialize response error!"<<std::endl;}//模拟http的短链接服务,由rpcprovider主动断开连接conn->shutdown();
}

RPC 服务调用

调用方需要利用到的是 Stub 类。Stub 类需要提供一个带参数的构造函数,需要重写这个实参 RpcChannel。

class MprpcChannel:public google::protobuf::RpcChannel
{
public://所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送void CallMethod(const google::protobuf::MethodDescriptor* method,google::protobuf::RpcController* controller, const google::protobuf::Message* request,google::protobuf::Message* response, google::protobuf::Closure* done);
};

提供方调用函数的方法:MprpcChannel::CallMethod,调用方的框架逻辑就是将访问的对象,函数,参数序列化,socket连接到zookeeper,获取对应的 response。

//所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,google::protobuf::RpcController* controller, const google::protobuf::Message* request,google::protobuf::Message* response, google::protobuf::Closure* done)
{const google::protobuf::ServiceDescriptor* sd=method->service();std::string service_name=sd->name();    //service namestd::string method_name=method->name(); //method name//获取参数的序列化字符串长度 args_sizeuint32_t args_size = 0;std::string args_str;if(request->SerializeToString(&args_str)){//序列化成功args_size=args_str.size();} else {controller->SetFailed("serialize request error!");//保存错误信息// std::cout <<"serialize request error!"<< std::endl;return;}//定义rpc的请求headermprpc::RpcHeader rpcHeader;rpcHeader.set_service_name(service_name);rpcHeader.set_method_name(method_name);rpcHeader.set_args_size(args_size);uint32_t header_size = 0;std::string rpc_header_str;if(rpcHeader.SerializeToString(&rpc_header_str)){   // response进行序列化   header_size = rpc_header_str.size();} else {// std::cout <<"serialize rpc header error!"<< std::endl;    // 优化controller->SetFailed("serialize rpc header error!");return;}//组织待发送的rpc请求的字符串std::string send_rpc_str;send_rpc_str.insert(0, std::string((char *)&header_size, 4));   // header_sizesend_rpc_str += rpc_header_str; // rpcheadersend_rpc_str += args_str;   // argsstd::cout<<"======================================"<<std::endl;std::cout<<"header_size: "<<header_size<<std::endl;std::cout<<"rpc_header_str"<<rpc_header_str<<std::endl;std::cout<<"service_name: "<<service_name<<std::endl;std::cout<<"method_name: "<<method_name<<std::endl;std::cout<<"args_str: "<<args_str<<std::endl;std::cout<<"======================================"<<std::endl;//使用TCP编程,完成rpc方法的远程调用int clientfd = socket(AF_INET, SOCK_STREAM, 0);if(-1 == clientfd){// std::cout << "create socket error! errno: "<< errno << std::endl;    //改用 controller 记录错误信息// exit(EXIT_FAILURE);char errtxt[512]={0};sprintf(errtxt,"create socket error! errno: %d",errno);controller->SetFailed(errtxt);return;}// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");// uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());/*rpc调用方向调用service_name服务,需要查询zk上该服务所在的host信息*/ZkClient zkCli;zkCli.Start();std::string method_path="/"+service_name+"/"+method_name;//获取ip地址和端口号std::string host_data=zkCli.GetData(method_path.c_str());if(host_data==""){controller->SetFailed(method_path+" is not exist!");return;}int idx=host_data.find(":");//分割符if(idx==-1){controller->SetFailed(method_path+" address is invalid!");return;}std::string ip=host_data.substr(0,idx); //从字符串中返回一个指定的子串uint32_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());   //把参数 str 所指向的字符串转换为一个整数struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(port);server_addr.sin_addr.s_addr = inet_addr(ip.c_str());//链接rpc服务节点if(-1 == connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr))){// std::cout<<"connect error!errno: "<<errno<<std::endl;// close(clientfd);// exit(EXIT_FAILURE);close(clientfd);char errtxt[512]={0};sprintf(errtxt,"connect error! errno: %d",errno);controller->SetFailed(errtxt);return;}//发送rpc请求if(-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0)){// std::cout<<"send error!errno: "<<errno<<std::endl;// close(clientfd);// return;close(clientfd);char errtxt[512]={0};sprintf(errtxt,"send error! errno: %d",errno);controller->SetFailed(errtxt);return;}//接受rpc请求的响应值char recv_buf[1024]={0};int recv_size = 0;if(-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"recv error! errno: %d",errno);controller->SetFailed(errtxt);return;}//反序列化rpc调用的响应数据std::string response_str(recv_buf, 0, recv_size);   //bug点:recv_buf遇到\0后的数据不再读取,导致反序列化失败//解决方案:使用string转换时会遇到\0,由于字符串特性导致不再读取,因为protobuf支持从数组转换,所以换方法直接从Array反序列化// if(!response->ParseFromString(response_str)){if(!response->ParsePartialFromArray(recv_buf,recv_size)){// std::cout<<"parse error! response_str:"<<response_str<<std::endl;// close(clientfd);// return;close(clientfd);char errtxt[512]={0};sprintf(errtxt,"arse error!! response_str: %s",response_str.c_str());controller->SetFailed(errtxt);return;}close(clientfd);
}

zookeeper

ZooKeeper 在这里作为服务方法的管理配置中心,负责管理服务方法提供者对外提供的服务方法。
Callee提前将本端对外提供的服务方法名及自己的通信地址信息(IP:Port)注册到ZooKeeper。
当Caller发起远端调用时,会先拿着自己想要调用的服务方法名询问 ZooKeeper,ZooKeeper 告知Caller想要调用的服务方法在哪台服务器上(ZooKeeper返回目标服务器的IP:Port给Caller),Caller便向目标服务器Callee请求服务方法调用。Callle在本地执行相应服务方法后将结果返回给Caller。

安装java环境

在这里插入图片描述

1.sudo apt-get install openjdk-8-jdk
2. 配置环境变量,编辑如下文件:vim ~/.bashrc
在最后一行加:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

3.测试jdk是否安装成功:java -version
在这里插入图片描述

Ubuntu安装JDK

Zookeeper分布式协调服务

下载 zookeeper.tar.gz,解压后
1.cd conf,将 zoo_sample.cfg 改名为 zoo.cfgmv zoo_sample.cfg zoo.cfg
2.进入bin目录,启动zkServer, ./zkServer.sh start
3.可以通过netstat查看zkServer的端口,在bin目录启动zkClient.sh链接zkServer,熟悉zookeeper怎么组织节点

在这里插入图片描述
在这里插入图片描述

zk的原生开发API(c/c++接口)

1.sudo ./configure
2.sudo make
在这里插入图片描述
zookeeper 源码编译生成C函数接口,在 ./configure 后生成的 Makefile 文件中,默认是将警告当成错误的,因此导致上图中的警告,总是以错误形式展现,编译失败

进入到生成的 Makefile 中,修改第548行,将AM_CFLAGS -Wall -Werror 改为 AM_CFLAGS -Wall上述问题

Linux安装zookeeper原生C API接口出现的make编译错误

3.make install

zookeeper 项目应用

ZooKeeper相当于是一个特殊的文件系统,不过和普通文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据,目录节点不行。ZooKeeper内部为了保持高吞吐和低延迟,再内存中维护了一个树状的目录结构,这种特性使ZooKeeper不能存放大量数据,每个节点存放数据的上线为1M。

服务对象名在ZooKeeper中以永久性节点的形式存在,当RpcServer与ZooKeeper断开连接后,整个节点还是会存在。方法对象名则以临时性节点存在,RpcServer与ZooKeeper断开后临时节点被删除。临时节点上带着节点数据,在本项目中,节点数据就是提供该服务方法的RpcServer的通信地址(IP+Port)

//封装的zk客户端类
class ZkClient
{
public:ZkClient(); ~ZkClient();//zkclinet启动链接zkservervoid Start();//在zkserver上根据指定的path创建znode节点void Create(const char *path,const char *data,int datalen,int state=0);//传入参数指定的znode节点路径,或者znode节点的值std::string GetData(const char *path);private://zk的客户端句柄zhandle_t *m_zhandle;
};
#include"zookeeperutil.h"
#include"mprpcapplication.h"
#include<semaphore.h>
#include<iostream>// 全局的 watcher 观察器  zkserver 给 zkclient 的通知
// 参数 type 和 state 分别是 ZooKeeper 服务端返回的事件类型和连接状态
void global_watcher(zhandle_t *zh,int type,int state,const char *path,void *watcherCtx)
{if(type==ZOO_SESSION_EVENT) //回调的消息类型是和会话相关的消息类型{if(state==ZOO_CONNECTED_STATE)  //zkclient和zkserver链接成功{sem_t *sem=(sem_t*)zoo_get_context(zh);sem_post(sem);  //信号量资源加一}}
}ZkClient::ZkClient():m_zhandle(nullptr)
{}ZkClient::~ZkClient()
{if(m_zhandle!=nullptr){zookeeper_close(m_zhandle);//关闭句柄释放资源}
}//zkclinet启动链接zkserver
void ZkClient::Start()
{//加载zk的IP和端口号,默认为2181std::string host=MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");std::string port=MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");std::string connstr=host+":"+port;//调用原生API,端口与IP,回调函数,会话超时时间/*zookeeper_mt:多线程版本zookeeper的API客户端程序提供了三个线程API调用线程网络I/O线程:专门在一个线程里处理网络I/Owatcher回调线程*/m_zhandle=zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);// 仅仅通过判断接口返回的句柄是否为NULL,并不能表示句柄是可用的。// 因为,会话的建立过程是异步的,必须等到会话状态变成ZOO_CONNECTED_STATE才表示句柄可用。if(nullptr==m_zhandle){std::cout<<"zookeeper_init error!"<<std::endl;exit(EXIT_FAILURE);}sem_t sem;sem_init(&sem,0,0); //初始化资源为0,用于多线程间的同步// 将刚才定义的同步信号量sem通过这个 zoo_set_context 函数可以传递给 m_zhandle 进行保存。// 在global_watcher中可以将这个sem从m_zhandle取出来使用。zoo_set_context(m_zhandle,&sem);    //设置上下文,添加额外信息sem_wait(&sem); // 阻塞结束后才连接成功!!!std::cout<<"zookeeper_init success!"<<std::endl;}
//在zkserver上根据指定的path创建znode节点
void ZkClient::Create(const char *path,const char *data,int datalen,int state)
{char path_buffer[128];int bufferlen=sizeof(path_buffer);int flag;//检查该节点是否存在flag=zoo_exists(m_zhandle,path,0,nullptr);if(ZNONODE==flag)//该节点并不存在{//创建指定path的znode节点flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen);if(flag==ZOK){std::cout<<"znode create success... path:"<<path<<std::endl;}else{std::cout<<"flag:"<<flag<<std::endl;std::cout<<"znode create error... path:"<<path<<std::endl;exit(EXIT_FAILURE);}}
}
//传入参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{char buffer[64];int bufferlen=sizeof(buffer);int flag=zoo_get(m_zhandle,path,0,buffer,&bufferlen,nullptr);//获取信息与返回值if(flag!=ZOK)//如果获取失败{std::cout<<"get znode error... path:"<<path<<std::endl;return "";}else{//获取成功return buffer;}}

watcher 机制就是ZooKeeper客户端对某个 znode 建立一个watcher事件,当该znode发生变化时,这些ZK客户端会收到ZK服务端的通知,然后ZK客户端根据znode的变化来做出业务上的改变。

ZooKeeper服务端收到来自客户端 callee 的连接请求后,服务端为节点创建会话(此时这个节点状态发生改变),服务端会返回给客户端callee一个事件通知,然后触发watcher回调(执行global_watcher函数).

总结

深入浅出RPC服务(一)RPC来源-论文解读
深入浅出RPC服务(二)不同层的网络协议
RPC 详解
RPC——RPC协议介绍及原理详解
C++实现轻量级RPC分布式网络通信框架
https://blog.csdn.net/weixin_52344401/article/details/131343863

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/842961.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Freertos的工训机器人

一. 工训机器人 V1 1. 实物 将自制的F4开发板放置车底板下方&#xff0c;节省上方空间&#xff0c;且能保证布线方便整齐。 2. SW仿真 使用SolidWorks进行仿真&#xff0c;且绘制3D打印件。 工训仿真 3.3D打印爪测试 机械爪测试 二. 工训机器人 V2 1. 实物 工训机器人V2不同于…

国密协议网关与IPSec VPN技术:保障数据安全传输的新途径

国密协议网关IPSec VPN隧道技术是一种结合了国家密码管理局&#xff08;简称国密&#xff09;的加密算法和IPSec VPN隧道技术的安全通信解决方案。 IPSec&#xff08;Internet Protocol Security&#xff09;是互联网协议安全的一种标准&#xff0c;用于保护网络通信的安全性和…

共筑信创新生态:DolphinDB 与移动云 BC-Linux 完成兼容互认

近日&#xff0c;DolphinDB 数据库软件 V2.0 与中国移动通信集团公司的移动云天元操作系统 BC-Linux 完成兼容性适配认证。经过双方共同严格测试&#xff0c;DolphinDB 性能及稳定性等各项指标表现优异&#xff0c;满足功能及兼容性测试要求。 此次 DolphinDB 成功通过移动云 B…

微服务-Nacos-安装-集成SpringBoot

微服务-SpringCloud-ALibaba-Nacos Nacos 是阿里巴巴推出的 SpringCloud的组件 官网:什么是 Nacos 主要是为了解决微服务的架构中 服务治理的问题服务治理就是进行服务的自动化管理&#xff0c;其核心是服务的注册与发现。 服务注册&#xff1a;服务实例将自身服务信息注册…

使用BigDecimal定义的实体类字段返回给前台的是字符串类型,如何返回数字类型

目录 前言&#xff1a; 问题现象&#xff1a; 解决方法&#xff1a; 效果&#xff1a; 前言&#xff1a; 做项目的时候数据字段通常定义为bigdecimal类型&#xff0c;方便进行运算&#xff0c;但是发现接口调用后返回给前台的是字符串&#xff0c;这篇博文讲的是如何将定义…

1109 擅长C(测试点0,1,2,3)

当你被面试官要求用 C 写一个“Hello World”时&#xff0c;有本事像下图显示的那样写一个出来吗&#xff1f; ..C.. .C.C. C...C CCCCC C...C C...C C...C CCCC. C...C C...C CCCC. C...C C...C CCCC. .CCC. C...C C.... C.... C.... C...C .CCC. CCCC. C...C C...C C...C C…

【香橙派 AIpro】OrangePi AIpro :教育、机器人、无人机领域的超级AI大脑,华为昇腾处理器驱动的AI开发板新标杆

【OrangePi AIpro&#xff1a;教育、机器人、无人机领域的超级AI大脑&#xff0c;华为昇腾处理器驱动的AI开发板新标杆】 文章目录 一、开箱与初印象1. 初印象2. 上手开机3. 安装和运行 TightVNC 远程桌面3.1. 安装 TightVNC 服务器3.2. 启动 VNC 服务器3.3. 在 Windows 上使用…

Java 字符串处理

Java 是一种广泛使用的编程语言&#xff0c;而字符串处理是 Java 编程中非常重要的一部分。Java 提供了丰富的字符串操作功能&#xff0c;通过 String 类和 StringBuilder、StringBuffer 类来处理字符串。 一、Java 字符串的创建 1. 使用字面量 在 Java 中&#xff0c;字符串…

应急响应-网页篡改-技术操作只指南

初步判断 网页篡改事件区别于其他安全事件地明显特点是&#xff1a;打开网页后会看到明显异常。 业务系统某部分出现异常字词 网页被篡改后&#xff0c;在业务系统某部分网页可能出现异常字词&#xff0c;例如&#xff0c;出现赌博、色情、某些违法APP推广内容等。2019年4月…

Oracle创建用户时提示ORA-65096:公用用户名或角色名无效

Oracle创建用户时提示“ORA-65096&#xff1a;公用用户名或角色名无效” 如下图所示&#xff1a; 解决方法&#xff1a;在新增用户名前面加上C##或者c##就可以解决无效问题&#xff0c;具体什么原因还不清楚&#xff0c;需要再研究一下。

一机实现All in one,NAS如何玩转虚拟机!

常言道&#xff0c;中年男人玩具有三宝 充电器、路由器、NAS 你问我NAS的魔力在哪里&#xff1f; 一机实现All in one洒洒水啦 那NAS又如何玩转虚拟机呢? 跟我来 0基础也能轻松get! NAS如何玩转虚拟机 铁威马NAS的VirtualBox的简单易用&#xff0c;可虚拟的系统包括Win…

python核心编程(二)

python面向对象 一、基本理论二、 面向对象在python中实践2.1 如何去定义类2.2 通过类创建对象2.3 属性相关2.4 方法相关 三、python对象的生命周期,以及周期方法3.1 概念3.2 监听对象的生命周期 四、面向对象的三大特性4.1 封装4.2 继承4.2.1 概念4.2.1 目的4.2.2 分类4.2.3 t…

cgicc开发(文件上传)

//cgicc文件上传封装 void UploadSoftware() {// 初始化CGIC环境Cgicc cgi;// 获取上传的文件file_iterator fileIter cgi.getFile("button_browse"); //from表单中,输入为文件属性(typefile)的name属性值if (fileIter cgi.getFiles().end()){ #if (DEBUG true)co…

软件设计师中级 重点 笔记

文章目录 下午题目网络DNS域名解析分类&#xff1a;域名协议简介网络设备 算法软件工程实体联系图&#xff08;E-R图&#xff09; 其它 下午题目 数据流图补充原则 22年下半年真题 更早-真题大全 答题技巧 网络 DNS域名解析分类&#xff1a; 递归查询的顺序&#xff1a;1.本…

电脑重要文件如何加密保护?教你两种方法

加密是保护电脑重要文件的常见方法&#xff0c;可以有效避免文件数据泄露。那么&#xff0c;电脑重要文件该如何加密保护呢&#xff1f;下面小编就来教你两种方法&#xff0c;帮助你解决文件安全问题。 超级加密3000 超级加密3000是一款专业的电脑数据加密软件&#xff0c;可以…

流量被劫持?不怕,轻松Get 防“窃”技巧!

流量劫持是一种恶意行为&#xff0c;攻击者会在用户访问网站时&#xff0c;将其流量重定向到第三方站点上&#xff0c;导致用户访问的不是原始目标站点。这种行为不仅会影响网站的品牌形象&#xff0c;还会导致用户流失和信息泄露等严重后果。本文将探讨网站如何应对流量劫持。…

SurfaceFinger layer创建过程

SurfaceFinger layer创建过程 引言 本篇博客重点分析app创建Surface时候&#xff0c;SurfaceFlinger是如何构建对应的Layer的主要工作有那些&#xff01; 这里参考的Android源码是Android 13 aosp&#xff01; app端创建Surface 其核心流程可以分为如下接部分: app使用w,h,fo…

window.location.search取不到值

window.location.search window.location.search没有值的原因&#xff1a; URL中使用了 hash &#xff08;指URL中带有#符号&#xff09;,导致URL后面携带的参数被location.hash截取走了&#xff0c;你再使用window.location.search得到的就是空值 打印 window.location 其实…

收银系统源码--零售连锁店铺如何选择适合自己的收银系统?

如果你现在还认为小便利店只要简单的收款&#xff0c;只有大型的连锁便利店才需要收银软件和管理软件&#xff0c;那你就错了&#xff0c;连锁品牌的便利店是必须要用到专业的收银软件&#xff0c;但是小微型的便利店更应该要用专门的软件&#xff0c; 在各行各业逐步革新互联网…

ORCLE删除数据库文件

在实际操作中很少会去删除数据库文件&#xff0c;但是凡事都有例外&#xff0c;由于一些特殊原因&#xff0c;例如存储方式变化、磁盘空间不够等&#xff0c;需要调整和删除一些无效的数据库文件&#xff0c;本文介绍一下实践出来的一种删除数据库文件的操作方式。 删除前请对数…