来聊聊去中心化Redis集群节点如何完成通信

写在文章开头

今天我们来聊点有意思的,关于redis中集群间通信的设计与实现,本文将从源码的角度分析redis集群节点如何利用Gossip协议完成节点间的通信与传播,希望对你有帮助。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

详解Redis集群节点通信的设计与实现

详解Gossip协议

在此之前我们先简单介绍一下Gossip协议,该协议是分布式集群的一种通信协议,我们都知道管理集群的方式有中心化和去中心化两种方式,中心化的方式是通过第一个第三方的管理中心,例如zookeeper等来维护一份集群节点的信息、状态。

在这里插入图片描述

redis采用的是去中心化的方式实现集群节点通信,即通过Gossip协议进行节点通信,让各个节点之间两两通信,广播与自己保持交流的节点,由此将节点串联起来构成一张关系网。

在这里插入图片描述

我们以一个简单的场景为例介绍一下Gossip协议,默认情况下我们的当前有3个节点的集群,各个节点彼此按照通信要求发送自己的信息和与自己保持交流的节点,由此将有限的资源共享出去构成一个集群。

此时,我们需要横向扩展一个节点4,我们只需配置/redis-cli --cluster add-node 新节点IP:新节点端口 任意存活节点IP:任意存活节点端口 ,这个存活节点后续和其他节点通信时,就会将当前新添加的节点4发送出去,由此其他节点收到这个消息并存储下来,经过各个节点的不断反复通信,这个集群中的各个节点就会拥有集群中所有节点的信息。

在这里插入图片描述

集群消息协定

任何通信都是需要按照协议规范进行,redis集群也一样,为了保证节点间通信的规范,redis要求集群节点通信的消息的类型可以是以下几种:

  1. ping消息,用来向其他节点发送节点信息。
  2. 回复pingpong消息。
  3. 如果当前节点中存在新添加的节点,则通过meet格式的消息发送给其他节点。
  4. 如果节点出现故障,则发送fail消息告知集群其他节点。

对此我们给出消息的宏定义代码,位于cluster.h中:

//集群中的ping
#define CLUSTERMSG_TYPE_PING 0          /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
//想加入集群的节点
#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
//某个节点有故障
#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */

集群节点消息体

后续集群都会通过clusterMsg来表示一条消息,它记录消息长度以及发送节点名称、负责的slots以及节点端口号等信息:

typedef struct {char sig[4];        /* Siganture "RCmb" (Redis Cluster message bus). *///消息总长度uint32_t totlen;    /* Total length of this message *///......//消息类型uint16_t type;      /* Message type *///......//发送节点的名称char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node *///发送节点负责的slotsunsigned char myslots[REDIS_CLUSTER_SLOTS/8];//......char notused1[32];  /* 32 bytes reserved for future usage. *///节点端口uint16_t port;      /* Sender TCP base port *///......//记录消息的消息体union clusterMsgData data;
} clusterMsg;

这里我们对这个消息体clusterMsgData进行展开说明一下,可以看到他用一段共用体维护各种类型消息的结构,这其中我们只需要了解的是ping消息,从注释可以看到ping消息这个结构体可以发送pingmeetpong等类型消息,ping消息类型其内部用clusterMsgDataGossip数组维护,这一点这个消息可以包含多个节点信息存于数组中:

union clusterMsgData {//可以发送ping meet pong的消息,该结构体内部有clusterMsgDataGossip数组,这意味这个结构体可以存放多个节点的消息struct {/* Array of N clusterMsgDataGossip structures */clusterMsgDataGossip gossip[1];} ping;//......
};

步入clusterMsgDataGossip即可看到这个结构体存储的是需要发送给它人的节点名称、ping和收到ping的时间以及端口号等信息:

typedef struct {char nodename[REDIS_CLUSTER_NAMELEN];//节点名称uint32_t ping_sent; //发送ping的事件uint32_t pong_received;//收到pong的事件char ip[REDIS_IP_STR_LEN];  //广播的节点ipuint16_t port;          //节点与客户端进行通信的端口//......
} clusterMsgDataGossip;

我们来简单小结一下,假设我们的某个节点向其他节点发送ping消息告知自己维护的节点信息和状态,那么对应的消息格式大体如下图所示:

在这里插入图片描述

详解集群节点ping流程

集群节点的指向流程也是交由redis的时间事件serverCron执行,它会每个100ms执行一次集群的定任务clusterCron方法,其内部会检查这个定时任务是否执行了10次,一旦执行10次(也就是100ms*10即每1秒)后就会随机从当前节点维护的其他节点信息字典表中抽取5个节点,找到最早发送pong给当前节点发送一条ping消息:

在这里插入图片描述

对此我们给出定时执行的serverCron函数,可以看到其内部每100ms执行一次集群定时任务clusterCron

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {//......//100ms执行一次集群的函数 run_with_period(100) {if (server.cluster_enabled) clusterCron();}//......
}

我们步入clusterCron即可看到

void clusterCron(void) {//......// 每10次即每过去1s执行一次这段逻辑if (!(iteration % 10)) {int j;//随机选出5个节点for (j = 0; j < 5; j++) {de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);/* Don't ping nodes disconnected or with a ping currently active. *///断连、或者自己、或者正在握手的节点不处理if (this->link == NULL || this->ping_sent != 0) continue;if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))continue;//选择最早收到pong的节点    if (min_pong_node == NULL || min_pong > this->pong_received) {min_pong_node = this;min_pong = this->pong_received;}}//向最早收到pong的调用clusterSendPing发送消息if (min_pong_node) {redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);}}//......
}

步入clusterSendPing即可看到我们所说的核心逻辑,即按照公式计算出要发送给最早回复pong的节点对应节点数,然后封装成消息发送出去:

void clusterSendPing(clusterLink *link, int type) {//......//我们希望添加的最大节点数,集群总是减去自己和正在握手的int freshnodes = dictSize(server.cluster->nodes)-2;//......//计算wantedwanted = floor(dictSize(server.cluster->nodes)/10);if (wanted < 3) wanted = 3;if (wanted > freshnodes) wanted = freshnodes;//....../* Populate the header. *///设置ping消息头,构建端口号、slot等信息if (link->node && type == CLUSTERMSG_TYPE_PING)link->node->ping_sent = mstime();clusterBuildMessageHdr(hdr,type);/* Populate the gossip fields */int maxiterations = wanted*3;//基于maxiterations进行循环随机抽取自己维护的节点信息并组装while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {dictEntry *de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);clusterMsgDataGossip *gossip;int j;//如果是自己则跳过if (this == myself) continue;//故障节点不发送if (maxiterations > wanted*2 &&!(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))continue;//....freshnodes--;//组装当前节点的名称、ip、端口等信息存到hdr所指向的消息结构体//指向gossip某个索引位置设置名称、ip、端口等gossip = &(hdr->data.ping.gossip[gossipcount]);memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);gossip->ping_sent = htonl(this->ping_sent);gossip->pong_received = htonl(this->pong_received);memcpy(gossip->ip,this->ip,sizeof(this->ip));gossip->port = htons(this->port);gossip->flags = htons(this->flags);gossip->notused1 = 0;gossip->notused2 = 0;gossipcount++;}//......//创建一个发送事件提交给redis发送出去clusterSendMessage(link,buf,totlen);zfree(buf);
}

节点解析ping消息并回复

每个集群的节点都会定时检查和对端链接的连接是否断开,如果断开的非阻塞向其发送连接,并注册一个处理器clusterReadHandler处理对端的ping等消息,所以我们上文的ping消息实际上就是通过这个函数进行解析读取:

在这里插入图片描述

对此我们给出这段源码的入口即可集群的定时任务clusterCron方法,可以看到其内部会便利当前节点通信的节点,查看连接是否为空,若为空则发起连接并注册clusterReadHandler处理消息:

void clusterCron(void) {//......di = dictGetSafeIterator(server.cluster->nodes);//遍历与当前节点保持通信的节点while((de = dictNext(di)) != NULL) {clusterNode *node = dictGetVal(de);//如果连接为空则非阻塞发起连接,然后注册clusterReadHandler处理对端节点的消息if (node->link == NULL) {int fd;mstime_t old_ping_sent;clusterLink *link;fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);//......//创建链接对应存储数据的空间link = createClusterLink(node);link->fd = fd;node->link = link;//为这个链接注册clusterReadHandler处理发送的消息aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);//......}}}

步入clusterReadHandler即可看到redis服务端解析消息存储到buf并通过clusterProcessPacket解析的逻辑:

void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {//......while(1) { /* Read as long as there is data to read. *///......//hdr指向link->rcvbufhdr = (clusterMsg*) link->rcvbuf;//读取消息到buf即link->rcvbuf中nread = read(fd,buf,readlen);//......if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {//调用clusterProcessPacket解析这个连接的消息,即 link->rcvbufif (clusterProcessPacket(link)) {sdsfree(link->rcvbuf);link->rcvbuf = sdsempty();} else {return; /* Link no longer valid. */}}}
}

clusterProcessPacket即是该方法的核心所在,它会将对端节点发送的消息进行解析与处理,这里我们就以收到pong消息为例说明一下流程,假设回复pong的是master节点,它会更新收到这条网络连接pong响应时间,然后解析报文内容,如果发现有个节点不在我们的节点列表中,将其存入node字典表中:

在这里插入图片描述

int clusterProcessPacket(clusterLink *link) {//....../* Perform sanity checks *///消息完整性校验//....../* Check if the sender is a known node. *///检查发送节点是否是已知节点sender = clusterLookupNode(hdr->sender);//......//....../* PING, PONG, MEET:消息处理逻辑 */if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||type == CLUSTERMSG_TYPE_MEET){//......//如果收到pong则更新pong_received为当前时间if (link->node && type == CLUSTERMSG_TYPE_PONG) {link->node->pong_received = mstime();link->node->ping_sent = 0;//......}//......//如果当前节点则调用clusterProcessGossipSection查看是否有新的节点,若有则发起握手并记录if (sender) clusterProcessGossipSection(hdr,link);} else if (type == CLUSTERMSG_TYPE_FAIL) {//......}//......    return 1;
}

步入clusterProcessGossipSection即可看到该函数会遍历消息中的节点一旦发现该节点是新添加节点则调用clusterStartHandshake其存入nodes字典表中:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {uint16_t count = ntohs(hdr->count);//解析当前节点gossip消息内容clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);//遍历nodewhile(count--) {//......//打印当前节点信息redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",g->nodename,g->ip,ntohs(g->port),ci);node = clusterLookupNode(g->nodename);if (node) {//已知节点处理,如果不可通信才握手重连//......} else {//未知节点则发起握手,若握手建立通信成功则将其存入nodes字典中//......if (sender &&!(flags & REDIS_NODE_NOADDR) &&!clusterBlacklistExists(g->nodename)){clusterStartHandshake(g->ip,ntohs(g->port));}}//走到下一个节点g++;}
}

我们给出clusterStartHandshake中将其存入serverclusternodes字典表的逻辑:

int clusterStartHandshake(char *ip, int port) {//......//如果处于握手中,则说明之前已经发现并进行通信了,直接返回if (clusterHandshakeInProgress(norm_ip,port)) {errno = EAGAIN;return 0;}//基于消息创建node结构其,并调用clusterAddNode将其存入server.cluster->nodes字典表中n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);memcpy(n->ip,norm_ip,sizeof(n->ip));n->port = port;clusterAddNode(n);return 1;
}

小结

自此我们将Redis集群节点通信的全流程的核心部分都分析完成了,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MAVSKD-Java开源库mavsdk_server库macOS平台编译

1.下载源码 2.使用IDEA打开,进行mavsdk_server目录,使用gradle进行编译 3.开始编译时会自动下载依赖 4.下载完成后,会自动编译 5.编译成功 6.成功生成AAR文件

2024算力基础设施安全架构设计与思考(免费下载)

算网安全体系是将数据中心集群、算力枢纽、一体化大数据中心三个层级的安全需求进行工程化解耦&#xff0c;从国家安全角度统筹设计&#xff0c;通过安全 服务化方式&#xff0c;依托威胁情报和指挥协同通道将三层四级安全体系串联贯通&#xff0c;达成一体化大数据安全目标。 …

文件IO(Ubuntu)

文件IO 目的 将数据写入文件中 与标准IO的区别 &#xff08;为什么要学习文件IO&#xff09; 标准IO只能操作普通文件和特殊的管道文件 文件IO能操作几乎所有的的文件 缓存区的目的 标准IO有缓存区 文件IO没有缓存区 根据右图描述 标准IO 文件IO buffer缓存区 有缓存区…

数据库管理的艺术(MySQL):DDL、DML、DQL、DCL及TPL的实战应用(上:数据定义与控制)

文章目录 DDL数据定义语言1、创建数据库2、创建表3、修改表结构4、删除5、数据类型 列的约束主键约束&#xff08;primary key&#xff09;唯一约束&#xff08;unique key&#xff09;非空约束检查约束&#xff08;check&#xff09;外键约束&#xff08;foreign key&#xff…

水域救援装备的详细简介_鼎跃安全

水域救援行动需要救援人员配备全面、专业的装备&#xff0c;以应对各种复杂的水域环境和救援任务。水域救援套装应运而生&#xff0c;它集合了水域救援所需的各类关键装备&#xff0c;为救援人员提供全方位的保护和辅助&#xff0c;确保数援行动的高效与安全。 水域救援头盔&am…

S参数入门

一、说明 S参数全称为散射参数&#xff0c;主要用来作为描述线性无源互联结构的一种行为模型&#xff0c;来源于网络分析方法。网络分析法是一种频域方法&#xff0c;在一组离散的频率点上&#xff0c;通过在输入和输出端口得到的参量完全描述线性时不变系统&#xff08;定义参…

PyTorch 深度学习实践-循环神经网络基础篇

视频指路 参考博客笔记 参考笔记二 文章目录 上课笔记基于RNNCell实现总代码 基于RNN实现总代码 含嵌入层的RNN网络嵌入层的作用含嵌入层的RNN网络架构总代码 其他RNN扩展基本注意力机制自注意力机制&#xff08;Self-Attention&#xff09;自注意力计算多头注意力机制&#xf…

redis笔记和测试

redis是用c语言写的,放不频繁更新的数据&#xff08;用户数据。课程数据&#xff09; Redis 中&#xff0c;"穿透"通常指的是缓存穿透&#xff08;Cache Penetration&#xff09;问题&#xff0c;这是指一种恶意或非法请求直接绕过缓存层&#xff0c;直接访问数据库或…

Nginx(详解)

1. 什么是Nginx&#xff1f; Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器&#xff0c;在BSD-like 协议下发行。其特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上nginx的并发能力在同类型的网页服务器中表…

elementUI在手机端使用遇到的问题总结

之前的博客有写过用vue2elementUI封装手机端选择器picker组件&#xff0c;支持单选、多选、远程搜索多选&#xff0c;最终真机调试的时候发现有很多细节样式需要调整。此篇博客记录下我调试过程中遇到的问题和解决方法。 一、手机真机怎么连电脑本地代码调试&#xff1f; 1.确…

Blender4.2版本正式上线,新版本的5个主要功能!

​Blender刚刚推出了备受瞩目的 Blender 4.2 版本&#xff0c;这款软件专为那些在视觉特效、动画制作、游戏开发和可视化设计领域工作的艺术家们量身打造。作为最新的长期稳定更新&#xff0c;Blender 4.2 不仅稳定可靠&#xff0c;还引入了备受期待的“Eevee Next”实时渲染引…

LabVIEW在CRIO中串口通讯数据异常问题

排查与解决步骤 检查硬件连接&#xff1a; 确保CRIO的串口模块正确连接&#xff0c;并且电缆无损坏。 确认串口模块在CRIO中被正确识别和配置。 验证串口配置&#xff1a; 在LabVIEW项目中&#xff0c;检查CRIO目标下的串口配置&#xff0c;确保波特率、数据位、停止位和校验…

用EXCEL和python 计算马尔可夫链转移矩阵

目录 目标&#xff1a;用EXCEL和python 计算马尔可夫链转移矩阵 1 用EXCEL计算 1.1 马尔可夫链的基本应用 1.2 具体计算 2 用python计算马尔可夫转移矩阵 2.1 py代码 2.2 运行结果 3 上面2者计算结果相同 目标&#xff1a;用EXCEL和python 计算马尔可夫链转移矩阵 1 用…

【信号频率估计】MVDR算法及MATLAB仿真

目录 一、MVDR算法1.1 简介1.2 原理1.3 特点1.3.1 优点1.3.2 缺点 二、算法应用实例2.1 信号的频率估计2.2 MATLAB仿真代码 三、参考文献 一、MVDR算法 1.1 简介 最小方差无失真响应&#xff08;Mininum Variance Distortionless Response&#xff0c;MVDR&#xff09;算法最…

Dify中的高质量索引模式实现过程

思考在什么情况下会使用到高质量索引模式呢?第1种情况是在知识库中上传文档,文档被拆分为段落后需要进行编码(增加);第2种情况是在召回测试的时候,需要对query进行编码(查询);第3种情况是当文档中的段落增加和更新时需要进行编码(增加和更新)。索引模式是针对知识库…

大数据开发之Hadoop

大数据开发之Hadoop Hadoop的发展Hadoop的三个功能组件一、HDFS 分布式文件系统 1、HDFS的基础架构2、HDFS基础操作命令3、HDFS WEB浏览&#xff1a;4、Big Data Tools插件5、使用NFS网关功能将HDFS挂载到本地系统6、HDFS数据存储7、NameNode 元数据8、SecondaryNameNode的作用…

用DrissionPage过某里滑块分析

最近我又在找工作了&#xff0c;悲哀啊~&#xff0c;面试官给了一道题&#xff0c;要求如下&#xff1a; 爬虫机试&#xff1a;https://detail.1688.com/offer/643272204627.html 过该链接的滑动验证码&#xff0c;拿到正确的商品信息页html&#xff0c;提取出商品维度的信息&a…

Golang|Shopee一面

1、一个有环的链表&#xff0c;如何确认链表有环&#xff0c;环的长度。 LeetCode 142。原题为判断链表是否有环&#xff0c;如果有环找到环的起点。本题修改为求环的长度&#xff0c;基本思路一致&#xff0c;依然为双指针。当快慢指针相遇之后&#xff0c;如果寻找环的起点&…

Java | Leetcode Java题解之第258题各位相加

题目&#xff1a; 题解&#xff1a; class Solution {public int addDigits(int num) {while (num > 10) {int sum 0;while (num > 0) {sum num % 10;num / 10;}num sum;}return num;} }

[Doris]阿里云搭建Doris,测试环境1FE 1BE

首先&#xff1a;阿里云的国内服务器千万不要用容器搭建&#xff0c;或者自己Dockfile构建镜像。两种方式都不得行&#xff0c;压根拉不到github的镜像&#xff0c;开了镜像加速器也拉不到&#xff0c;不要折腾了&#xff0c;极其愚蠢。 背景&#xff1a;现在测试环境&#xff…