C++实现Raft算法之更多的细节(clerk与RPC)

本篇细节讲解的是clerk和RPC原理的讲解

clerk

clerk相当于是一个外部的客户端,其作用就是向整个raft集群发起命令并接收响应。

clerk需要与kvServer建立网络链接,那么既然已经实现了已经简单的RPC,那么使用RPC来完成这个过程。

clerk本身的过程非常简单,需要注意的是,对于RPC返回对端不是leader的话,就需要另外再调用另一个kvServer的RPC重试,直到遇到leader。

clerk调用代码如下,调用本身非常简单,client.put和client.get即可:

int main() {// 创建一个Clerk对象 clientClerk client;// 初始化Clerk对象,加载配置文件 "test.conf"client.Init("test.conf");// 获取当前时间并记录为开始时间auto start = now();// 定义一个整数变量 count,初始化为 500int count = 500;// 将 count 的值赋给 tmp 变量int tmp = count;// 进入一个循环,循环执行 500 次(从 tmp 为 499 到 0)while (tmp--) {// 调用 client 的 Put 方法,将键 "x" 的值设置为 tmp 的字符串表示client.Put("x", std::to_string(tmp));// 调用 client 的 Get 方法获取键 "x" 对应的值,赋值给 get1 变量std::string get1 = client.Get("x");// 打印获取到的值 get1,格式化输出字符串std::printf("get return :{%s}\r\n", get1.c_str());}// 程序正常结束return 0;
}

可以再看看client.Init函数,这个函数的作用是连接了所有raftKvServer节点,方式依然是通过RPC的方式,这个是raft节点之间相互连接的过程是一样的:

//这是个Clerk::Init方法的实现
//主要功能是从配置文件中加载Raft节点的IP和端口信息,之后创建与每个节点的连接
// 初始化客户端,加载配置文件并连接 Raft 节点
void Clerk::Init(std::string configFileName) {// 创建一个 MprpcConfig 对象,用于加载配置文件MprpcConfig config;// 加载配置文件,读取配置文件中的内容config.LoadConfigFile(configFileName.c_str());// 创建一个 vector,用来存储每个 Raft 节点的 IP 和端口std::vector<std::pair<std::string, short>> ipPortVt;// 遍历所有可能的 Raft 节点,直到没有更多节点为止for (int i = 0; i < INT_MAX - 1; ++i) {// 构造节点名称 "node0", "node1", ..., "nodeN"std::string node = "node" + std::to_string(i);// 从配置中读取该节点的 IP 地址std::string nodeIp = config.Load(node + "ip");// 从配置中读取该节点的端口号字符串std::string nodePortStr = config.Load(node + "port");// 如果该节点的 IP 地址为空,说明没有更多节点,跳出循环if (nodeIp.empty()) {break;}// 将该节点的 IP 地址和端口号(转换为 short 类型)添加到 vector 中// atoi 用于将字符串转换为整型,如果失败返回 0,这里可以考虑自己实现一个更安全的转换方法ipPortVt.emplace_back(nodeIp, atoi(nodePortStr.c_str())); // 这里注意:atoi 并不检查错误情况}// 遍历刚才保存的所有 Raft 节点的 IP 和端口信息,进行连接操作for (const auto &item : ipPortVt) {// 获取每个节点的 IP 和端口std::string ip = item.first; short port = item.second;// 2024-01-04 todo:bug fix// 使用当前节点的 IP 和端口创建一个新的 raftServerRpcUtil 对象// raftServerRpcUtil 是一个类,负责与指定 IP 和端口的 Raft 服务器建立通信auto* rpc = new raftServerRpcUtil(ip, port);// 将创建的 rpc 对象存储到 m_servers 向量中// 使用智能指针 (shared_ptr) 来管理生命周期,确保对象在不再使用时被正确销毁m_servers.push_back(std::shared_ptr<raftServerRpcUtil>(rpc));}
}

再看看put函数:

//代码实现了Clerk::PutAppend方法,处理了向Raft集群中的领导节点发送PutAppend请求,并且具有错误重试机制。
// PutAppend 方法:向 Raft 集群的领导节点发送 Put 或 Append 操作。
// key:键,value:值,op:操作类型(Put 或 Append)
void Clerk::PutAppend(std::string key, std::string value, std::string op) { // 增加请求 ID,用于区分不同的请求m_requestId++;  // 每次请求都会递增请求 ID,确保请求的唯一性// 保存当前请求的 requestIdauto requestId = m_requestId;// 获取当前认为的 Raft 领导节点的 ID,初始化为 m_recentLeaderIdauto server = m_recentLeaderId;// 进入一个循环,不断尝试发送请求,直到成功while (true) {// 创建 PutAppend 请求参数对象 args,设置请求的各个字段raftKVRpcProctoc::PutAppendArgs args;args.set_key(key);          // 设置 keyargs.set_value(value);      // 设置 valueargs.set_op(op);            // 设置操作类型:Put 或 Appendargs.set_clientid(m_clientId); // 设置客户端 IDargs.set_requestid(requestId); // 设置请求 ID// 创建 PutAppend 回复对象 reply,用于存储响应结果raftKVRpcProctoc::PutAppendReply reply;// 向 Raft 节点发起 PutAppend 请求,使用当前的领导节点(server)bool ok = m_servers[server]->PutAppend(&args, &reply);// 如果请求失败或返回的是 ErrWrongLeader(非领导节点),需要重试if (!ok || reply.err() == ErrWrongLeader) {// 打印调试信息,说明请求失败,尝试切换到新的领导节点重试DPrintf("【Clerk::PutAppend】原以为的leader:{%d}请求失败,向新leader{%d}重试  ,操作:{%s}", server, server + 1, op.c_str());// 如果请求失败(rpc 失败),打印相应的失败原因if (!ok) {DPrintf("重试原因 ,rpc失败 ,");}// 如果返回的错误是 ErrWrongLeader,说明当前节点不是领导节点,需要切换到新的领导节点if (reply.err() == ErrWrongLeader) {DPrintf("重试原因:非leader");}// 选择下一个 Raft 节点进行重试,使用模运算循环切换server = (server + 1) % m_servers.size();  // 选择下一个节点进行重试continue;  // 继续进行下一次请求}// 如果请求成功且返回的错误为 OK,说明操作已成功if (reply.err() == OK) {// 保存当前成功的领导节点 ID,以便下次直接联系该节点m_recentLeaderId = server;return;  // 成功后返回,不再重试}}
}

这里可以注意。

m_requestId++; m_requestId每次递增。

m_recentLeaderId; m_recentLeaderId是每个clerk初始化的时候随机生成的。

这两个变量的作用是为了维护上一篇所述的“线性一致性”的概念。

server = (server+1)%m_servers.size(); 如果失败的话就让clerk循环节点进行重试。

RPC

项目使用到的RPC高度依赖protobuf

RPC是一种使得分布式系统中的不同模块之间能够透明地进行远程调用的技术,使得开发者可以更方便地构建分布式系统,而不用过多关注底层通信细节,调用另一台机器的方法会表现的像调用本地的方法一样

那么无论对外表现如何,只要设计多个主机之间的通信,必不可少的就是网络通讯这一步

我们看看一次RPC请求到底干了什么?

首先看下【准备:请求参数、返回参数(这里返回参数的值没有意义)、调用哪个方法】这一步,这一步需要发起者自己完成,如下:

在填充完请求值和返回值之后,就可以实际调用方法了。

我们点进去看看:

void FiendServiceRpc_Stub::GetFriendsList(::PROTOBUF_NAMESPACE_ID::RpcController* controller,const ::fixbug::GetFriendsListRequest* request,::fixbug::GetFriendsListResponse* response,::google::protobuf::Closure* done) {channel_->CallMethod(descriptor()->method(0),controller, request, response, done);
}

可以看到这里相当于是调用了channel_->CallMethod方法,只是第一个参数变成了descriptor()->method(0),其他参数都是我们传进去的参数没有改变,而这个descriptor()->method(0)存在的目的其实就是为了表示我们到底是调用的哪个方法。

到这里远端调用的东西就齐活了:方法、请求参数、响应参数

还记得在最开始生成stub的我们写的是:fixbug::FiendServiceRpc_Stub stub(new MprpcChannel(ip, port, true));,因此这个channel_本质上是我们自己实现的MprpcChannel类,而channel_->CallMethod本质上就是调用的MprpcChannel的CallMethod方法

我们简单看下这个CallMethod方法干了什么?按照

这样的方式将所需要的参数来序列化,序列化之后再通过send函数循环发送即可

可能的改进:在代码中send_rpc_str.insert(0, std::string((char *)&header_size, 4));我们可以看到头部长度固定是4个字节,那么这样的设计是否合理?如果不合理如何改进呢?

到了这一步,所有的报文已经发送到了对端,即接收RPC的一方,那么此时应该在对端进行:这一系列的步骤。

这一系列步骤的主要函数发生在:RpcProvider::OnMessage

我们看下这个函数干了什么?

首先根据上方序列化的规则进行反序列化,解析出相关的参数。

然后根据你要调用的方法名去找到实际的方法调用即可。

相关函数是在NotifyService函数中中提前注册好了,因此这里可以找到然后调用。

在这个过程中使用了protobuf提供的closure绑定了一个回调函数用于在实际调用完方法之后进行反序列化相关操作。

为啥这么写就算注册完反序列化的回调了呢?肯定是protobuf为我们提供了相关的功能,在后面代码流程中也会看到相对应的过程。

google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider, const muduo::net::TcpConnectionPtr &, google::protobuf::Message *>(this, &RpcProvider::SendRpcResponse,conn, response);

真正执行本地方法是在 service->CallMethod(method, nullptr, request, response, done);,为什么这个方法就可以调用到本地的方法呢?

这个函数会因为多态实际调用生成的pb.cc文件中的CallMethod方法。

void FiendServiceRpc::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,::PROTOBUF_NAMESPACE_ID::RpcController* controller,const ::PROTOBUF_NAMESPACE_ID::Message* request,::PROTOBUF_NAMESPACE_ID::Message* response,::google::protobuf::Closure* done)

 我们看下这个函数干了什么?

switch(method->index()) {case 0:GetFriendsList(controller,::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::fixbug::GetFriendsListRequest*>(request),::PROTOBUF_NAMESPACE_ID::internal::DownCast<::fixbug::GetFriendsListResponse*>(response),done);break;default:GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";break;
}

这个函数和上面讲过的FiendServiceRpc_Stub::GetFriendsList方法有似曾相识的感觉。都是通过xxx->index来调用实际的方法。

正常情况下校验会通过,即触发case 0。

然后会调用我们在FriendService中重写的GetFriendsList方法。

// 重写基类方法
void GetFriendsList(::google::protobuf::RpcController *controller,const ::fixbug::GetFriendsListRequest *request,::fixbug::GetFriendsListResponse *response,::google::protobuf::Closure *done) {uint32_t userid = request->userid();std::vector<std::string> friendsList = GetFriendsList(userid);response->mutable_result()->set_errcode(0);response->mutable_result()->set_errmsg("");for (std::string &name: friendsList) {std::string *p = response->add_friends();*p = name;}done->Run();
}

这个函数逻辑比较简单:调用本地的方法,填充返回值response。

然后调用回调函数done->Run();,还记得我们前面注册了回调函数吗?

google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,const muduo::net::TcpConnectionPtr &,google::protobuf::Message *>(this,&RpcProvider::SendRpcResponse,conn, response);

在回调真正执行之前,我们本地方法已经触发了并填充完返回值了。

此时回看原来的图,我们还需要序列化返回结果和将序列化后的数据发送给对端。

done->Run()实际调用的是:RpcProvider::SendRpcResponse。

这个方法比较简单,不多说了。

到这里,RPC提供方的流程就结束了。

从时间节点上来说,此时应该对端来接收返回值了,还在 MprpcChannel::CallMethod部分:

/*
从时间节点来说,这里将请求发送过去之后rpc服务的提供者就会开始处理,返回的时候就代表着已经返回响应了
*/
// 接收rpc请求的响应值
char recv_buf[1024] = {0};
int recv_size = 0;
if (-1 == (recv_size = recv(m_clientFd, recv_buf, 1024, 0)))
{close(m_clientFd); m_clientFd = -1;char errtxt[512] = {0};sprintf(errtxt, "recv error! errno:%d", errno);controller->SetFailed(errtxt);return;
}
// 反序列化rpc调用的响应数据
// std::string response_str(recv_buf, 0, recv_size); // bug:出现问题,recv_buf中遇到\0后面的数据就存不下来了,导致反序列化失败
// if (!response->ParseFromString(response_str))
if (!response->ParseFromArray(recv_buf, recv_size))
{char errtxt[1050] = {0};sprintf(errtxt, "parse error! response_str:%s", recv_buf);controller->SetFailed(errtxt);return;
}

将接受到的数据按照情况实际序列化成response即可。

这里就可以看出现在的RPC是不支持异步的,因为在MprpcChannel::CallMethod方法中发送完数据后就会一直等待着去接收。

protobuf库中充满了多态,因此推荐大家阅读的时候采用debug的方式。

注:因为目前RPC的网络通信采用的是muduo,muduo支持函数回调,即在对端发送信息来之后就会调用注册好的函数,函数注册代码在:

m_muduo_server->setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/62796.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于C#+SQLite开发数据库应用的示例

SQLite数据库&#xff0c;小巧但功能强大&#xff1b;并且是基于文件型的数据库&#xff0c;驱动库就是一个dll文件&#xff0c;有些开发工具 甚至不需要带这个dll&#xff0c;比如用Delphi开发&#xff0c;用一些三方组件&#xff1b;数据库也是一个文件&#xff0c;虽然是个文…

C++之异常智能指针其他

C之异常&智能指针&其他 异常关于函数异常声明异常的优劣 智能指针auto_ptrunique_ptrshared_ptrweak_ptr定制删除器 智能指针的历史与boost库 特殊类单例模式饿汉和懒汉的优缺点 C四种类型转换CIO流结语 异常 try括起来的的代码块中可能有throw一个异常&#xff08;可…

前端跳转路由的时候,清掉缓存

清除路由缓存的方法 ‌使用 $router.push() 方法‌&#xff1a;在跳转路由时&#xff0c;可以通过传递一个包含 replace: true 属性的对象来实现清除路由缓存。例如&#xff1a; this.$router.push({ path: "/new-route", replace: true }); ‌使用 $router.replace…

SpringBoot -拦截器Interceptor、过滤器 Filter 及设置

Spring Boot拦截器&#xff08;Interceptor&#xff09;的概念 - 在Spring Boot中&#xff0c;拦截器是一种AOP的实现方式。它主要用于<font style"color:#DF2A3F;">拦截请求</font>&#xff0c;在请求处理之前和之后执行特定的代码逻辑。与过滤器不同的…

Ubuntu 20.04 Server版连接Wifi

前言 有时候没有网线口插网线或者摆放电脑位置不够时&#xff0c;需要用Wifi联网。以下记录Wifi联网过程。 环境&#xff1a;Ubuntu 20.04 Server版&#xff0c;无UI界面 以下操作均为root用户&#xff0c;如果是普通用户&#xff0c;请切换到root用户&#xff0c;或者在需要权…

Java项目实战II基于微信小程序的亿家旺生鲜云订单零售系统的设计与实现(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 随着移动互联网技术的不断…

多线程安全单例模式的传统解决方案与现代方法

在多线程环境中实现安全的单例模式时&#xff0c;传统的双重检查锁&#xff08;Double-Checked Locking&#xff09;方案和新型的std::once_flag与std::call_once机制是两种常见的实现方法。它们在实现机制、安全性和性能上有所不同。 1. 传统的双重检查锁方案 双重检查锁&am…

Javaweb梳理21——Servlet

Javaweb梳理21——Servlet 21 Servlet21.1 简介21.3 执行流程21.4 生命周期4.5 方法介绍21.6 体系结构21.7 urlPattern配置21.8 XML配置 21 Servlet 21.1 简介 Servlet是JavaWeb最为核心的内容&#xff0c;它是Java提供的一门动态web资源开发技术。使用Servlet就可以实现&…

MySQL 主从同步一致性详解

MySQL主从同步是一种数据复制技术&#xff0c;它允许数据从一个数据库服务器&#xff08;主服务器&#xff09;自动同步到一个或多个数据库服务器&#xff08;从服务器&#xff09;。这种技术主要用于实现读写分离、提升数据库性能、容灾恢复以及数据冗余备份等目的。下面将详细…

点云3DHarris角点检测算法推导

先回顾2D的Harris角点检测算法推导 自相关矩阵是Harris角点检测算法的核心之一&#xff0c;它通过计算图像局部区域的梯度信息来描述该区域的特征。在推导Harris角点检测算法中的自相关矩阵时&#xff0c;我们首先需要了解自相关矩阵的基本思想和数学背景。 参考 [经典角点检…

在 CentOS 上安装 Docker:构建容器化环境全攻略

一、引言 在当今的软件开发与运维领域&#xff0c;Docker 无疑是一颗璀璨的明星。它以轻量级虚拟化的卓越特性&#xff0c;为应用程序的打包、分发和管理开辟了崭新的高效便捷之路。无论是开发环境的快速搭建&#xff0c;还是生产环境的稳定部署&#xff0c;Docker 都展现出了…

Unity-Particle System属性介绍(一)基本属性

什么是ParticleSystem 粒子系统是Unity中用于模拟大量粒子的行为的组件。每个粒子都有一个生命周期&#xff0c;包括出生、运动、颜色变化、大小变化和死亡等。粒子系统可以用来创建烟雾、火焰、水、雨、雪、尘埃、闪电和其他各种视觉效果。 开始 在项目文件下创建一个Vfx文件…

.NET8/.NETCore 依赖注入:自动注入项目中所有接口和自定义类

.NET8/.NETCore 依赖接口注入&#xff1a;自动注入项目中所有接口和自定义类 目录 自定义依赖接口扩展类&#xff1a;HostExtensions AddInjectionServices方法GlobalAssemblies 全局静态类测试 自定义依赖接口 需要依赖注入的类必须实现以下接口。 C# /// <summary>…

Brain.js(二):项目集成方式详解——npm、cdn、下载、源码构建

Brain.js 是一个强大且易用的 JavaScript 神经网络库&#xff0c;适用于前端和 Node.js 环境&#xff0c;帮助开发者轻松实现机器学习功能。 在前文Brain.js&#xff08;一&#xff09;&#xff1a;可以在浏览器运行的、默认GPU加速的神经网络库概要介绍-发展历程和使用场景中&…

使用pyQT完成简单登录界面

import sysfrom PyQt6.QtGui import QMovie,QPixmap from PyQt6.QtWidgets import QApplication, QWidget, QLabel, QPushButton,QLineEdit#封装我的窗口类 class MyWidget(QWidget):#构造函数def __init__(self):#初始化父类super().__init__()# 设置窗口大小self.resize(330,…

理解 Python PIL库中的 convert(‘RGB‘) 方法:为何及如何将图像转换为RGB模式

理解 Python PIL库中的 convert(RGB) 方法&#xff1a;为何及如何将图像转换为RGB模式 在图像处理中&#xff0c;保持图像数据的一致性和可操作性是至关重要的。Python的Pillow库&#xff08;继承自PIL, Python Imaging Library&#xff09;提供了强大的工具和方法来处理图像&…

avcodec_alloc_context3,avcodec_open2,avcodec_free_context,avcodec_close

avcodec_alloc_context3 是创建编解码器上下文&#xff0c;需要使用 avcodec_free_context释放 需要使用avcodec_free_context 释放 /** * Allocate an AVCodecContext and set its fields to default values. The * resulting struct should be freed with avcodec_free_co…

linux安装部署mysql资料

安装虚拟机 等待检查完成 选择中文 软件选择 网络和主机名 开始安装 设置root密码 ADH-password 创建用户 等待安装完成 重启 接受许可证 Centos 7 64安装完成 安装mysql开始 Putty连接指定服务器 在 opt目录下新建download目录 将mysql文件传到该目录下 查看linux服务器的…

vscode 怎么下载 vsix 文件?

参考&#xff1a;https://marketplace.visualstudio.com/items?itemNameMarsCode.marscode-extension 更好的办法&#xff1a;直接去相关插件的 github repo 下载老版本 https://github.com/VSCodeVim/Vim/releases?page5 或者&#xff0c;去 open-vsx.org 下载老版本 点击这…

医院管理系统

私信我获取源码和万字论文&#xff0c;制作不易&#xff0c;感谢点赞支持。 医院管理系统 摘要 随着信息互联网信息的飞速发展&#xff0c;医院也在创建着属于自己的管理系统。本文介绍了医院管理系统的开发全过程。通过分析企业对于医院管理系统的需求&#xff0c;创建了一个计…