[分布式网络通讯框架]----MprpcChannel以及ZkClient实现

在调用远程发布的rpc方法的Login时,我们使用了UserServiceRpc_Stub类,即fixbug::UserServiceRpc_Stub stub(new MprpcChannel());,来看看这个类的底层
在这里插入图片描述
实际上,是一个RpcChannel类,进入RpcChannel在这里插入图片描述
底层又是一个抽象类,它实现了一个虚函数CallMethod,在看看Login的底层,
在这里插入图片描述
实际上就是调用了这个CallMethod函数,这是一个虚函数,那么我们在远程调用rpc方法的时候,就需要自己写一个channel类去继承RpcChannel类,然后重写CallMethod。这样在fixbug::UserServiceRpc_Stub stub(new MprpcChannel());使用stub来调用Login方法的时候,底层就调用了我们重写的CallMethod,这就是我们接下来要分析的MprpcChannel类。

MprpcChannel类

底层就一个函数CallMethod,所有通过stub代理对象调用的rpc方法,统一做rpc方法调用的数据数据序列化和网络传送。

void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,google::protobuf::RpcController* controller, const google::protobuf::Message* request,google::protobuf::Message* response, google::protobuf::Closure* done)
{const google::protobuf::ServiceDescriptor* sd=method->service();std::string service_name=sd->name();std::string method_name=method->name();//获取参数的序列化字符串长度 args_sizeuint32_t args_size=0;std::string args_str;if(request->SerializeToString(&args_str)){args_size=args_str.size();}else{controller->SetFailed("Serialize request error!");return;}//定义rpc的请求headermprpc::RpcHeader rpcHeader;rpcHeader.set_service_name(service_name);rpcHeader.set_method_name(method_name);rpcHeader.set_args_size(args_size);uint32_t header_size=0;std::string rpc_header_str;if(rpcHeader.SerializeToString(&rpc_header_str)){header_size=rpc_header_str.size();}else{controller->SetFailed("Serialize rpc header error!");return;}//组织待发送的rpc请求的字符串std::string send_rpc_str;send_rpc_str.insert(0,std::string((char*)&header_size,4));send_rpc_str += rpc_header_str;//rpcheadersend_rpc_str += args_str;//args// 打印调试信息std::cout << "=============================" << std::endl;std::cout << "header_size: " << header_size << std::endl;std::cout << "rpc_header_str: " << rpc_header_str << std::endl;std::cout << "service_name: " << service_name << std::endl;std::cout << "method_name: " << method_name << std::endl;std::cout << "args_str: " << args_str << std::endl;std::cout << "=============================" << std::endl;//使用tcp编程 完成rpc方法的远程调用int clientfd=socket(AF_INET,SOCK_STREAM,0);if(-1==clientfd){char errtxt[512]={0};sprintf(errtxt,"create socket error! errno:%d",errno);controller->SetFailed(errtxt);return;}ZkClient zkCli;zkCli.Start();std::string method_path="/"+service_name+"/"+method_name;//127.0.0.1:8000std::string host_data=zkCli.GetData(method_path.c_str());if(host_data==""){controller->SetFailed(method_path+" is not exist!");return;}int idx=host_data.find(":");if(idx==-1){controller->SetFailed(method_path+" address is invalid!");return;}std::string ip=host_data.substr(0,idx);uint16_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(port);server_addr.sin_addr.s_addr=inet_addr(ip.c_str());//连接rpc服务节点if(-1==connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"connect error! errno:%d",errno);controller->SetFailed(errtxt);return;}//发送rpc请求if(-1==send(clientfd,send_rpc_str.c_str(),send_rpc_str.size(),0)){close(clientfd); char errtxt[512]={0};sprintf(errtxt,"send error! errno:%d",errno);controller->SetFailed(errtxt);return;}//接受rpc请求的响应值char recv_buf[1024]={0};int recv_size=0;if(-1==(recv_size=recv(clientfd,recv_buf,1024,0))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"recv error! errno:%d",errno);controller->SetFailed(errtxt);return;}//反序列化rpc调用的响应数据std::string response_str(recv_buf,0,recv_size);if(!(response->ParseFromArray(recv_buf,recv_size))){close(clientfd);char errtxt[512]={0};sprintf(errtxt,"parse error! errno:%s",recv_buf);controller->SetFailed(errtxt);return;}close(clientfd);}
  • 通过method,得到服务以及方法的名字
  • 在调用端,我们传入了rpc方法的请求参数
fixbug::LoginRequest request;
request.set_name("zhang san");
request.set_pwd("123456");

在user.proto中,我们对LoginRequest 进行如下定义:

message LoginRequest
{bytes name=1;bytes pwd=2;
}
  • 所以通过request->SerializeToString(&args_str)我们获取了获取参数的序列化字符串长度 args_size;
  • 根据service_name,method_name,args_size定义rpc的请求header;
  • 我们待发送的请求header形式是:header_size+service_name method_name args_size+args,获取header_size以后,组织待发送的rpc请求的字符串;
  • 使用tcp编程 完成rpc方法的远程调用,创建套接字 => 获取ip和port => 连接rpc服务节点 => 发送rpc请求 => 接受rpc请求的响应值;
  • 反序列化rpc调用的响应数据。

ZooKeeper

在这里,我们又一次看到了ZkClient的使用,在RpcProvider类中,我们也看到过它,它起到一个什么样的作用呢?
 ZooKeeper是一个分布式服务框架,为分布式应用提供一致性协调服务的中间件,对于ZooKeeper,我们在之前的博客中进行了简单介绍包括它的安装和简单使用。
 在本项目中,callee将对外提供的服务对象及其方法以及网络地址信息注册在ZooKeeper服务上;caller则通过访问ZooKeeper在整个分布式环境中获取自己想要调用的远端服务对象方法在哪一台设备上(网络地址信息),并向该设备直接发送服务方法调用请求。
 先来看看ZooKeeper类的封装。

ZooKeeper类

重要成员变量
zhandle_t *m_zhandle;
  • zk的客户端句柄
重要成员函数
构造析构
ZkClient::ZkClient():m_zhandle(nullptr){}ZkClient::~ZkClient()
{if(m_zhandle!=nullptr){zookeeper_close(m_zhandle);}
}
  • 构造函数将句柄m_zhandle进行初始化为空;
  • 析构函数关闭句柄,释放资源。
void Start();

zkclient启动连接zkserver

void ZkClient::Start()
{std::string host=MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");std::string port=MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");std::string connstr=host+":"+port;//zk要求是这样的格式m_zhandle=zookeeper_init(connstr.c_str(),global_watcher,30000,nullptr,nullptr,0);//发起的动作都没有产生过if(nullptr==m_zhandle){std::cout<<"zookeeper_init error!"<<std::endl;exit(EXIT_FAILURE);}//信号量sem_t sem;sem_init(&sem,0,0);//像句柄资源上设置上下文zoo_set_context(m_zhandle,&sem);//阻塞等待global_watcher发送连接成功ZOO_CONNECTED_STATE的信号sem_wait(&sem);std::cout<<"zookeeper_init success!"<<std::endl;
}
  • zookeeper_mt:多线程版本
    zookeeper的API客户端提供了三个线程
    API调用线程
    网络I/O线程 zookeeper_init底层直接调用pthread_create(底层是poll)会创建一个线程,专门发起IO操作
    watcher回调线程 当客户端接收到zkserver的响应时
  • zookeeper_init 异步的 创建会话,注意zookeeper_init成功只代表创建m_zhandle句柄资源成功,不代表zk与zkserver连接成功。
  • 通过zoo_set_context像句柄资源上设置上下文,即给入信号量来判断是否连接成功,即state==ZOO_CONNECTED_STATE时连接成功。
void global_watcher(zhandle_t *zh,int type,int state,const char *path,void *watcherCtx)
{if(type==ZOO_SESSION_EVENT){if(state==ZOO_CONNECTED_STATE) //zkclient zkserver连接成功{//在句柄上获取信号量sem_t *sem=(sem_t*)zoo_get_context(zh);//给信号量加一sem_post(sem);}}
}
  • 连接成功后,通过zoo_get_context,获取信号量,唤醒信号量,这时zookeeper_init success!
void Create();
void ZkClient::Create(const char *path,const char *data,int datalen,int state)
{char path_buffer[128];int bufferlen=sizeof(path_buffer);int flag;//判断path表示的znode节点是否存在,存在不在重复创建flag=zoo_exists(m_zhandle,path,0,nullptr);//节点不存在if(ZNONODE==flag){flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen);if(flag==ZOK){std::cout<<"znode create success... path"<<path<<std::endl;}else{std::cout<<"flag:"<<flag<<std::endl;std::cout<<"znode create error... path: "<<path<<std::endl;exit(EXIT_FAILURE);}}
}
  • 调用zk底层zoo_exists函数,判断path表示的znode节点是否存在,存在的话不在重复创建;
  • 节点不存在的情况下,创建指定path的znode节点,实际上是对zoo_create的调用;注意state 就是底层的flags 默认是永久性节点,如果是ZOO_EPHEMERAL则为临时性;
std::string GetData();

根据参数指定的znode节点路径,获取znode节点的值

std::string ZkClient::GetData(const char *path)
{char buffer[64];int bufferlen=sizeof(buffer);//同步 ZOK operation completed successfullyint flag=zoo_get(m_zhandle,path,0,buffer,&bufferlen,nullptr);if(flag!=ZOK){std::cout<<"znode get error... path: "<<path<<std::endl;return "";}else{return buffer;}
}
  • 调用zk底层zoo_get来获取值;
使用:在pcprovider.cc
zkCli.Start();for(auto &sp:m_serviceMap)
{std::string service_path="/"+sp.first;zkCli.Create(service_path.c_str(),nullptr,0);for(auto &mp:sp.second.m_methodMap){std::string method_path = service_path+"/"+mp.first;char method_path_data[128]={0};sprintf(method_path_data,"%s:%d",ip.c_str(),port);zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);}
}
  • 在这里我们定义 服务名 service_name 为永久性节点 ,不会随着服务器的宕机等被zk删除,也就是说当RpcServer与ZooKeeper断开连接后,整个节点还是会存在。而将方法名 method_name定义为临时节点,服务不在运行时就不要这个节点了,否则会造成还有这个方法的假象。
  • 服务名节点的创建 service_path,例如:/UserServiceRpc
  • 方法名节点的创建 method_path,例如/UserServiceRpc/Login;
  • 将服务的ip和port存储为节点的数据,sprintf(method_path_data,"%s:%d",ip.c_str(),port);
使用:在mprpcchannel.cc
ZkClient zkCli;
zkCli.Start();std::string method_path="/"+service_name+"/"+method_name;//127.0.0.1:8000
std::string host_data=zkCli.GetData(method_path.c_str());
if(host_data=="")
{controller->SetFailed(method_path+" is not exist!");return;
}int idx=host_data.find(":");
if(idx==-1)
{controller->SetFailed(method_path+" address is invalid!");return;
}std::string ip=host_data.substr(0,idx);
uint16_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());
  • 根据method_path="/"+service_name+"/"+method_name;我们在zk中查找是否有服务器提供此类方法,如果有拿到ip以及port。

在这里,我们对zookeeper的使用时比较简单的,实际上就是就是对zookeeper库的zoo_init、zoo_create、zoo_get等方法的封装,为RpcServer提供简易的接口,实现RpcServer连接ZooKeeper(ZkClient::Start())、RpcServer在ZooKeeper上创建节点(ZkClient::Create())、RpcServer根据节点路径path(/服务名/方法名)从ZooKeeper服务器上获取节点中携带的数据(ZkClient::GetData())。对于更深层次的理解,有需要的伙伴们还是建议去找更加详细的资料或者直接去看源码~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36680.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Amazon OpenSearch Service 现在支持 JSON Web Token(JWT)身份验证和授权

最近&#xff0c;Amazon OpenSearch 推出了一个新功能&#xff0c;支持 JWT 认证和授权。虽然这个功能在开源的 OpenSearch 中早已存在&#xff0c;但在托管的 Amazon OpenSearch 中的实现一直不够理想。 此前的授权方式 控制台登录 内部数据库&#xff1a;使用基本的用户名…

Android开发系列(十一)Jetpack Compose之Dialog

Dialogs是在应用程序中显示一些额外信息或进行用户交互的常见功能。Jetpack Compose中的Dialog可以通过使用AlertDialog组件来创建。 基本用法 下面通过示例来了解Dialog的使用。 OptIn(ExperimentalMaterial3Api::class) Composable fun AlertDialogExample(onDismissReques…

Redis 7.x 系列【9】数据类型之自动排重集合(Set)

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 前言2. 常用命令2.1 SADD2.2 SCARD2.3 SISMEMBER2.4 SREM2.5 SSCAN2.6 SDIFF2.7 SU…

DiAtom 共生菌固氮作用产生的碳输出(ANACONDAS)

Amazon iNfluence on the Atlantic: CarbOn export from Nitrogen fixation by DiAtom Symbioses (ANACONDAS) 亚马逊对大西洋的影响&#xff1a;DiAtom 共生菌固氮作用产生的碳输出&#xff08;ANACONDAS&#xff09; 简介 该研究项目探讨了亚马逊河羽流对热带北大西洋西部…

ECharts 源码代码规范

代码规范 - Apache EChartsApache ECharts&#xff0c;一款基于JavaScript的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。https://echarts.apache.org/zh/coding-standard.html 源文件 [强制] JavaScr…

STM32-hal库学习(4)--usart/uart通信 (同时显示在oled)

前言&#xff1a; 关于usart详解&#xff1a; stm32-USART通信-CSDN博客 因为在oled上显示&#xff0c;我们直接在上一个工程进行修改&#xff1a; STM32_hal库学习&#xff08;3&#xff09;-OLED显示-CSDN博客 其他配置与oled显示工程保持不变&#xff0c;打开oled文件的…

并发编程工具集——Lock和Condition(上)(十二)

简述&#xff1a;Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程&#xff0c;其中 Lock 用于解决互斥问题&#xff0c;Condition 用于解决同步问题。 再造管程的理由和期望 理由&#xff1a;synchronized 没有办法解决“破坏不可抢占条件方案”。 原因是synchroniz…

Linux kernel 与 设备树

Linux kernel 与 设备树 1 介绍1.1 概述1.2 发展历程1.3 各版本发布时间及特色1.4 Linux 单内核1.5 Linux 内核网址1.6 NXP 官方镜像与 野火 鲁班猫镜像的区别 2 Linux 内核组成2.1 进程管理2.2 内存管理2.3 文件系统2.4 设备管理2.5 网络功能 3 Linux 内核编译3.1 编译 Kernel…

小程序发布必须进行软件测试吗?测试内容有哪些?

在如今移动互联网时代&#xff0c;小程序已成为许多企业广泛采用的一种营销手段&#xff0c;然而&#xff0c;发布小程序之前进行充分的软件测试是至关重要的&#xff0c;因为它不仅可以确保小程序的质量&#xff0c;还可以避免潜在的风险和损失。 在进行小程序发布前进行软件…

可逆质子陶瓷电化学电池(R-PCEC)作为新型能量存储与转换装置开发应用价值大

可逆质子陶瓷电化学电池&#xff08;R-PCEC&#xff09;作为新型能量存储与转换装置开发应用价值大 可逆质子陶瓷电化学电池&#xff08;R-PCEC&#xff09;&#xff0c;同时具有燃料电池与电解槽功能&#xff0c;能够实现电能、化学能相互转化&#xff0c;是最具有发展前景的能…

「51媒体」政企活动媒体宣发如何做?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体宣传加速季&#xff0c;100万补贴享不停&#xff0c;一手媒体资源&#xff0c;全国100城线下落地执行。详情请联系胡老师。 政企活动媒体宣发是一个系统性的过程&#xff0c;需要明确…

餐饮冷库安全守护神:可燃气体报警器检定的科学性与有效性

随着餐饮业的快速发展&#xff0c;冷库成为储存食材、保证食品质量的重要场所。 然而&#xff0c;由于冷库环境的特殊性&#xff0c;如密封性强、温度低、湿度大等&#xff0c;一旦冷库内发生可燃气体泄露&#xff0c;后果将不堪设想。因此&#xff0c;在餐饮冷库中安装并合理…

SpringBoot集成道历(实现道历日期查询)

官网地址&#xff1a;官网地址https://6tail.cn/calendar/api.html 1、导入依赖 <dependency><groupId>cn.6tail</groupId><artifactId>lunar</artifactId><version>1.3.9</version></dependency><dependency><group…

MQTT遗嘱信息(2)

接前一篇文章&#xff1a;MQTT遗嘱信息&#xff08;1&#xff09; 本文内容参考&#xff1a; 什么是MQTT遗嘱消息&#xff1f;如何配置和处理遗嘱消息&#xff1f;_mqtt last will-CSDN博客 MQTT 协议学习&#xff1a;Retained&#xff08;保留消息&#xff09; 与 LWT&#x…

shark云原生-日志管理体系-filebeat

文章目录 1. deploy 文件1.1 RBAC1.2. DaemonSet1.2.1. Elasticsearch 连接信息1.2.2. Volume 1.3. ConfigMap1.3.1. 日志收集路径1.3.2. 日志事件输出目标 2. 在控制平面节点上运行Filebeat3. 查看输出3.1. 关于处理器 processors 4. 日志收集配置4.1. 手动指定日志收集路径4.…

基于SpringBoot前后端分离旅游出行管理系统设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f;感兴趣的可以先收藏起来&#xff0c;还…

聚观早报 | iPhone 16核心硬件曝光;三星Galaxy全球新品发布会

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 6月28日消息 iPhone 16核心硬件曝光 三星Galaxy全球新品发布会 苹果正多方下注布局AI商店 黄仁勋2024年薪酬3400…

Zynq7000系列FPGA中的DMA控制器简介(一)

DMA控制器&#xff08;DMAC&#xff09;使用64位AXI主接口来执行与系统存储器和PL外围设备之间的DMA数据传输&#xff0c;操作频率同CPU_2x的时钟速率。传输由DMA指令执行引擎控制。DMA引擎运行在一个小指令集上&#xff0c;该指令集提供了一种灵活的指定DMA传输的方法。这种方…

【电路笔记】-MOSFET放大器

MOSFET放大器 文章目录 MOSFET放大器1、概述2、电路图3、电气特性3.1 ** I D = F ( V G S ) I_D=F(V_{GS}) ID​=F(VGS​)**特性3.2 I D = F ( V D S ) I_D=F(V_{DS}) ID​=F(VDS​)特性4、MOSFET放大器5、输入和输出电压6、电压增益7、总结1、概述 在前面的文章中,我们已经…

基本的 Spring Boot 配置步骤和常见的配置项【创建,配置,日志,数据库,安全,MVC】

基本的 Spring Boot 配置步骤和常见的配置项【创建&#xff0c;配置&#xff0c;日志&#xff0c;数据库&#xff0c;安全&#xff0c;MVC】 学习总结 1、掌握 JAVA入门到进阶知识(持续写作中……&#xff09; 2、学会Oracle数据库入门到入土用法(创作中……&#xff09; 3、…