bprc二次封装

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 一、封装的思想
  • 二、封装单个服务的信道管理类
    • 1.成员变量
    • 2.成员函数
  • 三、封装总体的服务信道管理类
    • 1.成员变量
    • 2.成员函数
  • 四.etcd和brpc联合测试
    • 1.服务注册客户端
    • 2.服务发现客户端


一、封装的思想

brpc的二次封装:
brpc本质上来说–进行rpc调用的,但是向谁调用什么服务得管理起来 – 搭配etcd实现的注册中心管理
原因:通过注册中心,能够获知谁能提供什么服务,进而能够连接它发起这个服务调用。
封装思想:
主要是管理起来网络通信的信道----将不同服务节点主机的通信信道管理起来
封装的是服务节点信道的管理,而不是rpc调用的管理。
封装:
1.指定服务的信道管理类:
一个服务可能会有多个节点提供服务,每个节点都有自己的channe
建立服务与信道的映射关系,并且关系是一对多,采用RR轮转策略进行获取
2.总体的服务信道管理类
将多个服务的信道管理对象管理起来

二、封装单个服务的信道管理类

1.成员变量

1.需要一个string来表视当前服务的信道管理类的服务名称因为我们是把一个服务的信道管理起来,所以需要知道该服务的名称。
2.一个服务可能会有多个主机,也就是一个服务可能会有多个信道。所以我么需要一个数组来存储当前服务节点的信道。
3.当一个节点主机下线后,我们需要删除释放这个channel,所以我们需要一个哈希表来进行主机地址与channel的映射关系,方便我们进行删除。
4.需要一个rr轮转下标,当需要对该服务进行rpc调用时,可以使用rr轮转负载均衡式的返回channel.
5.需要一个互斥锁,保证容器的线程安全。

 std::mutex _mutex;int32_t _index; //rr轮转下标std::string _service_name;  //服务名称std::vector<ChannelPtr> _channels;  //当前服务对应的通信集合std::unordered_map<std::string,ChannelPtr> _hosts;  //主机地址与信道映射关系

2.成员函数

构造函数中,只需要初始化服务名称,然后轮转下标初始化为0.

ServiceChannel(const std::string& service_name):_index(0),_service_name(service_name){}

当服务上线一台主机时,需要创建一个channel进行管理,那么该函数中需要有一个参数,就是主机的地址。

void append(const std::string& host){//创建一个channel,并进行连接ChannelPtr channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1;options.timeout_ms = -1;options.max_retry = 3;options.protocol = "baidu_std";int ret = channel->Init(host.c_str(), &options);if (ret == -1) {LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_hosts.insert(std::make_pair(host, channel));_channels.push_back(channel);}

对应的,当主机下线时,需要将该主机的channel删除并释放

 void remove(const std::string& host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it == _hosts.end()) {LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);return;}for (auto vit = _channels.begin(); vit != _channels.end(); ++vit) {if (*vit == it->second) {_channels.erase(vit);break;}}_hosts.erase(it);}

当需要对该服务发起rpc调用时,我们可以返回该服务的channel。通过rr轮转方式实现一个负载均衡。

ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0) {LOG_ERROR("当前没有能够提供 {} 服务的节点!", _service_name);return ChannelPtr();}int32_t idx = _index++ % _channels.size();return _channels[idx];}

三、封装总体的服务信道管理类

我们的服务有多个,而每一个服务也会有多个主机节点。因此我们需要把这些服务总的管理起来。

1.成员变量

1.需要一个哈希表来实现服务名称和该服务的信道管理类的映射关系。
2.项目有多个服务,而我们并不是对所有的服务都要关心的。我们需要使用一个unordered_set来记录我们关心的服务。
3.一个互斥锁,保证容器的线程安全。

2.成员函数

添加一个关心的服务,只需要把服务名称传递进来,进行一个插入即可

 void declared(const std::string& service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}

当一个对应的服务上线一个主机节点时,为该服务节点主机创建一个channel,并管理起来。这里直接调用上面的服务的信道管理类就行。

void onServiceOnline(const std::string &service_instance,const std::string& host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()) {LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);return;}//先获取管理对象,没有则创建,有则添加节点auto sit = _services.find(service_name);if (sit == _services.end()) {service = std::make_shared<ServiceChannel>(service_name);_services.insert(std::make_pair(service_name, service));}else {service = sit->second;}}if (!service) {LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return ;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}

服务主机节点下线时,需要找到对应服务的信道管理类,来进行一个对应主机节点的channel的删除和释放。

void onServiceOffline(const std::string &service_instance,const std::string& host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()) {LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);return;}//先获取管理对象,没有则创建,有则添加节点auto sit = _services.find(service_name);if (sit == _services.end()) {LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);return;}service = sit->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}

上面的两个接口,是当服务的主机节点上线和下线时调用的,形参中的两个参数就是服务名称和主机节点。
那么什么时候会调用这两个函数呢?我们怎么知道什么时候主机上线下线呢?在前面我们封装了一个etcd,在服务发现客户端中,我们是同watcher来监控一个服务。当服务的数据发送改变时,就会调用我们传入的两个回调函数,那么这里的两个函数就可以作为俩个回调函数传入。也就是当服务新增节点和删除节点时调用。

获取指定服务的节点信道,传入指定服务,返回一个服务的信道。

//获取指定服务的节点信道ServiceChannel::ChannelPtr choose(const std::string& service_name){std::unique_lock<std::mutex> lock(_mutex);auto sit = _services.find(service_name);if (sit == _services.end()) {LOG_ERROR("当前没有能够提供 {} 服务的节点!", service_name);return ServiceChannel::ChannelPtr();}return sit->second->choose();}

四.etcd和brpc联合测试

前面我们封装了etcd,有一个服务注册客户端和一个服务发现客户端。那么我们可以用刚刚封装的brpc和etcd进行一个连接测试。

我们是可以用brpc进行rpc调用的,但是向谁进行rpc调用呢?我们可以使用服务发现客户端对服务进行一个监控,当服务注册客户端注册一个服务后,服务发现客户端就会触发一个回调函数,在这个回调函数就会对触发的事件进行一个判断,如果是新增事件,则调用用户设置的put_cb,如果是删除事件,则调用用户的del_cb;那么我们是不是可以把我们刚刚封装的bprc中的onServiceOnline和onServiceOffline作为对应的回调函数设置进去呢?

大体思路:

  • 服务注册客户端这边,首先创建一个brpc服务器,然后新增EchoService服务,并启动服务器。最后向etcd中注册一个/service/echo/instance服务,并指明自己的主机地址,表明自己的主机可以提供echo服务。
  • 服务发现客户端这边,首先创建一个总的信道管理类对象,然后构造一个服务发现对象,对/service/echo服务进行监控。接着通过总的信道管理类对象获取到一个channel,通过这个channel,就可以创建stub进行服务调用。

1.服务注册客户端

首先创建一个服务器对象,因为我们要提供rpc服务。

    brpc::Server server;

接着新增服务。

//3.向服务器对象中,新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1) {std::cout << "添加Rpc服务失败!\n";return -1;}

启动服务器,监听8085端口。

    brpc::ServerOptions options;options.idle_timeout_sec = -1;options.num_threads = 1;ret = server.Start(8085,&options);if (ret == -1) {std::cout << "启动服务器失败!\n";return -1;}

向etcd注册中心注册一个/service/echo/instance服务,设置val为自己的主机地址。代表当前主机可以提供echo服务。
此时etcd注册中心就会有一个{key:/service/echo/instance ; val:127.0.0.1:8085};

//5.注册服务Registry::ptr rClient = std::make_shared<Registry>("127.0.0.1:2379");rClient->registry("/service/echo/instance","127.0.0.1:8085");

2.服务发现客户端

先构建以恶个总的信道管理对象,需要通过这个对象来"知道"哪个主机可以提供服务。但具体是怎么发现的呢?可以看到,我们调用了declared这个函数代表我们关心/service/echo这个服务。此时这个字符串就会被插入到一个unordered_set中。其次我们将这个对象中的两个成员函数获取到了。这两个函数后续我们要设置进服务发现客户端中。

 //1.先构建Rpc信道管理对象auto sm = std::make_shared<ServiceManager>();sm->declared("/service/echo");auto put_cb = std::bind(&ServiceManager::onServiceOnline,sm.get(),std::placeholders::_1,std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline,sm.get(),std::placeholders::_1,std::placeholders::_2);

构造服务发现对象,这里的构造函数我们传入了四个参数,第一个参数是etcd注册中心的地址,因为我们要和etcd通信.
第二个参数是我们要监控的服务。我们的服务发现对象中有一个成员watcher,他会对指定的服务进行监控,如服务数据有变化,就会进行处理。而这里的第二个参数就是watcher进行监控的服务。另外这个监控它是会进行递归的,/service/下的所有目录发现变化,他都可以感知到。第三个第四个函数就是我们设置进的回调函数,当watcher感知到服务发送变化,会获取到发送变化的服务的key和val,也就是服务名称和主机地址。这是就可以调用我们设置的两个回调函数,我们的回调函数刚好可以接受两个参数。

而这两个参数一个是/service/echo/instance,一个是127.0.0.1:8085。我们的服务信道管理类是管理 服务的信道的。我们服务发现客户端关心的是/service/echo服务。而这里的/service/echo/instance本质上是一个/service/echo服务。所以在onServiceOnline和onServiceOffline中我们对这个参数进行了一个劫取子串,只需要/service/echo这一部分。然后为这一部分创建一个服务信道管理类对象。

//2. 构造服务发现对象Discovery::ptr dclient = std::make_shared<Discovery>("127.0.0.1:2379","/service", put_cb, del_cb);

接着就是获取服务的channel,我们关心哪个服务就获取什么服务的channel.

 //3. 通过Rpc信道管理对象,获取提供Echo服务的信道ServiceChannel::ChannelPtr channel = sm->choose("/service/echo");if(channel == nullptr){return -1;}

通过channel来发起rpc调用

//4.发起rpc调用example::EchoService_Stub stub(channel.get());example::EchoRequest req;req.set_message("你好,Lkm");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();stub.Echo(cntl, &req, rsp, nullptr);if(cntl->Failed() == true){std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << rsp->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/879733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

透明屏幕有普通屏幕有哪些优点

针对透明玻璃屏幕的安装方案&#xff0c;我们需要综合考虑多个因素&#xff0c;包括安装环境、屏幕尺寸、重量、安全要求以及视觉效果等。以下是一个概括性的安装方案框架&#xff0c;供您参考&#xff1a; 一、前期准备 1.1 需求分析 明确透明玻璃屏幕的使用场景&#xff08…

聊聊对别人表示真正的关注

在工作和生活中,那些重要人士所得到的关注已经很多了&#xff0c;所以你不能只关注那些重要的人&#xff0c;对那些保洁门卫、前台等也需要我们给予真心的关注。 他们可使你的生活正常有序&#xff0c;但却经常被你忽略&#xff0c;见面打个招呼时常跟他们聊一聊&#xff0c;这…

C++速通LeetCode中等第4题-三数之和

解题思路&#xff1a;先排序&#xff0c;固定第一个数&#xff0c;用两个指针分别指向右侧剩余数列的两端&#xff0c;右侧向左移动直到两指针重合&#xff0c;看三数合有没有解&#xff0c;指针遇到相同数字跳过。 class Solution { public:vector<vector<int>> …

Spring Session

Session 共享问题 在 Web 项目开发中&#xff0c;Session 会话管理是一个很重要的部分&#xff0c;用于存储与记录用户的状态或相关的数据。 通常情况下 session 交由容器&#xff08;tomcat&#xff09;来负责存储和管理&#xff0c;但是如果项目部署在多台 tomcat 中&#…

【Unity3D小技巧】Unity3D中使用EventTrigger对3D物体的响应

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址QQ群:398291828大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 1-1、EventTrigger简介 EventTrigger是Unity中用于处理UI事件的一个组件。它允许我们为UI元…

什么是区块链,以及应用场景

一、引言 在当今数字化时代&#xff0c;区块链技术作为一种新兴的分布式账本技术&#xff0c;正逐渐引起广泛关注。它具有去中心化、不可篡改、透明性等特点&#xff0c;为解决传统中心化系统中的信任问题提供了新的思路。本文将介绍区块链的基本概念、工作原理以及其在各个领域…

【数据库】MySQL内置函数

本篇分享一些在MySQL中常见的一些内置函数&#xff0c;如日期函数&#xff0c;字符串函数和数学函数&#xff0c;以方便于操作数据库中的数据。 1.日期函数 我们先整体观察一下这些函数再讲解案例 日期函数使用起来都非常就简单 获得年月日&#xff1a; select current_dat…

甘特图介绍

甘特图&#xff08;Gantt chart&#xff09;是一种常用于项目管理和计划安排的图表类型&#xff0c;它以图形的方式展示项目的任务、活动或工作流的时间线。甘特图得名于它的发明者亨利劳伦斯甘特&#xff08;Henry Laurence Gantt&#xff09;&#xff0c;他在20世纪初开发了这…

C++ 面试模拟02

第一部分&#xff1a;基础知识 什么是拷贝构造函数和赋值运算符&#xff1f;它们之间有什么区别&#xff1f;在 C 中&#xff0c;const 关键字的作用是什么&#xff1f;有哪些常见用法&#xff1f;C 中的内存管理机制是怎样的&#xff1f;如何避免内存泄漏&#xff1f;虚函数&…

为解决bypy大文件上传报错—获取百度云文件直链并使用Aria2上传文件至服务器

问题描述 一方面组内的服务器的带宽比较小&#xff0c;另一方面使用bypy方式进行大文件(大于15G)上传时会报错&#xff08;虽然有时可以成功上传&#xff0c;但是不稳定&#xff09;&#xff1a; 解决方式 总体思路: 获得云盘需要下载文件的直链复制直链到服务器中使用自带…

24年蓝桥杯及攻防世界赛题-MISC-3

21 reverseMe 复制图片&#xff0c;在线ocr识别&#xff0c;https://ocr.wdku.net/&#xff0c;都不费眼睛。 22 misc_pic_again ┌──(holyeyes㉿kali2023)-[~/Misc/tool-misc/zsteg] └─$ zsteg misc_pic_again.png imagedata … text: “$$KaTeX parse error: Undefined…

Excel快速填充颜色,快捷键真香

大家好&#xff0c;这里是效率办公指南&#xff01; &#x1f3a8; 在Excel中工作时&#xff0c;我们经常需要对单元格进行颜色填充&#xff0c;以突出显示重要数据或增加视觉可读性。今天&#xff0c;我们将分享几种快速填充颜色的方法&#xff0c;帮助你提高工作效率&#x…

2024最新!!!iOS高级面试题,全!(一)

TCP,HTTP,HTTPS&#xff0c;,WebSokect 区别&#xff1a; IP协议&#xff08;网络层协议&#xff09; TCP&#xff1a;传输控制协议&#xff0c;主要解决数据如何在网络中传输&#xff0c;面向连接&#xff0c;可靠。&#xff08;传输层协议&#xff09; UDP&#xff1a;用户数…

Istio:微服务网格的强大工具,Istio介绍

什么是Istio&#xff1f; 在现代软件开发中&#xff0c;微服务架构已经成为构建可扩展、灵活系统的首选方法。然而&#xff0c;随着微服务数量的增加&#xff0c;服务间的通信、监控和管理变得越来越复杂。为了解决这些问题&#xff0c;服务网格&#xff08;Service Mesh&…

Golang使用ReverseProxy实现反向代理

目录 1.源码结构体 2.官方单机示例 3.使用示例 4.简单的http服务&#xff08;用于测试&#xff09; 1.源码结构体 type ReverseProxy struct {// Rewrite 必须是一个函数&#xff0c;用于将请求修改为要使用 Transport 发送的新请求。然后&#xff0c;其响应将原封不动地…

解决mybatis plus 中 FastjsonTypeHandler无法正确反序列化List类型的问题

由于是根据自动映射类型&#xff0c;我们设置的字段类型是List 也就是反序列化的时候也只是用 FastjsonTypeHandler中的 Override protected Object parse(String json) { return JSON.parseObject(json, type); } 反序列化方法&#xff0c;这是type为List 反序列后我们并没…

C++11: 声明和定义

声明与定义是C/C中两个核心的概念&#xff0c;也是C/C区别于其他语言独有的特性。它们对程序的编译和链接过程起着至关重要的作用。 一、C标准的描述 声明&#xff08;Declaration&#xff09;&#xff1a;声明告诉编译器某个实体&#xff08;如变量、函数、类等&#xff09;…

用ASR PRO离线语音芯片和月饼盒做一个会跑会跳会说话的机器狗

中秋节刚过&#xff0c;大家月饼盒应该还有&#xff0c;不要扔&#xff0c;可以做点小玩意。 机器狗的创意来自B站石桥北的视频&#xff0c;他使用了一块ESP32芯片和打印件加四个舵机实现&#xff0c;应该说是比较复杂的&#xff0c;需要有3D打印机打印外壳&#xff0c;还得会…

Linux标准IO-系统调用详解

1.1 系统调用 系统调用&#xff08;system call&#xff09;其实是 Linux 内核提供给应用层的应用编程接口&#xff08;API&#xff09;&#xff0c;是 Linux 应用层进入内核的入口。不止 Linux 系统&#xff0c;所有的操作系统都会向应用层提供系统调用&#xff0c;应用程序通…

【Harmony】轮播图特效,持续更新中。。。。

效果预览 swiper官网例子 Swiper 高度可变化 两边等长露出&#xff0c;跟随手指滑动 Swiper 指示器导航点位于 Swiper 下方 一、官网 例子 参考代码&#xff1a; // xxx.ets class MyDataSource implements IDataSource {private list: number[] []constructor(list: nu…