[分布式网络通讯框架]----RpcProvider实现

在上一节userservice.cc的主函数中,我们初始化以后实例化了一个RpcProvider对象provider。接着调用了它的NotifyService(new UserService)方法,将UserService服务对象及其提供的方法进行预备发布。发布完服务对象后再调用Run()就将预备发布的服务对象及方法注册到ZooKeeper上并开启了对远端调用的网络监听。接下来我们看看RpcProvider的具体实现。

rpcprovider

该类是Rpc框架提供的专门发布RPC服务方法的网络对象类。

重要成员变量

muduo::net::EventLoop m_eventLoop;struct ServiceInfo
{google::protobuf::Service *m_service; std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;
};std::unordered_map<std::string,ServiceInfo> m_serviceMap;
  • EventLoop大家都陌生
  • ServiceInfo类,组织了一个service服务类型信息,里面包含了服务对象m_service,以及服务对象方法m_methodMap,在user.proto中注册的rpc远端调用方法Login和Register都是用google::protobuf::MethodDescriptor类来描述的。
  • m_serviceMap 存储注册成功的服务对象和其服务方法的所有信,一台服务器上可能会提供多个Service服务对象,m_serviceMap存储了多个Service_Info结构体。

重要成员函数

void NotifyService();

这里是框架提供给外部使用的,可以发布rpc方法的函数接口,它的参数是google::protobuf::Service *service决定了也可以接受任意的service。

为什么要使用google::protobuf::Service *service呢?
userservice.cc中我们知道UserService是继承自UserServiceRpc,而UserServiceRpc又是继承自google::protobuf::Service类。
在这里插入图片描述
这就是C++的多态设计,rpcprovider作为Rpc通信框架的一部分,是服务于业务层的,我们不能让其只服务与某一个业务,即void NotifyService(UserService *service);,对于不同的业务我们再去定义其他的类。所以protobuf就提供了google::protobuf::Service基类来描述服务对象。传递对象的时候传递基类指针指向派生类实例,使Rpc框架中定义的类方法解耦于业务层,这样就可以接受任意类型的service。

void RpcProvider::NotifyService(google::protobuf::Service *service)
{ServiceInfo service_info;// 获取了服务对象的描述信息const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();// 获取服务的名字std::string service_name = pserviceDesc->name();// 获取服务对象service对象的方法的数量int methodCnt = pserviceDesc->method_count();// std::cout << "service_name:" << service_name << std::endl;LOG_INFO("service_name:%s",service_name.c_str());for (int i = 0; i < methodCnt; i++){// 获取了服务对象指定下标的服务方法的描述(抽象描述)const google::protobuf::MethodDescriptor *pmethodDesc = pserviceDesc->method(i);std::string method_name = pmethodDesc->name();service_info.m_methodMap.insert({method_name, pmethodDesc});//std::cout << "method_name:" << method_name << std::endl;LOG_INFO("method_name:%s",method_name.c_str());}service_info.m_service = service;m_serviceMap.insert({service_name, service_info});
}
  • 定义了一个ServiceInfo对象service_info,用来保存service服务类型信息。
  • ServiceDescriptor对象pserviceDesc通过底层的GetDescriptor()函数来获取给定的消息对象的描述符,通过pserviceDesc,调用底层的方法我们可以获得服务的名字以及对应的方法数量。
  • 通过循环,得到方法对应的名字和方法的描述放入结构体service_info的m_methodMap中。
  • 最后将service_name, service_info一起放入m_serviceMap中。这样我们就获得了服务对应的方法以及方法对应的描述。
void Run();

负责启动rpc服务节点,开始提供rpc远程网络调用服务

void RpcProvider::Run()
{std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());muduo::net::InetAddress address(ip, port);// 创建TcpServer对象muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");// 绑定连接回调和消息读写回调的方法 分离了网络代码和业务代码server.setConnectionCallback(std::bind(&RpcProvider::OnConnection,this, std::placeholders::_1));server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置muduo库的线程数量server.setThreadNum(4);ZkClient zkCli;// 连接zk服务zkCli.Start();for(auto &sp:m_serviceMap){//    /service_name   /UserServiceRpcstd::string service_path="/"+sp.first;zkCli.Create(service_path.c_str(),nullptr,0);for(auto &mp:sp.second.m_methodMap){std::string method_path = service_path+"/"+mp.first;char method_path_data[128]={0};sprintf(method_path_data,"%s:%d",ip.c_str(),port);zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);}}std::cout << "RpcProvider statrt service at ip: " << ip<< " port: " << port << std::endl;// 启动网络服务server.start();m_eventLoop.loop();
}
  • 调用MprpcApplication的方法获取了响应的ip和port,接下来就是我们在muduo库中剖析的网络通讯过程,得到ip和port组装了address,创建tcpserver对象,注册连接回调和消息读写回调的方法,分离网络代码和业务代码,设置muduo库的线程数量。
  • 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以在zk上发现服务,关于zk之后会分析到。
  • 启动网络服务
void OnConnection();

连接回调

void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{if (!conn->connected()){// rpc client的连接断开了conn->shutdown();}
}
void OnMessage( );

已建立连接用户的读写事件回调,如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp)
{std::string revc_buf = buffer->retrieveAllAsString();uint32_t header_size = 0;revc_buf.copy((char *)&header_size, 4, 0);std::string rpc_header_str = revc_buf.substr(4, header_size);mprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if (rpcHeader.ParseFromString(rpc_header_str)){// 数据头反序列化成功service_name = rpcHeader.service_name();method_name = rpcHeader.method_name();args_size = rpcHeader.args_size();}else{// 数据头反序列化失败std::cout << "rpc_header_str:" << rpc_header_str<< " parse error!" << std::endl;return;}// 获取rpc方法参数的字符流数据std::string args_str = revc_buf.substr(4 + header_size, args_size);// 获取service对象和method对象auto it = m_serviceMap.find(service_name);if (it == m_serviceMap.end()){// 没有对应的服务对象std::cout << service_name << " is not exist!" << std::endl;return;}auto mit = it->second.m_methodMap.find(method_name);if (mit == it->second.m_methodMap.end()){// 没有对应的服务对象std::cout << service_name << ": "<< method_name << " is not exist!" << std::endl;return;}// 获取service对象 new UserServicegoogle::protobuf::Service *service = it->second.m_service;// 获取method对象 Loginconst google::protobuf::MethodDescriptor *method = mit->second;// 生成rpc方法调用的请求request和响应response参数google::protobuf::Message *request = service->GetRequestPrototype(method).New();if (!request->ParseFromString(args_str)){std::cout << " request parse error, content: " << args_str << std::endl;return;}google::protobuf::Message *response = service->GetResponsePrototype(method).New();google::protobuf::Closure *done=google::protobuf::NewCallback<RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*>(this, &RpcProvider::SendRpcResponse, conn, response);service->CallMethod(method, nullptr, request, response, done);
}
  • 网络上接受的远程rpc调用请求的字符流 ,并从中读取前4个字节的内容,这里我们按照header_size(4个字节)+hear_str+args_str进行存放,前四个字节是服务的名字和方法的名字一起的长度,通过这四个字节,我们可以分辨出来名字和参数。
  • 通过从字符流中读取前4个字节的内容,得到header_size,并根据其读取数据头的原始字符流,反序列化数据。
  • 在定义RpcHeader时我们按照以下结构进行定义,这样通过反序列化,我们就得到了相应的方法以及参数长度
syntax="proto3";
package mprpc;message RpcHeader
{bytes service_name=1;   //类名bytes method_name=2;    //方法名uint32 args_size=3;     //参数长度(参数序列化后的长度)
}
  • 通过service_name以及method_name在之前定义的m_serviceMap中,找到相应的service对象(UserService)和method对象(Login);
  • 生成rpc方法调用的请求request和响应response参数;
  • CallMethod函数中最后一个参数为google::protobuf::Closure *done,这里我们绑定一个Closure的回调函数SendRpcResponse,通过网络把rpc方法执行的结果发送会rpc的调用方。Closure类其实相当于一个闭包。这个闭包捕获了一个成员对象的成员函数例如login函数,以及这个成员函数需要的参数。然后闭包类提供了一个方法Run(),当执行这个闭包对象的Run()函数时,他就会执行捕获到的成员对象的成员函数,也就是相当于执行void RpcProvider::SendRpcResponse(conn, response);,这个函数可以将reponse消息体发送给Tcp连接的另一端,即caller。
  • 也就是在userservice.cc中的Login()函数中,最后调用done->Run(),实际上就是调用了RpcProvider::SendRpcResponse(conn, response);将response消息体作为Login处理结果发送回caller。
    在这里插入图片描述
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{std::string response_str;//response 进行序列化if(response->SerializeToString(&response_str)){//序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方conn->send(response_str);}else{std::cout<<"Serialize response_str error!"<<std::endl;}//模拟http的短链接服务,由rpcprovider主动断开连接conn->shutdown();
}
  • 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法,也就是service->CallMethod(method, nullptr, request, response, done);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/36831.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业级Windows设备日志采集工具

永久免费: 前往Gitee最新版本 更新内容 进一步提升工程师部署采集客户端效率. 打开根Url,自动跳转到部署页面.(原工程师需输入很长的Url);新增复制同类客户端同步任务功能.优化客户端分组操作;文件同步到服务器后,可配置文件名增加时间戳; 介绍 定时全量或增量采集工控机,…

项目分层--简单图书管理系统

分层情况 实体类Book代码 //实体类 public class Book {private int id;private String name;private int bsum;public Book() {}public Book(int id, String name, int bsum) {this.id id;this.name name;this.bsum bsum;}public int getId() {return id;}public void set…

9. Revit API UI: UIView、UIDocument、框选聚焦

9. Revit API UI: UIView、UIDocument、框选聚焦 UI命名空间下的API&#xff0c;到这里差不多就押送讲完了&#xff0c;同Application那篇所讲的几个类与接口&#xff0c;都是带UI的对应了一个不带UI的&#xff0c;如UIApplication和Application&#xff0c;作用呢&#xff0c…

Jenkins 下使用 Node 和 Npm(借助 nvm-wrapper 插件)构建前端程序

一、前言 搭建完Jenkins后&#xff0c;如何使用node进行构建前端呢&#xff0c;多个项目会使用的node的多个版本。如何动态指定node的版本进行构建呢。 方案一&#xff1a; 安装多个node版本&#xff0c;然后进行指定。这样比较麻烦。 方案二&#xff1a; 使用Jenkins的nv…

Spring相关面试题(三)

29 如何在所有的BeanDefinition注册完成后&#xff0c;进行扩展 Bean工厂的后置处理器&#xff0c;在所有的Bean注册完成后&#xff0c;就被执行。 public class A implements BeanFactoryPostProcessor {private String name "a class";private B b; ​public St…

ARM芯片架构(RTOS)

前言&#xff1a;笔记韦东山老师的rtos教程&#xff0c;连接放在最后 #ARM介绍 arm芯片属于精简指令集risc&#xff0c;所用的指令比较简单&#xff0c;ARM架构是一种精简指令集&#xff08;RISC&#xff09;架构&#xff0c;广泛应用于移动设备、嵌入式系统、物联网等领域。AR…

2. jenkins发布java项目

jenkins发布java项目 一、环境描述二、部署tomcat业务服务器三、部署git服务器&#xff0c;上传测试代码1、部署git服务器2、上传测试代码 四、jenkins对接组件1、安装必要的插件2、对接git客户端3、对接maven工具4、配置maven需要的jdk5、配置gitlab服务器的连接6、在jenkins上…

【Django】网上蛋糕项目商城-首页

概念 本文在上一文章搭建完数据库&#xff0c;以及创建好项目之后&#xff0c;以及前端静态文件后&#xff0c;对项目的首页功能开发。 后端代码编写 这里我们使用pymysql模块对数据库进行操作&#xff0c;获取数据。因此需要在dos窗口使用以下指令下载该库文件 pip instal…

新型发电系统——光伏行业推动能源转型

一、发展背景 “十四五”期间&#xff0c;随着“双碳”目标提出及逐步落实&#xff0c;本就呈现出较好发展势头的分布式光伏发展有望大幅提速。就“十四五”光伏发展规划&#xff0c;国家发改委能源研究所可再生能源发展中心副主任陶冶表示&#xff0c;“双碳”目标意味着国家…

MySQL改密

这里写目录标题 更改登录密码&#xff1a;有权限账号能登录mysql中&#xff1a;有权限账号不能登录mysql中&#xff1a;mysql5.6版本命令mysql5.7版本命令修改密码8.0版本改完后&#xff1a; mysql登录不上了本机安装了5.6后&#xff0c;又安装了mysql8.0 更改登录密码&#xf…

易查分小程序丨查询开始和截止时间如何设置?

老师在发布查询时&#xff0c;希望让学生家长在指定的时间段才能查询&#xff0c;应该如何实现&#xff1f; 通过查询时段功能&#xff0c;老师可以自主设置查询开始和截止时间&#xff0c;下面就来教给大家如何使用吧&#xff01; 设置查询时段演示效果 &#x1f4cc;使用教程…

ASP.NETMVC-简单例子-数据库查询+razor使用+项目发布

环境&#xff1a; win10&#xff0c;SQL Server 2008 R2 参考&#xff1a; asp.net mvc框架之EF的使用 - black娃 - 博客园 https://www.cnblogs.com/fjiqiang/p/11131365.html 目录 数据库查询要求思路操作 razor使用项目发布要求实现 数据库查询 要求 从服务器的数据库中查…

干货分享 | 学会这7个工具方法,数字化转型规划不是难题

提到数字化转型&#xff0c;首要做的便是分析企业现有的业务流程和价值流&#xff0c;发现企业利润来源的关键点&#xff0c;进而有针对性的数字化转型。要实现传统业务向数字化业务的转变&#xff0c;制定出高效、灵活的业务流程优化策略显得至关重要&#xff0c;这样才能找到…

LED裸眼3D显示屏:开启视觉新体验

随着科技的不断进步&#xff0c;LED显示屏作为一种新型的显示技术&#xff0c;已经被广泛应用于各个领域。而其中&#xff0c;LED裸眼3D显示屏更是因其独特的技术原理和令人震撼的视觉效果&#xff0c;成为了业界关注的焦点。 裸眼3D显示屏是一种前沿的显示技术&#xff0c;它…

Java | Leetcode Java题解之第201题数字范围按位与

题目&#xff1a; 题解&#xff1a; class Solution {public int rangeBitwiseAnd(int m, int n) {while (m < n) {// 抹去最右边的 1n n & (n - 1);}return n;} }

获取当前操作系统的名称platform.system()

【小白从小学Python、C、Java】 【考研初试复试毕业设计】 【Python基础AI数据分析】 获取当前操作系统的名称 platform.system() [太阳]选择题 在Python中&#xff0c;platform.system() 函数被用来获取什么信息&#xff1f; import platform print("【执行】platform.s…

05 threeJs基础---阵列立方体和相机适配体验立方体

1.增加相机视角fov 注&#xff1a; 范围更大&#xff0c;意味着可以看到渲染范围更大&#xff0c;远小近大的视觉效果更明显 fov:眼球张开的角度&#xff0c;0时相当于闭眼。aspect:可视区域横纵比。near:眼睛能看到的最近垂直距离。far&#xff1a;眼睛能看到的最远垂直距离。…

由于没有远程桌面授权服务器怎么办?

在现代的工作环境中&#xff0c;远程访问和远程桌面控制已经成为一项日益重要的需求。随着企业和组织的扩张&#xff0c;人们经常需要在不同的地点之间共享文件和应用程序。由于缺乏远程桌面授权服务器&#xff0c;这一过程可能会变得困难和不安全。 远程桌面授权服务器是一种…

day02-登录模块-主页鉴权

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.分析登录流程1.1传统思路是登录校验通过之后&#xff0c;直接调用接口&#xff0c;获取token之后&#xff0c;跳转到主页1.2vue-element-admin模板的登录思路&…

基于盲信号处理的声音分离-基于改进的信息最大化的ICA算法

基于信息最大化的ICA算法的主要依据是使输入端与输出端的互信息达到最大&#xff0c;且输出各个分量之间的相关性最小化&#xff0c;即输出各个分量之间互信息量最小化&#xff0c;其算法的系统框图如图所示。 基于信息最大化的ICA算法的主要依据是使输入端与输出端的互信息达到…