在调用远程发布的rpc方法的Login时,我们使用了UserServiceRpc_Stub
类,即fixbug::UserServiceRpc_Stub stub(new MprpcChannel());
,来看看这个类的底层
实际上,是一个RpcChannel
类,进入RpcChannel
类
底层又是一个抽象类,它实现了一个虚函数CallMethod
,在看看Login的底层,
实际上就是调用了这个CallMethod函数,这是一个虚函数,那么我们在远程调用rpc方法的时候,就需要自己写一个channel类去继承RpcChannel
类,然后重写CallMethod
。这样在fixbug::UserServiceRpc_Stub stub(new MprpcChannel());
使用stub来调用Login方法的时候,底层就调用了我们重写的CallMethod
,这就是我们接下来要分析的MprpcChannel类。
MprpcChannel类
底层就一个函数CallMethod
,所有通过stub代理对象调用的rpc方法,统一做rpc方法调用的数据数据序列化和网络传送。
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,google::protobuf::RpcController* controller, const google::protobuf::Message* request,google::protobuf::Message* response, google::protobuf::Closure* done)
{const google::protobuf::ServiceDescriptor* sd=method->service();std::string service_name=sd->name();std::string method_name=method->name();//获取参数的序列化字符串长度 args_sizeuint32_t args_size=0;std::string args_str;if(request->SerializeToString(&args_str)){args_size=args_str.size();}else{controller->SetFailed("Serialize request error!");return;}//定义rpc的请求headermprpc::RpcHeader rpcHeader;rpcHeader.set_service_name(service_name);rpcHeader.set_method_name(method_name);rpcHeader.set_args_size(args_size);uint32_t header_size=0;std::string rpc_header_str;if(rpcHeader.SerializeToString(&rpc_header_str)){header_size=rpc_header_str.size();}else{controller->SetFailed("Serialize rpc header error!");return;}//组织待发送的rpc请求的字符串std::string send_rpc_str;send_rpc_str.insert(0,std::string((char*)&header_size,4));send_rpc_str += rpc_header_str;//rpcheadersend_rpc_str += args_str;//args// 打印调试信息std::cout << "=============================" << std::endl;std::cout << "header_size: " << header_size << std::endl;std::cout << "rpc_header_str: " << rpc_header_str << std::endl;std::cout << "service_name: " << service_name << std::endl;std::cout << "method_name: " << method_name << std::endl;std::cout << "args_str: " << args_str << std::endl;std::cout << "=============================" << std::endl;//使用tcp编程 完成rpc方法的远程调用int clientfd=socket(AF_INET,SOCK_STREAM,0);if(-1==clientfd){char errtxt[512]={0};sprintf(errtxt,"create socket error! errno:%d",errno);controller->SetFailed(errtxt);return;}ZkClient zkCli;zkCli.Start();std::string method_path="/"+service_name+"/"+method_name;//127.0.0.1:8000std::string host_data=zkCli.GetData(method_path.c_str());if(host_data==""){controller->SetFailed(method_path+" is not exist!");return;}int idx=host_data.find(":");if(idx==-1){controller->SetFailed(method_path+" address is invalid!");return;}std::string ip=host_data.substr(0,idx);uint16_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(port);server_addr.sin_addr.s_addr=inet_addr(ip.c_str());//连接rpc服务节点if(-1==connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"connect error! errno:%d",errno);controller->SetFailed(errtxt);return;}//发送rpc请求if(-1==send(clientfd,send_rpc_str.c_str(),send_rpc_str.size(),0)){close(clientfd); char errtxt[512]={0};sprintf(errtxt,"send error! errno:%d",errno);controller->SetFailed(errtxt);return;}//接受rpc请求的响应值char recv_buf[1024]={0};int recv_size=0;if(-1==(recv_size=recv(clientfd,recv_buf,1024,0))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"recv error! errno:%d",errno);controller->SetFailed(errtxt);return;}//反序列化rpc调用的响应数据std::string response_str(recv_buf,0,recv_size);if(!(response->ParseFromArray(recv_buf,recv_size))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"parse error! errno:%s",recv_buf);controller->SetFailed(errtxt);return;}close(clientfd);}
- 通过method,得到服务以及方法的名字
- 在调用端,我们传入了rpc方法的请求参数
fixbug::LoginRequest request;
request.set_name("zhang san");
request.set_pwd("123456");
在user.proto中,我们对LoginRequest 进行如下定义:
message LoginRequest
{bytes name=1;bytes pwd=2;
}
- 所以通过
request->SerializeToString(&args_str)
我们获取了获取参数的序列化字符串长度 args_size; - 根据service_name,method_name,args_size定义rpc的请求header;
- 我们待发送的请求header形式是:header_size+service_name method_name args_size+args,获取header_size以后,组织待发送的rpc请求的字符串;
- 使用tcp编程 完成rpc方法的远程调用,创建套接字 => 获取ip和port => 连接rpc服务节点 => 发送rpc请求 => 接受rpc请求的响应值;
- 反序列化rpc调用的响应数据。
ZooKeeper
在这里,我们又一次看到了ZkClient的使用,在RpcProvider类中,我们也看到过它,它起到一个什么样的作用呢?
ZooKeeper是一个分布式服务框架,为分布式应用提供一致性协调服务的中间件,对于ZooKeeper,我们在之前的博客中进行了简单介绍包括它的安装和简单使用。
在本项目中,callee将对外提供的服务对象及其方法以及网络地址信息注册在ZooKeeper服务上;caller则通过访问ZooKeeper在整个分布式环境中获取自己想要调用的远端服务对象方法在哪一台设备上(网络地址信息),并向该设备直接发送服务方法调用请求。
先来看看ZooKeeper类的封装。
ZooKeeper类
重要成员变量
zhandle_t *m_zhandle;
- zk的客户端句柄
重要成员函数
构造析构
ZkClient::ZkClient():m_zhandle(nullptr){}ZkClient::~ZkClient()
{if(m_zhandle!=nullptr){zookeeper_close(m_zhandle);}
}
- 构造函数将句柄m_zhandle进行初始化为空;
- 析构函数关闭句柄,释放资源。
void Start();
zkclient启动连接zkserver
void ZkClient::Start()
{std::string host=MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");std::string port=MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");std::string connstr=host+":"+port;//zk要求是这样的格式m_zhandle=zookeeper_init(connstr.c_str(),global_watcher,30000,nullptr,nullptr,0);//发起的动作都没有产生过if(nullptr==m_zhandle){std::cout<<"zookeeper_init error!"<<std::endl;exit(EXIT_FAILURE);}//信号量sem_t sem;sem_init(&sem,0,0);//像句柄资源上设置上下文zoo_set_context(m_zhandle,&sem);//阻塞等待global_watcher发送连接成功ZOO_CONNECTED_STATE的信号sem_wait(&sem);std::cout<<"zookeeper_init success!"<<std::endl;
}
- zookeeper_mt:多线程版本
zookeeper的API客户端提供了三个线程
API调用线程
网络I/O线程 zookeeper_init底层直接调用pthread_create(底层是poll)会创建一个线程,专门发起IO操作
watcher回调线程 当客户端接收到zkserver的响应时 - zookeeper_init 异步的 创建会话,注意zookeeper_init成功只代表创建m_zhandle句柄资源成功,不代表zk与zkserver连接成功。
- 通过
zoo_set_context
像句柄资源上设置上下文,即给入信号量来判断是否连接成功,即state==ZOO_CONNECTED_STATE时连接成功。
void global_watcher(zhandle_t *zh,int type,int state,const char *path,void *watcherCtx)
{if(type==ZOO_SESSION_EVENT){if(state==ZOO_CONNECTED_STATE) //zkclient zkserver连接成功{//在句柄上获取信号量sem_t *sem=(sem_t*)zoo_get_context(zh);//给信号量加一sem_post(sem);}}
}
- 连接成功后,通过
zoo_get_context
,获取信号量,唤醒信号量,这时zookeeper_init success!
void Create();
void ZkClient::Create(const char *path,const char *data,int datalen,int state)
{char path_buffer[128];int bufferlen=sizeof(path_buffer);int flag;//判断path表示的znode节点是否存在,存在不在重复创建flag=zoo_exists(m_zhandle,path,0,nullptr);//节点不存在if(ZNONODE==flag){flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen);if(flag==ZOK){std::cout<<"znode create success... path"<<path<<std::endl;}else{std::cout<<"flag:"<<flag<<std::endl;std::cout<<"znode create error... path: "<<path<<std::endl;exit(EXIT_FAILURE);}}
}
- 调用zk底层
zoo_exists
函数,判断path表示的znode节点是否存在,存在的话不在重复创建; - 节点不存在的情况下,创建指定path的znode节点,实际上是对
zoo_create
的调用;注意state 就是底层的flags 默认是永久性节点,如果是ZOO_EPHEMERAL则为临时性;
std::string GetData();
根据参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{char buffer[64];int bufferlen=sizeof(buffer);//同步 ZOK operation completed successfullyint flag=zoo_get(m_zhandle,path,0,buffer,&bufferlen,nullptr);if(flag!=ZOK){std::cout<<"znode get error... path: "<<path<<std::endl;return "";}else{return buffer;}
}
- 调用zk底层
zoo_get
来获取值;
使用:在pcprovider.cc
中
zkCli.Start();for(auto &sp:m_serviceMap)
{std::string service_path="/"+sp.first;zkCli.Create(service_path.c_str(),nullptr,0);for(auto &mp:sp.second.m_methodMap){std::string method_path = service_path+"/"+mp.first;char method_path_data[128]={0};sprintf(method_path_data,"%s:%d",ip.c_str(),port);zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);}
}
- 在这里我们定义 服务名 service_name 为永久性节点 ,不会随着服务器的宕机等被zk删除,也就是说当RpcServer与ZooKeeper断开连接后,整个节点还是会存在。而将方法名 method_name定义为临时节点,服务不在运行时就不要这个节点了,否则会造成还有这个方法的假象。
- 服务名节点的创建 service_path,例如:/UserServiceRpc
- 方法名节点的创建 method_path,例如/UserServiceRpc/Login;
- 将服务的ip和port存储为节点的数据,
sprintf(method_path_data,"%s:%d",ip.c_str(),port);
;
使用:在mprpcchannel.cc
中
ZkClient zkCli;
zkCli.Start();std::string method_path="/"+service_name+"/"+method_name;//127.0.0.1:8000
std::string host_data=zkCli.GetData(method_path.c_str());
if(host_data=="")
{controller->SetFailed(method_path+" is not exist!");return;
}int idx=host_data.find(":");
if(idx==-1)
{controller->SetFailed(method_path+" address is invalid!");return;
}std::string ip=host_data.substr(0,idx);
uint16_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());
- 根据
method_path="/"+service_name+"/"+method_name;
我们在zk中查找是否有服务器提供此类方法,如果有拿到ip以及port。
在这里,我们对zookeeper的使用时比较简单的,实际上就是就是对zookeeper库的zoo_init、zoo_create、zoo_get等方法的封装,为RpcServer提供简易的接口,实现RpcServer连接ZooKeeper(ZkClient::Start()
)、RpcServer在ZooKeeper上创建节点(ZkClient::Create()
)、RpcServer根据节点路径path(/服务名/方法名)从ZooKeeper服务器上获取节点中携带的数据(ZkClient::GetData()
)。对于更深层次的理解,有需要的伙伴们还是建议去找更加详细的资料或者直接去看源码~