来聊聊去中心化Redis集群节点如何完成通信

写在文章开头

今天我们来聊点有意思的,关于redis中集群间通信的设计与实现,本文将从源码的角度分析redis集群节点如何利用Gossip协议完成节点间的通信与传播,希望对你有帮助。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

详解Redis集群节点通信的设计与实现

详解Gossip协议

在此之前我们先简单介绍一下Gossip协议,该协议是分布式集群的一种通信协议,我们都知道管理集群的方式有中心化和去中心化两种方式,中心化的方式是通过第一个第三方的管理中心,例如zookeeper等来维护一份集群节点的信息、状态。

在这里插入图片描述

redis采用的是去中心化的方式实现集群节点通信,即通过Gossip协议进行节点通信,让各个节点之间两两通信,广播与自己保持交流的节点,由此将节点串联起来构成一张关系网。

在这里插入图片描述

我们以一个简单的场景为例介绍一下Gossip协议,默认情况下我们的当前有3个节点的集群,各个节点彼此按照通信要求发送自己的信息和与自己保持交流的节点,由此将有限的资源共享出去构成一个集群。

此时,我们需要横向扩展一个节点4,我们只需配置/redis-cli --cluster add-node 新节点IP:新节点端口 任意存活节点IP:任意存活节点端口 ,这个存活节点后续和其他节点通信时,就会将当前新添加的节点4发送出去,由此其他节点收到这个消息并存储下来,经过各个节点的不断反复通信,这个集群中的各个节点就会拥有集群中所有节点的信息。

在这里插入图片描述

集群消息协定

任何通信都是需要按照协议规范进行,redis集群也一样,为了保证节点间通信的规范,redis要求集群节点通信的消息的类型可以是以下几种:

  1. ping消息,用来向其他节点发送节点信息。
  2. 回复pingpong消息。
  3. 如果当前节点中存在新添加的节点,则通过meet格式的消息发送给其他节点。
  4. 如果节点出现故障,则发送fail消息告知集群其他节点。

对此我们给出消息的宏定义代码,位于cluster.h中:

//集群中的ping
#define CLUSTERMSG_TYPE_PING 0          /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
//想加入集群的节点
#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
//某个节点有故障
#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */

集群节点消息体

后续集群都会通过clusterMsg来表示一条消息,它记录消息长度以及发送节点名称、负责的slots以及节点端口号等信息:

typedef struct {char sig[4];        /* Siganture "RCmb" (Redis Cluster message bus). *///消息总长度uint32_t totlen;    /* Total length of this message *///......//消息类型uint16_t type;      /* Message type *///......//发送节点的名称char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node *///发送节点负责的slotsunsigned char myslots[REDIS_CLUSTER_SLOTS/8];//......char notused1[32];  /* 32 bytes reserved for future usage. *///节点端口uint16_t port;      /* Sender TCP base port *///......//记录消息的消息体union clusterMsgData data;
} clusterMsg;

这里我们对这个消息体clusterMsgData进行展开说明一下,可以看到他用一段共用体维护各种类型消息的结构,这其中我们只需要了解的是ping消息,从注释可以看到ping消息这个结构体可以发送pingmeetpong等类型消息,ping消息类型其内部用clusterMsgDataGossip数组维护,这一点这个消息可以包含多个节点信息存于数组中:

union clusterMsgData {//可以发送ping meet pong的消息,该结构体内部有clusterMsgDataGossip数组,这意味这个结构体可以存放多个节点的消息struct {/* Array of N clusterMsgDataGossip structures */clusterMsgDataGossip gossip[1];} ping;//......
};

步入clusterMsgDataGossip即可看到这个结构体存储的是需要发送给它人的节点名称、ping和收到ping的时间以及端口号等信息:

typedef struct {char nodename[REDIS_CLUSTER_NAMELEN];//节点名称uint32_t ping_sent; //发送ping的事件uint32_t pong_received;//收到pong的事件char ip[REDIS_IP_STR_LEN];  //广播的节点ipuint16_t port;          //节点与客户端进行通信的端口//......
} clusterMsgDataGossip;

我们来简单小结一下,假设我们的某个节点向其他节点发送ping消息告知自己维护的节点信息和状态,那么对应的消息格式大体如下图所示:

在这里插入图片描述

详解集群节点ping流程

集群节点的指向流程也是交由redis的时间事件serverCron执行,它会每个100ms执行一次集群的定任务clusterCron方法,其内部会检查这个定时任务是否执行了10次,一旦执行10次(也就是100ms*10即每1秒)后就会随机从当前节点维护的其他节点信息字典表中抽取5个节点,找到最早发送pong给当前节点发送一条ping消息:

在这里插入图片描述

对此我们给出定时执行的serverCron函数,可以看到其内部每100ms执行一次集群定时任务clusterCron

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {//......//100ms执行一次集群的函数 run_with_period(100) {if (server.cluster_enabled) clusterCron();}//......
}

我们步入clusterCron即可看到

void clusterCron(void) {//......// 每10次即每过去1s执行一次这段逻辑if (!(iteration % 10)) {int j;//随机选出5个节点for (j = 0; j < 5; j++) {de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);/* Don't ping nodes disconnected or with a ping currently active. *///断连、或者自己、或者正在握手的节点不处理if (this->link == NULL || this->ping_sent != 0) continue;if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))continue;//选择最早收到pong的节点    if (min_pong_node == NULL || min_pong > this->pong_received) {min_pong_node = this;min_pong = this->pong_received;}}//向最早收到pong的调用clusterSendPing发送消息if (min_pong_node) {redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);}}//......
}

步入clusterSendPing即可看到我们所说的核心逻辑,即按照公式计算出要发送给最早回复pong的节点对应节点数,然后封装成消息发送出去:

void clusterSendPing(clusterLink *link, int type) {//......//我们希望添加的最大节点数,集群总是减去自己和正在握手的int freshnodes = dictSize(server.cluster->nodes)-2;//......//计算wantedwanted = floor(dictSize(server.cluster->nodes)/10);if (wanted < 3) wanted = 3;if (wanted > freshnodes) wanted = freshnodes;//....../* Populate the header. *///设置ping消息头,构建端口号、slot等信息if (link->node && type == CLUSTERMSG_TYPE_PING)link->node->ping_sent = mstime();clusterBuildMessageHdr(hdr,type);/* Populate the gossip fields */int maxiterations = wanted*3;//基于maxiterations进行循环随机抽取自己维护的节点信息并组装while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {dictEntry *de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);clusterMsgDataGossip *gossip;int j;//如果是自己则跳过if (this == myself) continue;//故障节点不发送if (maxiterations > wanted*2 &&!(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))continue;//....freshnodes--;//组装当前节点的名称、ip、端口等信息存到hdr所指向的消息结构体//指向gossip某个索引位置设置名称、ip、端口等gossip = &(hdr->data.ping.gossip[gossipcount]);memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);gossip->ping_sent = htonl(this->ping_sent);gossip->pong_received = htonl(this->pong_received);memcpy(gossip->ip,this->ip,sizeof(this->ip));gossip->port = htons(this->port);gossip->flags = htons(this->flags);gossip->notused1 = 0;gossip->notused2 = 0;gossipcount++;}//......//创建一个发送事件提交给redis发送出去clusterSendMessage(link,buf,totlen);zfree(buf);
}

节点解析ping消息并回复

每个集群的节点都会定时检查和对端链接的连接是否断开,如果断开的非阻塞向其发送连接,并注册一个处理器clusterReadHandler处理对端的ping等消息,所以我们上文的ping消息实际上就是通过这个函数进行解析读取:

在这里插入图片描述

对此我们给出这段源码的入口即可集群的定时任务clusterCron方法,可以看到其内部会便利当前节点通信的节点,查看连接是否为空,若为空则发起连接并注册clusterReadHandler处理消息:

void clusterCron(void) {//......di = dictGetSafeIterator(server.cluster->nodes);//遍历与当前节点保持通信的节点while((de = dictNext(di)) != NULL) {clusterNode *node = dictGetVal(de);//如果连接为空则非阻塞发起连接,然后注册clusterReadHandler处理对端节点的消息if (node->link == NULL) {int fd;mstime_t old_ping_sent;clusterLink *link;fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);//......//创建链接对应存储数据的空间link = createClusterLink(node);link->fd = fd;node->link = link;//为这个链接注册clusterReadHandler处理发送的消息aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);//......}}}

步入clusterReadHandler即可看到redis服务端解析消息存储到buf并通过clusterProcessPacket解析的逻辑:

void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {//......while(1) { /* Read as long as there is data to read. *///......//hdr指向link->rcvbufhdr = (clusterMsg*) link->rcvbuf;//读取消息到buf即link->rcvbuf中nread = read(fd,buf,readlen);//......if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {//调用clusterProcessPacket解析这个连接的消息,即 link->rcvbufif (clusterProcessPacket(link)) {sdsfree(link->rcvbuf);link->rcvbuf = sdsempty();} else {return; /* Link no longer valid. */}}}
}

clusterProcessPacket即是该方法的核心所在,它会将对端节点发送的消息进行解析与处理,这里我们就以收到pong消息为例说明一下流程,假设回复pong的是master节点,它会更新收到这条网络连接pong响应时间,然后解析报文内容,如果发现有个节点不在我们的节点列表中,将其存入node字典表中:

在这里插入图片描述

int clusterProcessPacket(clusterLink *link) {//....../* Perform sanity checks *///消息完整性校验//....../* Check if the sender is a known node. *///检查发送节点是否是已知节点sender = clusterLookupNode(hdr->sender);//......//....../* PING, PONG, MEET:消息处理逻辑 */if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||type == CLUSTERMSG_TYPE_MEET){//......//如果收到pong则更新pong_received为当前时间if (link->node && type == CLUSTERMSG_TYPE_PONG) {link->node->pong_received = mstime();link->node->ping_sent = 0;//......}//......//如果当前节点则调用clusterProcessGossipSection查看是否有新的节点,若有则发起握手并记录if (sender) clusterProcessGossipSection(hdr,link);} else if (type == CLUSTERMSG_TYPE_FAIL) {//......}//......    return 1;
}

步入clusterProcessGossipSection即可看到该函数会遍历消息中的节点一旦发现该节点是新添加节点则调用clusterStartHandshake其存入nodes字典表中:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {uint16_t count = ntohs(hdr->count);//解析当前节点gossip消息内容clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);//遍历nodewhile(count--) {//......//打印当前节点信息redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",g->nodename,g->ip,ntohs(g->port),ci);node = clusterLookupNode(g->nodename);if (node) {//已知节点处理,如果不可通信才握手重连//......} else {//未知节点则发起握手,若握手建立通信成功则将其存入nodes字典中//......if (sender &&!(flags & REDIS_NODE_NOADDR) &&!clusterBlacklistExists(g->nodename)){clusterStartHandshake(g->ip,ntohs(g->port));}}//走到下一个节点g++;}
}

我们给出clusterStartHandshake中将其存入serverclusternodes字典表的逻辑:

int clusterStartHandshake(char *ip, int port) {//......//如果处于握手中,则说明之前已经发现并进行通信了,直接返回if (clusterHandshakeInProgress(norm_ip,port)) {errno = EAGAIN;return 0;}//基于消息创建node结构其,并调用clusterAddNode将其存入server.cluster->nodes字典表中n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);memcpy(n->ip,norm_ip,sizeof(n->ip));n->port = port;clusterAddNode(n);return 1;
}

小结

自此我们将Redis集群节点通信的全流程的核心部分都分析完成了,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM/Linux嵌入式面经(十六):蔚来嵌入式一二三面面经

文章目录 static作用,局部static和全局static区别TCP三次握手Linux虚拟内存指针引用区别C++内存分区new/delete和malloc/free区别职业规划为什么选择蔚来介绍一下项目然后问我有没有内核级别开发经验,我说没有什么情况进入内核态一、主动式二、被动式三、其他方式注意事项示例…

MAVSKD-Java开源库mavsdk_server库macOS平台编译

1.下载源码 2.使用IDEA打开,进行mavsdk_server目录,使用gradle进行编译 3.开始编译时会自动下载依赖 4.下载完成后,会自动编译 5.编译成功 6.成功生成AAR文件

2024算力基础设施安全架构设计与思考(免费下载)

算网安全体系是将数据中心集群、算力枢纽、一体化大数据中心三个层级的安全需求进行工程化解耦&#xff0c;从国家安全角度统筹设计&#xff0c;通过安全 服务化方式&#xff0c;依托威胁情报和指挥协同通道将三层四级安全体系串联贯通&#xff0c;达成一体化大数据安全目标。 …

文件IO(Ubuntu)

文件IO 目的 将数据写入文件中 与标准IO的区别 &#xff08;为什么要学习文件IO&#xff09; 标准IO只能操作普通文件和特殊的管道文件 文件IO能操作几乎所有的的文件 缓存区的目的 标准IO有缓存区 文件IO没有缓存区 根据右图描述 标准IO 文件IO buffer缓存区 有缓存区…

数据库管理的艺术(MySQL):DDL、DML、DQL、DCL及TPL的实战应用(上:数据定义与控制)

文章目录 DDL数据定义语言1、创建数据库2、创建表3、修改表结构4、删除5、数据类型 列的约束主键约束&#xff08;primary key&#xff09;唯一约束&#xff08;unique key&#xff09;非空约束检查约束&#xff08;check&#xff09;外键约束&#xff08;foreign key&#xff…

水域救援装备的详细简介_鼎跃安全

水域救援行动需要救援人员配备全面、专业的装备&#xff0c;以应对各种复杂的水域环境和救援任务。水域救援套装应运而生&#xff0c;它集合了水域救援所需的各类关键装备&#xff0c;为救援人员提供全方位的保护和辅助&#xff0c;确保数援行动的高效与安全。 水域救援头盔&am…

S参数入门

一、说明 S参数全称为散射参数&#xff0c;主要用来作为描述线性无源互联结构的一种行为模型&#xff0c;来源于网络分析方法。网络分析法是一种频域方法&#xff0c;在一组离散的频率点上&#xff0c;通过在输入和输出端口得到的参量完全描述线性时不变系统&#xff08;定义参…

npm install 出现canvas错误

npm install canvas2.8.0 --ignore-scripts只要是&#xff1a;npm ERR! Failed at the XXXX.X.X install script 这种错误 都可以&#xff1a;npm install XXXX.X.X --ignore-scripts进行更改 https://blog.csdn.net/YXWik/article/details/119039561

PyTorch 深度学习实践-循环神经网络基础篇

视频指路 参考博客笔记 参考笔记二 文章目录 上课笔记基于RNNCell实现总代码 基于RNN实现总代码 含嵌入层的RNN网络嵌入层的作用含嵌入层的RNN网络架构总代码 其他RNN扩展基本注意力机制自注意力机制&#xff08;Self-Attention&#xff09;自注意力计算多头注意力机制&#xf…

浏览器的卡顿与react的解决思路

以下内容是阅读过程中结合自己的思考而诞生的产物&#xff0c;不一定准确&#xff0c;但相反的&#xff0c;可能个人对实际情况有很大的误解。 仅做参考&#xff0c;欢迎指正。 前面提到浏览器显示的其实是渲染流程最后渲染出来的一张图片&#xff0c;而一个行为引起的副作用需…

系统架构师(每日一练4)

每日一练 1.在网络操作系统环境中&#xff0c;若用户UserA的文件或文件夹被共享后&#xff0c;则()。 答案与解析 A.UserA 的安全性与未共享时相比将会有所提高 B.UserA 的安全性与未共享时相比将会有所下降 C.UserA 的可靠性与未共享时相比将会有所提高 D.UserA 的方便性与未…

redis笔记和测试

redis是用c语言写的,放不频繁更新的数据&#xff08;用户数据。课程数据&#xff09; Redis 中&#xff0c;"穿透"通常指的是缓存穿透&#xff08;Cache Penetration&#xff09;问题&#xff0c;这是指一种恶意或非法请求直接绕过缓存层&#xff0c;直接访问数据库或…

Nginx(详解)

1. 什么是Nginx&#xff1f; Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器&#xff0c;在BSD-like 协议下发行。其特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上nginx的并发能力在同类型的网页服务器中表…

使用ElementUI和element-china-area-data库实现省市区三级联动组件封装

在前端开发中&#xff0c;省市区三级联动是一个常见的需求。今天我们将使用Vue.js和ElementUI组件库&#xff0c;结合element-china-area-data库&#xff0c;来实现一个省市区三级联动的组件。这个组件不仅可以提高用户体验&#xff0c;还能大大简化我们的代码。接下来&#xf…

解决用PicGo为typora配置github图床失败的问题

问题 用PicGo给typora配置图床之&#xff0c;试了好几次&#xff0c;验证图片上传选项时一直都是success:false。 解决办法 安装了github-plus插件后&#xff0c;PicGo的图床设置里会出现github-plus&#xff0c;按照在GitHub选项里的信息在github-plus里再设置一遍&#xf…

swift小知识点(二)

1、 Swift 枚举 Swift 中使用 enum 关键词来创建枚举并且把它们的整个定义放在一对大括号内&#xff1a; enum enumname {// 枚举定义放在这里 } 如下事例&#xff1a; // 定义枚举 enum DaysofaWeek {case Sundaycase Mondaycase TUESDAYcase WEDNESDAYcase THURSDAYcase…

elementUI在手机端使用遇到的问题总结

之前的博客有写过用vue2elementUI封装手机端选择器picker组件&#xff0c;支持单选、多选、远程搜索多选&#xff0c;最终真机调试的时候发现有很多细节样式需要调整。此篇博客记录下我调试过程中遇到的问题和解决方法。 一、手机真机怎么连电脑本地代码调试&#xff1f; 1.确…

C++版OpenCV_02_几何变换

几何变换&#xff0c;持续更新 2.1 仿射变换2.2 投影变换2.3 极坐标变换 几何变换&#xff1a; 仿射变换&#xff1a;平移、放大缩小、旋转、计算仿射矩阵&#xff08;方程法、矩阵法&#xff09;、插值&#xff08;最近邻、双线性插值&#xff09;、投影变换极坐标变换 2.1 仿…

Blender4.2版本正式上线,新版本的5个主要功能!

​Blender刚刚推出了备受瞩目的 Blender 4.2 版本&#xff0c;这款软件专为那些在视觉特效、动画制作、游戏开发和可视化设计领域工作的艺术家们量身打造。作为最新的长期稳定更新&#xff0c;Blender 4.2 不仅稳定可靠&#xff0c;还引入了备受期待的“Eevee Next”实时渲染引…

Milvus 实践(1) --- 文本-图片交互式search搭建及原理

目录 背景 训练素材 download torchvision 简介 python代码 执行结果 模型训练 模型训练参数 训练模型 注意事项 模型加载 录入vectorDB 使用预加载的模型参数对图片进行编码 录入milvus 查询效果 查询编码 milvus search 模型适用列表 总结 背景 应该说Mi…