C++ Json-Rpc框架-3项目实现(2)

 一.消息分发Dispatcher实现

Dispatcher 就是“消息分发中枢”:根据消息类型 MType,把消息派发给对应的处理函数(Handler)执行。

初版:

#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;void registerHandler(MType mtype,const MessageCallback &handler){std::unique_lock<std::mutex> lock(_mutex);_handlers.insert(std::make_pair(mtype,handler));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息类型对应的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second(conn,msg);return;}//没有找到指定类型的处理回调 但我们客户端和服务端都是我们自己设计的 因此不可能出现这种情况ELOG("收到未知类型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,MessageCallback> _handlers;};
}

使用方法 :服务端为例

#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"void onRpcRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Rpc请求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::RpcResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);rpc_rsp->setResult(33);conn->send(rpc_rsp);
}
void onTopicRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Topic请求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::TopicResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);conn->send(rpc_rsp);
}int main()
{//server建立收到rpc topic请求时对应的调用函数auto dispatcher=std::make_shared<wws::Dispatcher>();dispatcher->registerHandler(wws::MType::REQ_RPC,onRpcRequest);//注册映射关系dispatcher->registerHandler(wws::MType::REQ_TOPIC,onTopicRequest);//注册映射关系auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}

回调函数调用过程:

1.服务端定义了两个函数onRpcRequest收到rpc请求的回调函数,onTopicRequest收到topic请求的回调函数(两个函数的参数部分都是一样的)。

2.创建Dispatcher类对象,调用registerHandler函数把请求类型(mtype)和对应的回调函数建立映射(因为上面回调函数的类型都是一样的,所以可以用map进行同一管理)。

3.把Dispatcher::onMessage函数设置成消息回调函数。在Dispatcher::onMessage函数内部会根据不同的消息类型,找到对应的回调函数并进行调用。

但是设置的两个回调函数中onRpcRequest,onTopicRequest。它们第二个参数类型都是基类BaseMessage,虽然传入的对象是子类对象RpcResponse TopicResponse,但仍无法访问它们对应子类的成员函数。

如果把第二个参数基类Base换成它们对应的子类,能访问了,但这就会导致函数的类型不一样了,就不能用map进行统一管理。

有没有什么办法即能用map进行统一管理,还能在回调函数中调用到子类的函数。

方法一:直接在回调函数中通过dynamic_cast / std::dynamic_pointer_cast将父类->子类

可以通过 dynamic_cast 将基类指针(或引用)转换为子类类型,以便访问子类特有的成员函数。前提是你使用的是多态(polymorphic)类,也就是基类至少要有一个虚函数

//Base基类 Derived子类
void test(Base* basePtr) {// 裸指针写法Derived* derivedPtr = dynamic_cast<Derived*>(basePtr);if (derivedPtr) {derivedPtr->specialFunction(); // 成功转换,调用子类函数} else {cout << "dynamic_cast failed!" << endl;}
}
//智能指针写法
std::shared_ptr<Base> basePtr = std::make_shared<Derived>();  // 用智能指针创建对象
std::shared_ptr<Derived> derivedPtr = std::dynamic_pointer_cast<Derived>(basePtr);  // 类型安全转换
项目裸指针写法智能指针写法
创建方式new Derived()std::make_shared<Derived>()
类型转换dynamic_cast<Derived*>(Base*)std::dynamic_pointer_cast<Derived>(shared_ptr<Base>)
返回值类型Derived*std::shared_ptr<Derived>
生命周期管理手动 delete自动释放,无内存泄漏风险
安全检查✅ 运行时类型检查,失败返回 nullptr✅ 同样运行时类型检查,失败返回空智能指针
是否影响引用计数❌ 不涉及引用计数✅ 新建了一个共享控制块引用
缺点说明
❌ 调用方需要知道消息类型调用者必须手动 dynamic_cast 到对应子类,否则不能访问子类内容
❌ 有一定运行时开销dynamic_cast 需要在运行时检查类型,会略有性能损失(但一般能接受)
❌ 容易出错如果类型错了,就返回 nullptr,还要额外判断、处理错误
❌ 易破坏封装上层代码要知道并显式转换为子类,增加了耦合度和类型暴露

方法二:模板 + 继承 + 多态

先看代码实现:
 

#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{//让统一的父类指针指向不同的子类对象//通过调用父类虚函数,调用不同子类onMessage类型转换(dynamic<>)完成后的函数调用class Callback{public:using ptr=std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)=0;};template<typename T>class CallbackT:public Callback{public:using ptr=std::shared_ptr<CallbackT<T>>;//根据消息类型重新定义出函数类型using MessageCallback =std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>;CallbackT(const MessageCallback &handler):_handler(handler){}void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)override{auto type_msg=std::dynamic_pointer_cast<T>(msg);_handler(conn,type_msg);}private:MessageCallback  _handler;};class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;template<typename T>                 //加typename表示这是一个类型void registerHandler(MType mtype,const typename CallbackT<T>::MessageCallback &handler)//传的是类对象 T是消息类型(BaseMessage子类){std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype,cb));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息类型对应的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second->onMessage(conn,msg);//调用类中的回调函数return;}//没有找到指定类型的处理回调 但我们客户端和服务端都是我们自己设计的 因此不可能出现这种情况ELOG("收到未知类型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,Callback::ptr> _handlers;//second是一个基类指针};
}

使用方法:

不用传BaseMessage基类,直接传子类。但在设置时指明Typed消息类型。

因为回调函数hadler类型不同,我们就用模板类根据T(消息类型)生成不同的类,里面是保存的是不同类型的回调函数。但根据模板类生成的类,类型不一样还是不能统一放入map中。

所以再定义一个父类 里面的回调函数设置为虚函数,这些不同的模板类作为子类继承父类并实现各自的回调函数。map中的handler存的不再是回调函数,而是一个父类指针,通过父类指针调用子类的回调函数。

具体过程:

  • 使用模板类 CallbackT<T> 根据消息类型 T 生成不同的子类,它们内部包装了各自类型的回调函数。(因为子类传入的是一个函数类型,保存的一个函数类型,而T是一个消息类型。还需要MessageCallback将T消息类型和回调函数类型绑定在一起)

  • 这些模板类继承自统一的基类 Callback,并重写了其 onMessage() 虚函数。

  • 当我们在调用registerHandler注册回调函数时,会创建一个CallbackT<T>的对象并获取其指针,最后设置到map中。

  • Dispatcher 中,map<MType, Callback::ptr> 存储的是基类指针,但实际指向的是不同子类 CallbackT<T> 的对象。

  • 当调用 onMessage() 时,基类指针会通过虚函数机制调用对应子类的实现,再通过 dynamic_pointer_cast 将消息转换为正确的类型,最终调用具体的业务回调函数

方法一和方法二对比:

对比点✅ 当前封装方式<br>(模板 + 多态)❌ 直接在每个 handler 里手动 dynamic_cast
💡 代码复用性回调逻辑封装在模板中,避免重复写类型转换代码每个回调都要重复写一次 dynamic_pointer_cast
✅ 类型安全编译器自动根据模板类型 T 限定传入的函数签名由开发者自己保证类型正确,容易写错
🎯 接口统一Dispatcher 接收统一的 BaseMessage::ptr,自动调用类型对应的 handler接口统一,但每次回调前都得“手动猜”消息类型
🧼 代码整洁性Handler 业务函数专注于处理业务,不掺杂类型转换代码handler 代码里混入了类型转换、错误判断等杂项
🔄 可扩展性新增消息类型只需 registerHandler<T> 一行,无需修改 dispatcher 逻辑每新增一种类型,都要写新回调 + 自己处理类型转换
🔒 类型封装类型转换封装在 CallbackT<T>::onMessage 内,调用者无感知显式暴露类型细节,破坏封装性
🧠 可维护性Dispatcher 管理逻辑集中、结构清晰回调函数多时,容易混乱、出错
相比直接在 handler 里 dynamic_cast,现在的设计通过模板和多态封装了类型转换逻辑,使回调函数 更简洁、更安全、更可维护,Dispatcher 也更具通用性和扩展性。

二.服务端-RpcRouter实现

组织和处理客户端发来的 RPC 请求,并调用对应的业务逻辑进行响应

这个模块主要由 4 个类构成:

类名作用
VType参数类型的枚举,例如整数、字符串、对象等
ServiceDescribe描述一个服务方法的参数、返回值、回调函数等
ServiceManager管理多个服务(增删查)
RpcRouter处理客户端发来的 RPC 请求,协调调用服务
#include "../common/net.hpp"
#include "../common/message.hpp"namespace wws
{
namespace server
{//枚举类 VType 定义参数与返回值的类型enum class VType{BOOL = 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};// 服务描述类class ServiceDescribe{public:using ptr=std::shared_ptr<ServiceDescribe>;using ServiceCallback=std::function<void(const Json::Value&,Json::Value&)>;using ParamDescribe=std::pair<std::string,VType>;//参数名称 类型ServiceDescribe(std::string &&method_name,ServiceCallback &&callback,std::vector<ParamDescribe>&& params_desc,VType return_type):_method_name(std::move(method_name)),_callback(std::move(callback)),_params_desc(std::move(params_desc)),_return_type(return_type){}//返回名称const std::string & method(){return _method_name;}//校验传入参数是否符合要求(1.字段完整 + 2.类型匹配)bool paramCheck(const Json::Value&params)//{"nums1",11}{for(auto&desc:_params_desc){//1.判断是否有该字段if(params.isMember(desc.first)==false){ELOG("没有 %s 参数字段",desc.first.c_str());return false;}//2.判断该字段类型是否正确if(check(desc.second,params[desc.first])==false){ELOG("%s参数字段类型错误",desc.first.c_str());return false;}}return true;}bool call(const Json::Value& params,Json::Value&result){_callback(params,result);if(rtypeCheck(result)==false){ELOG("回调处理函数中响应信息类型错误");return false;}return true;}private:// 判断return类型是否正确bool rtypeCheck(const Json::Value &val){return check(_return_type, val);}//判断val对象的类型是否和vtype一致 Json::Value兼容任何JSON类型(int、string、array、object 等)bool check(VType vtype,const Json::Value &val){switch(vtype){case VType::BOOL :return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name;//方法名称ServiceCallback _callback;//实际的业务回调函数std::vector<ParamDescribe> _params_desc;//参数字段格式描述vector<参数名称,对应的类型>VType _return_type;//结果类型描述 };//对比 直接在ServiceDescribe中set各参数 的优点:构造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。//若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁class SDescribeFactory{public:void setMethodName(const std::string&name){_method_name=name;}void setReturnType(VType vtype){_return_type=vtype;}void setParamDesc(const std::string &pname,VType vtype){_params_desc.push_back(ServiceDescribe::ParamDescribe(pname,vtype));}void setCallback(const ServiceDescribe::ServiceCallback&cb){_callback=cb;}ServiceDescribe::ptr build(){return std::make_shared<ServiceDescribe>(_method_name,_callback,_params_desc,_return_type);}private:std::string _method_name;//方法名称ServiceDescribe::ServiceCallback _callback;               // 实际的业务回调函数std::vector<ServiceDescribe::ParamDescribe> _params_desc; // 参数字段格式描述vector<参数名称,对应的类型>VType _return_type;                                       // 结果类型描述};//服务管理类 增删查class ServiceManager{public:using ptr=std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr&desc)//增{std::unique_lock<std::mutex> lock(_mutex);_service.insert(std::make_pair(desc->method(),desc));}ServiceDescribe::ptr select(const std::string &method_name)//查{std::unique_lock<std::mutex> lock(_mutex);auto it=_service.find(method_name);if(it==_service.end()){return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name)//删{_service.erase(method_name);}private:std::mutex _mutex;std::unordered_map<std::string,ServiceDescribe::ptr> _service;//函数名称 对应服务};class RpcRouter{public:using ptr=std::shared_ptr<ServiceDescribe>;//对注册到Dispatcher模块针对rpc请求进行回调处理的业务函数void onRpcRequest(const BaseConnection::ptr&conn,RpcRequest::ptr &request){//1.根据用户请求的方法描述 判断当前服务端能否提供对应的服务auto service=_service_manager->select(request->method());if(service.get()==nullptr){ELOG("未找到%s服务",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_NOT_FOUND_SERVICE);}//2.进行参数校验 确定能否提供服务if(service->paramCheck(request->params())==false){ELOG("%s服务参数校验失败",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INVALID_PARAMS);}//3.调用业务回调接口进行处理Json::Value result;bool ret=service->call(request->params(),result);if(ret==false){ELOG("%s服务参调用出错",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INTERNAL_ERROR);}//4.向客户端发送结果return response(conn,request,result,RCode::RCODE_OK);}//提供服务注册接口void registerMethod(const ServiceDescribe::ptr &service ){_service_manager->insert(service);}private://响应对象void response(const BaseConnection::ptr&conn,RpcRequest::ptr&req,const Json::Value&res,RCode rcode){auto msg=MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(wws::MType::RSP_RPC);msg->setRcode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};
}
}

ServiceDescribe:服务描述类

每一个服务方法(如 AddTranslate)都有一个 ServiceDescribe 对象,它记录:

  • 方法名 _method_name

  • 参数信息 _params_desc(字段名 + 类型)

  • 返回值类型 _return_type

  • 实际处理逻辑 _callback

 功能:

  • paramCheck():检查客户端传来的参数是否完整且类型匹配。

  • call():调用业务函数,处理请求,并校验响应类型,输出型参数Json::Value&result获取结果。

RpcRouter:RPC 请求核心调度器

处理流程(onRpcRequest):

1. 获取客户端请求的方法名:request->method()
2. 查找是否有对应的服务:_service_manager->select()
3. 参数检查:service->paramCheck()
4. 调用回调函数处理:service->call()
5. 构建响应消息:RpcResponse
6. 通过 conn->send(msg) 返回结果

注册流程(registerMethod()):

RpcRouter router;
router.registerMethod(service); // 将服务注册到服务管理器

为什么要用 SDescribeFactory 工厂模式而不是在 ServiceDescribe 中直接使用 setXxx() 方法进行设置ServiceDescribe的各个参数?

通过DescribeFactory 工厂模式,造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。

如果在ServiceDescribe 设置set(),若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁。

         客户端发送 RpcRequest↓RpcRouter::onRpcRequest()//进行处理↓_service_manager->select(method)//查找方法↓ServiceDescribe::paramCheck(params)//校验参数↓ServiceDescribe::call() → 执行业务逻辑//执行回调↓RpcRouter::response() → 发送响应

三.客户端-RpcRouter实现

🔹 1. describe(请求描述体)

  • 封装一个请求的基本信息:

    • request: 包含 RID(请求 ID)、MType(消息类型)、Body(请求体)

    • std::promise<response>:用于 future 异步

    • callback:用于 callback 异步

    • RType: 标识异步类型(ASYNC / CALLBACK)

🔹 2. Requestor::send(...)

  • send(connection, request, callback) → 异步 callback 模式

  • send(connection, request, std::future<response>) → future 模式

  • describe 存入 map<rid, describe> 中,等待响应回调

🔹 3. onResponse(connection, response)

  • 收到响应后,根据 RID 从 map<rid, describe> 查找对应的 describe

  • 按照 RType 分发:

    • 如果是 CALLBACK,调用 callback

    • 如果是 ASYNC,通过 promise.set_value(...) 实现 future 结果

🔹 4. Dispatcher<mt, handler>

  • MType 进行派发处理(主要针对订阅/通知类型的消息,不含 RID)

#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>namespace wws
{
namespace client
{class Requestor{public:using ptr=std::shared_ptr<Requestor>;using RequestCallback=std::function<void(const BaseMessage::ptr&)>;using AsyncResponse=std::future<BaseMessage::ptr>;//请求描述的结构体struct RequestDescribe{using ptr=std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;//请求消息指针RType rtype; //请求类型 异步/回调std::promise<BaseMessage::ptr> response;//用于 async 模式,设置结果set_value 给 future 返回值RequestCallback callback;//用于 callback 模式,响应到来时触发用户逻辑};//收到应答 根据rid找到对应的请求设置结果 或者 调用回调 void onReponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg){std::string rid=msg->rid();RequestDescribe::ptr rdp=getDescribe(rid);//根据id进行查找if(rdp.get()==nullptr){ELOG("收到响应%s,但是未找到对应的请求描述!",rid.c_str());return;}//异步请求把应答msg作为结果设置到promise中,让future就绪if(rdp->rtype==RType::REQ_ASYNC){rdp->response.set_value(msg);//promise.set_value(value); 手动设置值 让std::future<BaseMessage::ptr>变为就绪。}//回调请求 有回调函数就进行调用else if(rdp->rtype==RType::REQ_CALLBACK){if(rdp->callback) rdp->callback(msg);}elseELOG("请求类型未知");//收到应答 删除rid对应的请求描述delDescribe(rid);}//1.异步请求发送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,AsyncResponse&async_rsp){//创建请求描述对象(newDescribe内部完成插入map)RequestDescribe::ptr rdp=newDescribe(req,RType::REQ_ASYNC);if(rdp.get()==nullptr){ELOG("构建请描述对象失败");return false;}//get_future()关联std::future<>async_rsp 和 std::promise<>response//promise.set_value(value) 被调用,就能async_rsp.get()获取值async_rsp=rdp->response.get_future();return true;}//2.同步请求发送(发送完请求后,立刻调用get()阻塞等待set_value()设置后获取结果)//可以在上层进行get()阻塞等待,也是同样效果bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,BaseMessage::ptr&rsp){AsyncResponse rsp_future;bool ret=send(conn,req,rsp_future);if(ret==false) return false;rsp=rsp_future.get();//阻塞等待值就绪return true;}//3.回调请求发送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&rep,RequestCallback&cb){//创建请求描述对象(newDescribe内部完成插入map)RequestDescribe::ptr rdp=newDescribe(rep,RType::REQ_CALLBACK,cb);if(rdp.get()==nullptr){ELOG("构建请描述对象失败");return false;}conn->send(rep);return true;}private://1.新增RequestDescribe::ptr newDescribe(const BaseMessage::ptr&req,RType rtype,const RequestCallback&cb=RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);//构建请求描述对象 并插入到mapRequestDescribe::ptr rd=std::make_shared<RequestDescribe>();rd->request=req;rd->rtype=rtype;if(rtype==RType::REQ_CALLBACK&&cb)rd->callback=cb;_request_desc.insert(std::make_pair(req->rid(),rd));//插入到mapreturn rd;}//2.查找RequestDescribe::ptr getDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);auto it=_request_desc.find(rid);if(it==_request_desc.end()){return RequestDescribe::ptr();}return it->second;}//3.删除void delDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string,RequestDescribe::ptr> _request_desc;//id->请求消息};
}
}
用户发起请求│▼
Requestor::send(...)│▼
创建 RequestDescribe (含回调 or promise)│▼
加入 map<rid, describe>│▼
发送消息给服务器⬇(服务器响应)Requestor::onResponse(...)│▼找回 describe -> 判断 rtype├── CALLBACK → 执行 callback└── ASYNC    → set promise▼清除 map<rid, describe>

四.客户端RpcCaller实现

1. 构造函数 RpcCaller(const Requestor::ptr&)

  • 初始化传入一个 Requestor 实例;

  • Requestor 负责发送消息、注册回调、接收服务端响应等;

  • RpcCaller 是调用层,Requestor 是通信层。

2. 同步调用接口

bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,Json::Value& result)

1.先构建RpcRequest请求

2.调用requestor->同步send(),同步阻塞发送请求,并拿到rsp_msg。

3.将rsp_msg的Basemessage类型转换为 RpcResponse,取出正文结果 result();

3. 异步 Future 调用接口

bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,std::future<Json::Value>& result)
  • 创建 promise 对象,用于异步回填响应;

  • 通过 shared_ptr 管理 promise 生命周期,绑定到回调函数中;

  • 通过 _requestor->send(...) 注册异步回调;

  • 回调触发后由 Callback() 设置 promise.set_value()

  • 调用方使用返回的 future 进行 .get() 即可拿到结果。

4. 异步回调接口

bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,JsonResponseCallback& cb)
  • 直接注册用户定义的回调 cb 到请求;

  • 内部通过 Callback1() 做包装处理:

    • 类型转换为 RpcResponse

    • 错误处理

    • 最后调用用户传入的 cb(result) 传回结果。

#include "requestor.hpp"namespace wws
{
namespace client
{class RpcCaller{public:using ptr=std::shared_ptr<RpcCaller>;using JsonAsyncResponse=std::future<Json::Value>;using JsonResponseCallback=std::function<void(const Json::Value&)>;//requestor中的处理是针对BaseMessage进行处理的(因为要对所有请求进行处理,不单单对Rpc请求处理)//用于在rpc caller中针对结果的处理是对RpcResponse里边的result进行的RpcCaller(const Requestor::ptr &requestor):_requestor(requestor){}//1.同步调用  1.连接conn 2.方法名method 3.方法参数params 4.结果resultbool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value&params,Json::Value&result){//1.组织请求auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2.发送请求 因为send()是重载函数 参数的类型必须保持一致(req_msg子类RpcRequest->基类BaseMessage)bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg);if(ret==false){ELOG("同步Rpc请求失败");return false;}//3.等待响应 响应信息存放在rsp_msg此时是Base基类,需要转成RpcResponse应答auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return false;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc同步请求出错:%s",errReason(rpc_rsp_msg->rcode()));return false;}result=rpc_rsp_msg->result();//返回响应消息里面的正文return true;}//2.异步 Future 调用 向服务器发送异步回调请求 设置回调函数 //回调函数中传入一个promise对象 在回调函数中对promise设置数据//异步请求返回的是BaseMessage对象,用户想要的是message里面正文的结果Valuebool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value&params,std::future<Json::Value>&result){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// //这个json_promise对象是一个局部变量,等出了call作用域就会消失// //与它关联的future result在外部get()获取结果时,会异常// std::promise<Json::Value> json_promise;// result=json_promise.get_future();//future和promise建立关联 以后future.get()获取结果auto json_promise=std::make_shared<std::promise<Json::Value>>();result=json_promise->get_future();//std::bind() 对json_promise传值传参,shared_ptr引用计数 +1 此时引用计数==2//退出call作用域 引用计数-- 再等callback被触发完毕并释放后引用计数--,才会析构//shared_ptr引用计数是否加1,只和bind对json_promise指针的捕获方式有关,与函数的参数声明是否引用json_promise指针无关Requestor::RequestCallback cb=std::bind(&RpcCaller::Callback,this,json_promise,std::placeholders::_1);bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),cb);if(ret==false){ELOG("异步Rpc请求失败");return false;}return true;}//3.异步回调bool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value&params,JsonResponseCallback&cb){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("异步Rpc请求失败");return false;}return true;}private:void Callback1(const JsonResponseCallback&cb,const BaseMessage::ptr&msg){//先判断结果对不对auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc回调请求出错:%s",errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}//const BaseMessage::ptr&msg Request参数是拿到响应后传递的void Callback(std::shared_ptr<std::promise<Json::Value>>result,const BaseMessage::ptr&msg){auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc异步请求出错:%s",errReason(rpc_rsp_msg->rcode()));return ;}//promise.set_value()设置正文结果     result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};
}
}

为什么不让 Requestor::send() 直接处理 RpcRequest,而是让它只处理 BaseMessage,由 RpcCaller 来封装具体业务(如 RpcRequest/RpcResponse)?

1.因为要对所有请求进行处理,不单单对Rpc请求处理。还可以处理主题 服务等消息类型

2.解耦业务协议与通信通道,只做消息传递与回调处理。

模块职责
Requestor发送消息、注册和触发回调,处理响应派发
RpcCaller构造 RPC 请求、解析 RPC 响应,组织业务逻辑

为什么 bind 捕获 shared_ptr 能延长 promise 的生命周期?

shared_ptr+bind(值捕获)+外部拷贝保存

本质就是:

只要某个 shared_ptr 指向的对象,引用计数 不为 0,那它就不会被销毁

所以我们通过创建另一个生命周期更长的 shared_ptr(比如通过 bind() 值捕获并在外部保存),来延长这段资源的生命周期。

(不建议std::ref()引用捕获 如果外部shared_ptr_obj的被销毁,cb 里的引用就变成悬垂指针

auto cb = std::bind(&func, std::ref(shared_ptr_obj));

如果std::promise<Json::Value> json_promise;直接创建一个promise对象,这是一个局部对象等出了call函数(离开作用域),就会析构。

这就会导致与它关联的future result在外部get()获取结果时,会异常。

auto json_promise=std::make_shared<std::promise<Json::Value>>();创建promise对象并用shared指针来管理它,虽然出了call函数 引用计数-- 为0还是会析构。

但我们在bind绑定的时候,对json_promise进行了传值,拷贝了一份 shared_ptr 对象 进入 bind 内部的闭包,引用计数+1.

auto cb = std::bind(&RpcCaller::Callback,this,json_promise,   // 👈 这里复制了一份 shared_ptr,引用计数 +1std::placeholders::_1);

写成void Callback(const std::shared_ptr<std::promise<Json::Value>>&result,const BaseMessage::ptr&msg) 接收引用(不增加引用计数)。对bind传值引用计数+1无影响

如果是下面这种拷贝引用计数会再+1,但仅限Callback运行时,调用结束后立刻-1,只是暂时的。

所以说现在,在call函数中有被shared指针指向的promise对象引用计数为2,一个make_shared返回的,一个bind闭包值捕获的。

bind闭包对象cb(auto cb=std::bind(...)),它本质不也是一个局部对象吗?call函数结束不还是和另一个局部变量一样会销毁,promise的生命周期怎么会延长呢?

call函数中的cb是否销毁已经无关,因为在函数的外部已经拷贝保存了一份了。

它通常会被传出 call() 函数,交给 Requestor::send() 并被保存在异步响应回调表里!

send()会调用newDescibe,把闭包对象存入rd请求描述对象中,再插入到map进行统一管理。

(注意newDescribe虽然是const&获取的cb 不增加引用计数,但这根本无关紧要,因为rd->callback=cb;会把它拷贝到了 RequestDescribe::callback 中,这是才生命周期延长的关键)

等到onResponse收到应答并处理完,就delDescribe()删除对应请求描述信息,保存的回调函数cb也析构,里面保存的json_promise也会析构,所以说ptr+bind值捕获+外部保存

保证了 json_promise 会存活到 callback 被调用。

总结:

1.Callback(param) 中的 shared_ptr 是按值传递,因此会导致引用计数 +1,

但这个 +1 的生命周期仅限于函数执行期间,Callback() 结束后 param 被销毁,引用计数立即 -1。

2.而 bind(...) 捕获 shared_ptr 时(通过值捕获),会将其拷贝一份保存在闭包对象中(如 auto cb),

这会导致引用计数 +1,无论是否调用 Callback,引用计数都存在,直到闭包对象销毁为止

那闭包对象什么时候销毁呢?delDescribe()

3.需要注意的是,cb 本身是 call() 函数内的局部变量,但它并不会随着 call() 结束而失效。

虽然 call() 返回后 cb 变量本身会被销毁,但在此之前 cb 已经被作为参数传入 _requestor->send(),

并被内部保存到了 _request_desc 表中,作为 RequestDescribe 的一部分长期持有。

4.也就是说:call() 中定义的 cb 虽然是局部变量,但它在作用域结束前被拷贝并传出,生命周期已经被延长。

所以 cb 中捕获的 json_promise(shared_ptr)依然活着,call 函数中的 cb 就算析构,也不影响闭包内部捕获的 promise 的生命周期。

5.只有当服务端响应到达,回调被触发后,执行 cb(msg),再通过 delDescribe(rid) 删除请求描述对象rd保存的回调函数cb闭包进行析构,值捕获的json_promise被释放,引用计数==0 promise 被析构.

6.相比之下,如果只是创建了一个局部的 shared_ptr(比如 json_promise),没有通过 bind、lambda、线程等传出去,

它就会在函数结束时被立即析构,future 将失效,调用 .get() 会抛出异常(broken promise)。

✅ 因此,虽然 cb 是局部变量,但它在 call() 结束前被拷贝传出,生命周期被框架托管,保证了回调所需的 promise 安全存活。

这是异步通信框架中延长资源生命周期的关键机制,确保异步流程完整闭环。

局部变量当作参数传参,并不会延长它的生命周期; 它的生命周期仍然只属于它原来的作用域; 如果你想延长生命周期,必须使用 shared_ptr,并在外部再持有一份拷贝!

举个例子:

std::shared_ptr<int> create() {auto p = std::make_shared<int>(100);  // p 是局部变量return p;  // ✅ 返回值是“拷贝一份”,原来的 p 会被销毁,但返回值还持有控制块
}auto x = create(); // 返回值拷贝给 x,资源不会被销毁
情况局部变量会不会被销毁解释
普通局部变量(无指针托管)✅ 会作用域结束立即销毁
传值作为函数参数✅ 会参数是拷贝,原变量不延长
返回 shared_ptr原变量销毁,但返回值持有副本,资源不析构
shared_ptr 被 bind 捕获❌ 不会(+1)延长生命周期直到闭包结束

服务端测试代码

相较于之前实现的客户端,我们不再是直接给dispatcher传业务层函数,而是传RpcRouter的处理对应请求的回调函数,由该函数再从注册到RpcRouter中的具体实现函数中找用户需要的函数,并进行回调返回响应结果


#include "../common/message.hpp"
#include "../common/net.hpp"#include "../common/dispatcher.hpp"
#include "../server/rpc_router.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{auto router=std::make_shared<wws::server::RpcRouter>();std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//1.注册Add函数到RpcRouterrouter->registerMethod(desc_factory->build());//bind绑定RpcRouter收到消息的回调函数->cbauto cb=std::bind(&wws::server::RpcRouter::onRpcRequest,router.get(),std::placeholders::_1,std::placeholders::_2);auto dispatcher=std::make_shared<wws::Dispatcher>();//2.把RpcRouter回调函数onRpcRequest设置到dispatcher中dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC,cb);auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);//3.把Dispatcher的回调函数onMessage设置到server中server->setMessageCallback(message_cb);server->start();return 0;
}

从图示上可以看到,整个链路依次是:

  1. server 收到消息

  2. 调用 messageCallback

  3. 触发 onMessage 进入 dispatcher

  4. Dispatcher 发现消息类型是 REQ_RPC,调用 RpcRouter::onRpcRequest

  5. Router 找到 Add 方法并调用其回调函数

  6. Add 函数执行计算并返回结果

  • server->setMessageCallback(message_cb);

    • 这里的 message_cb 是通过 std::bind(&wws::Dispatcher::onMessage, dispatcher.get(), ...) 绑定的。

    • 当服务器收到任何消息时,就会调用 dispatcher->onMessage(...)

  • dispatcher->onMessage(...)

    • Dispatcher 根据消息的类型(这里为 REQ_RPC)找到先前注册的回调处理函数

    • 这一步对应代码 dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, cb);

    • 所以当消息类型匹配到 REQ_RPC 时,就执行 cb

  • cb -> std::bind(&wws::server::RpcRouter::onRpcRequest, router.get(), ...)

    • 这个 cb 本质上就是对 RpcRouter::onRpcRequest 的一次包装。

    • 调用 cb 时,实际上就是执行 router->onRpcRequest(...)

  • RpcRouter::onRpcRequest(...)

    • 在路由器里,根据请求中的“方法名称”(比如 "Add")找到对应的回调函数

    • 此处就是在之前 router->registerMethod(...) 时注册的 Add 方法回调。

  • 调用 Add(const Json::Value& req, Json::Value& rsp)

    • 最终执行我们自定义的逻辑(如取 nums1nums2,相加后存入 rsp)。

客户端测试代码

#include "../common/dispatcher.hpp"
#include "../client/requestor.hpp"
#include "../client/rpc_caller.hpp"
#include <thread>
#include <chrono>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{auto requestor=std::make_shared<wws::client::Requestor>();auto caller=std::make_shared<wws::client::RpcCaller>(requestor);auto dispatcher=std::make_shared<wws::Dispatcher>();auto rsp_cb=std::bind(&wws::client::Requestor::onResponse,requestor.get(),std::placeholders::_1,std::placeholders::_2);//wws::RpcResponse->wws::BaseMessage //rsp_cb绑定的函数参数为Requestor::onResponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg)//而registerHandler注册需要的函数类型 std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>; 第二个参数必须也为BaseMessage::ptr,所以T传BaseMessagedispatcher->registerHandler<wws::BaseMessage>(wws::MType::RSP_RPC,rsp_cb);auto client=wws::ClientFactory::create("127.0.0.1",9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();auto conn=client->connection();//1.同步调用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=caller->call(conn,"Add",params,result);if(ret!=false){DLOG("result: %d", result.asInt());}//2.异步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=caller->call(conn,"Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回调params["nums1"]=55;params["nums2"]=66;ret = caller->call(conn,"Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));client->shutdown();return 0;
}
  1. RpcCaller 调用 call("Add", params, rsp)

    • 用户在业务代码里直接写 caller->call(...),想要调用服务端的 “Add” 方法并等待结果。

  2. RpcCaller 内部执行 “AddDesc(...)”

    • call(...) 方法内部会先构造一个带唯一请求ID的 RpcRequest,然后调用 Requestor 的相关方法(示意中称作 AddDesc)来“登记”该请求:

      • 存储该请求ID

      • 记录调用类型(同步/异步/回调)

      • 如果是回调式,还会记录用户传入的回调函数

  3. 发送请求到服务端

    • Requestor 记完请求描述后,就会通过网络连接将 RpcRequest 发送给远程服务端。

  4. 服务端处理并返回 RpcResponse

    • 当服务端收到 “Add” 请求后,进行实际的加法运算或其他业务逻辑,然后打包 RpcResponse 返回给客户端。

  5. 客户端接收响应 -> 分发到 Requestor::onResponse()

    • 客户端网络层读到响应后,先通过 Dispatcher 分发,根据消息类型(如 RSP_RPC)找到之前绑定的回调,即 Requestor::onResponse(...)

    • Requestor::onResponse() 根据响应里的 “请求ID=111” 查到对应的“请求描述 (desc)”,确定是哪个请求、用什么方式处理(同步阻塞唤醒或执行用户回调等),并把结果交给调用方。

  6. RpcCaller 最终返回结果给用户

    • 对于异步调用 call(...),当响应到来时,onResponse() 会调用                        rdp->response.set_value(msg),把响应 msg 设置到 promise 中。用户future.get()获取结果。

    • 若是回调式调用,则 Requestor::onResponse()直接执行用户的回调函数,把结果带给用户。

Requestor 和 RpcCaller 的关系:

  • Requestor 负责管理请求–响应映射。它用一个请求描述(包含唯一 ID、请求类型和处理机制)来记录每次发送的请求。当响应到来时,根据请求 ID 查找描述,

    • 如果是异步请求(REQ_ASYNC),则调用 promise.set_value(msg) 使关联 future 就绪;

    • 如果是回调请求(REQ_CALLBACK),则直接调用用户注册的回调函数。

  • RpcCaller 则对外提供 RPC 调用接口。它负责构造 RPC 请求(设置方法名、参数、生成请求 ID),并调用 Requestor 的 send() 方法来登记请求并发送消息。用户可以选择同步(阻塞等待 future.get())、异步(返回 future)或回调式调用。

二者协同工作:RpcCaller 构造并发送请求,而 Requestor 负责匹配响应并将结果传递给上层。

RpcCaller用户调用的接口,用户传入要调用的对象 参数 方式,根据方式(同步 异步 回调)的不同选择不同的call进行调用,1.先根据参数构建请求消息2.调用Requester里面对应的send()。

Requester中send()会先构建请求描述对象(call传入的请求消息 请求类型...) 并建立请求-id间的映射(等收到应答时根据id找到对应的请求描述对象),再完成发送。

等服务端返回应答,Dispatcher根据消息的类型,找到client的Requestor中onResponse处理应答的回调函数,它根据id找到对应的请求描述,再根据请求描述中的类型,进行set_value设置结果或者callback()调用回调。最后删除该请求描述

set_value后,get()获取到结果,阻塞结束返回上层再返回到call进行检查并返回结果

RpcCaller::call() 的三种调用方式流程

同步调用 (call(conn, method, params, result))

客户端调用call,把连接 函数名 参数 用于获取结果的Value对象

进入同步调用call. 先根据传入的参数组织请求(里面设置了请求类型REQ_RPC)

        调用Requester中send()发送请求

 using AsyncResponse=std::future<BaseMessage::ptr>;先创建一个future对象用于后面get()阻塞获取结果。再调用异步send(),因为异步和同步的区别只是在于用户什么时候get()阻塞获取结果,异步+立刻get()==同步。

再看看异步send()

先调用newDescribe,里面会创建请求描述对象(传入回调函数会设置回调函数)和UUID建立映射关系用map管理起来。

conn->send()发送请求。

之后服务端进行处理,返回应答,server::messageCallback->Disoatcher::onMessage->根据请求类型找到对应的回调函数,RSP_RPC对应的就是requestor::onResponse()处理应答的回调函数。

 进入onResponse,先根据UUID找到对应的请求描述,根据请求描述的类型,看是异步(同步里面调用的异步),还是回调,进行相对的处理。

是同步,就set_value设置结果。


设置完结果,futrue就绪get()获取结果

上层的result就获取到了结果,进行输出。

RpcCaller::call(conn, method, params, result)  // 同步调用└──> Requestor::send(conn, req, rsp)       // 调用同步版本 send()└──> send(conn, req, rsp_future)     // 调用异步 Future 版本 send()└──> 创建 RequestDescribe 并存储└──> 阻塞等待 future.get()└──> Dispatcher::onMessage(conn, msg)└──> Requestor::onResponse(conn, msg)└──> rdp->response.set_value(msg) // future.set_value() 解除阻塞└──> 解析 RpcResponse 并返回 result  // get() 获取结果并返回

经过的关键函数:

  • RpcCaller::call(conn, method, params, result)

  • Requestor::send(conn, req, rsp)

  • Requestor::send(conn, req, rsp_future) (异步版本)

  • Dispatcher::onMessage(conn, msg)

  • Requestor::onResponse(conn, msg)

  • rdp->response.set_value(msg)

  • future.get() 解除阻塞

异步 Future 调用 (call(conn, method, params, future))

不传Json::Value result直接获取结果,std::future<Json::Value> res_futre,让用户自己get()获取结果。

result=json_promise->get_future();管理promise,set_value后用户就可以get()获取结果。

还绑定了Callback,传入回调函数cb,创建请求描述对象时设置回调函数cb

此时设置的类型为REQ_CALLBACK,收到应答,找到对应请求描述,会调用设置的回调函数cb

cb bind绑定的是Callbcak函数 它会set_value设置结果,让用户的future就绪,get()获取结果

RpcCaller::call(conn, method, params, future)  // 异步 Future 调用└──> 创建 RpcRequest 并设置参数└──> 创建 std::promise<Json::Value> 和 future 关联└──> 绑定 Callback (关联 promise 和 result)└──> Requestor::send(conn, req, cb)       // 传入回调函数 cb,onResponse() 解析后触发└──> 创建 RequestDescribe 并存储└──> 发送请求,不阻塞└──> Dispatcher::onMessage(conn, msg)      // 收到服务端响应└──> Requestor::onResponse(conn, msg)└──> 通过 rid 查找 RequestDescribe└──> Callback 触发 set_value(msg) // 解析结果并设置 promise└──> future.get() 解除阻塞,获取结果       // 用户上层调用 future.get() 阻塞获取结果

经过的关键函数:

  • RpcCaller::call(conn, method, params, res_future)

  • Requestor::send(conn, req, cb)

  • Requestor::onResponse(conn, msg)

  • rdp->callback(msg)

  • Callback 解析 msgset_value()

  • res_future.get() 解除阻塞

异步回调调用 (call(conn, method, params, cb))

回调,不直接设置结果,而是调用用户传来的函数(callback)。

call异步回调,和异步futrue一样设置回调函数Callback1.(但回调函数不同)

 也是调用的这个send() 接下来也是 调用请求描述对象中设置的回调函数即Callback1 而Callback1不和Callback一样set_value设置结果,而是调用上层传来的函数,它返回结果给用户。

RpcCaller::call(conn, method, params, cb)     // 异步回调调用└──> 创建 RpcRequest 并设置参数└──> 绑定 Callback1└──> Requestor::send(conn, req, cb)       // 调用异步回调版本 send()└──> 创建 RequestDescribe 并存储└──> 发送请求,不阻塞└──> Dispatcher::onMessage(conn, msg)      // 收到服务端返回消息└──> Requestor::onResponse(conn, msg)└──> 通过 rid 查找 RequestDescribe└──> Callback1 触发用户自定义的 cb(msg)└──> 用户自定义的回调函数解析结果
  • RpcCaller::call(conn, method, params, cb)

  • Requestor::send(conn, req, cb)

  • Requestor::onResponse(conn, msg)

  • rdp->callback(msg) 触发 Callback1

  • 用户自定义的 callback(msg) 解析结果

五.注册中心---服务端rpc_registry

服务端如何实现服务信息的管理:

服务端需要1.提供注册服务2.发现的请求业务处理

1.需要将 哪个服务 能够由 哪个主机提供 管理起来 hash<method,vector<provide>>

        进行服务发现时,能返回谁能提供指定服务

2.需要将 哪个主机 发现过 哪个服务 管理起来

        当进行服务通知的时候,能通知给对应发现者 <method,vector<discoverer>>

3.需要 哪个连接 对应 哪个服务提供者 管理起来 hash<conn,provider>

        当一个连接断开时 能知道哪个主机的哪些服务下线了,然后才能给发现者通知xxx的xxx服务下线了。

4.需要 哪个连接 对应 哪个服务发现者 管理起来 hash<conn.discoverer>

        当一个连接断开时 如果有服务上下线 就不需要给它进行通知了

  • 1️⃣ ProviderManager(服务提供者管理)

  • 维护服务提供者信息,进行服务注册、删除和查询。

  • 提供 addProvider()delProvider()getProvider()methodHosts() 等方法。


  • 2️⃣ DiscovererManager(服务发现者管理)

  • 维护服务发现者信息,进行服务发现、删除和通知。

  • 提供 addDisecoverer()delDisoverer()onlineNotify()offlineNotify() 等方法。


  • 3️⃣ PDManager(核心管理器)

  • 处理服务请求、注册、发现、上线/下线通知以及连接断开后的清理逻辑。

  • 提供 onServiceRequest()onConnShutdown() 等核心逻辑。

  • 处理服务的响应,包括:

    • registryResponse():服务注册应答

    • discoverResponse():服务发现应答

    • errorResponse():错误处理

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set> 
namespace wws
{
namespace server
{//服务提供者class ProviderManager{public:using ptr=std::shared_ptr<ProviderManager>;struct Provider{using ptr=std::shared_ptr<Provider>;std::mutex _mutex; BaseConnection::ptr conn;Address host; //主机信息ip+portstd::vector<std::string> methods; //所有提供的方法Provider(const BaseConnection::ptr&c,const Address&h):conn(c),host(h){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//新的服务提供者进行服务注册void addProvider(const BaseConnection::ptr&c,const Address &h,const std::string&method){Provider::ptr provider;{std::unique_lock<std::mutex> lock(_mutex);auto it=_conns.find(c);//找连接对应的提供者if(it!=_conns.end()){provider=it->second;}//找不到就创建 并新增连接->提供者else{provider=std::make_shared<Provider>(c,h);_conns.insert(std::make_pair(c,provider));}//method方法被哪些提供者提供 增加提供者auto &providers=_providers[method];providers.insert(provider);}//提供者内更新记录 能提供的方法provider->appendMethod(method);}//服务提供者断开连接时 获取它的信息 用于服务下线通知Provider::ptr getProvider(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return Provider::ptr();return it->second;}//服务提供者断开连接时 删除它的关联信息void delProvider(const BaseConnection::ptr&c)//连接->提供者->所有提供方法{std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;for(auto&method:it->second->methods)//找到该服务提供者的所有方法{auto&providers=_providers[method];//根据方法 从提供该方法的提供者中再进行删除//1.providers容器是vectro版本// //手动查找并按迭代器删除 providers.erase(it->second)// //但是 erase(it->second) 并不会起到按值删除的作用,因为 erase() 按值删除时,只有 std::vector 在 C++20 之后才引入 erase 和 erase_if// auto provider_it = std::find(providers.begin(), providers.end(), it->second);// if (provider_it != providers.end())// {//     providers.erase(provider_it);// }//set 直接支持 erase(it->second)providers.erase(it->second);}//删除连接与服务提供者的关系_conns.erase(it);}//返回 method对应的提供者std::vector<Address> methodHosts(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end())return std::vector<Address>();std::vector<Address> result(it->second.begin(), it->second.end());return result;}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Provider::ptr>> _providers;//方法->提供者主机std::unordered_map<BaseConnection::ptr,Provider::ptr> _conns;//连接->提供者}; //服务发现者class DiscovererManager{public:using ptr=std::shared_ptr<DiscovererManager>;struct Discoverer{using ptr=std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn;//发现者关联的客户端std::vector<std::string> methods;//发现过的服务Discoverer(const BaseConnection::ptr&c):conn(c){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//当客户端进行服务发现的时候新增发现者 新增服务名称?Discoverer::ptr addDisecoverer(const BaseConnection::ptr&c,const std::string &method){Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);//找连接对应的服务发现者auto it=_conns.find(c);if(it!=_conns.end()){discoverer=it->second;}else{discoverer=std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c,discoverer));}//method方法被哪些发现者发现了 增加发现者auto &discoverers=_discoverers[method];discoverers.insert(discoverer);}//在发现者中 增加已经发现的方法discoverer->appendMethod(method);return discoverer;}//发现者不需要被get() 发现者下线不需要通知 所以不需要进行get()获取对象后进行下线通知//发现者客户端断开连接时 找到发现者信息 删除关联信息void delDisoverer(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;//找到发现过的方法for(auto&method:it->second->methods){//从发现过method的所有发现者 中找要进行删除的发现者auto&discoverers=_discoverers[method];discoverers.erase(it->second);}//删除conn->discoverer_conns.erase(it);}//新的服务提供者上线 上线通知void onlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_ONLINE);}//服务提供者断开连接 下线通知void offlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method,const Address&host,wws::ServiceOptype optype){std::unique_lock<std::mutex> _mutex;//先判断该方法有没有被人发现过 auto it=_discoverers.find(method);//没有就不用进行任何处理if(it==_discoverers.end())return;//对发现过该方法的发现者一个个进行通知auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服务请求(注册、发现、上线、下线)msg_req->setMethod(method);msg_req->setOptype(optype);//服务操作类型 for(auto&discoverers:it->second){discoverers->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Discoverer::ptr>> _discoverers;//该方法被哪些发现者发现了std::unordered_map<BaseConnection::ptr,Discoverer::ptr> _conns;//连接->发现者(连接断开->对应发现者->删除vector中的发现者)};class PDManager{public:using ptr=std::shared_ptr<PDManager>;//处理服务请求 并返回应答void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr &msg) {//先判断服务操作请求:服务注册/服务发现auto optype=msg->optype();if(optype==ServiceOptype::SERVICE_REGISTRY)//服务注册{//1.新增服务提供者 _providers->addProvider(conn,msg->host(),msg->method());//2.对该方法的发现者进行服务上线通知_discoverers->onlineNotify(msg->method(),msg->host());//3.返回应答return registryResponse(conn,msg);}else if(optype==ServiceOptype::SERVICE_DISCOVERY)//服务发现{//新增服务发现者 _discoverers->addDisecoverer(conn,msg->method());return discoverResponse(conn,msg);}else{ELOG("收到服务操作请求,但操作类型错误"); return errorResponse(conn,msg);}}//连接断开void onConnShutdown(const BaseConnection::ptr&conn)//{//这个要断开的连接1.提供者下线 2.发现者下线//1.获取提供者信息 为空说明不是提供者auto provider=_providers->getProvider(conn);if(provider.get()!=nullptr){//提供者下线//1.提供者的每个方法都要下线 通知对应发现者for(auto&method:provider->methods){_discoverers->offlineNotify(method,provider->host);}//2.删除对该提供者的管理_providers->delProvider(conn);}//2.到这 可能是发现者 就算不是会直接返回空//直接删除对该发现者的管理_discoverers->delDisoverer(conn);}private://错误响应void errorResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)msg_rsp->setRcode(RCode::RCODE_INVALID_OPTYPE); //无效 msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);//服务操作类型 未知 conn->send(msg_rsp);}//注册应答void registryResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);//服务操作类型 注册conn->send(msg_rsp);}//发现应答 method方法有哪些主机可以提供 void discoverResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid()); msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);//服务操作类型 发现msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)std::vector<Address> hosts=_providers->methodHosts(msg->method());if(hosts.empty()){msg_rsp->setRcode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};
}
}

📡 服务注册流程

1️⃣ 客户端(Provider)通过 `registryMethod()` 发送 `SERVICE_REGISTRY` 请求
2️⃣ `PDManager::onServiceRequest()` 处理注册请求
3️⃣ `ProviderManager::addProvider()` 注册服务
4️⃣ `DiscovererManager::onlineNotify()` 通知发现者服务上线
5️⃣ `PDManager::registryResponse()` 发送注册结果

🔍 服务发现流程

1️⃣ 客户端(Discoverer)通过 `addDisecoverer()` 发送 `SERVICE_DISCOVERY` 请求
2️⃣ `PDManager::onServiceRequest()` 处理发现请求
3️⃣ `DiscovererManager::addDisecoverer()` 记录发现者
4️⃣ `ProviderManager::methodHosts()` 获取 `method` 对应的主机
5️⃣ `PDManager::discoverResponse()` 发送发现结果

🔥 服务下线流程

1️⃣ 连接断开时触发 `PDManager::onConnShutdown()`
2️⃣ 通过 `ProviderManager::getProvider()` 获取 `Provider`
3️⃣ `DiscovererManager::offlineNotify()` 通知发现者服务下线
4️⃣ `ProviderManager::delProvider()` 删除 `Provider`

🕵️‍♂️ 发现者下线流程

1️⃣ 连接断开时触发 `PDManager::onConnShutdown()`
2️⃣ 通过 `DiscovererManager::delDisoverer()` 删除 `Discoverer`
3️⃣ 清理 `_conns` 和 `_discoverers` 的映射

六.注册中心---客户端rpc_registry

客户端的功能比较分离,注册端和发现端根本就不在同一个主机上。

因此客户端的注册和发现功能是完全分开的。

1.作为服务提供者 需要一个能进行服务注册的接口

        连接注册中心 进行服务注册

2.作为服务发现者 需要一个能进行服务发现的接口,需要将获取到的提供对应服务的主机信息管理起来 hash<method,vector<host>> 一次发现,多次使用,没有的话再次进行发现。

                             需要进行服务上线/下线通知请求的处理(需要向dispatcher提供一个请求处理的回调函数)

#pragma 
#include"requestor.hpp"namespace wws
{
namespace client
{// 服务提供者类:负责将服务注册到服务注册中心class Provider{public:using ptr=std::shared_ptr<Provider>;Provider(const Requestor::ptr&requestor):_requestor(requestor){}//进行服务注册的接口bool registryMethod(const BaseConnection::ptr&conn,const std::string &method,const Address&host){// 1. 创建 ServiceRequest 请求消息auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服务请求(注册、发现、上线、下线)msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);//服务操作类型 注册// 2. 发送请求并同步等待响应BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);//同步请求if(ret==false){ELOG("%s服务注册失败",method.c_str());return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get()==nullptr){ELOG("响应类型向下转换失败");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服务注册失败 原因%s",errReason(service_rsp->rcode()));return false;}return true;}private:Requestor::ptr _requestor;//负责发送请求、接收响应};class MethodHost{public:using ptr=std::shared_ptr<MethodHost>;MethodHost(const std::vector<Address>&hosts):_hosts(hosts.begin(),hosts.end()),_idx(0){}void appendHost(const Address&host){//新的服务上线后进行调用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address&host){//服务下线进行调用std::unique_lock<std::mutex> lock(_mutex);//vector删除效率O(n)效率低,但更多的操作还是 随机访问[] 进行RR轮转,所以vector是最合适的for(auto it=_hosts.begin();it!=_hosts.end();it++){if(*it!=host){_hosts.erase(it);break;}}}Address chooseHost(){std::unique_lock<std::mutex> lock(_mutex);size_t pos=_idx++ %_hosts.size();//1.pos=_idx%size 2._idx+=1return _hosts[pos];}bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;//当前 RR 轮转索引//vector 提供了 O(1) 的索引访问,可以快速实现RR轮转机制。std::vector<Address> _hosts;};class Discoverer{public:Discoverer(const Requestor::ptr &requestor){}//服务发现的接口bool serviceDiscovery(const BaseConnection::ptr&conn,const std::string&method,Address&host){//当前有method方法对应的提供服务者 直接返回host地址{std::unique_lock<std::mutex> lock(_mutex);auto it=_method_hosts.find(method);if(it!=_method_hosts.end()){if (it->second->empty() == false){host = it->second->chooseHost();return true;}}}//当前没有对应的服务者//1.构建服务发现请求auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);//消息类型msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("服务发现失败!");return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(!service_rsp.get()){ELOG("服务发现失败! 响应类型转换失败");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服务发现失败! 错误原因%s",errReason(service_rsp->rcode()).c_str());return false;}//2.看服务发现完后有没有新提供者增加std::unique_lock<std::mutex> _mutex;auto method_host=std::make_shared<MethodHost>(service_rsp->hosts());if(method_host->empty()){ELOG("%s服务发现失败!没有能提供服务的主机",method.c_str());return false;}host=method_host->chooseHost();_method_hosts[method]=method_host;//更新method方法->提供者address(可能method方法已经在map中所以=赋值)return true;}//给Dispathcer模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){//1.先判断是上线/下线 都不是就不处理auto optype=msg->optype();std::string method=msg->method();std::unique_lock<std::mutex> lock(_mutex);//2.上线通知if(optype==ServiceOptype::SERVICE_ONLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//该method方法不存在 创建MethodHost初始化并添加入map中auto method_host=std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method]=method_host;}else{//存在直接加it->second->appendHost(msg->host());}}//3.下线通知else if(optype==ServiceOptype::SERVICE_OFFLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//该method方法不存在 直接return//不需要把method方法从map中移除 前面已经判断过map中method方法为空的情况return;}else{//存在直接删除it->second->removeHost(msg->host());}}}private:std::mutex _mutex;std::unordered_map<std::string,MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;}; 
}
}

1.客户端中的provider服务提供者,有服务注册接口registryMethod,构建服务注册请求借助Requestor send()到服务端的的注册接口。收到应答后判断应用是否正确。

2.客户端的discoverer服务发现者,有服务发现接口serviceDiscovery,先开当前有没有method方法对应的提供者,有就返回一个提供者。没有就构建服务发现请求,并检查是否有误,无错误后 再判断是否有新增的提供者,有就返回提供者并更新method->提供者的映射,没有就直接返回.

3.MethodHost类 管理一个method方法对应的所以提供者的主机信息,并提供RR轮转功能,按顺序返回method方法对应的提供者。

1️⃣ Provider 类:服务提供者

  • 作用:

    • 负责将服务注册到服务注册中心。

  • 主要功能:

    • registryMethod()

      • 发送 ServiceRequest 注册请求到服务中心。

      • 检查 msg_rsp 响应是否成功注册。

    • 内部成员:

      • _requestor

        • Requestor::ptr 类型,负责请求发送和接收响应。


2️⃣ MethodHost 类:服务主机管理

  • 作用:

    • 维护多个 Address 主机地址,并提供负载均衡(RR 轮转)功能。

  • 主要功能:

    • appendHost():新增一个服务主机地址。

    • removeHost():移除一个服务主机地址。

    • chooseHost()

      • 通过 RR 轮转选择一个主机地址。

    • empty():判断是否为空。

  • 内部成员:

    • _mutex:互斥锁保护 _hosts 访问安全。

    • _idx:当前 RR 轮转索引。

    • _hostsstd::vector<Address> 存储主机地址列表。


3️⃣ Discoverer 类:服务发现者

  • 作用:

    • 发现服务并维护 MethodHost 对象,进行服务上线/下线通知的管理。

  • 主要功能:

    • serviceDiscovery()

      • 发现服务并更新 _method_hosts 缓存。

      • 选择主机地址 host 并返回。

    • onServiceRequest()

      • 处理服务上线/下线的请求。

      • 动态添加/删除 MethodHost 及其地址信息。

  • 内部成员:

    • _mutex:互斥锁保护 _method_hosts

    • _method_hosts

      • std::unordered_map<std::string, MethodHost::ptr>,映射 method -> host

    • _requestor

      • Requestor::ptr 发送发现请求。


4️⃣ Requestor 类:请求器

  • 作用:

    • 负责向服务中心发送请求并获取响应。

  • 核心功能:

    • send()

      • 向服务中心同步发送 msg_req 请求。

      • 接收 msg_rsp 响应并返回 bool 标志位。


🔥 类之间的关系

  1. Provider 通过 _requestor 发送 ServiceRequest 进行服务注册。

  2. Discoverer 通过 _requestor 发送 ServiceRequest 进行服务发现。

  3. MethodHostDiscoverer 维护,并提供 RR 轮转机制选择主机。

  4. Discoverer 监听 onServiceRequest() 来处理服务上线/下线。

RR轮询(Round-Robin)

一个method方法可以对应多个提供者,用户请求method方法,我们应该返回哪一个提供者,才能实现最大资源利用呢?

我们可以通过RR轮询按照固定顺序轮流将请求分配到不同的主机。

1.维护一个递增索引 _idx。

2.每次请求时,选择 _hosts[_idx % _hosts.size()] 作为当前主机。

3._idx 自增,当 _idx >= _hosts.size() 时自动重置。

RR轮询需要随机访问[ ],所以管理提供者的容器最好选择vector< >。

RR 轮询机制的价值

1. 负载均衡: 均匀分配请求,防止主机过载,提高系统吞吐量。
2. 实现简单: 只需维护一个 _idx 递增索引,选择主机只需 O(1) 时间复杂度。
3. 故障规避: 结合 removeHost() 机制,可以自动剔除故障主机,提升可靠性。
4. 提升系统吞吐量: 通过多个主机并行处理请求,提升系统的整体性能。
5. 维护成本低: 逻辑简单,维护成本极低,不需要监控主机状态。

为什么选择 vector<Address> 作为主机管理容器


1. 访问速度快

  • vector 提供 O(1) 的随机访问能力,可以通过 pos 索引直接获取 provider

  • 轮询核心逻辑依赖于 vector[pos] 进行主机选择,比 mapO(log n) 查找速度更快,适合高频访问场景。


2. 内存布局紧凑

  • vector 采用 连续内存存储,有利于 CPU 缓存命中,提升访问效率。

  • 在轮询过程中,只需访问固定内存位置,避免了内存跳转带来的性能损耗。


3. 删除主机速度适中

  • 删除主机时虽然 removeHost() 的复杂度是 O(n),但主机上下线事件发生频率远低于请求频率。

  • 即使主机上下线处理稍慢,但 chooseHost() 仍然保持 O(1) 的快速访问。


⚠️ 注意:

  • vector 删除主机时会触发内存移动,导致性能下降,因此不适合频繁上下线的场景。

  • 但在主机变更较少的场景下,vector 的整体性能优于 listmap

七.对服务发现与注册的封装

一.客户端 rpc_client

封装客户端:三大功能模块

一、业务功能:

  1. 基础 RPC 功能

  2. 服务注册功能

  3. 服务发现功能

二、基础底层模块:

  • 网络通信客户端模块(由 BaseClient 封装)


🧱 类结构封装解析

1. RegistryClient:服务注册客户端

  • 构造时连接注册中心地址

  • 提供 registryMethod() 方法:业务提供者向注册中心注册服务

  • 成员模块包含:

    • _provider:服务提供者

    • _requestor:发送请求组件

    • _dispatcher:调度器

    • _client:基础通信客户端

2. DiscoveryClient:服务发现客户端

  • 构造时连接注册中心地址

  • 提供 registryDiscovery() 方法:业务调用方向注册中心发现服务

  • 成员模块包含:

    • _discoverer:服务发现器

    • _requestor_dispatcher_client:同上

3. RpcClient:RPC 核心客户端

  • 构造参数 enableDiscovery 决定是否开启服务发现模式:

    • 若为 true:连接的是注册中心

    • 若为 false:连接的是具体的服务提供者

  • 提供多种调用方式:

    • 同步调用(返回 result

    • 异步 future 调用(返回 std::future

    • 异步 callback 调用(传入回调函数)

  • 内部组合:

    • _discovery_client:可选服务发现客户端

    • _caller:RPC 调用管理器

    • _requestor_dispatcher_client:同样是基础通信组件

在构建rpc客户端时,我们用长连接还是短连接?

1.当客户端调用call()请求对应方法时,RpcClient 内部调用 DiscoveryClient 进行服务发现 向 注册中心 Registry Server 发送服务发现请求

2.注册中心再返回对应方法的提供者主机地址。

3.RpcClient 用RR轮询从中选出来一个地址创建rpc client客户端并连接对应方法的提供者主机

4.服务提供者 Provider 接收到请求处理完返回结果给客户端

5.客户端回调触发,返回响应给用户。

短链接:创建一个rpc client客户端对象,连接服务提供者,进行rpc调用,调用结束后就销毁关闭客户端。

✅ 优点:

  • 实现简单,按需连接、按需释放

  • 没有资源长期占用问题。

❌ 缺点:

  • 性能差:每次调用都要建立和销毁 TCP 连接,连接成本高

  • 不利于高频 RPC 场景;

  • 异步处理时管理麻烦:可能连接刚断,回调结果还没处理。

长连接:调用完后并不会销毁关闭客户端,而是将客户端放入连接池。后续还需要访问该主机的该方法,就会从连接池中找到原本的客户端对象,进行rpc调用。若该主机的该服务下线,需要从池中删除对应客户端连接。

✅ 优点:

  • 高性能:避免频繁连接/断开,尤其是重复调用同一服务时;

  • 适合高并发、低延迟系统。

❌ 缺点:

  • 管理复杂,需要处理:

    • 服务下线、连接失效的自动剔除;

    • 异步/并发安全;

    • 池容量、连接空闲策略等。

在我们这个项目中我们选择长连接。

主要还是短连接异步处理时管理麻烦。

短链接,客户端进行完rpc调用就会关闭,后面服务提供者返回结果给客户端,客户端没了收到收到应答的回调函数onResponse   也就不能把结果设置道promise中,上层futrue就不能就绪。

  • 回调不触发,业务逻辑“卡住”
    尤其是你用 std::futurepromise 等异步等待对象,结果永远收不到。

  • 触发回调时访问已被释放的连接对象,导致崩溃
    比如回调中引用了 RpcClientconnection,但连接已经析构。

  • 异步响应结果丢失,日志无记录,bug 难排查
    你会觉得“调用失败了但程序没报错”,其实是 TCP 在你没注意时被关掉了。

异步+短连接的问题在于连接的生命周期和异步结果不一致,导致回调无法安全执行。

解决方法:在rpc调用结束后不关闭客户端,而是设置一个回调函数确保收到收到响应处理完再关闭客户端。

#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"namespace wws
{
namespace client
{//服务注册客户端class RegistryClient{public:using ptr=std::shared_ptr<RegistryClient>;//构造函数传入注册中心的地址信息 用于连接注册中心RegistryClient(const std::string&ip,int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务注册接口bool registryMethod(const std::string&method, Address&host){return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//服务发现客户端class DiscoveryClient{public:using ptr=std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息 用于连接注册中心DiscoveryClient(const std::string&ip,int port,const Discoverer::OfflineCallback &cb):_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor,cb)),_dispatcher(std::make_shared<Dispatcher>()){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//当注册中心向客户端进行上线/下线通知时触发的回调函数auto req_cb=std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE,req_cb);//消息回调函数 所有收到的消息,统一交由 Dispatcher::onMessage 分发给对应 handlerauto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool registryDiscovery(const std::string&method, Address&host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//rpc客户端class RpcClient{public:using ptr=std::shared_ptr<RpcClient>;// enableDiscovery--是否启用服务发现功能 也决定了传入地址信息是注册中心地址 还是提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<wws::client::RpcCaller>(_requestor)){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//1.如果启用的服务发现 地址信息是注册中心的地址 是服务发现客户端需要连接的地址 则通过地址信息实例化discover_client//需要经过服务发现获取提供者address再获取对应的clientif(_enableDiscovery){//设置服务下线回调auto offline_cb=std::bind(&RpcClient::delClient,this,std::placeholders::_1);_discovery_client=std::make_shared<DiscoveryClient>(ip,port,offline_cb);}//2.如果没有启用服务发现 则地址信息是服务提供者的地址 则直接创建客户端实例化好rpc_client//直接根据提供的ip+port创建对应的clientelse{auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_rpc_client=ClientFactory::create(ip,port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}//1.同步bool call( const std::string &method,const Json::Value &params, Json::Value &result){//获取clientBaseClient::ptr client=getClient(method);if(client.get()==nullptr)return false;//通过客户端连接 发送rpc请求return _caller->call(client->connection(),method,params,result);}//2.异步futurebool call( const std::string &method,const Json::Value &params, std::future<Json::Value> &result){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, result); }//3.异步回调bool call(const std::string &method,const Json::Value &params, const RpcCaller::JsonResponseCallback &cb){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, cb); }private://创建clientBaseClient::ptr newClient(const Address &host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();//添加到连接池putClient(host,client);return client;}//根据address从连接池查找client 没有返回空BaseClient::ptr getClient(const Address&host){std::unique_lock<std::mutex> lock(_mutex);auto it=_rpc_clients.find(host);if(it==_rpc_clients.end()){return BaseClient::ptr();}return it->second;}//根据method获取client://1.method->服务发现->获取目标Address->从连接池获取/没有直接创建//2.用户传入的ip+port->直接获取已经创建的clientBaseClient::ptr getClient(const std::string method){//1.服务发现获取的ip+port BaseClient::ptr client;if(_enableDiscovery){//1.通过服务发现 获取服务提供者地址信息Address host;bool ret=_discovery_client->registryDiscovery(method,host);if(ret==false){ELOG("当前%s服务 没找到服务提供者",method.c_str());return BaseClient::ptr();}//2.查看连接池中是否有对应的客户端 有就直接用 没有就创建client=getClient(host);if(client.get()==nullptr){client=newClient(host);}}//2.用户提供的ip+port创建的clientelse{client=_rpc_client;}return client;}void putClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host,client));}void delClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash{size_t operator()(const Address&host){std::string addr=host.first+std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;//网络服务发现 用户传方法名method client用方法名找提供者地址进行连接RpcCaller::ptr _caller;Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//未启用服务发现// RpcClient rpc(false, "127.0.0.1", 8080);// rpc.call("Add", ...);  // 直接用 _rpc_client 调用std::mutex _mutex;//<"127.0.0.1",client1>std::unordered_map<Address,BaseClient::ptr,AddressHash>_rpc_clients;//服务发现的客户端连接池// RpcClient rpc(true, registry_ip, port);// rpc.call("Add", ...);  //  自动发现、自动连接、自动发请求};
}
}

unordered_map<>自定义类型作key

我们用哈希表来管理客户端连接池时:

我们知道在哈希表中是通过key值找到对应的val值的,但并不是直接用我们传过去的数当key,需要进行哈希值计算。(1.int size_t bool直接强转 2.char* string 

h = h * 131 + static_cast<unsigned char>(c);  // 类似 BKDR hash)

而库中实现了string、int、float 等基本类型的哈希值计算,但这个Address pair<string,int>是个自定义类型,需要我们自己重载哈希值计算其实就算把string+int融合成string,再套用库中对string类型计算的哈希函数。

STL中 pair 的哈希组合方法

template <typename T1, typename T2>
struct hash<std::pair<T1, T2>> {size_t operator()(const std::pair<T1, T2>& p) const {size_t h1 = std::hash<T1>()(p.first);size_t h2 = std::hash<T2>()(p.second);return h1 ^ (h2 << 1); }
};
操作目的
std::hash<X>()获取单个字段的 hash 值
<< 1 左移扰动哈希位,使两个字段 hash 分布更开
^ 异或混合两个 hash,避免简单叠加导致冲突
效果更均匀、稳定、不易冲突的哈希组合值

既然选择长连接+连接池的做法,那就要处理当服务下线时 在连接池中删除对应的client

怎么做?设置回调函数,在服务下线时进行调用。

1.RpcClinet初始化DiscoveryClient时传入回调函数

2.DiscoveryClient 传给-> Discoverer

3.Discoverer再把回调函数cb设置到成员变量中

4.Client::onServiceRequest处理服务下线时,除了删除该方法中下线的主机地址Address,还要删除连接池中连接它的client

二.服务端rpc_server (包含注册中心服务端 rpc服务端)

🌐 服务端实现业务功能:

  1. 提供 RPC 服务

  2. 服务注册与发现机制中提供者的管理(服务注册)和消费者的管理(服务发现)


📦 封装的三类服务端组件:

  1. RPC 服务端

    • 负责接收并处理 RPC 请求。

  2. 注册中心服务端

    • 负责管理服务提供者与消费者的注册信息。

  3. 发布订阅服务端(后续实现)

    • 用于实现基于事件的通信机制(如 pub-sub 模式),暂未实现。


🛠 实现细节说明:

1. 注册中心服务端

  • 是一个纯粹的服务端,用于管理提供者和消费者信息

  • 核心功能是处理服务注册与发现的请求。

2. RPC 服务端

  • 实际由两部分组成:

    • RPC 服务端:用于接收和响应 RPC 请求。

    • 服务注册客户端:启动后自动连接注册中心,并将自己能提供的服务注册上去。

#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"#include <set> 
namespace wws
{
namespace server
{//服务注册服务端class RegistryServer{public:using ptr=std::shared_ptr<RegistryServer>;RegistryServer(int port):_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = wws::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb=std::bind(&RegistryServer::onConnShutdown,this,std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr&conn){_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};//rpc服务端class RpcServer{public:using ptr=std::shared_ptr<RpcServer>;RpcServer(const Address&access_addr,bool enableRegistry=false,const Address&registry_server_addr=Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<wws::server::RpcRouter>()),_dispatcher(std::make_shared<wws::Dispatcher>()){//启用服务注册if(_enableRegistry){_reg_client=std::make_shared<client::RegistryClient>(registry_server_addr.first,registry_server_addr.second);}//成员server是一个rpcserver 用于提供rpc服务auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, rpc_cb);//创建一个监听指定端口(如 8080)的 RPC 服务器对象_server = wws::ServerFactory::create(access_addr.second);//默认监听 0.0.0.0:port 我监听所有可用 IPauto message_cb = std::bind(&wws::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service)//服务描述类{if(_enableRegistry){_reg_client->registryMethod(service->method(),_access_addr);//把 method->本主机地址 发给注册中心 表示自己可以提供method方法}_router->registerMethod(service);//本地注册 把服务描述service注册到RpcRouter中的服务管理类 onRpcRequest接收到客户端请求时进行路由分发调用}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;//自己的对外服务地址(客户端要连接我就来这)client::RegistryClient::ptr _reg_client;//注册客户端,用于连接注册中心并注册本地服务RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

注册中心 服务端 客户端代码测试

1.先启动注册中心服务端 并设置port. 处理服务注册请求 服务发现请求 以及服务上下线通知

2.再启动rpc服务端 ,rpc服务端可以通过_reg_client注册中心客户端进行服务注册,但需要启动服务注册_enableRegistry=ture,提供注册中心客户端的Address。

还可以注册本地服务方法,内部注册到 RpcRouter 路由器中,用于请求到来时快速查找并调用对应的回调函数。

3.启动rpc客户端,rpc客户端有两种连接方式 启动服务发现 根据方法名向注册中心查询服务地址,然后连接对应的服务端。不启动 直接连用户传入的服务提供者的地址

用户再调用call 通过客户端连接 发送rpc请求

rpc_server.cpp

#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{//build生产一个 服务描述类 函数名 + 参数类型+结果类型 + 函数地址(进行回调)std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//wws::Address("127.0.0.1",9090) 监听9090 在9090端口提供服务 true 表示启动服务注册 "127.0.0.1",8080 注册中心的地址信息,创建client用于连接注册中心wws::server::RpcServer server(wws::Address("127.0.0.1",9090),true,wws::Address("127.0.0.1",8080));//server.registerMethod(desc_factory->build());server.start();return 0;
}

registry_sever.cpp

#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"int main()
{//实例化服务端wws::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}

rpc_client.cpp


#include "../../client/rpc_client.hpp"
#include "../../common/detail.hpp"
#include <thread>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{wws::client::RpcClient client(true,"127.0.0.1",8080);//1.同步调用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=client.call("Add",params,result);//client内找对应Add方法对应提供者的连接if(ret!=false){DLOG("result: %d", result.asInt());}//2.异步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=client.call("Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回调params["nums1"]=55;params["nums2"]=66;ret = client.call("Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}

八.发布订阅服务端实现rpc_topic

1. Dispatcher 模块(右上角)

作用:

  • JSON-RPC 框架中的请求分发核心

  • 判断 RPC 消息类型 → 调用对应业务模块(如 PubSubManager)

Dispatcher::registerMethod("topic", &PubSubManager::onTopicRequest)

2. onTopicRequest 回调函数

位置: PubSubManager 中的统一入口函数
作用:

  • 接收来自 Dispatcher 的请求

  • 根据操作类型调用对应处理函数:

类型功能函数调用
创建主题topicCreate()
删除主题topicRemove()
订阅主题topicSubscribe()
取消订阅topicCancel()
发布消息topicPublish()

3. PubSubManager 核心模块(图中心)

职责:

  • 管理两个核心 map

  • 操作对应的 TopicSubscriber 数据结构

std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;

4.Subscriber 结构(图左下)

struct Subscriber {BaseConnection::ptr conn;std::unordered_set<std::string> topics;
};

图示结构:

  • 每个订阅者关联一个连接

  • 还记录了自己订阅的所有主题名

5. topic 结构(图左上)

struct Topic {std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers;
};

图示结构:

  • 每个主题有一个名称

  • 内部维护一组订阅它的订阅者指针

主要用途:

  • 当消息发布到该主题时: → 遍历 set<subscriber> 并调用 conn->send(msg)

6. 两张 map 映射关系(图中左)

map<topic_name, topic>
map<Connection, Subscriber>

构成了典型的双向映射系统:

  • topic_name -> topic -> subscribers

  • conn -> subscriber -> topics

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace wws
{
namespace server
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;TopicManager();// 给Dispathcer模块进行服务上线下线请求处理的回调函数void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {TopicOptype topic_optype=msg->optype();bool ret=true;switch(topic_optype){//主题创建case TopicOptype::TOPIC_CREATE: topicCreate(conn,msg);break;//主题删除case TopicOptype::TOPIC_REMOVE: topicRemove(conn,msg);break;//主题订阅case TopicOptype::TOPIC_SUBSCRIBE: topicSubscribe(conn,msg);break;//取消主题订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn,msg);break;//主题消息发布case TopicOptype::TOPIC_PUBLISH: topicPublish(conn,msg);break;//返回应答 无效操作类型default: return errorResponse(conn,msg,RCode::RCODE_INVALID_OPTYPE);}if(!ret) return errorResponse(conn,msg,RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn,msg);}//一个订阅者在连接断开时的处理 删除其关联的数据void onShutdown(const BaseConnection::ptr&conn){//消息发布者断开连接 不处理; 消息订阅者断开连接需要删除管理数据//1.判断断开连接的是否是订阅者 不是直接返回Subscriber::ptr subscriber;//断开连接的订阅者对象std::vector<Topic::ptr> topics;//受影响的主题对象{auto it = _subscribers.find(conn);if (it == _subscribers.end())return;// 2.获取受影响的主题对象subscriber=it->second;for(auto&topic_name:subscriber->topics){auto topic_it=_topics.find(topic_name);if(topic_it==_topics.end())continue;topics.push_back(topic_it->second);}//3.从订阅者映射map中删除 订阅者_subscribers.erase(it);}//4.从对应主题对象中删除订阅者for(auto&topic:topics){topic->removeSubscriber(subscriber);}}private://错误响应void errorResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg,RCode rcode){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);//主题响应 msg_rsp->setRcode(rcode);conn->send(msg_rsp);}//注册应答void topicResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);conn->send(msg_rsp);}//创建主题void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);//构建一个主题对象 添加映射关系的管理std::string topic_name=msg->topicKey();//主题名称auto topic=std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name,topic));}//删除主题void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1.查看当前主题 有哪些订阅者 再从订阅者中删除主题信息//_topics[topic_name]->subscribers  subscribers->topics订阅的所有主题名称//2.删除主题数据  _topics[topic_name]->Subscriber 主题名称和主题对象的映射关系std::string topic_name=msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//删除主题前 先找出订阅该主题的订阅者auto it=_topics.find(topic_name);if(it==_topics.end())return;subscribers=it->second->subscribers;_topics.erase(it);//删除主题名称->topic}       for(auto&subscriber:subscribers)subscriber->removeSTopic(topic_name);}// 主题订阅bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1.先找出主题对象topic 订阅者对象subscriber//  没有主题对象就报错 没有订阅者对象就构建Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}// 2.在主题对象中 新增一个订阅者对象管理的连接; 在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}}//取消主题订阅void topicCancel(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){// 1.先找出主题对象topic 订阅者对象subscriberTopic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end())topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end())subscriber = sub_it->second;// 2.在主题对象中 删除当前订阅者对象管理的连接; 在订阅者对象中删除对应订阅的主题if(subscriber) subscriber->removeSTopic(msg->topicKey());if(subscriber && topic) topic->removeSubscriber(subscriber);}}// 主题发布bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic=topic_it->second;}topic->pushMessage(msg);return true;}private:// 每个客户端连接会对应一个订阅者对象,记录它当前订阅了哪些主题struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics; // 订阅者订阅的主题名称Subscriber(const BaseConnection::ptr &c): conn(c){}// 增加主题void appendTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}// 删除主题void removeSTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};// 每个主题包含一个主题名 + 当前所有的订阅者struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; // 当前主题订阅者Topic(const std::string &name): topic_name(name){}// 增加订阅者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}// 删除订阅者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}// 给该主题的所有订阅者发消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : subscribers){subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};
}
}

九.发布订阅客户端实现rpc_topic

一、发布订阅客户端的角色划分

  • 消息发布客户端

    • 创建主题

    • 删除主题

    • 发布消息(向某个主题发布)

  • 消息订阅客户端

    • 创建主题

    • 删除主题

    • 订阅某主题的消息

    • 取消订阅某主题


二、整体模块设计思路

  1. 对外的五个操作接口

    • 针对“主题”的操作,包括:

      • 创建

      • 删除

      • 订阅

      • 取消订阅

      • 发布

  2. 对外的一个消息处理接口

    • 提供给 dispatcher 模块,进行消息分发处理

    • 相当于 dispatcher 收到消息发布请求后,查找有哪些订阅者,并调用对应的回调函数将消息推送过去

  3. 内部的数据管理

    • 管理“主题名称”与“消息处理回调函数”的映射关系

#pragma once
#include"requestor.hpp"namespace wws
{
namespace client
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;using SubCallback=std::function<void(const std::string &key,const std::string&msg)>;//主题创建bool create(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_CREATE);}//删除bool remove(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_REMOVE);}//订阅主题 SubCallback收到主题新消息的进行的回调bool subscribe(const BaseConnection::ptr&conn,const std::string &key,const SubCallback&cb){//当我们订阅了主题 可能发布者会马上发布该主题的内容 //这时候如果cb还没有设置到map中就无法执行回调函数 所以先设置回调函数到map中addSubscribe(key,cb);bool ret=commonRequest(conn,key,TopicOptype::TOPIC_SUBSCRIBE);if(ret==false){//请求发送失败 删除map中对应的cbdelSubscribe(key);return false;}return true;}//取消订阅bool cancel(const BaseConnection::ptr&conn,const std::string &key){delSubscribe(key);return commonRequest(conn,key,TopicOptype::TOPIC_CANCEL);}//发布消息(向某个主题发布)bool publish(const BaseConnection::ptr&conn,const std::string &key,const std::string &msg){return commonRequest(conn,key,TopicOptype::TOPIC_PUBLISH,msg);}// 当收到服务端推送的消息时调用,触发对应订阅者的回调处理逻辑 (设置给dispatcher收到对应主题消息 进行回调处理)void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){//1.先判断该消息的操作类型是否为 发布消息auto type=msg->optype();if(type!=TopicOptype::TOPIC_PUBLISH){ELOG("收到了错误类型的主题操作");return;}//2.取出主题名称 以及消息内容 后面调用cbstd::string topic_key=msg->topicKey();std::string topic_msg=msg->topicMsg();//3.调用cbauto callback=getSubscribe(topic_key);if(!callback){ELOG("收到了%s主题信息 但该主题无对应回调",topic_key.c_str());return;}callback(topic_key,topic_msg);}private:void addSubscribe(const std::string &key,const SubCallback&cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key,cb));}void delSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback& getSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);auto it=_topic_callbacks.find(key);if(it==_topic_callbacks.end())return SubCallback();return it->second;}bool commonRequest(const BaseConnection::ptr&conn,const std::string &key,TopicOptype type,const std::string &msg=""){//1.构造请求对象 并填充数据auto msg_req=MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setTopicKey(key);msg_req->setOptype(type);if(type==TopicOptype::TOPIC_PUBLISH)msg_req->setTopicMsg(msg);//2.向服务端发送请求 等待响应BaseMessage::ptr msg_rsp;//发请求 + 等响应 + 反序列化响应 + 返回响应对象msg_rspbool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("主题创建请求失败");return false;}//3.判断请求处理是否成功auto topic_rsp_msg=std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if(!topic_rsp_msg){ELOG("主题响应 向下类型转换失败");return false;}if(topic_rsp_msg->rcode()!=RCode::RCODE_OK);{ELOG("主题创建请求出错:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}return true;}private:std::mutex _mutex;//根据主题查找对应的回调函数执行std::unordered_map<std::string,SubCallback> _topic_callbacks;Requestor::ptr _requestor;};
}
}

十.topicServer topicClient封装

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/75777.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++算法优化实战:破解性能瓶颈,提升程序效率

C算法优化实战&#xff1a;破解性能瓶颈&#xff0c;提升程序效率 在现代软件开发中&#xff0c;算法优化是提升程序性能的关键手段之一。无论是在高频交易系统、实时游戏引擎&#xff0c;还是大数据处理平台&#xff0c;算法的高效性直接关系到整体系统的性能与响应速度。C作…

【PostgreSQL教程】PostgreSQL 特别篇之 语言接口连接PHP

博主介绍:✌全网粉丝22W+,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物联网、机器学习等设计与开发。 感兴趣的可…

山东大学软件学院创新项目实训开发日志(12)之将对话记录保存到数据库中

在之前的功能开发中&#xff0c;已经成功将deepseekAPI接口接入到springbootvue项目中&#xff0c;所以下一步的操作是将对话和消息记录保存到数据库中 在之前的开发日志中提到数据库建表&#xff0c;所以在此刻需要用到两个表&#xff0c;conversation表和message表&#xff…

Spring-注解编程

注解基础概念 1.什么是注解编程 指的是在类或者方法上加入特定的注解(XXX) 完成特定功能的开发 Component public classXXX{} 2.为什么要讲注解编程 1.注解开发方便 代码简洁 开发速度大大提高 2.Spring开发潮流 Spring2.x引入注解 Spring3.x完善注解 Springboot普及 推广注解…

Dify智能体平台源码二次开发笔记(5) - 多租户的SAAS版实现(2)

目录 前言 用户的查询 controller层 添加路由 service层 用户的添加 controller层 添加路由 service层-添加用户 service层-添加用户和租户关系 验证结果 结果 前言 完成租户添加功能后&#xff0c;下一步需要实现租户下的用户管理。基础功能包括&#xff1a;查询租…

基于若依的ruoyi-vue-plus的nbmade-boot在线表单的设计(一)架构方面的设计

希望大家一起能参与我的新开源项目nbmade-boot: 宁波智能制造低代码实训平台 主要目标是类似设计jeecgboot那样的online表单功能,因为online本身没有开源这部分代码,而我设计这个是完全开源的,所以希望大家支持支持,开源不容易。 1、数据库方面设计考虑 是在原来gen_table和…

WebFlux应用中获取x-www-form-urlencoded数据的六种方法

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

[Python基础速成]1-Python规范与核心语法

本系列旨在快速掌握Python&#xff0c;实现能够快速阅读和理解 Python 代码&#xff0c;并在可查阅语法的情况下进行 AI 学习。 本篇先了解一下Python基础知识。 本篇内容较菜鸟教程有所删减、方便快速构建大纲&#xff0c;且加入了PEP 8规范说明等有助于理解和编写代码的说明。…

农民剧团的春天与改变之路

杨天义&#xff0c;男&#xff0c;1966年9月生&#xff0c;中共党员&#xff0c;江西省吉安市吉水县水南农民剧团团长。 杨天义从废品收购起家&#xff0c;凭借自身的努力和奋斗&#xff0c;自筹资金100余万元建设了水南镇的第一座影剧院&#xff0c;组建了江西省吉安市吉水县…

python asyncio 的基本使用

1、引言 asyncio 是 Python 标准库中的一个库&#xff0c;提供了对异步 I/O 、事件循环、协程和任务等异步编程模型的支持。 asyncio 文档 2、进程、线程、协程 线程 线程是操作系统调度的基本单位&#xff0c;同一个进程中的多个线程共享相同的内存空间。线程之间的切换由操…

Leedcode刷题 | Day30_贪心算法04

一、学习任务 452. 用最少数量的箭引爆气球代码随想录435. 无重叠区间763. 划分字母区间 二、具体题目 1.452用最少数量的箭引爆气球452. 用最少数量的箭引爆气球 - 力扣&#xff08;LeetCode&#xff09; 在二维空间中有许多球形的气球。对于每个气球&#xff0c;提供的输…

Ant Design Vue 表格复杂数据合并单元格

Ant Design Vue 表格复杂数据合并单元格 官方合并效果 官方示例 表头只支持列合并&#xff0c;使用 column 里的 colSpan 进行设置。 表格支持行/列合并&#xff0c;使用 render 里的单元格属性 colSpan 或者 rowSpan 设值为 0 时&#xff0c;设置的表格不会渲染。 <temp…

C++ 标准库中的 <algorithm> 头文件算法总结

C 常用 <algorithm> 算法概览 C 标准库中的 <algorithm> 头文件提供了大量有用的算法&#xff0c;主要用于操作容器&#xff08;如 vector, list, array 等&#xff09;。这些算法通常通过迭代器来操作容器元素。 1. 非修改序列操作 std::all_of, std::any_of, s…

程序化广告行业(84/89):4A广告代理公司与行业资质解读

程序化广告行业&#xff08;84/89&#xff09;&#xff1a;4A广告代理公司与行业资质解读 大家好&#xff01;在探索程序化广告行业的道路上&#xff0c;每一次知识的分享都是我们共同进步的阶梯。一直以来&#xff0c;我都希望能和大家携手前行&#xff0c;深入了解这个充满机…

deepin使用autokey添加微信快捷键一键显隐ctrl+alt+w

打开deepin商店&#xff0c;搜索快捷键&#xff0c;找到autokey 快捷键管理&#xff0c;点击安装 点击右键新建文件夹 点击右键新建脚本 打开脚本并添加以下内容 import subprocess import time# ------------------ 配置项 ------------------ WM_CLASS "wechat…

文件内容课堂总结

Spark SQL是Spark用于结构化数据处理的模块&#xff0c;前身是Shark。Shark基于Hive开发&#xff0c;虽提升了SQL-on-Hadoop效率&#xff0c;但对Hive依赖过多。2014年6月1日Shark项目停止开发&#xff0c;团队将资源投入Spark SQL项目。Spark SQL具有诸多优点&#xff0c;如摆…

Zotero PDF Translate 翻译插件使用OpenAI API配置教程

PDF Translate&#xff1a;提升 Zotero 内置 PDF 阅读器的翻译功能 “PDF Translate” 是一款为 Zotero 设计的插件&#xff0c;旨在方便用户在 Zotero 内置的 PDF 阅读器中进行划词或段落翻译&#xff0c;辅助阅读外文文献。 一、 安装插件 下载插件&#xff1a; 访问 PDF T…

火山引擎旗下的产品

用户问的是火山引擎旗下的产品&#xff0c;我需要详细列出各个类别下的产品。首先&#xff0c;我得确认火山引擎有哪些主要业务领域&#xff0c;比如云计算、大数据、人工智能这些。然后&#xff0c;每个领域下具体有哪些产品呢&#xff1f;比如云计算方面可能有云服务器、容器…

C/C++程序中实现Python绑定多种技术路线

在C/C程序中实现Python绑定有多种技术路线&#xff0c;选择合适的方法取决于项目需求、性能要求和开发效率。以下是常见的几种方案&#xff0c;按易用性排序&#xff1a; 1. PyBind11&#xff08;推荐首选&#xff09; 特点&#xff1a;现代C库&#xff0c;语法简洁&#xff0…

【位运算】消失的两个数字

文章目录 面试题 17.19. 消失的两个数字解题思路 面试题 17.19. 消失的两个数字 面试题 17.19. 消失的两个数字 ​ 给定一个数组&#xff0c;包含从 1 到 N 所有的整数&#xff0c;但其中缺了两个数字。你能在 O(N) 时间内只用 O(1) 的空间找到它们吗&#xff1f; ​ 以任意…