brpc的二次封装以及brpc与etcd的联合

目的:

搭配etcd的注册中心管理能知道谁能提供什么服务,并用rpc进行服务调用

封装思想:

信道管理,将不同服务主机的通信信道管理起来

封装:

1.指定的信道管理类

一个服务通常会有多个节点,每个节点都会有自己的信道类,建立信道与服务的映射关系,服务一对多信道。

2.总体的信道管理类

管理多个服务的信道管理类管理起来

tips:我们没必要将所有的服务信道都建立起来,我们得申明我们关心的服务,不关心的就可以不管理这个服务

#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <iostream>
#include "./logger.hpp"
namespace common{
class ServerChannel
{
public:using ChannelPtr = std::shared_ptr<brpc::Channel>;using Ptr = std::shared_ptr<ServerChannel>;ServerChannel(const std::string &servername): _service_name(servername), _index(0){}void append(const std::string &host){ChannelPtr newchannel=std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 尝试连接时长Default: 200 (milliseconds)options.timeout_ms = -1; // rpc调用等待时间// Max duration of RPC over this Channel RPC调用在信道的期间options.protocol = "baidu_std";options.max_retry = 3;int ret = newchannel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(newchannel);_hosts.insert({host, newchannel});}// 服务器下线了一个节点,则把这个信道删除void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto score = _hosts.find(host);if (score == _hosts.end()){// 没找到LOG_WARN("没有找到该服务中的服务为{}主机名为{}的信道", _service_name, host);return;}// 找到了ChannelPtr scoreptr = score->second;for (auto it = _channels.begin(); it != _channels.end(); ++it){if (*it == scoreptr){_channels.erase(it);break;}}_hosts.erase(host);LOG_INFO("服务为{}主机名为{}的信道已经删除", _service_name, host);}// 选择一个信道给这个服务ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0){LOG_INFO("该服务{}中暂时没有信道", _service_name);return ChannelPtr();}int pos = _index++ % _channels.size();return _channels[pos];}private:int32_t _index;std::mutex _mutex;std::string _service_name;std::vector<ChannelPtr> _channels;// 节点与信道的位图std::unordered_map<std::string, ChannelPtr> _hosts;
};class ServerManager
{
public:using Ptr = std::shared_ptr<ServerManager>;ServerManager() {}ServerChannel::ChannelPtr choose(const std::string &instance_name){std::string servicename=instance_name;std::unique_lock<std::mutex> lock(_mutex);auto serversret = _servers.find(servicename);if (serversret == _servers.end()){LOG_INFO("当前服务{}没有可使用的节点!", servicename);return ServerChannel::ChannelPtr();}// 该服务被订阅且上线了,提供一个该服务的节点else{       auto score = _servers.find(servicename);ServerChannel::Ptr serverchannal = score->second;return serverchannal->choose();}}void onlinechannel(const std::string &instance_name, const std::string &host){ServerChannel::Ptr onlineserver;std::string servicename=get_service_name(instance_name);{std::unique_lock<std::mutex> lock(_mutex);auto followret = _follows.find(servicename);if (followret == _follows.end()){LOG_INFO("该服务{}-{}上线了,暂时没有被订阅", servicename, host);return;}// 新增该服务中的一个节点// 当该服务未上线auto serversret = _servers.find(servicename);if (serversret == _servers.end()){// 没有找到该服务 则添加该服务onlineserver = std::make_shared<ServerChannel>(servicename);_servers.insert({servicename, onlineserver});}else{onlineserver = serversret->second;}if (!onlineserver){LOG_ERROR("新增 {} 服务管理节点失败!", servicename);return;}// 已经存在该服务}onlineserver->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", servicename, host);}void offlinechannel(const std::string &instance_name, const std::string &host){ServerChannel::Ptr onlineserver;std::string servicename=get_service_name(instance_name);{std::unique_lock<std::mutex> lock(_mutex);auto followret = _follows.find(servicename);if (followret == _follows.end()){LOG_INFO("该服务{}-{}下线了,暂时没有被订阅", servicename, host);return;}// 删除该服务中的一个节点// 当该服务未上线auto serversret = _servers.find(servicename);if (serversret == _servers.end()){// 没有找到该服务LOG_INFO("没有找到该服务", servicename);return;}else{onlineserver = serversret->second;}if (!onlineserver){LOG_ERROR("删除 {} 服务管理节点失败!", servicename);return;}// 已经存在该服务}onlineserver->remove(host);LOG_DEBUG("{}-{} 服务下线该节点", servicename, host);}void declared(const std::string &servername){std::unique_lock<std::mutex> lock(_mutex);_follows.insert(servername);}std::string get_service_name(const std::string &server_instance_name){int pos=server_instance_name.find_last_of('/');if(pos==std::string::npos){return server_instance_name;}std::string retstring = server_instance_name.substr(0,pos);return retstring;}private:std::mutex _mutex;// 订阅的服务 没有被订阅的服务不提供节点使用std::unordered_set<std::string> _follows;// 服务名称 和 该服务的信道管理器std::unordered_map<std::string, ServerChannel::Ptr> _servers;
};
}

brpc与etcd的联合

服务端:1.构建echo服务 2.搭建RPC服务器 3.启动RPC服务器 4.在etcd上注册这个RPC服务

客户端: 1.构造RPC信道管理对象 2.再构造etcd的监控对象 3.再从RPC信道管理对象中获取提供echo服务的信道 4.发起echo业务

​​

​​

由于choose找的是servicename : /service/echo

而在新增服务时是 /service/echo/instance 所以在新增的回调函数中要采取截断的方法

​rigistry

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/channel.hpp"
#include<gflags/gflags.h>
#include <functional>
#include <brpc/server.h>
#include <brpc/closure_guard.h>
#include <butil/logging.h>
#include "main.pb.cc"
DEFINE_bool(run_mode,false,"这是运行的模式默认为调试模式false");
DEFINE_string(log_file,"","发布模式下指定的文件默认为空");
DEFINE_int32(log_level,0,"调试模式下默认的等级TRACE");DEFINE_string(Host,"http://127.0.0.1:2379","服务注册中心地址");
DEFINE_string(instance_Host,"127.0.0.1:7070","新上线服务的访问地址");
DEFINE_string(base_service,"/service","服务器监控目录");
DEFINE_string(instance_service,"/echo/instance","新上线的服务");
//一个 echo 中应该有多个 key - value ,如果定义成echo的话 只能对应一个服务的访问地址
//一个 echo 中应该有多个instance - value
DEFINE_int32(listen_post,7070,"新上线服务的访问地址");
class EchoServiceImpl:public example::EchoService               
{public:EchoServiceImpl(){}~EchoServiceImpl() override{} void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) override{brpc::ClosureGuard rpc_guard(done);// ~ClosureGuard() {//if (_done) {//_done->Run();//}std::cout<<"收到了消息"<<request->message()<<std::endl;response->set_message(request->message()+"好的,我知道了");//_done->Run();}};
int main(int argc,char * argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode,FLAGS_log_file,FLAGS_log_level);//1.启动rpc服务器 并添加服务//2.etcd内注册该服务 以及 访问该服务的 地址logging::LoggingSettings logger;logger.logging_dest= logging :: LoggingDestination::LOG_TO_NONE;logging::InitLogging(logger);//2.创建服务器并添加业务brpc::Server server;EchoServiceImpl echosservice;int n=server.AddService(&echosservice,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if(n<0){std::cout<<"AddService error"<<std::endl;exit(0);}//函数不匹配,源码有可能设置的是指针类型,tips//3.设置服务器选项,并且启动服务器brpc::ServerOptions opt;opt.idle_timeout_sec=-1;//尝试连接时间 超时则退出opt.num_threads=1;//number of pthreads that server runs on. 单线程; //number of requests processed in parallel(并行)// Default: 0 (unlimited)int m=server.Start(FLAGS_listen_post,&opt);if(m<0){std::cout<<"server Start error"<<std::endl;exit(1);}Rigistry::ptr rigserver=std::make_shared<Rigistry>(FLAGS_Host);rigserver->registry(FLAGS_base_service+FLAGS_instance_service,FLAGS_instance_Host);//运行等待服务结束server.RunUntilAskedToQuit();//运行等待服务结束
}

discovey

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/channel.hpp"
#include <gflags/gflags.h>
#include <functional>
#include <brpc/server.h>
#include <brpc/closure_guard.h>
#include <butil/logging.h>
#include "main.pb.cc"
DEFINE_bool(run_mode, false, "这是运行的模式默认为调试模式false");
DEFINE_string(log_file, "", "发布模式下指定的文件默认为空");
DEFINE_int32(log_level, 0, "调试模式下默认的等级TRACE");DEFINE_string(Host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(instance_Host, "168.198.0.1::8080", "新上线服务的访问地址");DEFINE_string(instance_service, "/echo", "新上线的服务");
DEFINE_string(base_dir, "/service", "新上线的服务");
DEFINE_string(call_service, "/service/echo", "新上线的服务");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1.创建服务管理类 并申明订阅的服务ServerManager::Ptr server = std::make_shared<ServerManager>();server->declared(FLAGS_call_service);// 2.将两个回调函数绑定,创建发现对象auto onlineput = std::bind(&ServerManager::onlinechannel, (server.get()), std::placeholders::_1, std::placeholders::_2);auto offlineput = std::bind(&ServerManager::offlinechannel, (server.get()), std::placeholders::_1, std::placeholders::_2);// 3.发现对象发现该服务,创建Echo_ServicestubDiscovery::ptr Disserver = std::make_shared<Discovery>(FLAGS_Host, FLAGS_base_dir, onlineput, offlineput);while (1){auto channel = server->choose(FLAGS_call_service);if (!channel){std::this_thread::sleep_for(std::chrono::seconds(1));return -1;}example::EchoService_Stub echoServiceStub(channel.get());brpc::Controller *control = new brpc::Controller();control->Reset();example::EchoRequest request1;request1.set_message("你好啊少年");example::EchoResponse *response1 = new example::EchoResponse();// 4.Echo_Servicestub调用Echo服务// rpc业务调用echoServiceStub.Echo(control, &request1, response1, nullptr);if (control->Failed()){std::cout << "rpc echo service failed" << control->ErrorText() << std::endl;delete control;delete response1;std::this_thread::sleep_for(std::chrono::seconds(1));continue;}std::cout << "client收到响应" << response1->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62277.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【提升效率】如何写好一份详细设计文档

版本日期修订人描述V1.02024/12/6nick huang创建文档 背景 CSDN在发起“如何做好一份技术文档”的活动。 想起我最近在写一份详细设计&#xff0c;有一些感受&#xff1a; 一份考虑较周全的“详细设计文档模板”能起到质量保底的作用。 当一名初级技术人员需要编写详细设计文…

电阻计RM3544、RM3545的使用

目录&#xff1a; 一、电阻计与PC通讯 1、硬件连接 2、RmLogger.exe的使用 二、RM3545测量35uΩ电阻 一、电阻计与PC通讯 1、硬件连接 可以设置USB或COM口(串口)连接PC&#xff0c;也可以设置为“打印”输出。 1&#xff09;使用USB连接PC 2&#xff09;使用串口连接PC …

Jenkins 的HTTP Request 插件为什么不能配置Basic认证了

本篇遇到的问题 还是因为Jenkins需要及其所在的OS需要升级&#xff0c;升级策略是在一台新服务器上安装和配置最新版本的Jenkins&#xff0c; 当前的最新版本是&#xff1a; 2.479.2 LTS。 如果需要这个版本的话可以在官方站点下载&#xff0c;也可以到如下地址下载&#xff1…

uniapp 封装自定义头部导航栏

封装原因 项目中有时候需要使用自定义的头部导航栏&#xff0c;原生的无法满足需求 参数 属性名描述示例title标题字符串&#xff1a;首页bgColor背景色字符串&#xff1a;#ffftype左侧的操作内容字符串&#xff1a;all&#xff0c;详细值请在下方查看 参数解释 type all…

docker学习笔记(五)--docker-compose

文章目录 常用命令docker-compose是什么yml配置指令详解versionservicesimagebuildcommandportsvolumesdepends_on docker-compose.yml文件编写 常用命令 命令说明docker-compose up启动所有docker-compose服务&#xff0c;通常加上-d选项&#xff0c;让其运行在后台docker-co…

Linux中inode

磁盘的空间管理 如何对磁盘空间进行管理&#xff1f; 假设在一块大小为500G的磁盘中&#xff0c;500*1024*1024524288000KB。在磁盘中&#xff0c;扇区是磁盘的基本单位&#xff08;一般大小为512byte&#xff09;&#xff0c;而文件系统访问磁盘的基本单位是4KB&#xff0c;因…

5G扬帆乘劲风,遨游通讯赋能千行百业谱新篇

在大型工厂&#xff0c;轻触手机屏幕&#xff0c;实时库存数据、人员定位等信息便跃然眼前、一目了然&#xff1b;在边远油田&#xff0c;动动手指&#xff0c;即可实时查询设备温度、危险气体浓度等信息&#xff0c;大数据瞬间尽在“掌”握……在遨游5G防爆智能手机的助力下&a…

RT Thread Studio新建STM32F407IG工程文件编译提示错误

编译提示错误 原因: RT 源码使用4.0.3的话&#xff0c;请用STM32F4支持包的0.2.2版本&#xff0c;就不会出错了。 如果支持包用0.2.3版本的话&#xff0c;需要用RT内核4.1.0版本。0.2.3 版本更新了一些针对内核4.1.0的驱动代码&#xff0c;这几个定义都是4.1.0里的。

学生管理系统(java)

1.前期准备 &#xff08;1&#xff09;新建java项目 &#xff08;2&#xff09;新建java软件包以及三个文件Student.java,Student.txt,StuSystem.java Student.java package student_management_system;public class Student {private String id;private String name;private…

JavaWeb学习(2)(Cookie原理(超详细)、HTTP无状态)

目录 一、HTTP无状态。 &#xff08;1&#xff09;"记住我"&#xff1f; &#xff08;2&#xff09;HTTP无状态。 &#xff08;3&#xff09;信息存储客户端中。如何处理&#xff1f; 1、loaclStorage与sessionStorage。 2、Cookie。 二、Cookie。 &#xff08;1&…

SpringBoot教程(三十二) SpringBoot集成Skywalking链路跟踪

SpringBoot教程&#xff08;三十二&#xff09; | SpringBoot集成Skywalking链路跟踪 一、Skywalking是什么&#xff1f;二、Skywalking与JDK版本的对应关系三、Skywalking下载四、Skywalking 数据存储五、Skywalking 的启动六、部署探针 前提&#xff1a; Agents 8.9.0 放入 …

flask创建templates目录存放html文件

首先&#xff0c;创建flask项目&#xff0c;在pycharm中File --> New Project&#xff0c;选择Flask项目。 然后&#xff0c;在某一目录下&#xff0c;新建名为templates的文件夹&#xff0c;这时会是一个普通的文件夹。 然后右击templates文件夹&#xff0c;选择Unmark as …

Java进阶(注解,设计模式,对象克隆)

Java进阶(注解&#xff0c;设计模式&#xff0c;对象克隆) 一. 注解 1.1 什么是注解 java中注解(Annotation)&#xff0c;又称java标注&#xff0c;是一种特殊的注释 可以添加在包&#xff0c;类&#xff0c;成员变量&#xff0c;方法&#xff0c;参数等内容上 注解会随同…

部署loki,grafana 以及springcloud用法举例

文章目录 场景docker 部署grafanadocker-compose部署loki维护配置文件 local-config.yaml维护docker-compose.yml配置启动 grafana 添加loki数据源springcloud用法举例查看loki的explore,查看日志 场景 小公司缺少运维岗位&#xff0c;需要研发自己部署日志系统&#xff0c;elk…

keil报错---connection refused due to device mismatch

解决办法如下&#xff1a; 记得改成1 把Enable取消

第三节、电机定速转动【51单片机-TB6600驱动器-步进电机教程】

摘要&#xff1a;本节介绍用定时器定时的方式&#xff0c;精准控制脉冲时间&#xff0c;从而控制步进电机速度 一、计算过程 1.1 电机每一步的角速度等于走这一步所花费的时间&#xff0c;走一步角度等于步距角&#xff0c;走一步的时间等于一个脉冲的时间 w s t e p t … ……

vue中pdf.js的使用,包括pdf显示,跳转指定页面,高亮关键词

目录 一、下载pdf.js 二、引入到本地的项目中 三、实现预览pdf 四、跳转到指定页面 五、利用pdf里面的find查找关键词 六、修改页面大小为实际大小 一、下载pdf.js https://github.com/mozilla/pdf.js 里面有很多的版本&#xff0c; 高版本的可能浏览器不兼容或者还要考…

OD B卷【连续字母长度】

题目 给定一个字符串&#xff0c;只包含大写字母&#xff0c;求在包含同一字母的子串中&#xff0c;长度第k长的子串的长度&#xff0c;相同字母只取最长的那个子串。 输入描述&#xff1a; 第一行输入一个子串&#xff08;长【1,100】&#xff09;&#xff0c;只包含大写字母…

python中的 Pydantic 框架介绍

Pydantic 框架介绍 Pydantic 是一个用于数据验证和设置管理的 Python 库。它主要通过数据模型类的定义来处理 JSON 数据、解析请求和响应数据&#xff0c;并提供自动化的验证和转换。Pydantic 主要用于处理 Python 类型的安全性和验证&#xff0c;尤其在 FastAPI 等现代 Pytho…

桥接模式和组合模式的区别

桥接模式&#xff08;Bridge Pattern&#xff09;和组合模式&#xff08;Composite Pattern&#xff09;都是结构型设计模式&#xff0c;旨在解决对象结构的复杂性问题&#xff0c;但它们的应用场景和目的有所不同。以下是它们的区别&#xff1a; 1. 定义与目的 桥接模式&…