Redis 命令处理过程

我们知道 Redis 是一个基于内存的高性能键值数据库, 它支持多种数据结构, 提供了丰富的命令, 可以用来实现缓存、消息队列、分布式锁等功能。
而在享受 Redis 带来的种种好处时, 是否曾好奇过 Redis 是如何处理我们发往它的命令的呢?

本文将以伪代码的形式简单分析一下 Redis 命令处理的过程, 探讨其背后的机制。

1 例子

set myKey myValue
ok

上面是一个简单的 Redis 命令执行过程:

  1. 用户借助 Redis 的客户端 (redis-cli, 或者各种第三方的客户端) 向 Redis 服务端发送了一个 set 命令
  2. Redis 服务端将其后面的 myKey 和 myValue 存储下来
  3. Redis 服务端再向客户端响应一个 ok 值, 表示处理成功。

下面我们就以这个为例子, 来分析一下 Redis 命令处理的过程。

备注:

  1. 下面的逻辑, 是按照 Redis 5.x 的版本进行分析的, 各个大版本之间可能会有出入
  2. 在伪代码分析过程中, 会将大部分无关的逻辑和异常场景进行省略
  3. 因为整个过程会涉及到大量 Redis 事件轮询的知识和以 set 为例, 会涉及 Redis String 编码的知识, 可以先看一下往期这 2 篇先了解一下

2 认识一下 redisServer 和 client

在真正进行分析前, 有 2 个对象需要先介绍一下, 可以说他们贯穿了整个命令处理的过程。

2.1 redisServer

redisServer 可以看做是整个 Redis 服务端运行时的上下文, 保存着整个 Redis 的配置和运行中产生的数据。

public class redisServer {// Tcp 连接对应的文件描述符 fd 存放的数组int[] ipfd = new int[16];// 所有存入到 Redis 中的数据, 都会存放到这里redisDb[] db = new redisDb[16];// 命令字典, 保存着 Redis 支持的所有命令// Redis 支持的所有命令和命令对应的执行函数, 都会存放到这里dict commands;// 一个双向链表, 所有连接到 Redis 的客户端, 都会存放到这里List<client> clients;// 向 Redis 发起请求的客户端, Redis 处理完成后, 如果需要响应客户端数据// 会将这个客户端先放到这里, 后面统一响应List<client> clients_pending_write;// 事件循环, Redis 的核心aeEventLoop el;
}
2.1.1 int[] ipfd
bind 127.0.0.1 

上面的配置应该很熟悉吧。
在 Redis 的配置文件中, 加上了这个配置, 就表示 Redis 只有本地可以访问, 因为他只会监听本地机器上的的连接, 当然也可以配置具体的 Ip 地址。

在 Redis 服务端启动后, 会和bind 指定的 Ip 地址 建立对应的 Tcp 连接, 同时会获取到一个文件描述符 fd (可以理解代表当前 Tcp 连接的唯一 Id, 持有这个文件描述符, 代表了持有了对应的端口的监听能力),
并将连接的 fd 存放在这个 ipfd 数组中, 最大支持 16 个连接。

2.1.2 redisDb[] db

Redis 本身默认支持 16 个数据库, 只是我们正常情况都是在使用 0 号数据库。 可以通过 select [0 到 15] 进行切换。
而这个 redisDb[] db 是一个长度为 16 的数组, 每个元素都是一个 redisDb 对象, 代表着一个 Redis 数据库。

redisDb 本身的定义很简单, 如下:

Alt 'redisDb 定义'

其中 dict 是字典的意思, 本身就是一个 key-value 的数据结构, 可以直接看做是一个 Map (JDK 1.7 的 HashMap), 本质是一个数组, 数组中的每个元素是一个 dictEntry。
当发送了 set myKey myValue 到服务端, myKey, myValue 就会以键值对的形式存储在 redisDb 中的 dict 中。

2.1.3 dict commands

首先它也是一个 dict, 也就是一个 Map, 一个 key-value 的映射属性, 具体的含义就是命令字典。

在平时中执行的 Redis 命令, 这个命令对应的执行函数就是存放在这里, 格式如: Map<命令的 key, redisCommand>。
当发送了 set myKey myValue 到服务端, 服务端就用通过 set 这个命令 key 到这里找到对应的 setCommand, 然后执行里面的函数。

2.1.4 List<client> clients

客户端双向链表。
Redis 本身是支持多个客户端的, Redis 接收到客户端的连接后, Redis 内部会将其封装为 client, 然后维护在这个双向链表。
具体的属性下面讲解。

2.1.5 List<client> clients_pending_write

待响应客户端双向链表。
服务端处理完客户端的请求后, 可能需要响应客户端结果, 也就是响应数据。
而 Redis 不是处理完就立即响应的, 而是先将响应结果放到客户端的输出缓存区, 然后再后面统一一起响应。
所以, 有数据需要响应的客户端, 会将其放到这个链表中。

2.1.6 aeEventLoop *el

事件轮询对象: 本质就是一个包装了多路复用的死循环。

大体的实现如下:
Alt 'aeEventLoop 定义'

2.2 client

client 是理解 Redis 命令处理过程的另一个重要对象, 他代表着一个客户端连接。
Redis 客户端连接到服务端时, 服务端将这个客户端的封装成这个对象。

client 本身的属性如下:

public class client {// 同 redisServer 的 ipfd// 当 Redis 接收到客户端的连接后, 会获取到一个代表这个客户端 Tcp 连接的文件描述符 fd, 然后存放到这个属性中int fd;// 当前客户端的是否已经经过了密码认证, 0 代表未认证, 1 代表已认证int authenticated;// 输入缓存区, 客户端发送过来的数据会先存放在这里sds querybuf;// 命令参数的个数, 一个客户端发送过来的命令, 会被 Redis 拆分成多个参数// 比如 set myKey myValue, 总共 3 个参数int argc;// 命令参数, 一个客户端发送过来的命令, 会被 Redis 拆分成多个参数// 比如 set myKey myValue, 就会被拆分成 3 个参数, 3 个参数会存放在这个数组中robj[] argv;// 一个数组, 固定输出缓冲区, 用来存放响应客户端的数据char[] buf = new char[16 * 1024];// 一个链表, 动态输出缓冲区, 同样是用来存放响应客户端的数据List<clientReplyBlock> reply;
}

下面介绍一下几个重要的属性。

2.2.1 sds querybuf

输入缓冲区。
客户端发送到服务端的数据, Redis 服务端收到了, 会先存放到这里。实现结构是一个 sds。 大体的定义如下:
Alt 'sds 定义'

2.2.2 robj[] argv

querybuf 中的数据进行解析后的数据存放的地方, 具体的属性是一个 redisObject 的数组。
而一个 sds 类型 redisObject 的结构如下:
Alt ‘sds 类型的 redisObject'

2.2.3 char[] buf

一个可以存放 16 * 1024 个字符的数组。 客户端发送的命令, Redis 服务端处理完成后, 需要进行响应, 而响应的内容会先存放到这里。
因为是一个长度固定的数组, 所以叫做固定输出缓冲区, 最多可以存放 16kb 的响应数据。

2.2.4 List<clientReplyBlock> reply

动态输出缓冲区
当 Redis 服务端响应客户端数据大于上面的 char[] buf 的容量时, 就先放到这里 (双向链表理论上没有大小限制)。

本质是一个 clientReplyBlock 的双向链表。
clientReplyBlock 的定义也很简单。如下, 可以简单的看做是一个 char[] 的封装。
Alt 'clientReplyBlock 定义'

可以看出来, Redis 的响应缓存区是由一个固定大小的 char 数组加一个动态变化的 char 数组链表共同构成的。
这么组织的好处是: 16kb 的固定 buffer, 基本满足大部分的情况的使用, 提前声明好可以避免频繁分配、回收内存。
动态的响应链表则是起到一个兜底的作用, 保证大数据量时的响应。而本身在需要时进行再分配内存, 使用后释放, 可以起到节省内存的作用。

到此, Redis 命令处理过程中的 2 个重要对象: redisServer 和 client 就介绍完了, 只需要大体知道 2 个对象里面有哪些属性, 大体是干什么的即可,
怎么实现等都可以不用深入, 在开始前先介绍这 2 个对象, 只是是为了后面的分析更加清晰。

3 Redis 服务端启动流程

./redis-server ./redis.conf --port 6666 --dbfilename dump.rdb

在服务器上可以通过上面的命令启动一个 Redis 服务端。
启动脚本 redis-server 后面紧跟的是 Redis 的配置文件, 再后面是用户想要指定的参数 (这里将端口修改为 6666)。

整个启动的过程如下:
Alt 'Redis 服务端启动流程'

  1. 通过脚本启动 Redis 服务端
  2. 创建一个 redisServer 对象, 这时 redisServer 里面所有的配置都是默认值, 比如监听的端口, 连接超时等
  3. 读取配置文件和命令行参数并覆盖掉 redisServer 里面的默认配置, 比如这里的端口, 默认为 6379, 通过命令行参数覆盖为 6666, 在这个过程, 还会将 server.c 里面写好的命令和命令对应的函数从一个静态数组中加载到 redisServer 的 commands 字典中
  4. 将 redisServer 里面的事件轮询 aeEventLoop 创建出来
  5. 和配置文件里面的 bind 地址 + 启动端口建立起 Tcp 连接, 可以得到对应连接的文件描述 fd, 可以理解为一个 Id
  6. 为每一个文件描述符, 也就是 Tcp 连接, 在事件轮询中注册一个可读的文件事件, 执行函数为 acceptTcpHandler (可以理解为告诉多路复用, 关心对应的 Tcp 连接的连接事件, 触发了执行 acceptTcpHandler 函数)
  7. 从磁盘中将 Redis 上次运行的数据加载到 redisServer 的 16 个 redisDb 中 (如果有的话)
  8. 设置事件轮询的阻塞前和阻塞后执行的函数
  9. 启动事件轮询, 进入一个死循环, 整个 Redis 服务端启动成功

大体的伪代码逻辑如下:

// server.c 
int main(int argc, char **argv) {// 1. redisServer 各个属性进行默认值设置initServerConfig();// 2. 解析命令行参数// 启动脚本的参数会通过 main 方法的 argv 传递进来, 这里会对这些参数进行解析处理parsingCommandLineArguments();// 3. 根据配置文件和命令行参数的配置覆盖掉 redisServer 的默认值// 内部会执行一个函数 populateCommandTable(), 将 Reids 所以支持的命令和对应的函数放到 redisServer.commands 中loadServerConfig()// 4. 初始化服务端// 4.1 创建事件轮询对象// 4.2 对所有绑定的 Ip 对应的 6666 端口(默认为 6379, 上面启动命令修改为了 6666) 开启 TCP 监听, 并得到对应的 Ip 文件描述符 fd, 存放到 redisServer 的 ipfd 中// 4.3 对 Redis 的 16 个数据库进行初始化// 4.4 向事件轮询注册 1 个时间事件: 1 毫秒执行一次, 执行函数 serverCron// 4.5 对每个 ipfd 向事件轮询注册 1 个可读的文件事件: 执行函数 acceptTcpHandler// 其他无法的省略initServer();// 5. 从磁盘中加载数据到 redisServer 的 redisDB 中 (AOF, RDB)loadDataFromDisk();// 6. 向事件轮询注册 阻塞前调用函数 beforeSleepaeSetBeforeSleepProc(server.el,beforeSleep);// 7. 向事件轮询注册 阻塞后调用函数 afterSleepaeSetAfterSleepProc(server.el,afterSleep);// 8. 启动事件轮询, 这时进入了死循环, 整个服务端启动aeMain(server.el);// 9. 从事件轮询中退出来,表示程序需要退出了, 删除事件轮询aeDeleteEventLoop(server.el);return 0;
}

启动后的 redisServer 的状态如下:
Alt 'Redis 服务端启动后 redisServer 的状态'

4 Redis 客户端连接服务端

Redis 服务端端启动后, 整个 Redis 就进入到事件轮询里面的死循环, 一直在执行下面的逻辑

Alt 'Redis 服务端启动后事件轮询中执行的死循环逻辑'

这时有个客户端通过 Ip + 端口连接到 Redis 服务端, 多路复用观察到有上游关心的可读事件, 会保留下这个连接请求事件。
这时 redisServer 的事件轮询执行到了 从多路复用中获取事件, 获取到了客户端的连接事件, 接着触发了 acceptTcpHandler 函数。

Alt 'Redis 服务端接收到客户端的连接'

触发的 acceptTcpHandler 函数的逻辑如下:

Alt 'Redis acceptTcpHandler 函数逻辑'

  1. 将连接到 Redis 服务端的客户端封装为 client, 代表当前的客户端
  2. 将封装后的 client, 放到 redisServer 的客户端双写链表 List<client> clients 中
  3. 向事件轮询为这个客户端注册一个可读的文件事件, 触发执行的函数为 readQueryFromClient

大体的伪代码逻辑如下:

// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask){// 1. 获取客户端 Tcp 连接对应的文件描述符int cfd = anetTcpAccept();// 2. 创建 client createClient();// 3. 向事件轮询注册一个当前客户端的可读的文件事件, 执行函数为: readQueryFromClientregisterClientFdReadFilesEvent();// 4. 初始化 client 的属性, 比如选中的数据库默认为第一个, 输入和输出缓存区创建initClient();// 5. 将 client 添加到 redisServer 的 client 双向链表中linkClient();// 6. 最大客户端数量检查, 如果超过了, 就关闭这个连接 (默认为 10000) maxClientNumCheck();// 7. 保护模式检查, 默认开启 (protected-mode yes)// 开启保护模式时, 没有配置 bind Ip 和密码, 同时客户端的 Ip 地址不是 127.0.0.1 或 ::1, 就关闭这个连接protectedModeCheck();
}

接受了客户端的连接后的 redisServer 的状态如下:
Alt 'Redis 接收到客户端的连接后 redisServer 的状态'

5 客户端发送命令到服务端

Redis 的客户端和服务端之间的数据的传输, 都是遵循内部自定义的一套协议: RESP

5.1 RESP 协议

当用户在客户端输入对应的请求命令时, 比如 set myKey myValue, 客户端会将这个命令转换为 RESP 协议的格式, 然后发送到服务端。

RESP 介绍的具体介绍, 可以看一下这篇文章

所有的内容通过 \r\n 进行分割, 然后定义了几个标识符, 如下:
+ 标识后面是一个简单的字符串
$ 表示后面的内容是一个二进制安全的字符串, 后面会紧跟着一个数字, 表示字符串的长度
* 表示后面的内容是一个数组, 后面同样紧跟一个数字, 表示数组的长度
… 后面省略

比如:
set myKey myValue

  1. 三个字符串 (set + myKey + myValue), 那么转换后就是 3 个二进制安全的字符串, 所以开头就是 *3
  2. 跟后面的内容用 \r\n 分隔, 所以就是 *3\r\n
  3. 第一个字符串是 set, 长度 3, 所以就是 *3\r\n$3\r\nset\r\n
  4. 后面的 myKey 和 myValue 类似, 最终转换后的内容如下 *3\r\n$3\r\nset\r\n$5\r\nmyKey\r\n$7\r\nmyValue\r\n

5.2 请求类型

在 Redis 解析客户端的请求内容前, 还需要确定当前的请求的方式, 判断的逻辑如下:

// 请求内容以 * 开头, 那么请求类型为 mult bulk 请求, 否则是 inline 请求
if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;
} else {c->reqtype = PROTO_REQ_INLINE;
}

可以看到 Redis 支持 2 种请求的类型 mult bulk 请求, 还是 inline 请求
2 者的区别也很简单, 以请求内容的开头是否为 * 开头进行区分。
以 * 开头的内容, 可以看出就是遵循 REST 协议的请求, 而其他的请求就是 inline 请求。

之所以有 inline 请求, 其实是为了兼容一下特殊的客户端, 比如 Linux 的 telnet 等。

在 Linux 可以通过 telnet Ip 端口 连接到服务端, 然后直接发送请求到服务端, 而这些请求是直接发送到服务端的, 没有中间转为 RESP 协议的。
所以 Redis 选择了兼容这些特殊的情况, 并将这些请求称为 inline 请求。

所以客户端发送命令到服务端的过程如下
Alt 'Redis 客户端按照 RESP 协议转换发送请求'

  1. Redis 客户端接收用户的输入请求
  2. 将这些请求按照 RESP 协议进行转换 (inline 请求, 不会有这一步)
  3. 将转换后的请求内容发送给 Redis 服务端

6 服务端接收到客户端发送的命令

在上面客户端连接时, 向事件轮询中为当前的客户端注册了一个可读的文件事件, 触发函数为 readQueryFromClient
而在客户端将请求发送到服务端后, 事件轮询从多路复用中获取到了这个文件事件后, 会执行里面的函数 readQueryFromClient 函数。

整个 redisQueryFromClient 可以拆分为 2 部分

  1. 请求参数处理
  2. 具体请求命令的执行

6.1 请求参数处理

在上面我们知道, 客户端向服务端发送了一段 RESP 格式的请求 *3\r\n$3\r\nset\r\n$5\r\nmyKey\r\n$7\r\nmyValue\r\n, 服务端会

  1. 将客户端发送过来的请求 *3\r\n$3\r\nset\r\n$5\r\nmyKey\r\n$7\r\nmyValue\r\n, 原封不动的存储到对应 client 的输入缓冲区 queryBuf
    Alt 'Redis redaQueryFromCLient - 存储请求'
  2. 存储在 client querybuf 的内容 *3\r\n$3\r\nset\r\n$5\r\nmyKey\r\n$7\r\nmyValue\r\n, 按照 RESP 协议解析为 3 个 embstr 编码的 redisObject (String 的三种编码有讲解), 然后存储到 client 的 argv 数组中。
    Alt 'Redis redaQueryFromCLient - 命令解析'
  3. 根据 client 的参数数组 argv 的第一个参数 (第一个参数一定是命令参数) 到 redisServer 的命令字典 commands 查找当前的命令
    Alt 'Redis redaQueryFromCLient  执行命令查询'
  4. 找到命令后, 当然是执行对应的命令里面的函数了

上面是 redisQueryFromClient 第一部分, 忽略请求命令的逻辑后的简化过程, 想要继续深入了解里面的其他判断可以看一下下面的伪代码

// networking.c
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {// 1. 先将客户端的请求数据读取到 client 的 querybuf 中putRequestContentIntoClientQueryBuffer();// 2. 如果 querybuf 中的数据超过了 1GB, 就关闭这个连接checkClientQueryBufferMoreThanOneGb();// 3. 临时暂停这次请求, 等待后面触发, 对应的状态有// 3.1 当前的 client 的为阻塞状态 (如果 querybuf 中的数据超过了 256MB, 就将这个 client 的状态设置为 REDIS_BLOCKED)// 3.2 当前有一个 lua 脚本在执行中// 3.3 当前的客户端是准备关闭状态// 3.4 客户端被暂停了temporaryPaurseThisRequest();// 4. 根据请求参数的第一个字符是否为 *, 确定当前的请求是 mult bulk 请求还是 inline 请求confirmThisRequestType();// 5. 根据请求类型, 对 querybuf 的参数进行解析, 然后存放到 argv parseRequestContentIntoClientArgvByRequestType();// 6. 命令处理processCommand();
}// server.c
int processCommand(client *c) {// 1. 模块过滤器, 前置处理// https://redis.io/resources/modules/moduleCallCommandFilters(c);// 2. argv[0] 为 quit (断开连接)// 将客户端的标识设置为 client_close_after_reply, 等待后面的处理, 然后返回ifQuitCommandHandle(c);// 3. 根据 argv[0], 也就是 key, 从 redisServer 的 commands 中找到对应的 redisCommand, 也就是执行命令c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);// 4. 命令 null 检查和命令需要的参数格个数和实际参数个数检查, 不符合就返回错误commandNullAndArgumentsNumberCheck(c->cmd, c->argc);// 5. 服务端需要密码认证, 同时当前的客户端未认证, 并且执行的命令不是 auth 命令, 返回错误requirePassCheckWhenCmdNotAuth(c->authenticated, c->cmd);// 6. 开启了集群模式, 根据 key 计算出对应的执行服务端节点, 如果当前的服务端不是执行的服务端节点, 通知客户端重定向redirectClientIfNeedByKeyWhenClusterEnabled();// 7. 如果设置了最大内存同时当前没有正在执行的 lua 脚本, 就尝试释放内存tryToReleaseMemoryWhenSetMaxMemoryAndNoLuaScriptTimeout();// 8. 当前是主节点, 磁盘检测失败, 执行的命令具备变更属性(新增, 修改, 删除等)或者是 ping 命令,  返回错误// 磁盘检测失败的场景// 8.1 开启了 RDB, 上一次 RDB 失败了,  同时配置了 RDB 失败停止写操作 (stop_writes_on_bgsave_error yes)// 8.2 开启了 AOF, 上一次 AOF 失败了pingAndWriteCommandsDeniedByDiskErrorByMaster();// 9. 主从复制配置检查// 配置了 repl_min_slaves_to_write 和 repl_min_slaves_max_lag// 当前需要有多少个心跳正常的从节点存活, 否则变更属性的命令不执行, 返回错误writeCommandsDeniedByMinSlavesNumberReply();// 10. 当前的客户端是从节点, 并且配置了 slave_read_only, 并且执行的命令具备变更属性, 返回错误writeCommandDeniedBySlaveReadOnly();// 11. 当前的客户端是一个订阅客户端 (subscribe), 执行的命令不是 subscribe, unsubscribe, psubscribe, punsubscribe, ping, 返回错误subscribeClientCanHandleCommandCheck();// 12. 从节点和主节点失去了联系或者正在执行复制中, 同时 slave-serve-stale-data 设置为了 no, 执行的命令不是 stale 属性(info, slaveof), 返回错误slaveSynchronizingOrConnectStateUnusualCheck();// 13. 服务端正在从磁盘中加载数据, 执行的命令不是 loading 属性(info, subscribe, unsubscribe, psubscribe, punsubscribe, publish) , 返回错误loadingFromDiskCheck();// 14. 当前正在执行 lua 脚本, 执行的命令不是 auth, replconf, shutdown, script, 返回错误luaScribtBusyCheck();// 15. 开启了事务, 执行的命令不是 exec, discard, multi, watch, 返回错误if (openTranscation() && commandIsNotExecAndDiscardAndMultiAndWatch()) {// 15.1 命令入队列queueMultiCommand()return C_OK;}// 17. 将要执行的命令, 发送给监控器// Redis 客户端可以成为服务端的监控器, 服务端执行的命令会同步传输给客户端sendCommandToMonitors();// 18. 对应 key 的命令函数执行, 后面会以 setCommand 为例进行分析c->cmd->proc(c);// 19. 如果需要,进行统计记录latencyAddSampleIfNeeded();// 20. 如果需要, 慢日志记录slowlogPushEntryIfNeeded();// 21. 命令传播, 如果有必要进行命令替换// aof 和 主从复制需要当前执行的命令进行数据处理// 一些随机性的命令, 不能直接就传播出去, 需要根据当前的执行结果进行替换, 比如 SPOP key [count], 从 set 中随机弹出若干个元素propagateCommand();
}

6.2 具体请求命令的执行

在 redisQueryFromClient 的逻辑中, 有一段代码

int processCommand(client *c) {......// 这一步就是具体的命令执行的地方, 以 set 命令为例, 了解一下 set 命令的执行过程c->cmd->proc(c);......
}

就是具体的请求命令的执行时机, 这里以 setCommand 为了, 这次直接看伪代码先

// t_string.c
void setCommand(client *c) {// 上面的 c->cmd->proc(c), 最终执行到的函数就是这个// SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] // 1. 根据参数计算超时时间robj *expire = calExpireTime(c->argv, c->argc);// 2. 尝试对 value 进行字符串的编码优化// 2.1 编码不是 embstr 和 raw, 就直接返回原数据, 不是字符串类型, 没必要优化// 2.2 value 长度小于 20, 同时可以转为整数// 2.2.1 没有配置最大内存, 同时内存回收策略不是 MAXMEMORY_FLAG_NO_SHARED_INTEGERS (涉及 lru/lfu 这 2 种模式的回收策略), // 转换的数字大于等于 0, 小于 10000, 返回共享整数池中返回这个数字, 都不满足, 新建一个整数// 2.2.2 原本的 reidsObject 的编码为 raw, 将入参的 redisObject 转为 int 编码, *ptr 修改为转换后的整数值// 2.2.3 原本的 reidsObject 的编码为 embstr, 重新创建一个新的 int 编码的 redisObject// 2.2 逻辑结束 下面为 2.2 不满足情况// 2.3 入参的 redisObject 内容长度小于等于 44, 重新创建一个 embstr 的字符串, 将入参的 redisObject 转为 embstr 编码,  *ptr 修改为转换后的整数值// 2.3 逻辑结束 下面为 2.3 不满足情况// 2.4 到了这里, 说明客户端传过来的 value 大于 44, 只能用 raw 编码, 但是 raw 编码还可以 尝试进行 trim 优化, 也就是去空格c->argv[2] = tryObjectEncoding(c->argv[2]);// 3. 将 key 和 value 存放到 当前客户端选择的 redisDb[] db 中putTheKeyAndValueToDb(c->db, c->argv[1], c->argv[2]);// 4. 如果设置了参数时间, 将更新 redisObject 的 expireTimesetExpireTimeIfNeed(c->db, c->argv[1], expire);// 5. 如果需要, 将当前的客户端放到 redisServer 的 pending_write_clients 中, 表明当前的客户端有数据需要响应putCurrentClientToClientsPendingWriteIfNeed();// 6. 将响应内容 +OK\r\n (响应结果也遵循 RESP 协议) 写入到客户端的 buf 中, 无法写入就写入到客户端的 replytryWriteResponseToBufOrReply();// 7. 当写入的内容是写入到 reply 中时, 需要检查当前客户端待响应的内容的大小是否超过了限制, 是的话, 关闭当前客户端checkClientOutputBufferLimitsWhenWriteToReply();
}

逻辑概括如下:

  1. 根据参数计算超时时间, Redis 的 set 命令支持很多种参数格式, 需要根据这些参数计算出一个当前 String 的过期时间 (如果有设置的话)
  2. 参数数组 argv[2], 一定是要存入到 Redis 的 value, 当前的 value 虽然已经是 redisObject 了, 但如果它是 embstr 和 raw, 尝试寻找更合适的编码 (这一部分都是 Redis String 编码的内容)
    Alt 'Redis setCommand  重编码尝试过程'
  3. 将处理好的 myKey 和 myValue 存到 redisServer 的 redisDb 数组中的第一个 (如果使用前, 通过 select 修改了使用的数据库, 那么存在对应的数据库, 默认为 0, 第一个)
    Alt 'Redis setCommand  保存对象过程'
  4. 如果有必要, 对 redisObject 的过期时间的进行更新
  5. 数据处理完了, 当前的命令如果有数据需要响应客户端时, 需要将当前客户端放到 redisServer 的待响应客户端双向链表 clients_pending_write 中, set 命令处理完需要响应一个 ok, 所以当前 client 需要加入这个链表
    Alt 'Redis setCommand  客户端加入待响应客户端双向链表'
  6. 如果有数据需要响应, 将响应的数据放到 client 的固定输出缓冲区 char buf[] 中, 如果无法直接存放进去, 则存放到动态输出缓冲区 List reply 中, set 回应的是 ok, 经过 RESP 协议后假设可以直接放到固定输出缓冲区
    Alt 'Redis setCommand  响应结果写入缓冲区'

服务端接收到客户端发送的命令并处理后, redisServer 的状态如下:
Alt 'Redis 服务端接收到客户端发送的命令后 redisServer 状态'

此时 client 的状态如下:
Alt 'Redis 服务端接收到客户端发送的命令后 client 状态'

7 服务端响应客户端

存放在 client 的输出缓冲区的数据, 是什么时候发送给客户端的呢?
在 Redis 里面是经过 2 个步骤实现的

  1. 为每一个待发送的客户端注册一个可写的文件事件, 执行函数为 sendReplyToClient
  2. 事件轮询获取这个可写事件并触发 sendReplyToClient 函数

7.1 为待发送的客户端注册发送数据的文件事件

Redis 服务端端启动后, 整个 Redis 就进入到事件轮询里面的死循环, 一直在执行下面的逻辑

!Alt 'Redis 服务端启动后事件轮询中执行的死循环逻辑'

而这次在阻塞前 beforesleep 函数执行 时, 在 beforesleep 函数中会:
遍历 redisServer 的待响应客户端双向链表 clients_pending_write 中的所有客户端,

  1. 将对应的客户端从双向链表删除
  2. 删除的客户端如果有数据要发送, 为他在多路复用各注册一个可写的文件事件, 触发函数 sendReplyToClient

Alt 'Redis 服务端注册发送数据文件事件'

对应的地方为 beforeSleep 函数逻辑如下:

// server.c
void beforeSleep(struct aeEventLoop *eventLoop) {......// 处理带有输出缓冲区的客户端handleClientsWithPendingWrites();......
}int handleClientsWithPendingWrites(void) {client *c// 1. 遍历 redisServer 的 clients_pending_write while(c = getNextNodeFromLinkList(server.clients_pending_write)) {// 将当前的 client 从 clients_pending_write 链表中删除removeTheClientFromeClientsPendingWrite(c);// 当前的客户端有数据需要发送 (client->buf 或 client->reply 不为空),// 向多路复用注册一个可写的文件事件, 执行函数为 sendReplyToClientregistFileEventForClientWhenClientHaveDataToWrite(c);}
}

7.2 触发发送数据的文件事件

事件轮询在执行完阻塞前函数后, 又进入到多路复用中获取文件事件, 这时会获取到刚刚注册的可写事件文件, 触发 sendReplyToClient 的逻辑, 过程如下:

Alt 'Redis 服务端响应客户端过程'

  1. 逐步将 client 的缓冲区推送给客户端 (单次推送数据有上限要求, 超过的需要到下次事件轮询再推送)
  2. client 推送数据完成, 将其对应的文件事件从多路复用中删除 (如果还有数据没推送, 事件不会被删除, 下次事件轮询还能触发, 推送剩下的)

具体的逻辑如下:

// networking.c
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {//  client 的输出缓冲区有数据需要发送while(clientHasPendingReplies(c)) {// client 的 buf 有数据需要发送if (clientBufHasDataToSend(c)) {writeDataToClient();} else {// 如果 client 的 reply 有数据, 获取链表的第一个节点, 将里面的数据发送给客户端, 同时从双写链表中删除这个节点writeDataToClientIfClientReplyHasData();}// 当前已经发送的数据达到了单次发送的上线 1024*64if (currentHaveSendMoreThanMaxLimit()) {// 没有设置最大内存, 当前发送数据停止if(noSetMaxMemory()) {break;}// 设置了最大内存, 当前已经使用的内存大小小于最大内存, 当前发送数据停止if (haveSetMaxMemoryAndCurrentUsedMemoryLessThanMaxMemory()) {break;}// 设置了最大内存了, 当前使用的内存大于等于最大内存了, 继续循环, 尽量多发送一些, 释放内存}}// 当前 client 没有数据需要发送了if (!clientHasPendingReplies(c)) {// 从事件轮询中删除当前的发送数据事件delCurrentFileEventFromEventLoop();}// client 还有数据, 那么不删除事件, 继续保留, 下次事件轮询执行, 继续执行

需要留意的是执行一个 sendReplyToClient 函数, 给这个客户端推送数据

  1. 每次个客户端推送数据最大为 1024 * 64, 超过了会停止这次推送, 将剩下的留到下次再继续推送 (伪代码里面表明了一些特殊情况了)

至此

set myKey myValue
ok

一个完整的流程就结束了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/176068.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos 显卡驱动安装(chatglm2大模型安装步骤一)

1.服务器配置 服务器系统:Centos7.9 x64 显卡:RTX3090 (24G) 2.安装环境 2.1 检查显卡驱动是否安装 输入命令:nvidia-smi(显示显卡信息) 如果有以下显示说明,已经有显卡驱动。否则需要重装。 2.2 下载显卡驱动 第一步:浏览器输入https://www.nvidia.cn/Downloa…

Python读取Ansible playbooks返回信息

一&#xff0e;背景及概要设计 当公司管理维护的服务器到达一定规模后&#xff0c;就必然借助远程自动化运维工具&#xff0c;而ansible是其中备选之一。Ansible基于Python开发&#xff0c;集合了众多运维工具&#xff08;puppet、chef、func、fabric&#xff09;的优点&#x…

nuxt、vue实现PDF和视频文件的上传、下载、预览

上传 上传页面 <el-form-item :label"(form.ququ3 1 ? 参培 : form.ququ3 2 ? 授课 : ) 证明材料" prop"ququ6"><PdfUpload v-model"form.ququ6" :fileType"[pdf, mp4, avi, ts]"></PdfUpload> </el-form-i…

亚马逊,shein,temu如何避免爆品评分低被强制下架

近期&#xff0c;一些Temu卖家反映产品下架问题&#xff0c;无论是日出千单的爆品还是其他商品&#xff0c;都有可能面临下架的风险。这其中最主要的原因之一是产品质量问题&#xff0c;导致消费者差评较多&#xff0c;评分降至4.2分或4.0分以下时&#xff0c;平台可能会强制下…

EfficientViT:具有级联群体注意力的内存高效Transformer

EfficientViT: Memory Efficient Vision Transformer with Cascaded Group Attention 1、介绍2、使用 Vision Transformer 加快速度2.1 内存效率2.2 计算效率2.3 参数效率 3、Efficient Vision Transformer3.1 EfficientViT 构建模块3.3 EfficientViT 网络架构 4、实验5、结论 …

YOLOv8独家原创改进: AKConv(可改变核卷积),即插即用的卷积,效果秒杀DSConv | 2023年11月最新发表

💡💡💡本文全网首发独家改进:可改变核卷积(AKConv),赋予卷积核任意数量的参数和任意采样形状,为网络开销和性能之间的权衡提供更丰富的选择,解决具有固定样本形状和正方形的卷积核不能很好地适应不断变化的目标的问题点,效果秒殺DSConv 1)AKConv替代标准卷积进行…

如何在vs2019及以后版本(如vs2022)上添加 添加ActiveX控件中的MFC类

有时候我们在MFC项目开发过程中&#xff0c;需要用到一些微软已经提供的功能&#xff0c;如VC使用EXCEL功能&#xff0c;这时候我们就能直接通过VS2019到如EXCEL.EXE方式&#xff0c;生成对应的OLE头文件&#xff0c;然后直接使用功能&#xff0c;那么&#xff0c;我们上篇文章…

【Docker】python flask 项目如何打包成 Docker images镜像 上传至阿里云ACR私有(共有)镜像仓库 集成Drone CI

一、Python环境编译 1、处理好venv环境 要生成正常的 requirements.txt 文件&#xff0c;我们就需要先将虚拟环境处理好 创建虚拟环境&#xff08;可选&#xff09;&#xff1a; 在项目目录中&#xff0c;你可以选择使用虚拟环境&#xff0c;这样你的项目依赖将被隔离在一个…

C++基础 -6-二维数组,数组指针

二维数组在内存中的存放方式和一维数组完全相同 下表把二维数组抽象成了行列形式方便理解 a[0]指向第一行首元素地址 a指向第一行的首地址 所以a地址和a[0]地址相同,因为起点相同 但a[0]1往右偏移 但a1往下方向偏移 方便理解 an控制行 a[0]n控制列(相当于*an) 数组指针指向二…

食材管家,轻松搞定!商户选择生鲜配送系统的原因

随着消费者对生鲜食品的需求不断增加&#xff0c;生鲜市场逐渐成为了电商领域中的热门行业。而生鲜配送系统&#xff0c;则是生鲜电商发展中不可或缺的一部分。本文将探讨商户选择生鲜配送系统的几个原因。 1. 提高效率 生鲜配送系统通过智能化的订单处理、路线规划和配送优化…

【MySQL系列】PolarDB入门使用

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

不用render_template函数,把html代码放在py文件里,不用单独写html文件

3.猜拳游戏&#xff1a;石头、剪刀、布的游戏 ##不用render_template函数&#xff0c;把html代码放在py文件里&#xff0c;不用单独写html文件 from flask import Flask, request import randomapp Flask(__name__)app.route(/) def index():#下面form标签虽然放在注释里&…

Scanner常用知识点

在Java中&#xff0c;Scanner类是用于读取用户输入的工具类&#xff0c;可以从多种输入源读取数据&#xff0c;如标准输入流、文件或字符串。以下是一些Scanner类的常用知识点&#xff1a; Scanner的初始化&#xff1a;在使用Scanner类之前&#xff0c;需要先将其导入到你的Ja…

Pycharm在debug问题解决方案

Pycharm在debug问题解决方案 前言一、Frames are not available二、查看变量时一直显示collecting data并显示不了任何内容 前言 Pycharm在debug时总是出现一些恼人的问题&#xff0c;以下是博主在训练中遇到的问题及在网上找到的可用解决方案&#xff1a; 一、Frames are not…

C语言基础篇5:指针(二)

接上篇&#xff1a;C语言基础篇5&#xff1a;指针(一) 4 指针作为函数参数 4.1 指针变量作为函数的参数 指针型变量可以作为函数的参数&#xff0c;使用指针作为函数的参数是将函数的参数声明为一个指针&#xff0c;前面提到当数组作为函数的实参时&#xff0c;值传递数组的地址…

搭建你自己的网盘-个人云存储的终极解决方案-nextcloud (一)

在当今数字化时代&#xff0c;我们越来越多地依赖云存储来保存和共享我们的个人和工作文件。而自己搭建网盘不仅可以提供更大的存储空间和更高的隐私保护&#xff0c;还可以让我们完全掌控我们的数据。 在之前我分享过一个文件共享站-Pingvin Share 。 但是今天我将带来一个文件…

西工大网络空间安全学院计算机系统基础实验零

首先&#xff0c;下载VMware17 Pro workstation。为什么要下载VMware17 Pro workstation呢&#xff1f;因为计算机系统基础实验有四个大部分&#xff1a;利用位运算实现诸如a*b&#xff0c;a/b&#xff0c;a*(2^4)等运算&#xff1b;C语言循环语句、switch语句等语句与汇编代码…

【开源】基于Vue+SpringBoot的大学生相亲网站

项目编号&#xff1a; S 048 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S048&#xff0c;文末获取源码。} 项目编号&#xff1a;S048&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询会员4…

OpenVINO异步Stable Diffusion推理优化方案

文章目录 Stable Diffusion 推理优化背景技术讲解&#xff1a;异步优化方案思路&#xff1a;异步推理优化原理OpenVINO异步推理Python API同步和异步实现方式对比 oneflow分布式调度优化优势&#xff1a;实现思路 总结&#xff1a; Stable Diffusion 推理优化 背景 2022年&am…

服务器中启动和停止项目

服务器中启动和停止项目 一、前言二、使用命令启动和关闭项目1、启动项目2、停止项目 三、使用可执行脚本启动和关闭项目1、启动项目2、停止项目 一、前言 在服务器上部署项目&#xff0c;一般就是将项目挂在后台&#xff0c;如果是微服务首选docker-compose&#xff0c;但如果…