一.消息分发Dispatcher实现
Dispatcher 就是“消息分发中枢”:根据消息类型 MType,把消息派发给对应的处理函数(Handler)执行。
初版:
#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;void registerHandler(MType mtype,const MessageCallback &handler){std::unique_lock<std::mutex> lock(_mutex);_handlers.insert(std::make_pair(mtype,handler));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息类型对应的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second(conn,msg);return;}//没有找到指定类型的处理回调 但我们客户端和服务端都是我们自己设计的 因此不可能出现这种情况ELOG("收到未知类型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,MessageCallback> _handlers;};
}
使用方法 :服务端为例
#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"void onRpcRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Rpc请求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::RpcResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);rpc_rsp->setResult(33);conn->send(rpc_rsp);
}
void onTopicRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Topic请求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::TopicResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);conn->send(rpc_rsp);
}int main()
{//server建立收到rpc topic请求时对应的调用函数auto dispatcher=std::make_shared<wws::Dispatcher>();dispatcher->registerHandler(wws::MType::REQ_RPC,onRpcRequest);//注册映射关系dispatcher->registerHandler(wws::MType::REQ_TOPIC,onTopicRequest);//注册映射关系auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}
回调函数调用过程:
1.服务端定义了两个函数onRpcRequest收到rpc请求的回调函数,onTopicRequest收到topic请求的回调函数(两个函数的参数部分都是一样的)。
2.创建Dispatcher类对象,调用registerHandler函数把请求类型(mtype)和对应的回调函数建立映射(因为上面回调函数的类型都是一样的,所以可以用map进行同一管理)。
3.把Dispatcher::onMessage函数设置成消息回调函数。在Dispatcher::onMessage函数内部会根据不同的消息类型,找到对应的回调函数并进行调用。
但是设置的两个回调函数中onRpcRequest,onTopicRequest。它们第二个参数类型都是基类BaseMessage,虽然传入的对象是子类对象RpcResponse TopicResponse,但仍无法访问它们对应子类的成员函数。
如果把第二个参数基类Base换成它们对应的子类,能访问了,但这就会导致函数的类型不一样了,就不能用map进行统一管理。
有没有什么办法即能用map进行统一管理,还能在回调函数中调用到子类的函数。
方法一:直接在回调函数中通过
dynamic_cast
/ std::dynamic_pointer_cast将父类->子类可以通过
dynamic_cast
将基类指针(或引用)转换为子类类型,以便访问子类特有的成员函数。前提是你使用的是多态(polymorphic)类,也就是基类至少要有一个虚函数//Base基类 Derived子类 void test(Base* basePtr) {// 裸指针写法Derived* derivedPtr = dynamic_cast<Derived*>(basePtr);if (derivedPtr) {derivedPtr->specialFunction(); // 成功转换,调用子类函数} else {cout << "dynamic_cast failed!" << endl;} } //智能指针写法 std::shared_ptr<Base> basePtr = std::make_shared<Derived>(); // 用智能指针创建对象 std::shared_ptr<Derived> derivedPtr = std::dynamic_pointer_cast<Derived>(basePtr); // 类型安全转换
项目 裸指针写法 智能指针写法 创建方式 new Derived()
std::make_shared<Derived>()
类型转换 dynamic_cast<Derived*>(Base*)
std::dynamic_pointer_cast<Derived>(shared_ptr<Base>)
返回值类型 Derived*
std::shared_ptr<Derived>
生命周期管理 手动 delete 自动释放,无内存泄漏风险 安全检查 ✅ 运行时类型检查,失败返回 nullptr
✅ 同样运行时类型检查,失败返回空智能指针 是否影响引用计数 ❌ 不涉及引用计数 ✅ 新建了一个共享控制块引用
缺点 说明 ❌ 调用方需要知道消息类型 调用者必须手动 dynamic_cast
到对应子类,否则不能访问子类内容❌ 有一定运行时开销 dynamic_cast
需要在运行时检查类型,会略有性能损失(但一般能接受)❌ 容易出错 如果类型错了,就返回 nullptr
,还要额外判断、处理错误❌ 易破坏封装 上层代码要知道并显式转换为子类,增加了耦合度和类型暴露
方法二:模板 + 继承 + 多态
先看代码实现:
#pragma once #include "net.hpp" #include "message.hpp"namespace wws {//让统一的父类指针指向不同的子类对象//通过调用父类虚函数,调用不同子类onMessage类型转换(dynamic<>)完成后的函数调用class Callback{public:using ptr=std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)=0;};template<typename T>class CallbackT:public Callback{public:using ptr=std::shared_ptr<CallbackT<T>>;//根据消息类型重新定义出函数类型using MessageCallback =std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>;CallbackT(const MessageCallback &handler):_handler(handler){}void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)override{auto type_msg=std::dynamic_pointer_cast<T>(msg);_handler(conn,type_msg);}private:MessageCallback _handler;};class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;template<typename T> //加typename表示这是一个类型void registerHandler(MType mtype,const typename CallbackT<T>::MessageCallback &handler)//传的是类对象 T是消息类型(BaseMessage子类){std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype,cb));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息类型对应的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second->onMessage(conn,msg);//调用类中的回调函数return;}//没有找到指定类型的处理回调 但我们客户端和服务端都是我们自己设计的 因此不可能出现这种情况ELOG("收到未知类型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,Callback::ptr> _handlers;//second是一个基类指针}; }
使用方法:
不用传BaseMessage基类,直接传子类。但在设置时指明Typed消息类型。
因为回调函数hadler类型不同,我们就用模板类根据T(消息类型)生成不同的类,里面是保存的是不同类型的回调函数。但根据模板类生成的类,类型不一样还是不能统一放入map中。
所以再定义一个父类 里面的回调函数设置为虚函数,这些不同的模板类作为子类继承父类并实现各自的回调函数。map中的handler存的不再是回调函数,而是一个父类指针,通过父类指针调用子类的回调函数。
具体过程:
使用模板类
CallbackT<T>
根据消息类型T
生成不同的子类,它们内部包装了各自类型的回调函数。(因为子类传入的是一个函数类型,保存的一个函数类型,而T是一个消息类型。还需要MessageCallback将T消息类型和回调函数类型绑定在一起)这些模板类继承自统一的基类
Callback
,并重写了其onMessage()
虚函数。当我们在调用registerHandler注册回调函数时,会创建一个
CallbackT<T>
的对象并获取其指针,最后设置到map中。在
Dispatcher
中,map<MType, Callback::ptr>
存储的是基类指针,但实际指向的是不同子类CallbackT<T>
的对象。当调用
onMessage()
时,基类指针会通过虚函数机制调用对应子类的实现,再通过dynamic_pointer_cast
将消息转换为正确的类型,最终调用具体的业务回调函数。
方法一和方法二对比:
相比直接在 handler 里 dynamic_cast,现在的设计通过模板和多态封装了类型转换逻辑,使回调函数 更简洁、更安全、更可维护,Dispatcher 也更具通用性和扩展性。
对比点 ✅ 当前封装方式<br>(模板 + 多态) ❌ 直接在每个 handler 里手动 dynamic_cast 💡 代码复用性 回调逻辑封装在模板中,避免重复写类型转换代码 每个回调都要重复写一次 dynamic_pointer_cast ✅ 类型安全 编译器自动根据模板类型 T
限定传入的函数签名由开发者自己保证类型正确,容易写错 🎯 接口统一 Dispatcher 接收统一的 BaseMessage::ptr
,自动调用类型对应的 handler接口统一,但每次回调前都得“手动猜”消息类型 🧼 代码整洁性 Handler 业务函数专注于处理业务,不掺杂类型转换代码 handler 代码里混入了类型转换、错误判断等杂项 🔄 可扩展性 新增消息类型只需 registerHandler<T>
一行,无需修改 dispatcher 逻辑每新增一种类型,都要写新回调 + 自己处理类型转换 🔒 类型封装 类型转换封装在 CallbackT<T>::onMessage
内,调用者无感知显式暴露类型细节,破坏封装性 🧠 可维护性 Dispatcher 管理逻辑集中、结构清晰 回调函数多时,容易混乱、出错
二.服务端-RpcRouter实现
组织和处理客户端发来的 RPC 请求,并调用对应的业务逻辑进行响应。
这个模块主要由 4 个类构成:
类名 作用 VType
参数类型的枚举,例如整数、字符串、对象等 ServiceDescribe
描述一个服务方法的参数、返回值、回调函数等 ServiceManager
管理多个服务(增删查) RpcRouter
处理客户端发来的 RPC 请求,协调调用服务
#include "../common/net.hpp"
#include "../common/message.hpp"namespace wws
{
namespace server
{//枚举类 VType 定义参数与返回值的类型enum class VType{BOOL = 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};// 服务描述类class ServiceDescribe{public:using ptr=std::shared_ptr<ServiceDescribe>;using ServiceCallback=std::function<void(const Json::Value&,Json::Value&)>;using ParamDescribe=std::pair<std::string,VType>;//参数名称 类型ServiceDescribe(std::string &&method_name,ServiceCallback &&callback,std::vector<ParamDescribe>&& params_desc,VType return_type):_method_name(std::move(method_name)),_callback(std::move(callback)),_params_desc(std::move(params_desc)),_return_type(return_type){}//返回名称const std::string & method(){return _method_name;}//校验传入参数是否符合要求(1.字段完整 + 2.类型匹配)bool paramCheck(const Json::Value¶ms)//{"nums1",11}{for(auto&desc:_params_desc){//1.判断是否有该字段if(params.isMember(desc.first)==false){ELOG("没有 %s 参数字段",desc.first.c_str());return false;}//2.判断该字段类型是否正确if(check(desc.second,params[desc.first])==false){ELOG("%s参数字段类型错误",desc.first.c_str());return false;}}return true;}bool call(const Json::Value& params,Json::Value&result){_callback(params,result);if(rtypeCheck(result)==false){ELOG("回调处理函数中响应信息类型错误");return false;}return true;}private:// 判断return类型是否正确bool rtypeCheck(const Json::Value &val){return check(_return_type, val);}//判断val对象的类型是否和vtype一致 Json::Value兼容任何JSON类型(int、string、array、object 等)bool check(VType vtype,const Json::Value &val){switch(vtype){case VType::BOOL :return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name;//方法名称ServiceCallback _callback;//实际的业务回调函数std::vector<ParamDescribe> _params_desc;//参数字段格式描述vector<参数名称,对应的类型>VType _return_type;//结果类型描述 };//对比 直接在ServiceDescribe中set各参数 的优点:构造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。//若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁class SDescribeFactory{public:void setMethodName(const std::string&name){_method_name=name;}void setReturnType(VType vtype){_return_type=vtype;}void setParamDesc(const std::string &pname,VType vtype){_params_desc.push_back(ServiceDescribe::ParamDescribe(pname,vtype));}void setCallback(const ServiceDescribe::ServiceCallback&cb){_callback=cb;}ServiceDescribe::ptr build(){return std::make_shared<ServiceDescribe>(_method_name,_callback,_params_desc,_return_type);}private:std::string _method_name;//方法名称ServiceDescribe::ServiceCallback _callback; // 实际的业务回调函数std::vector<ServiceDescribe::ParamDescribe> _params_desc; // 参数字段格式描述vector<参数名称,对应的类型>VType _return_type; // 结果类型描述};//服务管理类 增删查class ServiceManager{public:using ptr=std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr&desc)//增{std::unique_lock<std::mutex> lock(_mutex);_service.insert(std::make_pair(desc->method(),desc));}ServiceDescribe::ptr select(const std::string &method_name)//查{std::unique_lock<std::mutex> lock(_mutex);auto it=_service.find(method_name);if(it==_service.end()){return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name)//删{_service.erase(method_name);}private:std::mutex _mutex;std::unordered_map<std::string,ServiceDescribe::ptr> _service;//函数名称 对应服务};class RpcRouter{public:using ptr=std::shared_ptr<ServiceDescribe>;//对注册到Dispatcher模块针对rpc请求进行回调处理的业务函数void onRpcRequest(const BaseConnection::ptr&conn,RpcRequest::ptr &request){//1.根据用户请求的方法描述 判断当前服务端能否提供对应的服务auto service=_service_manager->select(request->method());if(service.get()==nullptr){ELOG("未找到%s服务",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_NOT_FOUND_SERVICE);}//2.进行参数校验 确定能否提供服务if(service->paramCheck(request->params())==false){ELOG("%s服务参数校验失败",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INVALID_PARAMS);}//3.调用业务回调接口进行处理Json::Value result;bool ret=service->call(request->params(),result);if(ret==false){ELOG("%s服务参调用出错",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INTERNAL_ERROR);}//4.向客户端发送结果return response(conn,request,result,RCode::RCODE_OK);}//提供服务注册接口void registerMethod(const ServiceDescribe::ptr &service ){_service_manager->insert(service);}private://响应对象void response(const BaseConnection::ptr&conn,RpcRequest::ptr&req,const Json::Value&res,RCode rcode){auto msg=MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(wws::MType::RSP_RPC);msg->setRcode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};
}
}
ServiceDescribe
:服务描述类每一个服务方法(如
Add
、Translate
)都有一个ServiceDescribe
对象,它记录:
方法名
_method_name
参数信息
_params_desc
(字段名 + 类型)返回值类型
_return_type
实际处理逻辑
_callback
功能:
paramCheck()
:检查客户端传来的参数是否完整且类型匹配。
call()
:调用业务函数,处理请求,并校验响应类型,输出型参数Json::Value&result获取结果。
RpcRouter
:RPC 请求核心调度器处理流程(onRpcRequest):
1. 获取客户端请求的方法名:request->method() 2. 查找是否有对应的服务:_service_manager->select() 3. 参数检查:service->paramCheck() 4. 调用回调函数处理:service->call() 5. 构建响应消息:RpcResponse 6. 通过 conn->send(msg) 返回结果
注册流程(registerMethod()):
RpcRouter router; router.registerMethod(service); // 将服务注册到服务管理器
为什么要用 SDescribeFactory 工厂模式而不是在 ServiceDescribe 中直接使用 setXxx() 方法进行设置ServiceDescribe的各个参数?
通过DescribeFactory 工厂模式,造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。
如果在ServiceDescribe 设置set(),若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁。
客户端发送 RpcRequest↓RpcRouter::onRpcRequest()//进行处理↓_service_manager->select(method)//查找方法↓ServiceDescribe::paramCheck(params)//校验参数↓ServiceDescribe::call() → 执行业务逻辑//执行回调↓RpcRouter::response() → 发送响应
三.客户端-RpcRouter实现
🔹 1.
describe
(请求描述体)
封装一个请求的基本信息:
request
: 包含 RID(请求 ID)、MType(消息类型)、Body(请求体)
std::promise<response>
:用于 future 异步
callback
:用于 callback 异步
RType
: 标识异步类型(ASYNC / CALLBACK)🔹 2.
Requestor::send(...)
send(connection, request, callback)
→ 异步 callback 模式
send(connection, request, std::future<response>)
→ future 模式将
describe
存入map<rid, describe>
中,等待响应回调🔹 3.
onResponse(connection, response)
收到响应后,根据 RID 从
map<rid, describe>
查找对应的describe
按照
RType
分发:
如果是
CALLBACK
,调用 callback如果是
ASYNC
,通过promise.set_value(...)
实现 future 结果🔹 4.
Dispatcher<mt, handler>
对
MType
进行派发处理(主要针对订阅/通知类型的消息,不含 RID)
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>namespace wws
{
namespace client
{class Requestor{public:using ptr=std::shared_ptr<Requestor>;using RequestCallback=std::function<void(const BaseMessage::ptr&)>;using AsyncResponse=std::future<BaseMessage::ptr>;//请求描述的结构体struct RequestDescribe{using ptr=std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;//请求消息指针RType rtype; //请求类型 异步/回调std::promise<BaseMessage::ptr> response;//用于 async 模式,设置结果set_value 给 future 返回值RequestCallback callback;//用于 callback 模式,响应到来时触发用户逻辑};//收到应答 根据rid找到对应的请求设置结果 或者 调用回调 void onReponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg){std::string rid=msg->rid();RequestDescribe::ptr rdp=getDescribe(rid);//根据id进行查找if(rdp.get()==nullptr){ELOG("收到响应%s,但是未找到对应的请求描述!",rid.c_str());return;}//异步请求把应答msg作为结果设置到promise中,让future就绪if(rdp->rtype==RType::REQ_ASYNC){rdp->response.set_value(msg);//promise.set_value(value); 手动设置值 让std::future<BaseMessage::ptr>变为就绪。}//回调请求 有回调函数就进行调用else if(rdp->rtype==RType::REQ_CALLBACK){if(rdp->callback) rdp->callback(msg);}elseELOG("请求类型未知");//收到应答 删除rid对应的请求描述delDescribe(rid);}//1.异步请求发送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,AsyncResponse&async_rsp){//创建请求描述对象(newDescribe内部完成插入map)RequestDescribe::ptr rdp=newDescribe(req,RType::REQ_ASYNC);if(rdp.get()==nullptr){ELOG("构建请描述对象失败");return false;}//get_future()关联std::future<>async_rsp 和 std::promise<>response//promise.set_value(value) 被调用,就能async_rsp.get()获取值async_rsp=rdp->response.get_future();return true;}//2.同步请求发送(发送完请求后,立刻调用get()阻塞等待set_value()设置后获取结果)//可以在上层进行get()阻塞等待,也是同样效果bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,BaseMessage::ptr&rsp){AsyncResponse rsp_future;bool ret=send(conn,req,rsp_future);if(ret==false) return false;rsp=rsp_future.get();//阻塞等待值就绪return true;}//3.回调请求发送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&rep,RequestCallback&cb){//创建请求描述对象(newDescribe内部完成插入map)RequestDescribe::ptr rdp=newDescribe(rep,RType::REQ_CALLBACK,cb);if(rdp.get()==nullptr){ELOG("构建请描述对象失败");return false;}conn->send(rep);return true;}private://1.新增RequestDescribe::ptr newDescribe(const BaseMessage::ptr&req,RType rtype,const RequestCallback&cb=RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);//构建请求描述对象 并插入到mapRequestDescribe::ptr rd=std::make_shared<RequestDescribe>();rd->request=req;rd->rtype=rtype;if(rtype==RType::REQ_CALLBACK&&cb)rd->callback=cb;_request_desc.insert(std::make_pair(req->rid(),rd));//插入到mapreturn rd;}//2.查找RequestDescribe::ptr getDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);auto it=_request_desc.find(rid);if(it==_request_desc.end()){return RequestDescribe::ptr();}return it->second;}//3.删除void delDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string,RequestDescribe::ptr> _request_desc;//id->请求消息};
}
}
用户发起请求│▼
Requestor::send(...)│▼
创建 RequestDescribe (含回调 or promise)│▼
加入 map<rid, describe>│▼
发送消息给服务器⬇(服务器响应)Requestor::onResponse(...)│▼找回 describe -> 判断 rtype├── CALLBACK → 执行 callback└── ASYNC → set promise▼清除 map<rid, describe>
四.客户端RpcCaller实现
1. 构造函数
RpcCaller(const Requestor::ptr&)
初始化传入一个
Requestor
实例;
Requestor
负责发送消息、注册回调、接收服务端响应等;
RpcCaller
是调用层,Requestor
是通信层。2. 同步调用接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,Json::Value& result)
1.先构建RpcRequest请求
2.调用requestor->同步send(),同步阻塞发送请求,并拿到rsp_msg。
3.将rsp_msg的Basemessage类型转换为 RpcResponse,取出正文结果 result();
3. 异步 Future 调用接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,std::future<Json::Value>& result)
创建
promise
对象,用于异步回填响应;通过
shared_ptr
管理promise
生命周期,绑定到回调函数中;通过
_requestor->send(...)
注册异步回调;回调触发后由
Callback()
设置promise.set_value()
;调用方使用返回的
future
进行.get()
即可拿到结果。4. 异步回调接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,JsonResponseCallback& cb)
直接注册用户定义的回调
cb
到请求;内部通过
Callback1()
做包装处理:
类型转换为
RpcResponse
错误处理
最后调用用户传入的
cb(result)
传回结果。
#include "requestor.hpp"namespace wws
{
namespace client
{class RpcCaller{public:using ptr=std::shared_ptr<RpcCaller>;using JsonAsyncResponse=std::future<Json::Value>;using JsonResponseCallback=std::function<void(const Json::Value&)>;//requestor中的处理是针对BaseMessage进行处理的(因为要对所有请求进行处理,不单单对Rpc请求处理)//用于在rpc caller中针对结果的处理是对RpcResponse里边的result进行的RpcCaller(const Requestor::ptr &requestor):_requestor(requestor){}//1.同步调用 1.连接conn 2.方法名method 3.方法参数params 4.结果resultbool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,Json::Value&result){//1.组织请求auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2.发送请求 因为send()是重载函数 参数的类型必须保持一致(req_msg子类RpcRequest->基类BaseMessage)bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg);if(ret==false){ELOG("同步Rpc请求失败");return false;}//3.等待响应 响应信息存放在rsp_msg此时是Base基类,需要转成RpcResponse应答auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return false;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc同步请求出错:%s",errReason(rpc_rsp_msg->rcode()));return false;}result=rpc_rsp_msg->result();//返回响应消息里面的正文return true;}//2.异步 Future 调用 向服务器发送异步回调请求 设置回调函数 //回调函数中传入一个promise对象 在回调函数中对promise设置数据//异步请求返回的是BaseMessage对象,用户想要的是message里面正文的结果Valuebool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,std::future<Json::Value>&result){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// //这个json_promise对象是一个局部变量,等出了call作用域就会消失// //与它关联的future result在外部get()获取结果时,会异常// std::promise<Json::Value> json_promise;// result=json_promise.get_future();//future和promise建立关联 以后future.get()获取结果auto json_promise=std::make_shared<std::promise<Json::Value>>();result=json_promise->get_future();//std::bind() 对json_promise传值传参,shared_ptr引用计数 +1 此时引用计数==2//退出call作用域 引用计数-- 再等callback被触发完毕并释放后引用计数--,才会析构//shared_ptr引用计数是否加1,只和bind对json_promise指针的捕获方式有关,与函数的参数声明是否引用json_promise指针无关Requestor::RequestCallback cb=std::bind(&RpcCaller::Callback,this,json_promise,std::placeholders::_1);bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),cb);if(ret==false){ELOG("异步Rpc请求失败");return false;}return true;}//3.异步回调bool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,JsonResponseCallback&cb){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("异步Rpc请求失败");return false;}return true;}private:void Callback1(const JsonResponseCallback&cb,const BaseMessage::ptr&msg){//先判断结果对不对auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc回调请求出错:%s",errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}//const BaseMessage::ptr&msg Request参数是拿到响应后传递的void Callback(std::shared_ptr<std::promise<Json::Value>>result,const BaseMessage::ptr&msg){auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc异步请求出错:%s",errReason(rpc_rsp_msg->rcode()));return ;}//promise.set_value()设置正文结果 result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};
}
}
为什么不让 Requestor::send() 直接处理 RpcRequest,而是让它只处理 BaseMessage,由 RpcCaller 来封装具体业务(如 RpcRequest/RpcResponse)?
1.因为要对所有请求进行处理,不单单对Rpc请求处理。还可以处理主题 服务等消息类型
2.解耦业务协议与通信通道,只做消息传递与回调处理。
模块 职责 Requestor
发送消息、注册和触发回调,处理响应派发 RpcCaller
构造 RPC 请求、解析 RPC 响应,组织业务逻辑
为什么 bind 捕获 shared_ptr 能延长 promise 的生命周期?
shared_ptr+bind(值捕获)+外部拷贝保存
本质就是:
只要某个 shared_ptr 指向的对象,引用计数 不为 0,那它就不会被销毁;
所以我们通过创建另一个生命周期更长的 shared_ptr(比如通过 bind() 值捕获并在外部保存),来延长这段资源的生命周期。
(不建议std::ref()引用捕获 如果外部shared_ptr_obj的被销毁,cb 里的引用就变成悬垂指针)
auto cb = std::bind(&func, std::ref(shared_ptr_obj));
如果std::promise<Json::Value> json_promise;直接创建一个promise对象,这是一个局部对象等出了call函数(离开作用域),就会析构。
这就会导致与它关联的future result在外部get()获取结果时,会异常。
auto json_promise=std::make_shared<std::promise<Json::Value>>();创建promise对象并用shared指针来管理它,虽然出了call函数 引用计数-- 为0还是会析构。
但我们在bind绑定的时候,对json_promise进行了传值,拷贝了一份
shared_ptr
对象 进入bind
内部的闭包,引用计数+1.auto cb = std::bind(&RpcCaller::Callback,this,json_promise, // 👈 这里复制了一份 shared_ptr,引用计数 +1std::placeholders::_1);
写成void Callback(const std::shared_ptr<std::promise<Json::Value>>&result,const BaseMessage::ptr&msg) 接收引用(不增加引用计数)。对bind传值引用计数+1无影响
如果是下面这种拷贝引用计数会再+1,但仅限Callback运行时,调用结束后立刻-1,只是暂时的。
所以说现在,在call函数中有被shared指针指向的promise对象引用计数为2,一个make_shared返回的,一个bind闭包值捕获的。
bind闭包对象cb(auto cb=std::bind(...)),它本质不也是一个局部对象吗?call函数结束不还是和另一个局部变量一样会销毁,promise的生命周期怎么会延长呢?
call函数中的cb是否销毁已经无关,因为在函数的外部已经拷贝保存了一份了。
它通常会被传出
call()
函数,交给Requestor::send()
并被保存在异步响应回调表里!send()会调用newDescibe,把闭包对象存入rd请求描述对象中,再插入到map进行统一管理。
(注意newDescribe虽然是const&获取的cb 不增加引用计数,但这根本无关紧要,因为rd->callback=cb;会把它拷贝到了 RequestDescribe::callback 中,这是才生命周期延长的关键)
等到onResponse收到应答并处理完,就delDescribe()删除对应请求描述信息,保存的回调函数cb也析构,里面保存的json_promise也会析构,所以说ptr+bind值捕获+外部保存
保证了 json_promise 会存活到 callback 被调用。
总结:
1.Callback(param) 中的 shared_ptr 是按值传递,因此会导致引用计数 +1,
但这个 +1 的生命周期仅限于函数执行期间,Callback() 结束后 param 被销毁,引用计数立即 -1。
2.而 bind(...) 捕获 shared_ptr 时(通过值捕获),会将其拷贝一份保存在闭包对象中(如 auto cb),
这会导致引用计数 +1,无论是否调用 Callback,引用计数都存在,直到闭包对象销毁为止。
那闭包对象什么时候销毁呢?delDescribe()
3.需要注意的是,cb 本身是 call() 函数内的局部变量,但它并不会随着 call() 结束而失效。
虽然 call() 返回后 cb 变量本身会被销毁,但在此之前 cb 已经被作为参数传入 _requestor->send(),
并被内部保存到了 _request_desc 表中,作为 RequestDescribe 的一部分长期持有。
4.也就是说:call() 中定义的 cb 虽然是局部变量,但它在作用域结束前被拷贝并传出,生命周期已经被延长。
所以 cb 中捕获的 json_promise(shared_ptr)依然活着,call 函数中的 cb 就算析构,也不影响闭包内部捕获的 promise 的生命周期。
5.只有当服务端响应到达,回调被触发后,执行 cb(msg),再通过 delDescribe(rid) 删除请求描述对象,rd保存的回调函数cb闭包进行析构,值捕获的json_promise被释放,引用计数==0 promise 被析构.
6.相比之下,如果只是创建了一个局部的 shared_ptr(比如 json_promise),没有通过 bind、lambda、线程等传出去,
它就会在函数结束时被立即析构,future 将失效,调用 .get() 会抛出异常(broken promise)。
✅ 因此,虽然 cb 是局部变量,但它在 call() 结束前被拷贝传出,生命周期被框架托管,保证了回调所需的 promise 安全存活。
这是异步通信框架中延长资源生命周期的关键机制,确保异步流程完整闭环。
局部变量当作参数传参,并不会延长它的生命周期; 它的生命周期仍然只属于它原来的作用域; 如果你想延长生命周期,必须使用 shared_ptr,并在外部再持有一份拷贝!
举个例子:
std::shared_ptr<int> create() {auto p = std::make_shared<int>(100); // p 是局部变量return p; // ✅ 返回值是“拷贝一份”,原来的 p 会被销毁,但返回值还持有控制块 }auto x = create(); // 返回值拷贝给 x,资源不会被销毁
情况 局部变量会不会被销毁 解释 普通局部变量(无指针托管) ✅ 会 作用域结束立即销毁 传值作为函数参数 ✅ 会 参数是拷贝,原变量不延长 返回 shared_ptr ✅ 原变量销毁,但返回值持有副本,资源不析构 shared_ptr 被 bind 捕获 ❌ 不会(+1) 延长生命周期直到闭包结束
服务端测试代码
相较于之前实现的客户端,我们不再是直接给dispatcher传业务层函数,而是传RpcRouter的处理对应请求的回调函数,由该函数再从注册到RpcRouter中的具体实现函数中找用户需要的函数,并进行回调返回响应结果。
#include "../common/message.hpp"
#include "../common/net.hpp"#include "../common/dispatcher.hpp"
#include "../server/rpc_router.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{auto router=std::make_shared<wws::server::RpcRouter>();std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//1.注册Add函数到RpcRouterrouter->registerMethod(desc_factory->build());//bind绑定RpcRouter收到消息的回调函数->cbauto cb=std::bind(&wws::server::RpcRouter::onRpcRequest,router.get(),std::placeholders::_1,std::placeholders::_2);auto dispatcher=std::make_shared<wws::Dispatcher>();//2.把RpcRouter回调函数onRpcRequest设置到dispatcher中dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC,cb);auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);//3.把Dispatcher的回调函数onMessage设置到server中server->setMessageCallback(message_cb);server->start();return 0;
}
从图示上可以看到,整个链路依次是:
server 收到消息
调用 messageCallback
触发 onMessage 进入 dispatcher
Dispatcher 发现消息类型是
REQ_RPC
,调用 RpcRouter::onRpcRequestRouter 找到 Add 方法并调用其回调函数
Add 函数执行计算并返回结果
server->setMessageCallback(message_cb);
这里的
message_cb
是通过std::bind(&wws::Dispatcher::onMessage, dispatcher.get(), ...)
绑定的。当服务器收到任何消息时,就会调用
dispatcher->onMessage(...)
。
dispatcher->onMessage(...)
Dispatcher
根据消息的类型(这里为REQ_RPC
)找到先前注册的回调处理函数。这一步对应代码
dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, cb);
。所以当消息类型匹配到
REQ_RPC
时,就执行cb
。
cb
->std::bind(&wws::server::RpcRouter::onRpcRequest, router.get(), ...)
这个
cb
本质上就是对RpcRouter::onRpcRequest
的一次包装。当调用
cb
时,实际上就是执行router->onRpcRequest(...)
。
RpcRouter::onRpcRequest(...)
在路由器里,根据请求中的“方法名称”(比如
"Add"
)找到对应的回调函数。此处就是在之前
router->registerMethod(...)
时注册的Add
方法回调。调用
Add(const Json::Value& req, Json::Value& rsp)
最终执行我们自定义的逻辑(如取
nums1
、nums2
,相加后存入rsp
)。
客户端测试代码
#include "../common/dispatcher.hpp"
#include "../client/requestor.hpp"
#include "../client/rpc_caller.hpp"
#include <thread>
#include <chrono>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{auto requestor=std::make_shared<wws::client::Requestor>();auto caller=std::make_shared<wws::client::RpcCaller>(requestor);auto dispatcher=std::make_shared<wws::Dispatcher>();auto rsp_cb=std::bind(&wws::client::Requestor::onResponse,requestor.get(),std::placeholders::_1,std::placeholders::_2);//wws::RpcResponse->wws::BaseMessage //rsp_cb绑定的函数参数为Requestor::onResponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg)//而registerHandler注册需要的函数类型 std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>; 第二个参数必须也为BaseMessage::ptr,所以T传BaseMessagedispatcher->registerHandler<wws::BaseMessage>(wws::MType::RSP_RPC,rsp_cb);auto client=wws::ClientFactory::create("127.0.0.1",9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();auto conn=client->connection();//1.同步调用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=caller->call(conn,"Add",params,result);if(ret!=false){DLOG("result: %d", result.asInt());}//2.异步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=caller->call(conn,"Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回调params["nums1"]=55;params["nums2"]=66;ret = caller->call(conn,"Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));client->shutdown();return 0;
}
RpcCaller 调用
call("Add", params, rsp)
用户在业务代码里直接写
caller->call(...)
,想要调用服务端的 “Add” 方法并等待结果。RpcCaller 内部执行 “AddDesc(...)”
call(...)
方法内部会先构造一个带唯一请求ID的RpcRequest
,然后调用 Requestor 的相关方法(示意中称作AddDesc
)来“登记”该请求:
存储该请求ID
记录调用类型(同步/异步/回调)
如果是回调式,还会记录用户传入的回调函数
发送请求到服务端
Requestor 记完请求描述后,就会通过网络连接将
RpcRequest
发送给远程服务端。服务端处理并返回
RpcResponse
当服务端收到 “Add” 请求后,进行实际的加法运算或其他业务逻辑,然后打包
RpcResponse
返回给客户端。客户端接收响应 -> 分发到 Requestor::onResponse()
客户端网络层读到响应后,先通过 Dispatcher 分发,根据消息类型(如
RSP_RPC
)找到之前绑定的回调,即Requestor::onResponse(...)
。
Requestor::onResponse()
根据响应里的 “请求ID=111” 查到对应的“请求描述 (desc)”,确定是哪个请求、用什么方式处理(同步阻塞唤醒或执行用户回调等),并把结果交给调用方。RpcCaller 最终返回结果给用户
对于异步调用
call(...)
,当响应到来时,onResponse() 会调用 rdp->response.set_value(msg),把响应 msg 设置到 promise 中。用户future.get()获取结果。若是回调式调用,则
Requestor::onResponse()
会直接执行用户的回调函数,把结果带给用户。
Requestor 和 RpcCaller 的关系:
Requestor 负责管理请求–响应映射。它用一个请求描述(包含唯一 ID、请求类型和处理机制)来记录每次发送的请求。当响应到来时,根据请求 ID 查找描述,
如果是异步请求(REQ_ASYNC),则调用 promise.set_value(msg) 使关联 future 就绪;
如果是回调请求(REQ_CALLBACK),则直接调用用户注册的回调函数。
RpcCaller 则对外提供 RPC 调用接口。它负责构造 RPC 请求(设置方法名、参数、生成请求 ID),并调用 Requestor 的 send() 方法来登记请求并发送消息。用户可以选择同步(阻塞等待 future.get())、异步(返回 future)或回调式调用。
二者协同工作:RpcCaller 构造并发送请求,而 Requestor 负责匹配响应并将结果传递给上层。
RpcCaller用户调用的接口,用户传入要调用的对象 参数 方式,根据方式(同步 异步 回调)的不同选择不同的call进行调用,1.先根据参数构建请求消息2.调用Requester里面对应的send()。
Requester中send()会先构建请求描述对象(call传入的请求消息 请求类型...) 并建立请求-id间的映射(等收到应答时根据id找到对应的请求描述对象),再完成发送。
等服务端返回应答,Dispatcher根据消息的类型,找到client的Requestor中onResponse处理应答的回调函数,它根据id找到对应的请求描述,再根据请求描述中的类型,进行set_value设置结果或者callback()调用回调。最后删除该请求描述
set_value后,get()获取到结果,阻塞结束返回上层
再返回到call进行检查并返回结果
RpcCaller::call()
的三种调用方式流程
同步调用 (
call(conn, method, params, result)
)客户端调用call,把连接 函数名 参数 用于获取结果的Value对象
进入同步调用call. 先根据传入的参数组织请求(里面设置了请求类型REQ_RPC)
调用Requester中send()发送请求
using AsyncResponse=std::future<BaseMessage::ptr>;先创建一个future对象用于后面get()阻塞获取结果。再调用异步send(),因为异步和同步的区别只是在于用户什么时候get()阻塞获取结果,异步+立刻get()==同步。
再看看异步send()
先调用newDescribe,里面会创建请求描述对象(传入回调函数会设置回调函数)和UUID建立映射关系用map管理起来。
conn->send()发送请求。
之后服务端进行处理,返回应答,server::messageCallback->Disoatcher::onMessage->根据请求类型找到对应的回调函数,RSP_RPC对应的就是requestor::onResponse()处理应答的回调函数。
进入onResponse,先根据UUID找到对应的请求描述,根据请求描述的类型,看是异步(同步里面调用的异步),还是回调,进行相对的处理。
是同步,就set_value设置结果。
设置完结果,futrue就绪get()获取结果
上层的result就获取到了结果,进行输出。
RpcCaller::call(conn, method, params, result) // 同步调用└──> Requestor::send(conn, req, rsp) // 调用同步版本 send()└──> send(conn, req, rsp_future) // 调用异步 Future 版本 send()└──> 创建 RequestDescribe 并存储└──> 阻塞等待 future.get()└──> Dispatcher::onMessage(conn, msg)└──> Requestor::onResponse(conn, msg)└──> rdp->response.set_value(msg) // future.set_value() 解除阻塞└──> 解析 RpcResponse 并返回 result // get() 获取结果并返回
经过的关键函数:
RpcCaller::call(conn, method, params, result)
Requestor::send(conn, req, rsp)
Requestor::send(conn, req, rsp_future)
(异步版本)
Dispatcher::onMessage(conn, msg)
Requestor::onResponse(conn, msg)
rdp->response.set_value(msg)
future.get()
解除阻塞
异步 Future 调用 (
call(conn, method, params, future)
)不传Json::Value result直接获取结果,std::future<Json::Value> res_futre,让用户自己get()获取结果。
result=json_promise->get_future();管理promise,set_value后用户就可以get()获取结果。
还绑定了Callback,传入回调函数cb,创建请求描述对象时设置回调函数cb
此时设置的类型为REQ_CALLBACK,收到应答,找到对应请求描述,会调用设置的回调函数cb
cb bind绑定的是Callbcak函数 它会set_value设置结果,让用户的future就绪,get()获取结果
RpcCaller::call(conn, method, params, future) // 异步 Future 调用└──> 创建 RpcRequest 并设置参数└──> 创建 std::promise<Json::Value> 和 future 关联└──> 绑定 Callback (关联 promise 和 result)└──> Requestor::send(conn, req, cb) // 传入回调函数 cb,onResponse() 解析后触发└──> 创建 RequestDescribe 并存储└──> 发送请求,不阻塞└──> Dispatcher::onMessage(conn, msg) // 收到服务端响应└──> Requestor::onResponse(conn, msg)└──> 通过 rid 查找 RequestDescribe└──> Callback 触发 set_value(msg) // 解析结果并设置 promise└──> future.get() 解除阻塞,获取结果 // 用户上层调用 future.get() 阻塞获取结果
经过的关键函数:
RpcCaller::call(conn, method, params, res_future)
Requestor::send(conn, req, cb)
Requestor::onResponse(conn, msg)
rdp->callback(msg)
Callback
解析msg
并set_value()
res_future.get()
解除阻塞
异步回调调用 (
call(conn, method, params, cb)
)回调,不直接设置结果,而是调用用户传来的函数(callback)。
call异步回调,和异步futrue一样设置回调函数Callback1.(但回调函数不同)
也是调用的这个send()
接下来也是 调用请求描述对象中设置的回调函数即Callback1
而Callback1不和Callback一样set_value设置结果,而是调用上层传来的函数,它返回结果给用户。
RpcCaller::call(conn, method, params, cb) // 异步回调调用└──> 创建 RpcRequest 并设置参数└──> 绑定 Callback1└──> Requestor::send(conn, req, cb) // 调用异步回调版本 send()└──> 创建 RequestDescribe 并存储└──> 发送请求,不阻塞└──> Dispatcher::onMessage(conn, msg) // 收到服务端返回消息└──> Requestor::onResponse(conn, msg)└──> 通过 rid 查找 RequestDescribe└──> Callback1 触发用户自定义的 cb(msg)└──> 用户自定义的回调函数解析结果
RpcCaller::call(conn, method, params, cb)
Requestor::send(conn, req, cb)
Requestor::onResponse(conn, msg)
rdp->callback(msg)
触发Callback1
用户自定义的
callback(msg)
解析结果
五.注册中心---服务端rpc_registry
服务端如何实现服务信息的管理:
服务端需要1.提供注册服务2.发现的请求业务处理
1.需要将 哪个服务 能够由 哪个主机提供 管理起来 hash<method,vector<provide>>
进行服务发现时,能返回谁能提供指定服务
2.需要将 哪个主机 发现过 哪个服务 管理起来
当进行服务通知的时候,能通知给对应发现者 <method,vector<discoverer>>
3.需要 哪个连接 对应 哪个服务提供者 管理起来 hash<conn,provider>
当一个连接断开时 能知道哪个主机的哪些服务下线了,然后才能给发现者通知xxx的xxx服务下线了。
4.需要 哪个连接 对应 哪个服务发现者 管理起来 hash<conn.discoverer>
当一个连接断开时 如果有服务上下线 就不需要给它进行通知了
1️⃣
ProviderManager
(服务提供者管理)维护服务提供者信息,进行服务注册、删除和查询。
提供
addProvider()
、delProvider()
、getProvider()
和methodHosts()
等方法。
2️⃣
DiscovererManager
(服务发现者管理)维护服务发现者信息,进行服务发现、删除和通知。
提供
addDisecoverer()
、delDisoverer()
、onlineNotify()
和offlineNotify()
等方法。
3️⃣
PDManager
(核心管理器)处理服务请求、注册、发现、上线/下线通知以及连接断开后的清理逻辑。
提供
onServiceRequest()
、onConnShutdown()
等核心逻辑。处理服务的响应,包括:
registryResponse()
:服务注册应答
discoverResponse()
:服务发现应答
errorResponse()
:错误处理
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>
namespace wws
{
namespace server
{//服务提供者class ProviderManager{public:using ptr=std::shared_ptr<ProviderManager>;struct Provider{using ptr=std::shared_ptr<Provider>;std::mutex _mutex; BaseConnection::ptr conn;Address host; //主机信息ip+portstd::vector<std::string> methods; //所有提供的方法Provider(const BaseConnection::ptr&c,const Address&h):conn(c),host(h){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//新的服务提供者进行服务注册void addProvider(const BaseConnection::ptr&c,const Address &h,const std::string&method){Provider::ptr provider;{std::unique_lock<std::mutex> lock(_mutex);auto it=_conns.find(c);//找连接对应的提供者if(it!=_conns.end()){provider=it->second;}//找不到就创建 并新增连接->提供者else{provider=std::make_shared<Provider>(c,h);_conns.insert(std::make_pair(c,provider));}//method方法被哪些提供者提供 增加提供者auto &providers=_providers[method];providers.insert(provider);}//提供者内更新记录 能提供的方法provider->appendMethod(method);}//服务提供者断开连接时 获取它的信息 用于服务下线通知Provider::ptr getProvider(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return Provider::ptr();return it->second;}//服务提供者断开连接时 删除它的关联信息void delProvider(const BaseConnection::ptr&c)//连接->提供者->所有提供方法{std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;for(auto&method:it->second->methods)//找到该服务提供者的所有方法{auto&providers=_providers[method];//根据方法 从提供该方法的提供者中再进行删除//1.providers容器是vectro版本// //手动查找并按迭代器删除 providers.erase(it->second)// //但是 erase(it->second) 并不会起到按值删除的作用,因为 erase() 按值删除时,只有 std::vector 在 C++20 之后才引入 erase 和 erase_if// auto provider_it = std::find(providers.begin(), providers.end(), it->second);// if (provider_it != providers.end())// {// providers.erase(provider_it);// }//set 直接支持 erase(it->second)providers.erase(it->second);}//删除连接与服务提供者的关系_conns.erase(it);}//返回 method对应的提供者std::vector<Address> methodHosts(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end())return std::vector<Address>();std::vector<Address> result(it->second.begin(), it->second.end());return result;}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Provider::ptr>> _providers;//方法->提供者主机std::unordered_map<BaseConnection::ptr,Provider::ptr> _conns;//连接->提供者}; //服务发现者class DiscovererManager{public:using ptr=std::shared_ptr<DiscovererManager>;struct Discoverer{using ptr=std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn;//发现者关联的客户端std::vector<std::string> methods;//发现过的服务Discoverer(const BaseConnection::ptr&c):conn(c){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//当客户端进行服务发现的时候新增发现者 新增服务名称?Discoverer::ptr addDisecoverer(const BaseConnection::ptr&c,const std::string &method){Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);//找连接对应的服务发现者auto it=_conns.find(c);if(it!=_conns.end()){discoverer=it->second;}else{discoverer=std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c,discoverer));}//method方法被哪些发现者发现了 增加发现者auto &discoverers=_discoverers[method];discoverers.insert(discoverer);}//在发现者中 增加已经发现的方法discoverer->appendMethod(method);return discoverer;}//发现者不需要被get() 发现者下线不需要通知 所以不需要进行get()获取对象后进行下线通知//发现者客户端断开连接时 找到发现者信息 删除关联信息void delDisoverer(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;//找到发现过的方法for(auto&method:it->second->methods){//从发现过method的所有发现者 中找要进行删除的发现者auto&discoverers=_discoverers[method];discoverers.erase(it->second);}//删除conn->discoverer_conns.erase(it);}//新的服务提供者上线 上线通知void onlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_ONLINE);}//服务提供者断开连接 下线通知void offlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method,const Address&host,wws::ServiceOptype optype){std::unique_lock<std::mutex> _mutex;//先判断该方法有没有被人发现过 auto it=_discoverers.find(method);//没有就不用进行任何处理if(it==_discoverers.end())return;//对发现过该方法的发现者一个个进行通知auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服务请求(注册、发现、上线、下线)msg_req->setMethod(method);msg_req->setOptype(optype);//服务操作类型 for(auto&discoverers:it->second){discoverers->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Discoverer::ptr>> _discoverers;//该方法被哪些发现者发现了std::unordered_map<BaseConnection::ptr,Discoverer::ptr> _conns;//连接->发现者(连接断开->对应发现者->删除vector中的发现者)};class PDManager{public:using ptr=std::shared_ptr<PDManager>;//处理服务请求 并返回应答void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr &msg) {//先判断服务操作请求:服务注册/服务发现auto optype=msg->optype();if(optype==ServiceOptype::SERVICE_REGISTRY)//服务注册{//1.新增服务提供者 _providers->addProvider(conn,msg->host(),msg->method());//2.对该方法的发现者进行服务上线通知_discoverers->onlineNotify(msg->method(),msg->host());//3.返回应答return registryResponse(conn,msg);}else if(optype==ServiceOptype::SERVICE_DISCOVERY)//服务发现{//新增服务发现者 _discoverers->addDisecoverer(conn,msg->method());return discoverResponse(conn,msg);}else{ELOG("收到服务操作请求,但操作类型错误"); return errorResponse(conn,msg);}}//连接断开void onConnShutdown(const BaseConnection::ptr&conn)//{//这个要断开的连接1.提供者下线 2.发现者下线//1.获取提供者信息 为空说明不是提供者auto provider=_providers->getProvider(conn);if(provider.get()!=nullptr){//提供者下线//1.提供者的每个方法都要下线 通知对应发现者for(auto&method:provider->methods){_discoverers->offlineNotify(method,provider->host);}//2.删除对该提供者的管理_providers->delProvider(conn);}//2.到这 可能是发现者 就算不是会直接返回空//直接删除对该发现者的管理_discoverers->delDisoverer(conn);}private://错误响应void errorResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)msg_rsp->setRcode(RCode::RCODE_INVALID_OPTYPE); //无效 msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);//服务操作类型 未知 conn->send(msg_rsp);}//注册应答void registryResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);//服务操作类型 注册conn->send(msg_rsp);}//发现应答 method方法有哪些主机可以提供 void discoverResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid()); msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);//服务操作类型 发现msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)std::vector<Address> hosts=_providers->methodHosts(msg->method());if(hosts.empty()){msg_rsp->setRcode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};
}
}
📡 服务注册流程
1️⃣ 客户端(Provider)通过 `registryMethod()` 发送 `SERVICE_REGISTRY` 请求 2️⃣ `PDManager::onServiceRequest()` 处理注册请求 3️⃣ `ProviderManager::addProvider()` 注册服务 4️⃣ `DiscovererManager::onlineNotify()` 通知发现者服务上线 5️⃣ `PDManager::registryResponse()` 发送注册结果
🔍 服务发现流程
1️⃣ 客户端(Discoverer)通过 `addDisecoverer()` 发送 `SERVICE_DISCOVERY` 请求 2️⃣ `PDManager::onServiceRequest()` 处理发现请求 3️⃣ `DiscovererManager::addDisecoverer()` 记录发现者 4️⃣ `ProviderManager::methodHosts()` 获取 `method` 对应的主机 5️⃣ `PDManager::discoverResponse()` 发送发现结果
🔥 服务下线流程
1️⃣ 连接断开时触发 `PDManager::onConnShutdown()` 2️⃣ 通过 `ProviderManager::getProvider()` 获取 `Provider` 3️⃣ `DiscovererManager::offlineNotify()` 通知发现者服务下线 4️⃣ `ProviderManager::delProvider()` 删除 `Provider`
🕵️♂️ 发现者下线流程
1️⃣ 连接断开时触发 `PDManager::onConnShutdown()` 2️⃣ 通过 `DiscovererManager::delDisoverer()` 删除 `Discoverer` 3️⃣ 清理 `_conns` 和 `_discoverers` 的映射
六.注册中心---客户端rpc_registry
客户端的功能比较分离,注册端和发现端根本就不在同一个主机上。
因此客户端的注册和发现功能是完全分开的。
1.作为服务提供者 需要一个能进行服务注册的接口
连接注册中心 进行服务注册
2.作为服务发现者 需要一个能进行服务发现的接口,需要将获取到的提供对应服务的主机信息管理起来 hash<method,vector<host>> 一次发现,多次使用,没有的话再次进行发现。
需要进行服务上线/下线通知请求的处理(需要向dispatcher提供一个请求处理的回调函数)
#pragma
#include"requestor.hpp"namespace wws
{
namespace client
{// 服务提供者类:负责将服务注册到服务注册中心class Provider{public:using ptr=std::shared_ptr<Provider>;Provider(const Requestor::ptr&requestor):_requestor(requestor){}//进行服务注册的接口bool registryMethod(const BaseConnection::ptr&conn,const std::string &method,const Address&host){// 1. 创建 ServiceRequest 请求消息auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服务请求(注册、发现、上线、下线)msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);//服务操作类型 注册// 2. 发送请求并同步等待响应BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);//同步请求if(ret==false){ELOG("%s服务注册失败",method.c_str());return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get()==nullptr){ELOG("响应类型向下转换失败");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服务注册失败 原因%s",errReason(service_rsp->rcode()));return false;}return true;}private:Requestor::ptr _requestor;//负责发送请求、接收响应};class MethodHost{public:using ptr=std::shared_ptr<MethodHost>;MethodHost(const std::vector<Address>&hosts):_hosts(hosts.begin(),hosts.end()),_idx(0){}void appendHost(const Address&host){//新的服务上线后进行调用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address&host){//服务下线进行调用std::unique_lock<std::mutex> lock(_mutex);//vector删除效率O(n)效率低,但更多的操作还是 随机访问[] 进行RR轮转,所以vector是最合适的for(auto it=_hosts.begin();it!=_hosts.end();it++){if(*it!=host){_hosts.erase(it);break;}}}Address chooseHost(){std::unique_lock<std::mutex> lock(_mutex);size_t pos=_idx++ %_hosts.size();//1.pos=_idx%size 2._idx+=1return _hosts[pos];}bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;//当前 RR 轮转索引//vector 提供了 O(1) 的索引访问,可以快速实现RR轮转机制。std::vector<Address> _hosts;};class Discoverer{public:Discoverer(const Requestor::ptr &requestor){}//服务发现的接口bool serviceDiscovery(const BaseConnection::ptr&conn,const std::string&method,Address&host){//当前有method方法对应的提供服务者 直接返回host地址{std::unique_lock<std::mutex> lock(_mutex);auto it=_method_hosts.find(method);if(it!=_method_hosts.end()){if (it->second->empty() == false){host = it->second->chooseHost();return true;}}}//当前没有对应的服务者//1.构建服务发现请求auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);//消息类型msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("服务发现失败!");return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(!service_rsp.get()){ELOG("服务发现失败! 响应类型转换失败");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服务发现失败! 错误原因%s",errReason(service_rsp->rcode()).c_str());return false;}//2.看服务发现完后有没有新提供者增加std::unique_lock<std::mutex> _mutex;auto method_host=std::make_shared<MethodHost>(service_rsp->hosts());if(method_host->empty()){ELOG("%s服务发现失败!没有能提供服务的主机",method.c_str());return false;}host=method_host->chooseHost();_method_hosts[method]=method_host;//更新method方法->提供者address(可能method方法已经在map中所以=赋值)return true;}//给Dispathcer模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){//1.先判断是上线/下线 都不是就不处理auto optype=msg->optype();std::string method=msg->method();std::unique_lock<std::mutex> lock(_mutex);//2.上线通知if(optype==ServiceOptype::SERVICE_ONLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//该method方法不存在 创建MethodHost初始化并添加入map中auto method_host=std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method]=method_host;}else{//存在直接加it->second->appendHost(msg->host());}}//3.下线通知else if(optype==ServiceOptype::SERVICE_OFFLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//该method方法不存在 直接return//不需要把method方法从map中移除 前面已经判断过map中method方法为空的情况return;}else{//存在直接删除it->second->removeHost(msg->host());}}}private:std::mutex _mutex;std::unordered_map<std::string,MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;};
}
}
1.客户端中的provider服务提供者,有服务注册接口registryMethod,构建服务注册请求借助Requestor send()到服务端的的注册接口。收到应答后判断应用是否正确。
2.客户端的discoverer服务发现者,有服务发现接口serviceDiscovery,先开当前有没有method方法对应的提供者,有就返回一个提供者。没有就构建服务发现请求,并检查是否有误,无错误后 再判断是否有新增的提供者,有就返回提供者并更新method->提供者的映射,没有就直接返回.
3.MethodHost类 管理一个method方法对应的所以提供者的主机信息,并提供RR轮转功能,按顺序返回method方法对应的提供者。
1️⃣
Provider
类:服务提供者
作用:
负责将服务注册到服务注册中心。
主要功能:
registryMethod()
:
发送
ServiceRequest
注册请求到服务中心。检查
msg_rsp
响应是否成功注册。内部成员:
_requestor
:
Requestor::ptr
类型,负责请求发送和接收响应。
2️⃣
MethodHost
类:服务主机管理
作用:
维护多个
Address
主机地址,并提供负载均衡(RR 轮转)功能。主要功能:
appendHost()
:新增一个服务主机地址。
removeHost()
:移除一个服务主机地址。
chooseHost()
:
通过 RR 轮转选择一个主机地址。
empty()
:判断是否为空。内部成员:
_mutex
:互斥锁保护_hosts
访问安全。
_idx
:当前 RR 轮转索引。
_hosts
:std::vector<Address>
存储主机地址列表。
3️⃣
Discoverer
类:服务发现者
作用:
发现服务并维护
MethodHost
对象,进行服务上线/下线通知的管理。主要功能:
serviceDiscovery()
:
发现服务并更新
_method_hosts
缓存。选择主机地址
host
并返回。
onServiceRequest()
:
处理服务上线/下线的请求。
动态添加/删除
MethodHost
及其地址信息。内部成员:
_mutex
:互斥锁保护_method_hosts
。
_method_hosts
:
std::unordered_map<std::string, MethodHost::ptr>
,映射method -> host
。
_requestor
:
Requestor::ptr
发送发现请求。
4️⃣
Requestor
类:请求器
作用:
负责向服务中心发送请求并获取响应。
核心功能:
send()
:
向服务中心同步发送
msg_req
请求。接收
msg_rsp
响应并返回bool
标志位。
🔥 类之间的关系
Provider
通过_requestor
发送ServiceRequest
进行服务注册。
Discoverer
通过_requestor
发送ServiceRequest
进行服务发现。
MethodHost
由Discoverer
维护,并提供 RR 轮转机制选择主机。
Discoverer
监听onServiceRequest()
来处理服务上线/下线。
RR轮询(Round-Robin)
一个method方法可以对应多个提供者,用户请求method方法,我们应该返回哪一个提供者,才能实现最大资源利用呢?
我们可以通过RR轮询按照固定顺序轮流将请求分配到不同的主机。
1.维护一个递增索引 _idx。
2.每次请求时,选择 _hosts[_idx % _hosts.size()] 作为当前主机。
3._idx 自增,当 _idx >= _hosts.size() 时自动重置。
RR轮询需要随机访问[ ],所以管理提供者的容器最好选择vector< >。
RR 轮询机制的价值
✅ 1. 负载均衡: 均匀分配请求,防止主机过载,提高系统吞吐量。
✅ 2. 实现简单: 只需维护一个_idx
递增索引,选择主机只需O(1)
时间复杂度。
✅ 3. 故障规避: 结合removeHost()
机制,可以自动剔除故障主机,提升可靠性。
✅ 4. 提升系统吞吐量: 通过多个主机并行处理请求,提升系统的整体性能。
✅ 5. 维护成本低: 逻辑简单,维护成本极低,不需要监控主机状态。
为什么选择
vector<Address>
作为主机管理容器
✅ 1. 访问速度快
vector
提供 O(1) 的随机访问能力,可以通过pos
索引直接获取provider
。轮询核心逻辑依赖于
vector[pos]
进行主机选择,比map
的 O(log n) 查找速度更快,适合高频访问场景。
✅ 2. 内存布局紧凑
vector
采用 连续内存存储,有利于 CPU 缓存命中,提升访问效率。在轮询过程中,只需访问固定内存位置,避免了内存跳转带来的性能损耗。
✅ 3. 删除主机速度适中
删除主机时虽然
removeHost()
的复杂度是 O(n),但主机上下线事件发生频率远低于请求频率。即使主机上下线处理稍慢,但
chooseHost()
仍然保持 O(1) 的快速访问。
⚠️ 注意:
vector
删除主机时会触发内存移动,导致性能下降,因此不适合频繁上下线的场景。但在主机变更较少的场景下,
vector
的整体性能优于list
或map
。
七.对服务发现与注册的封装
一.客户端 rpc_client
封装客户端:三大功能模块
一、业务功能:
基础 RPC 功能
服务注册功能
服务发现功能
二、基础底层模块:
网络通信客户端模块(由
BaseClient
封装)
🧱 类结构封装解析
1.
RegistryClient
:服务注册客户端
构造时连接注册中心地址
提供
registryMethod()
方法:业务提供者向注册中心注册服务成员模块包含:
_provider
:服务提供者
_requestor
:发送请求组件
_dispatcher
:调度器
_client
:基础通信客户端2.
DiscoveryClient
:服务发现客户端
构造时连接注册中心地址
提供
registryDiscovery()
方法:业务调用方向注册中心发现服务成员模块包含:
_discoverer
:服务发现器
_requestor
、_dispatcher
、_client
:同上3.
RpcClient
:RPC 核心客户端
构造参数
enableDiscovery
决定是否开启服务发现模式:
若为
true
:连接的是注册中心若为
false
:连接的是具体的服务提供者提供多种调用方式:
同步调用(返回
result
)异步 future 调用(返回
std::future
)异步 callback 调用(传入回调函数)
内部组合:
_discovery_client
:可选服务发现客户端
_caller
:RPC 调用管理器
_requestor
、_dispatcher
、_client
:同样是基础通信组件
在构建rpc客户端时,我们用长连接还是短连接?
1.当客户端调用call()请求对应方法时,RpcClient 内部调用 DiscoveryClient 进行服务发现 向 注册中心 Registry Server 发送服务发现请求
2.注册中心再返回对应方法的提供者主机地址。
3.RpcClient 用RR轮询从中选出来一个地址创建rpc client客户端并连接对应方法的提供者主机
4.服务提供者 Provider 接收到请求处理完返回结果给客户端
5.客户端回调触发,返回响应给用户。
短链接:创建一个rpc client客户端对象,连接服务提供者,进行rpc调用,调用结束后就销毁关闭客户端。
✅ 优点:
实现简单,按需连接、按需释放;
没有资源长期占用问题。
❌ 缺点:
性能差:每次调用都要建立和销毁 TCP 连接,连接成本高;
不利于高频 RPC 场景;
异步处理时管理麻烦:可能连接刚断,回调结果还没处理。
长连接:调用完后并不会销毁关闭客户端,而是将客户端放入连接池。后续还需要访问该主机的该方法,就会从连接池中找到原本的客户端对象,进行rpc调用。若该主机的该服务下线,需要从池中删除对应客户端连接。
✅ 优点:
高性能:避免频繁连接/断开,尤其是重复调用同一服务时;
适合高并发、低延迟系统。
❌ 缺点:
管理复杂,需要处理:
服务下线、连接失效的自动剔除;
异步/并发安全;
池容量、连接空闲策略等。
在我们这个项目中我们选择长连接。
主要还是短连接异步处理时管理麻烦。
短链接,客户端进行完rpc调用就会关闭,后面服务提供者返回结果给客户端,客户端没了收到收到应答的回调函数onResponse 也就不能把结果设置道promise中,上层futrue就不能就绪。
回调不触发,业务逻辑“卡住”
尤其是你用std::future
或promise
等异步等待对象,结果永远收不到。触发回调时访问已被释放的连接对象,导致崩溃
比如回调中引用了RpcClient
或connection
,但连接已经析构。异步响应结果丢失,日志无记录,bug 难排查
你会觉得“调用失败了但程序没报错”,其实是 TCP 在你没注意时被关掉了。异步+短连接的问题在于连接的生命周期和异步结果不一致,导致回调无法安全执行。
解决方法:在rpc调用结束后不关闭客户端,而是设置一个回调函数,确保收到收到响应处理完再关闭客户端。
#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"namespace wws
{
namespace client
{//服务注册客户端class RegistryClient{public:using ptr=std::shared_ptr<RegistryClient>;//构造函数传入注册中心的地址信息 用于连接注册中心RegistryClient(const std::string&ip,int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务注册接口bool registryMethod(const std::string&method, Address&host){return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//服务发现客户端class DiscoveryClient{public:using ptr=std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息 用于连接注册中心DiscoveryClient(const std::string&ip,int port,const Discoverer::OfflineCallback &cb):_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor,cb)),_dispatcher(std::make_shared<Dispatcher>()){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//当注册中心向客户端进行上线/下线通知时触发的回调函数auto req_cb=std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE,req_cb);//消息回调函数 所有收到的消息,统一交由 Dispatcher::onMessage 分发给对应 handlerauto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool registryDiscovery(const std::string&method, Address&host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//rpc客户端class RpcClient{public:using ptr=std::shared_ptr<RpcClient>;// enableDiscovery--是否启用服务发现功能 也决定了传入地址信息是注册中心地址 还是提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<wws::client::RpcCaller>(_requestor)){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//1.如果启用的服务发现 地址信息是注册中心的地址 是服务发现客户端需要连接的地址 则通过地址信息实例化discover_client//需要经过服务发现获取提供者address再获取对应的clientif(_enableDiscovery){//设置服务下线回调auto offline_cb=std::bind(&RpcClient::delClient,this,std::placeholders::_1);_discovery_client=std::make_shared<DiscoveryClient>(ip,port,offline_cb);}//2.如果没有启用服务发现 则地址信息是服务提供者的地址 则直接创建客户端实例化好rpc_client//直接根据提供的ip+port创建对应的clientelse{auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_rpc_client=ClientFactory::create(ip,port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}//1.同步bool call( const std::string &method,const Json::Value ¶ms, Json::Value &result){//获取clientBaseClient::ptr client=getClient(method);if(client.get()==nullptr)return false;//通过客户端连接 发送rpc请求return _caller->call(client->connection(),method,params,result);}//2.异步futurebool call( const std::string &method,const Json::Value ¶ms, std::future<Json::Value> &result){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, result); }//3.异步回调bool call(const std::string &method,const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, cb); }private://创建clientBaseClient::ptr newClient(const Address &host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();//添加到连接池putClient(host,client);return client;}//根据address从连接池查找client 没有返回空BaseClient::ptr getClient(const Address&host){std::unique_lock<std::mutex> lock(_mutex);auto it=_rpc_clients.find(host);if(it==_rpc_clients.end()){return BaseClient::ptr();}return it->second;}//根据method获取client://1.method->服务发现->获取目标Address->从连接池获取/没有直接创建//2.用户传入的ip+port->直接获取已经创建的clientBaseClient::ptr getClient(const std::string method){//1.服务发现获取的ip+port BaseClient::ptr client;if(_enableDiscovery){//1.通过服务发现 获取服务提供者地址信息Address host;bool ret=_discovery_client->registryDiscovery(method,host);if(ret==false){ELOG("当前%s服务 没找到服务提供者",method.c_str());return BaseClient::ptr();}//2.查看连接池中是否有对应的客户端 有就直接用 没有就创建client=getClient(host);if(client.get()==nullptr){client=newClient(host);}}//2.用户提供的ip+port创建的clientelse{client=_rpc_client;}return client;}void putClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host,client));}void delClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash{size_t operator()(const Address&host){std::string addr=host.first+std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;//网络服务发现 用户传方法名method client用方法名找提供者地址进行连接RpcCaller::ptr _caller;Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//未启用服务发现// RpcClient rpc(false, "127.0.0.1", 8080);// rpc.call("Add", ...); // 直接用 _rpc_client 调用std::mutex _mutex;//<"127.0.0.1",client1>std::unordered_map<Address,BaseClient::ptr,AddressHash>_rpc_clients;//服务发现的客户端连接池// RpcClient rpc(true, registry_ip, port);// rpc.call("Add", ...); // 自动发现、自动连接、自动发请求};
}
}
unordered_map<>自定义类型作key
我们用哈希表来管理客户端连接池时:
我们知道在哈希表中是通过key值找到对应的val值的,但并不是直接用我们传过去的数当key,需要进行哈希值计算。(1.int size_t bool直接强转 2.char* string
h = h * 131 + static_cast<unsigned char>(c); // 类似 BKDR hash)
而库中实现了string、int、float 等基本类型的哈希值计算,但这个Address pair<string,int>是个自定义类型,需要我们自己重载哈希值计算。其实就算把string+int融合成string,再套用库中对string类型计算的哈希函数。
STL中 pair 的哈希组合方法
template <typename T1, typename T2> struct hash<std::pair<T1, T2>> {size_t operator()(const std::pair<T1, T2>& p) const {size_t h1 = std::hash<T1>()(p.first);size_t h2 = std::hash<T2>()(p.second);return h1 ^ (h2 << 1); } };
操作 目的 std::hash<X>()
获取单个字段的 hash 值 << 1
左移扰动哈希位,使两个字段 hash 分布更开 ^
异或混合两个 hash,避免简单叠加导致冲突 效果 更均匀、稳定、不易冲突的哈希组合值
既然选择长连接+连接池的做法,那就要处理当服务下线时 在连接池中删除对应的client
怎么做?设置回调函数,在服务下线时进行调用。
1.RpcClinet初始化DiscoveryClient时传入回调函数
2.DiscoveryClient 传给-> Discoverer
3.Discoverer再把回调函数cb设置到成员变量中
4.Client::onServiceRequest处理服务下线时,除了删除该方法中下线的主机地址Address,还要删除连接池中连接它的client
二.服务端rpc_server (包含注册中心服务端 rpc服务端)
🌐 服务端实现业务功能:
提供 RPC 服务
服务注册与发现机制中提供者的管理(服务注册)和消费者的管理(服务发现)
📦 封装的三类服务端组件:
RPC 服务端
负责接收并处理 RPC 请求。
注册中心服务端
负责管理服务提供者与消费者的注册信息。
发布订阅服务端(后续实现)
用于实现基于事件的通信机制(如 pub-sub 模式),暂未实现。
🛠 实现细节说明:
1. 注册中心服务端
是一个纯粹的服务端,用于管理提供者和消费者信息。
核心功能是处理服务注册与发现的请求。
2. RPC 服务端
实际由两部分组成:
RPC 服务端:用于接收和响应 RPC 请求。
服务注册客户端:启动后自动连接注册中心,并将自己能提供的服务注册上去。
#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"#include <set>
namespace wws
{
namespace server
{//服务注册服务端class RegistryServer{public:using ptr=std::shared_ptr<RegistryServer>;RegistryServer(int port):_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = wws::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb=std::bind(&RegistryServer::onConnShutdown,this,std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr&conn){_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};//rpc服务端class RpcServer{public:using ptr=std::shared_ptr<RpcServer>;RpcServer(const Address&access_addr,bool enableRegistry=false,const Address®istry_server_addr=Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<wws::server::RpcRouter>()),_dispatcher(std::make_shared<wws::Dispatcher>()){//启用服务注册if(_enableRegistry){_reg_client=std::make_shared<client::RegistryClient>(registry_server_addr.first,registry_server_addr.second);}//成员server是一个rpcserver 用于提供rpc服务auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, rpc_cb);//创建一个监听指定端口(如 8080)的 RPC 服务器对象_server = wws::ServerFactory::create(access_addr.second);//默认监听 0.0.0.0:port 我监听所有可用 IPauto message_cb = std::bind(&wws::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service)//服务描述类{if(_enableRegistry){_reg_client->registryMethod(service->method(),_access_addr);//把 method->本主机地址 发给注册中心 表示自己可以提供method方法}_router->registerMethod(service);//本地注册 把服务描述service注册到RpcRouter中的服务管理类 onRpcRequest接收到客户端请求时进行路由分发调用}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;//自己的对外服务地址(客户端要连接我就来这)client::RegistryClient::ptr _reg_client;//注册客户端,用于连接注册中心并注册本地服务RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}
注册中心 服务端 客户端代码测试
1.先启动注册中心服务端 并设置port. 处理服务注册请求 服务发现请求 以及服务上下线通知
2.再启动rpc服务端 ,rpc服务端可以通过_reg_client注册中心客户端进行服务注册,但需要启动服务注册_enableRegistry=ture,提供注册中心客户端的Address。
还可以注册本地服务方法,内部注册到
RpcRouter
路由器中,用于请求到来时快速查找并调用对应的回调函数。3.启动rpc客户端,rpc客户端有两种连接方式 启动服务发现 根据方法名向注册中心查询服务地址,然后连接对应的服务端。不启动 直接连用户传入的服务提供者的地址
用户再调用call 通过客户端连接 发送rpc请求
rpc_server.cpp
#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{//build生产一个 服务描述类 函数名 + 参数类型+结果类型 + 函数地址(进行回调)std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//wws::Address("127.0.0.1",9090) 监听9090 在9090端口提供服务 true 表示启动服务注册 "127.0.0.1",8080 注册中心的地址信息,创建client用于连接注册中心wws::server::RpcServer server(wws::Address("127.0.0.1",9090),true,wws::Address("127.0.0.1",8080));//server.registerMethod(desc_factory->build());server.start();return 0;
}
registry_sever.cpp
#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"int main()
{//实例化服务端wws::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}
rpc_client.cpp
#include "../../client/rpc_client.hpp"
#include "../../common/detail.hpp"
#include <thread>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{wws::client::RpcClient client(true,"127.0.0.1",8080);//1.同步调用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=client.call("Add",params,result);//client内找对应Add方法对应提供者的连接if(ret!=false){DLOG("result: %d", result.asInt());}//2.异步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=client.call("Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回调params["nums1"]=55;params["nums2"]=66;ret = client.call("Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}
八.发布订阅服务端实现rpc_topic
1. Dispatcher 模块(右上角)
作用:
JSON-RPC 框架中的请求分发核心
判断 RPC 消息类型 → 调用对应业务模块(如 PubSubManager)
Dispatcher::registerMethod("topic", &PubSubManager::onTopicRequest)
2. onTopicRequest 回调函数
位置:
PubSubManager
中的统一入口函数
作用:
接收来自 Dispatcher 的请求
根据操作类型调用对应处理函数:
类型 功能函数调用 创建主题 topicCreate()
删除主题 topicRemove()
订阅主题 topicSubscribe()
取消订阅 topicCancel()
发布消息 topicPublish()
3. PubSubManager 核心模块(图中心)
职责:
管理两个核心 map
操作对应的
Topic
和Subscriber
数据结构std::unordered_map<std::string, Topic::ptr> _topics; std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
4.Subscriber 结构(图左下)
struct Subscriber {BaseConnection::ptr conn;std::unordered_set<std::string> topics; };
图示结构:
每个订阅者关联一个连接
还记录了自己订阅的所有主题名
5. topic 结构(图左上)
struct Topic {std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; };
图示结构:
每个主题有一个名称
内部维护一组订阅它的订阅者指针
主要用途:
当消息发布到该主题时: → 遍历
set<subscriber>
并调用conn->send(msg)
6. 两张 map 映射关系(图中左)
map<topic_name, topic> map<Connection, Subscriber>
构成了典型的双向映射系统:
topic_name -> topic -> subscribers
conn -> subscriber -> topics
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace wws
{
namespace server
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;TopicManager();// 给Dispathcer模块进行服务上线下线请求处理的回调函数void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {TopicOptype topic_optype=msg->optype();bool ret=true;switch(topic_optype){//主题创建case TopicOptype::TOPIC_CREATE: topicCreate(conn,msg);break;//主题删除case TopicOptype::TOPIC_REMOVE: topicRemove(conn,msg);break;//主题订阅case TopicOptype::TOPIC_SUBSCRIBE: topicSubscribe(conn,msg);break;//取消主题订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn,msg);break;//主题消息发布case TopicOptype::TOPIC_PUBLISH: topicPublish(conn,msg);break;//返回应答 无效操作类型default: return errorResponse(conn,msg,RCode::RCODE_INVALID_OPTYPE);}if(!ret) return errorResponse(conn,msg,RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn,msg);}//一个订阅者在连接断开时的处理 删除其关联的数据void onShutdown(const BaseConnection::ptr&conn){//消息发布者断开连接 不处理; 消息订阅者断开连接需要删除管理数据//1.判断断开连接的是否是订阅者 不是直接返回Subscriber::ptr subscriber;//断开连接的订阅者对象std::vector<Topic::ptr> topics;//受影响的主题对象{auto it = _subscribers.find(conn);if (it == _subscribers.end())return;// 2.获取受影响的主题对象subscriber=it->second;for(auto&topic_name:subscriber->topics){auto topic_it=_topics.find(topic_name);if(topic_it==_topics.end())continue;topics.push_back(topic_it->second);}//3.从订阅者映射map中删除 订阅者_subscribers.erase(it);}//4.从对应主题对象中删除订阅者for(auto&topic:topics){topic->removeSubscriber(subscriber);}}private://错误响应void errorResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg,RCode rcode){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);//主题响应 msg_rsp->setRcode(rcode);conn->send(msg_rsp);}//注册应答void topicResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);conn->send(msg_rsp);}//创建主题void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);//构建一个主题对象 添加映射关系的管理std::string topic_name=msg->topicKey();//主题名称auto topic=std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name,topic));}//删除主题void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1.查看当前主题 有哪些订阅者 再从订阅者中删除主题信息//_topics[topic_name]->subscribers subscribers->topics订阅的所有主题名称//2.删除主题数据 _topics[topic_name]->Subscriber 主题名称和主题对象的映射关系std::string topic_name=msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//删除主题前 先找出订阅该主题的订阅者auto it=_topics.find(topic_name);if(it==_topics.end())return;subscribers=it->second->subscribers;_topics.erase(it);//删除主题名称->topic} for(auto&subscriber:subscribers)subscriber->removeSTopic(topic_name);}// 主题订阅bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1.先找出主题对象topic 订阅者对象subscriber// 没有主题对象就报错 没有订阅者对象就构建Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}// 2.在主题对象中 新增一个订阅者对象管理的连接; 在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}}//取消主题订阅void topicCancel(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){// 1.先找出主题对象topic 订阅者对象subscriberTopic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end())topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end())subscriber = sub_it->second;// 2.在主题对象中 删除当前订阅者对象管理的连接; 在订阅者对象中删除对应订阅的主题if(subscriber) subscriber->removeSTopic(msg->topicKey());if(subscriber && topic) topic->removeSubscriber(subscriber);}}// 主题发布bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic=topic_it->second;}topic->pushMessage(msg);return true;}private:// 每个客户端连接会对应一个订阅者对象,记录它当前订阅了哪些主题struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics; // 订阅者订阅的主题名称Subscriber(const BaseConnection::ptr &c): conn(c){}// 增加主题void appendTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}// 删除主题void removeSTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};// 每个主题包含一个主题名 + 当前所有的订阅者struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; // 当前主题订阅者Topic(const std::string &name): topic_name(name){}// 增加订阅者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}// 删除订阅者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}// 给该主题的所有订阅者发消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : subscribers){subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};
}
}
九.发布订阅客户端实现rpc_topic
一、发布订阅客户端的角色划分
消息发布客户端
创建主题
删除主题
发布消息(向某个主题发布)
消息订阅客户端
创建主题
删除主题
订阅某主题的消息
取消订阅某主题
二、整体模块设计思路
对外的五个操作接口
针对“主题”的操作,包括:
创建
删除
订阅
取消订阅
发布
对外的一个消息处理接口
提供给 dispatcher 模块,进行消息分发处理
相当于 dispatcher 收到消息发布请求后,查找有哪些订阅者,并调用对应的回调函数将消息推送过去
内部的数据管理
管理“主题名称”与“消息处理回调函数”的映射关系
#pragma once
#include"requestor.hpp"namespace wws
{
namespace client
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;using SubCallback=std::function<void(const std::string &key,const std::string&msg)>;//主题创建bool create(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_CREATE);}//删除bool remove(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_REMOVE);}//订阅主题 SubCallback收到主题新消息的进行的回调bool subscribe(const BaseConnection::ptr&conn,const std::string &key,const SubCallback&cb){//当我们订阅了主题 可能发布者会马上发布该主题的内容 //这时候如果cb还没有设置到map中就无法执行回调函数 所以先设置回调函数到map中addSubscribe(key,cb);bool ret=commonRequest(conn,key,TopicOptype::TOPIC_SUBSCRIBE);if(ret==false){//请求发送失败 删除map中对应的cbdelSubscribe(key);return false;}return true;}//取消订阅bool cancel(const BaseConnection::ptr&conn,const std::string &key){delSubscribe(key);return commonRequest(conn,key,TopicOptype::TOPIC_CANCEL);}//发布消息(向某个主题发布)bool publish(const BaseConnection::ptr&conn,const std::string &key,const std::string &msg){return commonRequest(conn,key,TopicOptype::TOPIC_PUBLISH,msg);}// 当收到服务端推送的消息时调用,触发对应订阅者的回调处理逻辑 (设置给dispatcher收到对应主题消息 进行回调处理)void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){//1.先判断该消息的操作类型是否为 发布消息auto type=msg->optype();if(type!=TopicOptype::TOPIC_PUBLISH){ELOG("收到了错误类型的主题操作");return;}//2.取出主题名称 以及消息内容 后面调用cbstd::string topic_key=msg->topicKey();std::string topic_msg=msg->topicMsg();//3.调用cbauto callback=getSubscribe(topic_key);if(!callback){ELOG("收到了%s主题信息 但该主题无对应回调",topic_key.c_str());return;}callback(topic_key,topic_msg);}private:void addSubscribe(const std::string &key,const SubCallback&cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key,cb));}void delSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback& getSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);auto it=_topic_callbacks.find(key);if(it==_topic_callbacks.end())return SubCallback();return it->second;}bool commonRequest(const BaseConnection::ptr&conn,const std::string &key,TopicOptype type,const std::string &msg=""){//1.构造请求对象 并填充数据auto msg_req=MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setTopicKey(key);msg_req->setOptype(type);if(type==TopicOptype::TOPIC_PUBLISH)msg_req->setTopicMsg(msg);//2.向服务端发送请求 等待响应BaseMessage::ptr msg_rsp;//发请求 + 等响应 + 反序列化响应 + 返回响应对象msg_rspbool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("主题创建请求失败");return false;}//3.判断请求处理是否成功auto topic_rsp_msg=std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if(!topic_rsp_msg){ELOG("主题响应 向下类型转换失败");return false;}if(topic_rsp_msg->rcode()!=RCode::RCODE_OK);{ELOG("主题创建请求出错:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}return true;}private:std::mutex _mutex;//根据主题查找对应的回调函数执行std::unordered_map<std::string,SubCallback> _topic_callbacks;Requestor::ptr _requestor;};
}
}
十.topicServer topicClient封装