来聊聊redis集群数据迁移

写在文章开头

本文将是笔者对于redis源码分析的一个阶段的最后一篇,将从源码分析的角度让读者深入了解redis节点迁移的工作流程,希望对你有帮助。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

详解redis cluster数据迁移过程

节点基本结构定义

redis集群提供16384slot,我们可以按需分配给节点上,后续进行键值对存储时,我们就可以按照算法将键值对存到对应slot上的redis服务器上:
在这里插入图片描述

集群节点本质就是通过slots这个数组记录当前节点的所管理的情况,这里我们可以看到slots是一个char 数组,长度为REDIS_CLUSTER_SLOTS(16384)除8,这样做的原因是因为:

  1. char占1个字节,每个字节8位。
  2. 每个char可以记录8个slot的情况,如果是自己的slot则对应char的某一个位置记录为1:

我们以node-1为例,因为它负责0-5460的节点,所以它的slots0-5460都为1,对应的图解如下所示,可以看到笔者这里省略了后半部分,仅仅表示了0-15位置为1:

在这里插入图片描述

对此我们也给出这段redis中节点的定义,即位于cluster.h中的clusterNode这个结构体中,可以看slots这段定义:

typedef struct clusterNode {//......//记录集群负责的槽,总的为16384unsigned char slots[REDIS_CLUSTER_SLOTS/8]; //......
}

设置slot后续节点走向

以本文示例为例,我们希望后续节点2的数据全部存到节点1中,那么我们首先需要键入如下两条配置:

# 在节点1上执行,将节点2数据导入到节点1上CLUSTER SETSLOT 3 IMPORTING node2# 在节点2上执行,将自己的数据迁移到节点1CLUSTER SETSLOT 3 MIGRATING node1

这两条指最终都会被各自的服务端解析,并调用clusterCommand执行,我们以节点1导入为例,假设我们执行clusterCommand解析到setslot 关键字和importing关键字,即知晓要导入其他节点的数据。对应的节点1就会通过importing_slots_from数组标记自己将导入这个slot的数据,而节点2也会通过migrating_slots_to数组标记自己要将数据导出给其他节点的slot:

在这里插入图片描述

对此我们给出clusterCommand的执行流程,可以看到该函数解析出migrating或者importing关键字时就会将对的migrating_slots_to或者importing_slots_from数组对应slot位置的索引位置设置为当前上述命令传入的node id

void clusterCommand(redisClient *c) {//......if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//处理迁出的逻辑//看看自己是否有迁出的slot,没有则报错if (server.cluster->slots[slot] != myself) {addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);return;}//查看自己是否知晓这个node id,如果没有则报错if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {addReplyErrorFormat(c,"I don't know about node %s",(char*)c->argv[4]->ptr);return;}//标记迁出到slot为传入的nodeserver.cluster->migrating_slots_to[slot] = n;} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//处理迁入的逻辑//查看迁入的slot是否已经配置,如果有则报错if (server.cluster->slots[slot] == myself) {addReplyErrorFormat(c,"I'm already the owner of hash slot %u",slot);return;}//查看自己是否知晓要迁入数据的node的信息,如果不知道则报错if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {addReplyErrorFormat(c,"I don't know about node %s",(char*)c->argv[3]->ptr);return;}//标记迁入slot位置为传入的nodeidserver.cluster->importing_slots_from[slot] = n;} //......
}

后续的我们假设还是将set key value请求发送到节点2,因为上述命令的原因,节点会返回move/ask告知客户端这个键值对现在要存到节点1上。对应节点1收到这个key请求时,通过key计算得slot正是自己,它就会将这个键值对存储到自己的数据库中:

在这里插入图片描述

这里我们以节点1的角度查看这个问题,当客户端收到move指令后,继续向节点1发送指令,节点1通过收到指令调用processCommand,其内部调用getNodeByQuery获取当前key对应的slot,发现是自己则直接存储数据到当前节点的内存数据库中:

int processCommand(redisClient *c) {//......//如果开启了集群模式,且发送者不是master且参数带key则进入逻辑if (server.cluster_enabled &&!(c->flags & REDIS_MASTER) &&!(c->flags & REDIS_LUA_CLIENT &&server.lua_caller->flags & REDIS_MASTER) &&!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)){int hashslot;if (server.cluster->state != REDIS_CLUSTER_OK) {//......} else {int error_code;//查找键值对对应的slot和这个slot负责的节点clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);//如果为空且或者非自己,则转交出去给别人处理if (n == NULL || n != server.cluster->myself) {flagTransaction(c);clusterRedirectClient(c,n,hashslot,error_code);return REDIS_OK;}}}//......//将键值对存储到当前数据库中
}

我们以节点的视角再次直接步入getNodeByQuery查看这段逻辑,可以看到其内部会基于key计算slot然后将得到对应的node,如果发现这个node是自己且属于importing_slots_from,即说明是客户端通过move或者ask请求找到自己的,则进行进一步是否是多条指令执行且存在key找不到存储位置的情况,若存在则返回空,反之都是直接返回当前节点信息,即node2的新数据直接迁移过来:

clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {//......//遍历命令for (i = 0; i < ms->count; i++) {//.....//获取指令、参数个数、参数mcmd = ms->commands[i].cmd;margc = ms->commands[i].argc;margv = ms->commands[i].argv;//解析出key以及个数keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);for (j = 0; j < numkeys; j++) {//拿到keyrobj *thiskey = margv[keyindex[j]];//计算slotint thisslot = keyHashSlot((char*)thiskey->ptr,sdslen(thiskey->ptr));//.....//如果就是当前节点正在做迁出或者迁入,则migrating_slot/importing_slot设置为1if (n == myself &&server.cluster->migrating_slots_to[slot] != NULL){migrating_slot = 1;} else if (server.cluster->importing_slots_from[slot] != NULL) {importing_slot = 1;}} else {//.....
//.....}//.....}//如果设置了导入标识为1且标识为asking则步入这段逻辑,if (importing_slot &&(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)){	//当前指令有多个key且存在未命中的则返回空,反之返回自己if (multiple_keys && missing_keys) {if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;return NULL;} else {return myself;}}//.....//返回节点信息以本示例来说就是返回当前节点信息return n;
}

完成节点迁移

上述操作仅仅针对新节点的迁移,对于旧的节点我们就需要通过节点2键入CLUSTER GETKEYSINSLOT slot count要迁移的旧的keyslot,然后通过MIGRATE host port key dbid timeout [COPY | REPLACE]将数据迁移到节点1上。
这里我们补充一下MIGRATEcopy和replace的区别,前者是遇到重复直接报错,后者是迁移时直接覆盖。
最终这条指令回基于要迁移的key而生成一条RESTORE-ASKING key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]指令发送给导入的节点,以本文例子来说就是节点1:

在这里插入图片描述

这里我们给出MIGRATE 指令对应的处理函数migrateCommand,逻辑和我上文说的差不多,基于指令解析出replace或者copy等信息,然后用argv[3]即我们的key得出这个键值对的信息生成RESTORE指令将键值对转存给节点1:

/* 命令 MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {//......//解析拷贝和替代选项,前者重复会报错for (j = 6; j < c->argc; j++) {if (!strcasecmp(c->argv[j]->ptr,"copy")) {copy = 1;} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {replace = 1;} else {addReply(c,shared.syntaxerr);return;}}//......//查看要迁移的key是否存在吗,如果不存则直接报错返回if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {addReplySds(c,sdsnew("+NOKEY\r\n"));return;}/* Connect *///建立socket连接cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);//......//cmd初始化一个buf缓冲区rioInitWithBuffer(&cmd,sdsempty());/* Send the SELECT command if the current DB is not already selected. *///如果尚未选择当前DB,则发送SELECT命令。int select = cs->last_dbid != dbid; /* Should we emit SELECT? */if (select) {redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));}/* Create RESTORE payload and generate the protocol to call the command. *///获取key的过期时效expireat = getExpire(c->db,c->argv[3]);if (expireat != -1) {ttl = expireat-mstime();if (ttl < 1) ttl = 1;}//集群用RESTORE-ASKING发送key给目标if (server.cluster_enabled)redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE-ASKING",14));elseredisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));//填充key和value ttl等redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));//......//迁移指令字符串写入缓冲区redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));//......//如果是replace发出 REPLACEif (replace)redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));//......
}

最后调整

最后我们只需在节点1和2都执行CLUSTER SETSLOT <SLOT> NODE <NODE ID> 完成slot指派,这指令最终就会走到clusterCommand中,节点1和节点2格子的处理逻辑为:

  1. 节点2看看迁移的key是否不存则且migrating_slots_to数据不为空,若符合要求说明迁移完成但状态未修改,直接将migrating_slots_to置空完成指派最后调整。
  2. 节点1查看节点id是否是自己且importing_slots_from是否有数据,若有则说明节点导入完成,直接将importing_slots_from置空。

void clusterCommand(redisClient *c) {//......else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {//处理setslot指令//......else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 标记最终迁移的节点 */clusterNode *n = clusterLookupNode(c->argv[4]->ptr);//......//如果发现对应的key为0,且migrating_slots_to不为空,则说明迁出完成但状态还未修改,节点2会将migrating_slots_to设置为空if (countKeysInSlot(slot) == 0 &&server.cluster->migrating_slots_to[slot])server.cluster->migrating_slots_to[slot] = NULL;//如果是节点1则会看指令的nodeid是否是自己且importing_slots_from是否有数据,若有则说明导入成功直接将importing_slots_from设置为空if (n == myself &&server.cluster->importing_slots_from[slot]){//......server.cluster->importing_slots_from[slot] = NULL;}}//......
}

小结

自此我们将redis集群中的所有核心设计都分析完成,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

Redis的slot迁移:https://blog.csdn.net/Aquester/article/details/107935887

CLUSTER GETKEYSINSLOT:https://redis.io/docs/latest/commands/cluster-getkeysinslot/

RESTORE-ASKING:https://redis.io/docs/latest/commands/restore-asking/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/47860.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为OD机考题(HJ61 放苹果)

前言 经过前期的数据结构和算法学习&#xff0c;开始以OD机考题作为练习题&#xff0c;继续加强下熟练程度。 描述 把m个同样的苹果放在n个同样的盘子里&#xff0c;允许有的盘子空着不放&#xff0c;问共有多少种不同的分法&#xff1f; 注意&#xff1a;如果有7个苹果和3…

C语言 | Leetcode C语言题解之第275题H指数II

题目&#xff1a; 题解&#xff1a; int hIndex(int* citations, int citationsSize) {int left 0, right citationsSize - 1;while (left < right) {int mid left (right - left) / 2;if (citations[mid] > citationsSize - mid) {right mid - 1;} else {left mi…

Java 中的线程

创建线程的三种方式 方式一&#xff1a;继承Thread类 实现步骤&#xff1a; 继承Thread类并重写run()方法&#xff1b; 创建线程并启动。 代码实现&#xff1a; public class MyThread extends Thread {Overridepublic void run() {for(int i0; i<100; i) {System.out…

DB-GPT:LLM应用的集大成者

整体架构 架构解读 可以看到&#xff0c;DB-GPT把架构抽象为7层&#xff0c;自下而上分别为&#xff1a; 运行环境&#xff1a;支持本地/云端&单机/分布式等部署方式。顺便一提&#xff0c;RAY是蚂蚁深度参与的一个开源项目&#xff0c;所以对RAY功能的支持应该非常完善。…

Vue自定义指令与Vue插槽学习

文章目录 自定义指令1.指令介绍2.自定义指令3.自定义指令语法4.指令中的配置项 自定义指令-指令的值1.使用效果2.语法 插槽-默认插槽1.作用2.用处4.插槽的基本语法 插槽-具名插槽1.作用2.具名插槽语法3.v-slot的简写 插槽总结1.插槽分类2.作用3.场景4.使用步骤 自定义指令 1.指…

实现Nginx的反向代理和负载均衡

一、反向代理和负载均衡简介 1.1、反向代理 反向代理(reverse proxy)指:以代理服务器来接受Internet上的连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给Internet上请求连接的客户端。此时代理服务器对外就表现为一个反向代理服务器。 反向代…

【Android Compose】ListView效果

【Android Compose】ListView效果 1、Column、Row 和 Box2、LazyColumn和LazyRow3、Compose 中的状态4、ListView效果5、android-compose-codelabs Jetpack Compose 使用入门 Jetpack Compose 教程 Jetpack Compose 1、Column、Row 和 Box Compose 中的三个基本标准布局元素是 …

算法day05 master公式估算递归时间复杂度 归并排序 小和问题 堆排序

2.认识O(NlogN)的排序_哔哩哔哩_bilibili master公式 有这样一个数组&#xff1a;【0&#xff0c;4&#xff0c;2&#xff0c;3&#xff0c;3&#xff0c;1&#xff0c;2】&#xff1b;假设实现了这样一个sort()排序方法&#xff0c; 将数组二分成左右两等分&#xff0c;使用so…

linux、windows、macos,命令终端清屏

文章目录 LinuxWindowsmacOS 在Linux、Windows和macOS的命令终端中&#xff0c;清屏的命令或方法各不相同。以下是针对这三种系统的清屏方法&#xff1a; Linux clear命令&#xff1a;这是最常用的清空终端屏幕的命令之一。在终端中输入clear命令后&#xff0c;屏幕上的所有内容…

【计算机网络】TCP/IP——流量控制与拥塞控制

学习日期&#xff1a;2024.7.22 内容摘要&#xff1a;TCP的流量控制与拥塞控制 流量控制 一般来说&#xff0c;我们总是希望数据传输的快一些&#xff0c;但是如果数据传输的太快&#xff0c;接收方可能就来不及接收&#xff0c;这就会导致数据的丢失&#xff0c;流量控制正是…

Vue中渲染函数

why? 在绝大多数情况下&#xff0c;Vue 推荐使用模板语法来创建应用。然而在某些使用场景下&#xff0c;我们真的需要用到 JavaScript 完全的编程能力。这时渲染函数就派上用场了。 例如&#xff1a;下方要在多个模型上方设置对话框&#xff0c;如果使用Vue模板语法相对较困难…

小技巧:如何在已知PDF密码情况下去掉PDF的密码保护

第一步&#xff0c;用Edge打开你的pdf&#xff0c;输入密码进去 第二步&#xff0c;点击打印 第三步&#xff0c;选择导出PDF&#xff0c;选择彩印 第四步&#xff0c;选择导出位置&#xff0c;导出成功后打开发现没有密码限制了&#xff01;

什么是长效住宅IP?

长效住宅IP的定义 长效住宅IP&#xff0c;简而言之&#xff0c;是指长期稳定、非动态更换的住宅网络IP地址。这类IP地址通常由互联网服务提供商&#xff08;ISP&#xff09;分配给居民家庭用户&#xff0c;用于上网、网络通信等日常网络活动。与传统的动态IP相比&#xff0c;长…

数据结构day5

一、思维导图 二、课后练习 1、使用循环链表完成约瑟夫环问题 2、使用栈&#xff0c;完成进制转换&#xff08;输入&#xff1a;一个整数&#xff0c;进制数&#xff0c;输出&#xff1a;该数的对应的进制数&#xff09; //头文件 #ifndef DEC_TO_BIN_H #define DEC_TO_BIN_H…

【WAF剖析】10种XSS某狗waf绕过姿势,以及思路分析

原文&#xff1a;【WAF 剖析】10 种 XSS 绕过姿势&#xff0c;以及思路分析 xss基础教程参考&#xff1a;https://mp.weixin.qq.com/s/RJcOZuscU07BEPgK89LSrQ sql注入waf绕过文章参考&#xff1a; https://mp.weixin.qq.com/s/Dhtc-8I2lBp95cqSwr0YQw 复现 网站安全狗最新…

Electron 渲染进程直接调用主进程的API库@electron/remote引用讲解

背景 remote是个老库&#xff0c;早期Electron版本中有个remote对象&#xff0c;这个对象可以横跨所有进程&#xff0c;随意通信&#xff0c;后来官方认为不安全&#xff0c;被干掉了&#xff0c;之后有人利用Electron的IPC通信&#xff0c;底层通过Promise的await能力&#x…

Air780EP- AT开发-阿里云应用指南

简介 使用AT方式连接阿里云分为一机一密和一型一密两种方式&#xff0c;其中一机一密又包括HTTP认证二次连接和MQTT直连两种方式 关联文档和使用工具&#xff1a; AT固件获取在线加/解密工具阿里云平台 准备工作 Air780EP_全IO开发板一套&#xff0c;包括天线SIM卡&#xff0…

在Windows安装、部署Tomcat的方法

本文介绍在Windows操作系统中&#xff0c;下载、配置Tomcat的方法。 Tomcat是一个开源的Servlet容器&#xff0c;由Apache软件基金会的Jakarta项目开发和维护&#xff1b;其提供了执行Servlet和Java Server Pages&#xff08;JSP&#xff09;所需的所有功能。其中&#xff0c;S…

机械学习—零基础学习日志(高数09——函数图形)

零基础为了学人工智能&#xff0c;真的开始复习高数 函数图像&#xff0c;开始新的学习&#xff01; 幂函数 利用函数的性质&#xff0c;以幂函数为例&#xff0c;因为单调性相同&#xff0c;利用图中的2和3公式&#xff0c;求最值问题&#xff0c;可以直接将式子进行简化。这…

自监督学习在言语障碍及老年语音识别中的应用

近几十年来针对正常言语的自动语音识别&#xff08;ASR&#xff09;技术取得了快速进展&#xff0c;但准确识别言语障碍&#xff08;dysarthric&#xff09;和老年言语仍然是一项极具挑战性的任务。言语障碍是一种由多种运动控制疾病引起的常见言语障碍类型&#xff0c;包括脑瘫…