写在文章开头
今天我们来聊点有意思的,关于redis中集群间通信的设计与实现,本文将从源码的角度分析redis
集群节点如何利用Gossip
协议完成节点间的通信与传播,希望对你有帮助。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解Redis集群节点通信的设计与实现
详解Gossip协议
在此之前我们先简单介绍一下Gossip
协议,该协议是分布式集群的一种通信协议,我们都知道管理集群的方式有中心化和去中心化两种方式,中心化的方式是通过第一个第三方的管理中心,例如zookeeper
等来维护一份集群节点的信息、状态。
而redis
采用的是去中心化的方式实现集群节点通信,即通过Gossip协议
进行节点通信,让各个节点之间两两通信,广播与自己保持交流的节点,由此将节点串联起来构成一张关系网。
我们以一个简单的场景为例介绍一下Gossip协议,默认情况下我们的当前有3个节点的集群,各个节点彼此按照通信要求发送自己的信息和与自己保持交流的节点,由此将有限的资源共享出去构成一个集群。
此时,我们需要横向扩展一个节点4,我们只需配置/redis-cli --cluster add-node 新节点IP:新节点端口 任意存活节点IP:任意存活节点端口
,这个存活节点后续和其他节点通信时,就会将当前新添加的节点4发送出去,由此其他节点收到这个消息并存储下来,经过各个节点的不断反复通信,这个集群中的各个节点就会拥有集群中所有节点的信息。
集群消息协定
任何通信都是需要按照协议规范进行,redis
集群也一样,为了保证节点间通信的规范,redis
要求集群节点通信的消息的类型可以是以下几种:
ping
消息,用来向其他节点发送节点信息。- 回复
ping
的pong
消息。 - 如果当前节点中存在新添加的节点,则通过
meet
格式的消息发送给其他节点。 - 如果节点出现故障,则发送fail消息告知集群其他节点。
对此我们给出消息的宏定义代码,位于cluster.h
中:
//集群中的ping
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
//想加入集群的节点
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
//某个节点有故障
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
集群节点消息体
后续集群都会通过clusterMsg
来表示一条消息,它记录消息长度以及发送节点名称、负责的slots以及节点端口号等信息:
typedef struct {char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). *///消息总长度uint32_t totlen; /* Total length of this message *///......//消息类型uint16_t type; /* Message type *///......//发送节点的名称char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node *///发送节点负责的slotsunsigned char myslots[REDIS_CLUSTER_SLOTS/8];//......char notused1[32]; /* 32 bytes reserved for future usage. *///节点端口uint16_t port; /* Sender TCP base port *///......//记录消息的消息体union clusterMsgData data;
} clusterMsg;
这里我们对这个消息体clusterMsgData
进行展开说明一下,可以看到他用一段共用体维护各种类型消息的结构,这其中我们只需要了解的是ping
消息,从注释可以看到ping消息这个结构体可以发送ping
、meet
、pong
等类型消息,ping消息类型其内部用clusterMsgDataGossip
数组维护,这一点这个消息可以包含多个节点信息存于数组中:
union clusterMsgData {//可以发送ping meet pong的消息,该结构体内部有clusterMsgDataGossip数组,这意味这个结构体可以存放多个节点的消息struct {/* Array of N clusterMsgDataGossip structures */clusterMsgDataGossip gossip[1];} ping;//......
};
步入clusterMsgDataGossip
即可看到这个结构体存储的是需要发送给它人的节点名称、ping
和收到ping
的时间以及端口号等信息:
typedef struct {char nodename[REDIS_CLUSTER_NAMELEN];//节点名称uint32_t ping_sent; //发送ping的事件uint32_t pong_received;//收到pong的事件char ip[REDIS_IP_STR_LEN]; //广播的节点ipuint16_t port; //节点与客户端进行通信的端口//......
} clusterMsgDataGossip;
我们来简单小结一下,假设我们的某个节点向其他节点发送ping
消息告知自己维护的节点信息和状态,那么对应的消息格式大体如下图所示:
详解集群节点ping流程
集群节点的指向流程也是交由redis
的时间事件serverCron
执行,它会每个100ms
执行一次集群的定任务clusterCron
方法,其内部会检查这个定时任务是否执行了10次,一旦执行10次(也就是100ms*10即每1秒)后就会随机从当前节点维护的其他节点信息字典表中抽取5个节点,找到最早发送pong给当前节点发送一条ping
消息:
对此我们给出定时执行的serverCron
函数,可以看到其内部每100ms
执行一次集群定时任务clusterCron
:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {//......//100ms执行一次集群的函数 run_with_period(100) {if (server.cluster_enabled) clusterCron();}//......
}
我们步入clusterCron即可看到
void clusterCron(void) {//......// 每10次即每过去1s执行一次这段逻辑if (!(iteration % 10)) {int j;//随机选出5个节点for (j = 0; j < 5; j++) {de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);/* Don't ping nodes disconnected or with a ping currently active. *///断连、或者自己、或者正在握手的节点不处理if (this->link == NULL || this->ping_sent != 0) continue;if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))continue;//选择最早收到pong的节点 if (min_pong_node == NULL || min_pong > this->pong_received) {min_pong_node = this;min_pong = this->pong_received;}}//向最早收到pong的调用clusterSendPing发送消息if (min_pong_node) {redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);}}//......
}
步入clusterSendPing
即可看到我们所说的核心逻辑,即按照公式计算出要发送给最早回复pong
的节点对应节点数,然后封装成消息发送出去:
void clusterSendPing(clusterLink *link, int type) {//......//我们希望添加的最大节点数,集群总是减去自己和正在握手的int freshnodes = dictSize(server.cluster->nodes)-2;//......//计算wantedwanted = floor(dictSize(server.cluster->nodes)/10);if (wanted < 3) wanted = 3;if (wanted > freshnodes) wanted = freshnodes;//....../* Populate the header. *///设置ping消息头,构建端口号、slot等信息if (link->node && type == CLUSTERMSG_TYPE_PING)link->node->ping_sent = mstime();clusterBuildMessageHdr(hdr,type);/* Populate the gossip fields */int maxiterations = wanted*3;//基于maxiterations进行循环随机抽取自己维护的节点信息并组装while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {dictEntry *de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);clusterMsgDataGossip *gossip;int j;//如果是自己则跳过if (this == myself) continue;//故障节点不发送if (maxiterations > wanted*2 &&!(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))continue;//....freshnodes--;//组装当前节点的名称、ip、端口等信息存到hdr所指向的消息结构体//指向gossip某个索引位置设置名称、ip、端口等gossip = &(hdr->data.ping.gossip[gossipcount]);memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);gossip->ping_sent = htonl(this->ping_sent);gossip->pong_received = htonl(this->pong_received);memcpy(gossip->ip,this->ip,sizeof(this->ip));gossip->port = htons(this->port);gossip->flags = htons(this->flags);gossip->notused1 = 0;gossip->notused2 = 0;gossipcount++;}//......//创建一个发送事件提交给redis发送出去clusterSendMessage(link,buf,totlen);zfree(buf);
}
节点解析ping消息并回复
每个集群的节点都会定时检查和对端链接的连接是否断开,如果断开的非阻塞向其发送连接,并注册一个处理器clusterReadHandler
处理对端的ping等消息,所以我们上文的ping
消息实际上就是通过这个函数进行解析读取:
对此我们给出这段源码的入口即可集群的定时任务clusterCron
方法,可以看到其内部会便利当前节点通信的节点,查看连接是否为空,若为空则发起连接并注册clusterReadHandler
处理消息:
void clusterCron(void) {//......di = dictGetSafeIterator(server.cluster->nodes);//遍历与当前节点保持通信的节点while((de = dictNext(di)) != NULL) {clusterNode *node = dictGetVal(de);//如果连接为空则非阻塞发起连接,然后注册clusterReadHandler处理对端节点的消息if (node->link == NULL) {int fd;mstime_t old_ping_sent;clusterLink *link;fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);//......//创建链接对应存储数据的空间link = createClusterLink(node);link->fd = fd;node->link = link;//为这个链接注册clusterReadHandler处理发送的消息aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);//......}}}
步入clusterReadHandler
即可看到redis
服务端解析消息存储到buf并通过clusterProcessPacket
解析的逻辑:
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {//......while(1) { /* Read as long as there is data to read. *///......//hdr指向link->rcvbufhdr = (clusterMsg*) link->rcvbuf;//读取消息到buf即link->rcvbuf中nread = read(fd,buf,readlen);//......if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {//调用clusterProcessPacket解析这个连接的消息,即 link->rcvbufif (clusterProcessPacket(link)) {sdsfree(link->rcvbuf);link->rcvbuf = sdsempty();} else {return; /* Link no longer valid. */}}}
}
而clusterProcessPacket
即是该方法的核心所在,它会将对端节点发送的消息进行解析与处理,这里我们就以收到pong
消息为例说明一下流程,假设回复pong
的是master
节点,它会更新收到这条网络连接pong
响应时间,然后解析报文内容,如果发现有个节点不在我们的节点列表中,将其存入node
字典表中:
int clusterProcessPacket(clusterLink *link) {//....../* Perform sanity checks *///消息完整性校验//....../* Check if the sender is a known node. *///检查发送节点是否是已知节点sender = clusterLookupNode(hdr->sender);//......//....../* PING, PONG, MEET:消息处理逻辑 */if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||type == CLUSTERMSG_TYPE_MEET){//......//如果收到pong则更新pong_received为当前时间if (link->node && type == CLUSTERMSG_TYPE_PONG) {link->node->pong_received = mstime();link->node->ping_sent = 0;//......}//......//如果当前节点则调用clusterProcessGossipSection查看是否有新的节点,若有则发起握手并记录if (sender) clusterProcessGossipSection(hdr,link);} else if (type == CLUSTERMSG_TYPE_FAIL) {//......}//...... return 1;
}
步入clusterProcessGossipSection
即可看到该函数会遍历消息中的节点一旦发现该节点是新添加节点则调用clusterStartHandshake
其存入nodes
字典表中:
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {uint16_t count = ntohs(hdr->count);//解析当前节点gossip消息内容clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);//遍历nodewhile(count--) {//......//打印当前节点信息redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",g->nodename,g->ip,ntohs(g->port),ci);node = clusterLookupNode(g->nodename);if (node) {//已知节点处理,如果不可通信才握手重连//......} else {//未知节点则发起握手,若握手建立通信成功则将其存入nodes字典中//......if (sender &&!(flags & REDIS_NODE_NOADDR) &&!clusterBlacklistExists(g->nodename)){clusterStartHandshake(g->ip,ntohs(g->port));}}//走到下一个节点g++;}
}
我们给出clusterStartHandshake
中将其存入server
的cluster
的nodes
字典表的逻辑:
int clusterStartHandshake(char *ip, int port) {//......//如果处于握手中,则说明之前已经发现并进行通信了,直接返回if (clusterHandshakeInProgress(norm_ip,port)) {errno = EAGAIN;return 0;}//基于消息创建node结构其,并调用clusterAddNode将其存入server.cluster->nodes字典表中n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);memcpy(n->ip,norm_ip,sizeof(n->ip));n->port = port;clusterAddNode(n);return 1;
}
小结
自此我们将Redis集群节点通信的全流程的核心部分都分析完成了,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。