Redis如何避免数据丢失?——AOF

目录

AOF日志

1. 持久化——命令写入到AOF文件

写到用户缓冲区

AOF的触发入口函数——propagate

 具体的实现逻辑——feedAppendOnlyFile

从用户缓冲区写入到AOF文件(磁盘)

函数write、fsync、fdatasync

Redis的线程池

AOF文件的同步策略

触发的入口函数——flushAppendOnlyFile

2. AOF重写 

AOF 重写的2个触发时机

用户发送 bgrewriteaof 命令

在定时函数serverCron中触发

父子进程使用pipe进行通信 

两个缓冲区——重写缓冲 和 差异缓冲

重写缓冲

差异缓冲

rewriteAppendOnlyFileBackground的实现

执行重写过程的函数——rewriteAppendOnlyFile

父进程监听子进程结束, AOF 重写收尾

在定时函数serverCron中监听

主进程对AOF重写收尾——backgroundRewriteDoneHandler

3. Redis重启,AOF 文件加载


Redis是把数据储存在内存的键值数据库,而服务器一旦宕机,那内存中的数据将全部丢失。像MySQL那样,是有宕机后数据恢复机制的。那Redis也是有的,其有两种方式:AOF和RDB。该文章讲解AOF

AOF日志

MySQL是使用redo log(重做日志)来进行宕机恢复的。其是使用了写前日志(Write Ahead Log,WAL),即是在实际写数据前,先把修改的数据写到日志文件中,方便出故障时候进行恢复。

而AOF正好相反,是写后日志即是先执行命令把数据写到内存,之后再把该操作记录到日志中。这是个文本日志,不是二进制文件。

那该日志主要有3个操作:

  • AOF持久化(同步):客户端向Redis服务器发送命令,这些命令会被存储到AOF缓冲区中,并随后会持久化到磁盘文件中
  • AOF重写:随着写入的内容越来越多,就会占用大量的磁盘空间,并且Redis重启时候需要按照顺序执行AOF中的命令,这样时间就比较长,所以Redis 会定期重写 AOF 日志,以达到文件瘦身的效果和缩短重启恢复所需的时间。
  • 重启数据恢复:Redis重启后,通过AOF来进行数据恢复

1. 持久化——命令写入到AOF文件

写到用户缓冲区

首先,写入到AOF的命令是先存储在一个AOF缓冲区。

struct redisServer {.........sds aof_buf;      /* AOF buffer, written before entering the event loop */
};

客户端发送的命令转为RESP协议格式的字符串,然后追加到已有的字符串后面,这些都是存储在aof_buf中。

AOF的触发入口函数——propagate

单线程情况下,其函数被调用的流程:readQueryFromClient——>processInputBuffer——>processCommandAndResetClient——>processCommand——> call(client *c, int flags) ——>propagate

void call(client *c, int flags) {/* Call the command. */c->cmd->proc(c);........................// 入参的 flags 设置了 CMD_CALL_PROPAGATE 标识, 表示当前的命令需要传播// 同时对应的客户端内部的标识不是 CLIENT_PREVENT_PROP (客户端的命令阻止传播)if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){int propagate_flags = PROPAGATE_NONE;if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);// 当前的客户端设置了需要强制同步传播,或者设置了 需要强制 AOF 传播if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;// 与客户端c的flags对比,若是符合条件,取消 命令传播标识的repl或者aofif (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL))propagate_flags &= ~PROPAGATE_REPL;if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF))propagate_flags &= ~PROPAGATE_AOF;//  命令传播标识 不为 none, 且当前的命令不是模块命令if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))// 处理aof 和 复制给副本            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}..................................
}
// 将命令写到aof 文件,并将命令发送给副本
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)
{if (!server.replication_allowed)return;// AOF 开启了, 同时命令传播标识为 需要 AOF 传播if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);     // 将当前的命令保存到 AOF 缓冲区..................  
}

 具体的实现逻辑——feedAppendOnlyFile

该函数就是把命令写入到aof缓冲区。

  1. 创建一个SDS对象buf,用户把命令写入到该对象。判断该命令使用的数据库号是否是用户选择的数据库号,若不是就需要在aof文件中添加选择数据库。
  2. 把命令写入到buf。
    1. 对于 EXPIREEXPIREAT 和 PEXPIRE 将其转换为 PEXPIREAT 特殊处理。
    2. 对于带 EXPX 参数的 SET 命令特殊处理,主要涉及过期时间的处理;
    3. 对于其它命令,调用 catAppendOnlyGenericCommand 按照 RESP 协议组装命令,并将其暂存至 buf;
  3. 如果启用 AOF 日志,则将 buf 中暂存的命令追加到 AOF缓冲区server.aof_buf。
  4. 如果存在正在重写 AOF 的子进程,则将命令追加到 AOF 重写缓冲区server.aof_rewrite_buf_blocks
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {sds buf = sdsempty();//该命令写入的数据库和用户选择的数据库不一致的话,需要在aof文件添加一段选择数据库的记录if (dictid != server.aof_selected_db) {char seldb[64];snprintf(seldb,sizeof(seldb),"%d",dictid);// 拼接出一个 select 数据库号 的语句, 这个语句是遵守 RESP 协议buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",(unsigned long)strlen(seldb),seldb);server.aof_selected_db = dictid;}//这三个命令, 在 AOF 保存的时候, 都会转为 expireat key 具体的过期时间 (单位毫秒) 的格式存入到 AOF 文件中if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||cmd->proc == expireatCommand) {// 转为过期对应的文本, 同时追加到 buf 中buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setCommand && argc > 3) {//带 EX、PX 参数的 SET 命令,特殊处理, set key value ex seconds, set key value px millisecondsrobj *pxarg = NULL;if (!strcasecmp(argv[3]->ptr, "px")) {    //过期时间是毫秒的pxarg = argv[4];}if (pxarg) {    //毫秒的robj *millisecond = getDecodedObject(pxarg);long long when = strtoll(millisecond->ptr,NULL,10);when += mstime();decrRefCount(millisecond);robj *newargs[5];newargs[0] = argv[0];newargs[1] = argv[1];newargs[2] = argv[2];newargs[3] = shared.pxat;newargs[4] = createStringObjectFromLongLong(when);// 往 buf 中追加 set 命令buf = catAppendOnlyGenericCommand(buf,5,newargs);// 创建的对象手动修改引用计数, 便于内存回收decrRefCount(newargs[4]);} else {    //秒过期的buf = catAppendOnlyGenericCommand(buf,argc,argv);}} else {// 其他的命令直接转为 RESP 协议的字符串进行追加buf = catAppendOnlyGenericCommand(buf,argc,argv);}//将组装好的命令追加到 aof_bufif (server.aof_state == AOF_ON)server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));// CHILD_TYPE_AOF表明后台正在进行重写,那么将命令再追加一份到重写缓冲区中,以便我们记录重写时 AOF 文件和当前数据库的差异if (server.child_type == CHILD_TYPE_AOF)aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));sdsfree(buf);
}

从用户缓冲区写入到AOF文件(磁盘)

函数write、fsync、fdatasync

  • write只是将数据保存到系统缓冲区或者用户缓冲区,还没有真正落入到磁盘中的
  • fsync是真正地把数据写入到磁盘,即是把缓冲区中的数据落入磁盘。
    • POSIX 标准定义的 fsync 函数在文件元数据(metadata,例如 st_sizest_atimest_mtime 等)变脏时,会将所有元数据同步到磁盘。由于每次同步都必定导致时间戳的改变,而且文件内容和文件元数据通常存储在磁盘上的不同位置,因此每次调用 fsync 至少需要两次随机磁盘 I/O
  • 为此,Linux 平台提供了一个 fdatasync 函数。该函数仅在必要时才将元数据同步到磁盘(文件读写时间戳等信息的改变不会实时落盘),大大降低了元数据同步的频率。
    • 举例来说,文件的尺寸(st_size)如果变化,是需要立即同步的,否则OS一旦崩溃,即使文件的数据部分已同步,由于metadata没有同步,依然读不到修改的内容。而最后访问时间(atime)/修改时间(mtime)是不需要每次都同步的,只要应用程序对这两个时间戳没有苛刻的要求,基本无伤大雅

 Redis 通过条件编译,将 Linux 平台的 redis_fsync 定义成了 fdatasync,而在其它类 Unix 平台上依旧是 fsync

#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif

Redis的线程池

真正写入到磁盘的是使用fsync函数,那说明该函数是相对比较耗时的。Redis维护了一个线程池,就是用来处理一些比较耗时的操作。

那么,AOF缓冲区写入到AOF文件(存入到磁盘)过程中,会先通过write将数据写入到系统缓存,然后根据当前的AOF保存策略,决定是否需要把fsync函数的执行交给线程池。

AOF文件的同步策略

  • no:不进行同步,每个写命令执行完后,只是先把记录写到AOF文件中的内存缓冲区中,由操作系统决定合适将缓冲区内存写回磁盘。
  • always:每次write后,都会立即执行fsync,这种就是在主线程中执行fsync。
  • everysec:每次write后,不会立即执行fsync,理论是每秒执行一次fsync,同时内部将fysnc的执行交给线程池去处理

触发的入口函数——flushAppendOnlyFile

将缓冲区中的数据写入到aof文件的函数是flushAppendOnlyFile。

在Redis中有5处会调用该函数

  1. 通过命令动态关闭AOF功能时,会进行一次保存,即是发送命令将appendonly yes设置为appendonly no。
  2. 在Redis正常关闭之前,会执行该函数。
  3. 在事件循环中的beforesleep函数中会调用一次,这个是AOF功能的主要的保存入口
  4. Redis的定时函数serverCron(默认100毫秒执行一次)
  5. 定时函数serverCron,判断上次AOF写入状态,失败就执行一次该函数。

 定时函数serverCron关于这部分的代码。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {..................//上次的 AOF 写文件时, 没有执行, 将 aof_flush_postponed_start 设置为true, 表示需要延迟处理//存在延迟的AOF落盘操作,在这里完成if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)flushAppendOnlyFile(0);run_with_period(1000) {//上次的写文件失败,即是fync失败if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)flushAppendOnlyFile(0);}...................
}

存在延迟的AOF落盘操作

比如:主线程在执行flushAppendOnlyFile中调用write后,提交一个任务给后台线程,假设此时数据量很大,fsync需要执行较长时间。而主线程又执行到了flushAppendOnlyFile,而上一次的fsync函数还没有执行完,Redis会选择延迟执行,将Server成员变量aof_flush_postponed_start设置为当前时间,就结束该函数。

所以在执行定时任务时候,会判断该变量是否>0,若是,会再执行flushAppendOnlyFile,这个就是AOF同步延迟到定时函数处执行。

但是,延迟到定时任务处触发, 还是无法保证后台线程一定执行完上次的 fsync。所以该函数会根据当前的时间和变量储存的时间进行判断,若是在2s内,就不做任何处理,退出该函数;而大于2s,立即执行AOF缓冲区写入文件的逻辑。

flushAppendOnlyFile的实现

  1. 如果AOF缓冲区为空, 并且AOF策略是everysec,同步到磁盘的内容大小不等于当前AOF文件的内容大小,当前时间 > 上次AOF fsync的时间,同时当前没有正在运行的bio后台任务,则尝试执行fsync。
  2. 如果策略是everysec,且后台存在正在同步的bio线程,则判断aof_flush_postponed_start是否为0:
    1. 若是0,表示之前没有延迟落盘任务,所以就只记录当前的时间戳给aof_flush_postponed_start并退出。
    2. 若是不为0,但判断距离aof_flush_postponed_start是否已经过去2s,若是就增加server.aof_delayed_fsync 计数,强制后续的磁盘同步流程
  3. 调用aofWrite将AOF缓冲区中的数据写入到系统内核缓冲区(这时是还没有使用fsync),若是写入到系统的数据长度不等于当前 AOF 缓冲区的长度, 需要进行异常处理
  4. 如果aof_buf的总空间小于4kb,则清空buffer内容并重新使用该缓冲区,否则创建一个新的。
// AOF 缓冲区数据写入文件
// 当持久策略被设置为 everysec, 实际上会由后台线程进行处理, 那么当前这次刷新写入时, 后台可能有线程还在写入, 所以这时的操作会延迟写入 
//参数force 1:表示无视后台的 fsync, 直接写入, 0: 表示可以延迟, 一般 AOF 过程都是 0
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {ssize_t nwritten;int sync_in_progress = 0;mstime_t latency;//表示aof缓冲区中没有数据, 就可以结束了,但是Redis中有一些极端情况,不会结束,当前学习可以不用了解,后序熟悉该代码了再回头看if (sdslen(server.aof_buf) == 0) {if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync &&!(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} else {return;}}// 持久策略为每秒 fsync 一次, 判断后台的线程池是否有线程在执行 fsync if (server.aof_fsync == AOF_FSYNC_EVERYSEC)sync_in_progress = aofFsyncInProgress();//该返回值为true,表示当前有BIO线程在执行 fsync // 持久策略为每秒 fsync 一次, 同时不需要强制写入文件if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {// 当前有 BIO 线程在执行 fsyncif (sync_in_progress) {if (server.aof_flush_postponed_start == 0) { // 0 表示当前没有延迟执行//当前有后台线程在执行 fsync, 那么先延长一下, 设置aof_flush_postponed_start 为当前时间server.aof_flush_postponed_start = server.unixtime;return;//若之前是偶延迟执行,然后又进入了该函数(一般是执行定时函数触发),那这时后台还在执行fsync,但是当前时间和上一次设置的延迟时间小于2s,可以接受,就暂时不做处理} else if (server.unixtime - server.aof_flush_postponed_start < 2) {/* We were already waiting for fsync to finish, but for less* than two seconds this is still ok. Postpone again. */return;}//到了这一步表示线程池中有请求 fsync 的任务, 同时上次延迟距离当前时间超过 2 秒了server.aof_delayed_fsync++;    // 延迟 fsync 的次数 + 1serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");}}if (server.aof_flush_sleep && sdslen(server.aof_buf)) {usleep(server.aof_flush_sleep);}latencyStartMonitor(latency);//调用 write 函数将缓冲区中的数据写入到文件 (此时还在系统缓存, 还没写入到磁盘nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));latencyEndMonitor(latency);if (sync_in_progress) {latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);} else if (hasActiveChildProcess()) {latencyAddSampleIfNeeded("aof-write-active-child",latency);} else {latencyAddSampleIfNeeded("aof-write-alone",latency);}latencyAddSampleIfNeeded("aof-write",latency);//将缓冲区中的数据 write 到系统后, 可以把延迟执行设置为 0   server.aof_flush_postponed_start = 0;                 // 写入到系统的数据长度不等于当前 AOF 缓冲区的长度, 进入异常处理if (nwritten != (ssize_t)sdslen(server.aof_buf)) {static time_t last_write_error_log = 0;if (nwritten == -1) {    // -1, 没有写入任何数据, 就直接失败了server.aof_last_write_errno = errno;}} else {// 大于 -1 但是不等于缓冲区的大小, 写入成功了一部分, if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {//写错误日志..............}} else {/* If the ftruncate() succeeded we can set nwritten to* -1 since there is no longer partial data into the AOF. */nwritten = -1;}server.aof_last_write_errno = ENOSPC;}// 同步策略为 alwaysif (server.aof_fsync == AOF_FSYNC_ALWAYS) {// 这种情况无法处理了, 已经告知客户端写入成功了, 但是当前写入失败了, 直接退出程序。serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");exit(1);} else {// 设置上一次写入状态为异常, 在定时器中会判断这个状态, 再次触发 flushAppendOnlyFile server.aof_last_write_status = C_ERR;if (nwritten > 0) {// 更新当前 aof 文件的大小, 同时将缓冲区中这部分大小的数据移除// 表示这部分写入成功了, 剩余部分下次调用继续server.aof_current_size += nwritten;sdsrange(server.aof_buf,nwritten,-1);}return; /* We'll try again on the next call... */}} else {    //写入成功if (server.aof_last_write_status == C_ERR) {server.aof_last_write_status = C_OK;}}server.aof_current_size += nwritten;    // 更新当前 AOF 文件的大小//如果当前 AOF 缓冲区足够小,小于4K,那么重用这个缓存,否则释放 AOF 缓冲区, 然后重新分配一个    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {sdsclear(server.aof_buf);} else {sdsfree(server.aof_buf);server.aof_buf = sdsempty();}try_fsync://aof缓冲区中没有数据,但是有一些特例情况的需要处理的..........
}

try_fsync部分的代码:

void flushAppendOnlyFile(int force) {if (sdslen(server.aof_buf) == 0) {if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync && !(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} .......................}..........................
try_fsync:// no-appendfsync-on-rewrite (正在重写, 不执行 fsync) 被设置为 yes//判断是否有运行中的 bio 线程if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())return;if (server.aof_fsync == AOF_FSYNC_ALWAYS) {latencyStartMonitor(latency);//如果 AOF 落盘策略为 always,直接同步if (redis_fsync(server.aof_fd) == -1) {serverLog(LL_WARNING,"Can't persist AOF for fsync error when the ""AOF fsync policy is 'always': %s. Exiting...", strerror(errno));exit(1);}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-fsync-always",latency);server.aof_fsync_offset = server.aof_current_size;//更新 aof_fsync_offset 为当前的 AOF文件大小server.aof_last_fsync = server.unixtime;     // 上次 fsync 为当前的时间} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.unixtime > server.aof_last_fsync)) {// 当前没有请求 fsync 的任务在线程池中if (!sync_in_progress) {//提交一个任务,就是向线程池的任务链表中添加任务节点, 最终就是一个后台线程执行一次 redis_fsync 函数aof_background_fsync(server.aof_fd);server.aof_fsync_offset = server.aof_current_size;}server.aof_last_fsync = server.unixtime;}
}

2. AOF重写 

很容易想到的一个情况,文件越写越大。AOF文件是以追加形式逐一记录接受到的写命令的。当一个键值对被多条命令反复修改时,AOF文件会记录相应的多条命令。要是宕机后重启,对同一个key,就需要依次执行AOF文件中对该key的操作命令。但是我们只需要最新的对该key的操作。

所以就有了重写。重写的时候,是根据这个键值对的最新状态,为它生成对应的写入命令。这样一来,一个键值对在重写日志中只使用一条命令即可。在日志恢复时,只执行一条命令,就可以直接完成这个键值对的写入,也方便省时。

AOF 重写的2个触发时机

  • bgrewriteaof 命令被执行。
  • 定时器函数, 定时检查 AOF 文件, 如果满足配置文件里面设置的条件, 就触发AOF重写

用户发送 bgrewriteaof 命令

bgrewriteaof 命令方式对应的逻辑函数为 bgrewriteaofCommand。主要逻辑是:

  1. 如果正在执行重写中了,返回错误提示
  2. 如果正在执行RDB保存,就将 aof_rewrite_scheduled 属性设置为 true, 返回提示后, 结束。之后是通过定时器函数serverCron判断这个状态确定是否需要触发
  3. 否则,调用 rewriteAppendOnlyFileBackground 执行重写
struct redisCommand redisCommandTable[] = {   {"bgrewriteaof",bgrewriteaofCommand,1,"admin no-script",0,NULL,0,0,0,0,0,0},.................
};void bgrewriteaofCommand(client *c) {if (server.child_type == CHILD_TYPE_AOF) {addReplyError(c,"Background append only file rewriting already in progress");} else if (hasActiveChildProcess()) {server.aof_rewrite_scheduled = 1;addReplyStatus(c,"Background append only file rewriting scheduled");} else if (rewriteAppendOnlyFileBackground() == C_OK) {addReplyStatus(c,"Background append only file rewriting started");} else {addReplyError(c,"Can't execute an AOF background rewriting. ""Please check the server logs for more information.");}
}

在定时函数serverCron中触发

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  .......................// 后台没有进程在 RDB 和 AOF, 同时通过 bgrewriteaof 命令设置了定时刷新重写 AOF  if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) {rewriteAppendOnlyFileBackground();}if (hasActiveChildProcess() || ldbPendingChildren()){    // 后台有进程在 RDB 或者 AOF.......................} else {    // 当前后台 没有进程在 RDB 或者 AOF//省略了一些检查.............//达到了AOF重写的条件:开启了AOF && 后台没有RDB和AOF重写进行 &&// 配置了目前 AOF 文件大小超过上次重写的 AOF 文件的百分比 &&//当前的 AOF 文件大小超过了配置的需要触发重写的最小大小if (server.aof_state == AOF_ON && !hasActiveChildProcess() &&server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size){// 计算当前的文件增长的比例long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1;long long growth = (server.aof_current_size*100/base) - 100;// 超过了就调用 rewriteAppendOnlyFileBackground 进行重写if (growth >= server.aof_rewrite_perc) {rewriteAppendOnlyFileBackground();}}}...........................................
}

所以,都是集中到函数rewriteAppendOnlyFileBackground中处理的

在某时刻,需要AOF文件重写:

  • 那为了不阻塞主线程,那可以fork一个子进程来重写。fork出来的子进程,拥有了和父进程一样的内存数据,子进程就先把这些内存数据写入到一个AOF临时文件。
  • 但是在这个过程中,父进程还是能接受客户端的命令的,所以父子进程需要通讯,而Redis中父子进程是使用管道pipe进行通讯的。

父子进程使用pipe进行通信 

Redis是使用了3个管道。每个管道有2端,所以有6个fd。

struct redisServer {/* AOF pipes used to communicate between parent and child during rewrite. */int aof_pipe_write_data_to_child;int aof_pipe_read_data_from_parent;int aof_pipe_write_ack_to_parent;int aof_pipe_read_ack_from_child;int aof_pipe_write_ack_to_child;int aof_pipe_read_ack_from_parent;...................
}; int aofCreatePipes(void) {int fds[6] = {-1, -1, -1, -1, -1, -1};int j;//int pipe(int pipefd[2]); 成功:0;失败:-1,设置errno,函数调用成功返回r/w两个文件描述符if (pipe(fds) == -1) goto error; /* parent -> children data. */if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */if (pipe(fds+4) == -1) goto error; /* parent -> children ack. *//* Parent -> children data is non blocking. */if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return C_OK;................
}
  1. aof_pipe_write_data_to_child 和 aof_pipe_read_data_from_parent, 主要是父进程将子进程重写过程中产生的命令同步给子进程(即是同步数据
  2. aof_pipe_write_ack_to_parent 和 aof_pipe_read_ack_from_child, 主要是用于子进程通知父进程停止同步变更命令
  3. aof_pipe_write_ack_to_child 和 aof_pipe_read_ack_from_parent, 主要用于父进程响应子进程的停止同步变更命令的请求

 我们要重点关注aof_pipe_write_data_to_child(写端) aof_pipe_read_data_from_parent(读端)。这两个是传输Redis内存数据的管道fd。

两个缓冲区——重写缓冲差异缓冲

重写缓冲

struct redisServer {list *aof_rewrite_buf_blocks; //重写缓冲区,注意是个链表  /* Hold changes during an AOF rewrite. */...................
};#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block *///AOF 重写缓存列表的节点定义
typedef struct aofrwblock {unsigned long used, free;    //used:已使用的空间,free:剩余的空阿金char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;void aofRewriteBufferReset(void) {if (server.aof_rewrite_buf_blocks)listRelease(server.aof_rewrite_buf_blocks);server.aof_rewrite_buf_blocks = listCreate();listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}

什么时候使用到重写缓冲区?那就是需要进行AOF重写的时候。

将缓冲区中的数据写入到aof的函数是flushAppendOnlyFile。那也是在该函数中,会使用到重写缓冲区。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {.............................if (server.child_type == CHILD_TYPE_AOF)    /// 如果后台正在进行重写aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));//将命令写入到 AOF 重写缓冲区
}/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {//获取缓冲区列表,是添加在尾部,所以获取尾部listNode *ln = listLast(server.aof_rewrite_buf_blocks);aofrwblock *block = ln ? ln->value : NULL;while(len) {/* If we already got at least an allocated block, try appending* at least some piece into it. */if (block) {    //表明重写缓冲列表已有数据//计算当前节点的剩余空间是否够len长度的数据写入unsigned long thislen = (block->free < len) ? block->free : len;if (thislen) {  /* The current block is not already full. */memcpy(block->buf+block->used, s, thislen);block->used += thislen;block->free -= thislen;s += thislen;len -= thislen;}}// len > 0, 说明还需要空间, 但是当前的节点没有空间了, 需要新建一个节点if (len) { /* First block to allocate, or need another block. */int numblocks;// 分配新的缓存节点, 同时放到列表的尾部block = zmalloc(sizeof(*block));block->free = AOF_RW_BUF_BLOCK_SIZE;block->used = 0;listAddNodeTail(server.aof_rewrite_buf_blocks,block);numblocks = listLength(server.aof_rewrite_buf_blocks);if (((numblocks+1) % 10) == 0) {int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :LL_NOTICE;serverLog(level,"Background AOF buffer size: %lu MB",aofRewriteBufferSize()/(1024*1024));}}}// 注册一个文件事件, 用来将缓冲区的数据写入到 aof_pipe_write_data_to_child 中, //然后在 Pipe 的作用下, 可以同步到 aof_pipe_read_data_from_parentif (!server.aof_stop_sending_diff &&aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0){//这里注意:注册的是 写事件, 写事件就绪的条件是内核空间的缓冲有空,就可以写aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,AE_WRITABLE, aofChildWriteDiffData, NULL);}
}

接着来看看管道fd的写事件回调函数aofChildWriteDiffData

那么当内核缓冲区空间有空闲,就会触发该管道fd的写事件,就会执行aofChildWriteDiffData通过该函数就把重写缓存中的数据写到了管道中,供子进程读取到子进程的差异缓冲中。

//事件回调函数, 把当前的 AOF 缓冲区同步到 aof_pipe_write_data_to_child, 在 Pipe 的作用下间接同步到 aof_pipe_read_data_from_parent
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {listNode *ln;aofrwblock *block;ssize_t nwritten;while(1) {ln = listFirst(server.aof_rewrite_buf_blocks);block = ln ? ln->value : NULL;// 停止同步 或者 重写缓冲区为空,  就需要删除这个 写事件if (server.aof_stop_sending_diff || !block) {aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);return;}if (block->used > 0) {// 把 block 的数据写入到 aof_pipe_write_data_to_childnwritten = write(server.aof_pipe_write_data_to_child,block->buf,block->used);if (nwritten <= 0) return;memmove(block->buf,block->buf+nwritten,block->used-nwritten);block->used -= nwritten;block->free += nwritten;}if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);}
}

差异缓冲

在子进程重写AOF过程中,子进程等待主进程把重写缓冲中的数据通过pipe发送到差异缓冲区。

struct redisServer {sds aof_child_diff;  //子进程的差异缓冲区  /* AOF diff accumulator child side. */te. */...................
};

子进程通过pipe将重写缓冲区中的数据同步到差异缓冲区的函数是aofReadDiffFromParent

ssize_t aofReadDiffFromParent(void) {char buf[65536]; /* Default pipe buffer size on most Linux systems. */ssize_t nread, total = 0;// 将 aof_pipe_read_data_from_parent 中的数据读取到 buf 中while ((nread =read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {// 把buf的数据拼接到aof_child_diff 中        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);  total += nread;}return total;
}

rewriteAppendOnlyFileBackground的实现

了解了Redis中关于AOF重写的两个缓冲区和父子进程通过pipe通讯,那对AOF重写的过程就好理解了。

其具体的细节步骤:

  1. 主进程fork出一个子进程,让子进程来进行AOF重写。fork出来的子进程,拥有了和父进程一样的内存数据 
  2. 子进程将内存中的数据写入到一个AOF临时文件中
  3. 在子进程重写期间,主进程还是会继续将新到达的命令追加写到原AOF,并将这些命令拷贝到重写缓冲,然后通过pipe管道发送给子进程的差异缓冲中。
  4. 子进程处理完内存数据后,就把差异缓冲中的数据追加到临时AOF文件中,之后就禁止主进程发新数据。
  5. 这时,若主进程中的重写缓存中还剩余数据,就把该数据追加到临时AOF文件中,再用临时AOF文件替换旧的AOF,结束。 
int rewriteAppendOnlyFileBackground(void) {pid_t childpid;if (hasActiveChildProcess()) return C_ERR;  //判断当前没有RDB和aof重写  if (aofCreatePipes() != C_OK) return C_ERR;    //创建 Pipe 通道, 用于父子进程之间通信//创建 AOF 子进程if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {char tmpfile[256];/* Child */redisSetProcTitle("redis-aof-rewrite");//将自己绑定给某个cpuredisSetCpuAffinity(server.aof_rewrite_cpulist);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());//这个是重点,  重写AOFif (rewriteAppendOnlyFile(tmpfile) == C_OK) {//子进程重写完成的一些收尾工作, 基本不涉及主流程, 通知父进程过程中子进程修改了多少数据sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");exitFromChild(0);} else {exitFromChild(1);}} else {/* Parent */..............server.aof_rewrite_scheduled = 0;server.aof_rewrite_time_start = time(NULL);server.aof_selected_db = -1;// 清空 redisServer 的 repl_scriptcache_dict 字典和 repl_scriptcache_fifo 这个列表// 和主从复制相关replicationScriptCacheFlush();return C_OK;}return C_OK; /* unreached */
}

执行重写过程的函数——rewriteAppendOnlyFile

子进程执行的rewriteAppendOnlyFile就是真正的AOF重写过程。

这个流程步骤有点多:

  1. 打开aof临时文件,并命名;初始化差异缓冲server.aof_child_diff
  2. 若是启用了混合持久化,则调用rdbSaveRio将 RDB 数据写入 aof 临时文件;否则,调用 rewriteAppendOnlyFileRio() 进行普通的 aof 重写。其内部会遍历字典快照,删除无效数据后,将其封装为 RESP 数据写入临时文件。在遍历的过程中,还会周期性地从管道中拉取增量数据到 aof_child_diff
  3. 将I/O缓冲和内核缓冲中的剩余数据同步到磁盘
  4. 从管道中读取剩余的增量数据,持续一段时间
  5. 停止读取后,发送指令给管道让主进程停止向管道写入。然后等待主进程地 ACK;
  6. 此时父进程不会在同步差异命令过来了, 再做最后一次同步, 将 Pipe 通道中残留的数据同步过来,再次从管道中读取数据。
  7. 将差异缓冲中的数据追加到AOF临时文件中,并再次将AOF临时文件缓冲中的数据同步到磁盘中。
  8. 修改临时文件名,并确认写入成功
int rewriteAppendOnlyFile(char *filename) {rio aof;char tmpfile[256];char byte;// 1snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());FILE *fp = fopen(tmpfile,"w");//..................// 清空 aof_child_diff 的数据, 这个就是 AOF 子进程差异缓冲区server.aof_child_diff = sdsempty();rioInitWithFile(&aof,fp);      // 初始 rio 流, 也就是 IO 流, 用于写入数据到文件// 设定 fsync 触发条件if (server.aof_rewrite_incremental_fsync)rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);startSaving(RDBFLAGS_AOF_PREAMBLE);// 2if (server.aof_use_rdb_preamble) {int error;//混合持久化if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {errno = error;goto werr;}} else {//普通持久化if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;}// 3//fflush:是把C库中的缓冲调用write函数写到磁盘[其实是写到内核的缓冲区]。//fsync:是把内核缓冲刷到磁盘上。if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;int nodata = 0;mstime_t start = mstime();// 4 .从管道中拉取剩余的增量数据,持续一段时间while(mstime()-start < 1000 && nodata < 20) {if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0){nodata++;continue;}nodata = 0; /* Start counting from zero, we stop on N *contiguous*timeouts. */aofReadDiffFromParent();    //从管道读数据到 差异缓冲aof_child_diff}// 5 通知主进程 停止发送增量数据if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)goto werr;// 等待主进程的 ACK,最多等 5sif (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||byte != '!') goto werr;// 此时父进程不会在同步差异命令过来了, 再做最后一次同步, 将 Pipe 通道中残留的数据同步过来// 再次从管道中读取差异数据aofReadDiffFromParent();//获取差异缓冲数据的内容大小size_t bytes_to_write = sdslen(server.aof_child_diff);const char *buf = server.aof_child_diff;long long cow_updated_time = mstime();long long key_count = dbTotalServerKeyCount();// 6 . 将差异缓冲数据写入 aof 文件while (bytes_to_write) {size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);// 将 aof_child_diff 中的数据写入到 aof 文件中if (rioWrite(&aof,buf,chunk_size) == 0)goto werr;bytes_to_write -= chunk_size;buf += chunk_size;/* Update COW info */long long now = mstime();if (now - cow_updated_time >= 1000) {sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");cow_updated_time = now;}}// 7 .将 aof 文件缓冲中的数据,同步到磁盘if (fflush(fp)) goto werr;if (fsync(fileno(fp))) goto werr;if (fclose(fp)) { fp = NULL; goto werr; }fp = NULL;//8 .重命名文件if (rename(tmpfile,filename) == -1) {unlink(tmpfile);stopSaving(0);return C_ERR;}stopSaving(1);return C_OK;werr:if (fp) fclose(fp);unlink(tmpfile);stopSaving(0);return C_ERR;
}

父进程监听子进程结束, AOF 重写收尾

在定时函数serverCron中监听

那是在哪进行监听呢?还是在定时函数serverCron中。定时地检查子进程的状态是否为结束了, 是的话, 执行结束逻辑。在下一次运行 serverCron定时函数时,调用 checkChildrenDone()完成 AOF 收尾工作。checkChildrenDone的核心内容backgroundRewriteDoneHandler函数

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {.................// 检查是否有 RDB 子进程或者 AOF 重写子进程结束了if (hasActiveChildProcess() || ldbPendingChildren()){run_with_period(1000) receiveChildInfo();checkChildrenDone();} else {...........}............................
}/* Receive info data from child. */
void receiveChildInfo(void) {if (server.child_info_pipe[0] == -1) return;size_t cow;monotime cow_updated;size_t keys;double progress;childInfoType information_type;/* Drain the pipe and update child info so that we get the final message. */while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &progress)) {updateChildInfo(information_type, cow, cow_updated, keys, progress);}
}void checkChildrenDone(void) {int statloc = 0;pid_t pid;// wait3可以获取所有的进程是否有一个进程退出状态的, 有的话, 进行彻底的销毁,并返回其进程idif ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;int bysignal = 0;if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {bysignal = SIGUSR1;exitcode = 1;}if (pid == -1) {//打印日志} else if (pid == server.child_pid) {if (server.child_type == CHILD_TYPE_RDB) {backgroundSaveDoneHandler(exitcode, bysignal);} else if (server.child_type == CHILD_TYPE_AOF) {backgroundRewriteDoneHandler(exitcode, bysignal); //自己想哦买噶最终的清理逻辑} if (!bysignal && exitcode == 0) receiveChildInfo();    //获取子进程发送给父进程的信息resetChildState();} else {if (!ldbRemoveChild(pid)) {//打印日志}}/* start any pending forks immediately. */replicationStartPendingFork();}
}

主进程对AOF重写收尾——backgroundRewriteDoneHandler

主进程的backgroundRewriteDoneHandler中主要是4步骤:

  1. 打开子进程刚刚处理完的 aof 临时文件
  2. 将停止发送增量数据期间积累的数据追加到 临时AOF文件
  3.  重命名,替换旧的aof文件
  4. 最后,进行清除工作
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {if (!bysignal && exitcode == 0) {int newfd, oldfd;char tmpfile[256];long long now = ustime();mstime_t latency;latencyStartMonitor(latency);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.child_pid);// 1 打开子进程刚刚处理完的 aof 临时文件newfd = open(tmpfile,O_WRONLY|O_APPEND);if (newfd == -1) { goto cleanup; }// 2 将停止发送增量数据期间积累的数据追加到 临时AOF文件if (aofRewriteBufferWrite(newfd) == -1) {close(newfd); goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {aof_background_fsync(newfd);} else if (server.aof_fsync == AOF_FSYNC_ALWAYS) {latencyStartMonitor(latency);if (redis_fsync(newfd) == -1) {close(newfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-done-fsync",latency);}// aof_fd 为当前的 AOF 文件的文件描述符, 等于 -1, 应该是 AOF 功能停用了// 这时为了下面的流程能走下去, 从配置文件中获取到配置的文件名, 尝试打开禁用前的文件if (server.aof_fd == -1) {/* AOF disabled */oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);} else {/* AOF enabled */oldfd = -1; /* We'll set this to the current AOF filedes later. */}latencyStartMonitor(latency);// 3  重命名,替换旧的aof文件if (rename(tmpfile,server.aof_filename) == -1) {close(newfd);if (oldfd != -1) close(oldfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rename",latency);if (server.aof_fd == -1) {/* AOF disabled, we don't need to set the AOF file descriptor* to this new file, so we can close it. */close(newfd);} else {/* AOF enabled, replace the old fd with the new one. */oldfd = server.aof_fd;server.aof_fd = newfd;server.aof_selected_db = -1; /* Make sure SELECT is re-issued */aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;server.aof_fsync_offset = server.aof_current_size;server.aof_last_fsync = server.unixtime;/* Clear regular AOF buffer since its contents was just written to* the new AOF from the background rewrite buffer. */sdsfree(server.aof_buf);server.aof_buf = sdsempty();}server.aof_lastbgrewrite_status = C_OK;/* Change state from WAIT_REWRITE to ON if needed */if (server.aof_state == AOF_WAIT_REWRITE)server.aof_state = AOF_ON;/* Asynchronously close the overwritten AOF. */if (oldfd != -1) bioCreateCloseJob(oldfd);} else if (!bysignal && exitcode != 0) {server.aof_lastbgrewrite_status = C_ERR;} else {if (bysignal != SIGUSR1)server.aof_lastbgrewrite_status = C_ERR;}cleanup://清除工作aofClosePipes();aofRewriteBufferReset();aofRemoveTempFile(server.child_pid);server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;server.aof_rewrite_time_start = -1;/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */if (server.aof_state == AOF_WAIT_REWRITE)server.aof_rewrite_scheduled = 1;
}

重写失败的话,原来的AOF文件依然是可以使用的。在AOF重写过程中,新来的命令会被写入磁盘两次(主进程写入到旧AOF,子进程是追加到临时AOF),这就会浪费一定的磁盘空间(磁盘便宜大碗,没问题的)。只是在重写过程中,新的命令会被全部储存到子进程的差异缓冲区中,这可能会导致较高的内存占用。

3. Redis重启,AOF 文件加载

从main函数开始,其调用流程 main——>loadDataFromDisk——>loadAppendOnlyFile

其主要流程:

  1. 打开AOF文件
  2. 创建一个虚拟客户端,用于执行AOF中的命令
  3. 根据aof文件中的前导码,判断若是REDIS开头,就调用rdbLoadRio加载RDB的数据;否则将文件指针归零;
  4. 开始循环处理RESP格式的字符串
    1. 按照RESP协议读取命令的参数的个数
    2. 读取命令的每个参数
    3. 根据第一个参数,查询命令表,得到命令
    4. 执行命令
int main(int argc, char **argv) {.........if (!server.sentinel_mode) {    //非哨兵模式loadDataFromDisk();............}
}void loadDataFromDisk(void) {if (server.aof_state == AOF_ON) {loadAppendOnlyFile(server.aof_filename)   }...................
}int loadAppendOnlyFile(char *filename) {struct client *fakeClient;// 1  打开aof文件FILE *fp = fopen(filename,"r");struct redis_stat sb;int old_aof_state = server.aof_state;long loops = 0;off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {server.aof_current_size = 0;server.aof_fsync_offset = server.aof_current_size;fclose(fp);return C_ERR;}/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI* to the same file we're about to read. */server.aof_state = AOF_OFF;// 2  创建虚拟客户端fakeClient = createAOFClient();startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);// 3  根据是否有RDB前导码,再确定处理方式char sig[5]; /* "REDIS" */if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {/* No RDB preamble, seek back at 0 offset. */if (fseek(fp,0,SEEK_SET) == -1) goto readerr;} else {/* RDB preamble. Pass loading the RDB functions. */rio rdb;if (fseek(fp,0,SEEK_SET) == -1) goto readerr;rioInitWithFile(&rdb,fp);//加载rdb内容if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {goto readerr;}}// 4  循环处理Aof文件中剩下的所有命令while(1) {int argc, j;unsigned long len;robj **argv;char buf[128];sds argsds;struct redisCommand *cmd;/* Serve the clients from time to time */if (!(loops++ % 1000)) {loadingProgress(ftello(fp));processEventsWhileBlocked();processModuleLoadingProgressEvent(1);}if (fgets(buf,sizeof(buf),fp) == NULL) {if (feof(fp))break;elsegoto readerr;}if (buf[0] != '*') goto fmterr;if (buf[1] == '\0') goto readerr;// 4.1  按照resp协议读取命令的参数数量argc = atoi(buf+1);if (argc < 1) goto fmterr;argv = zmalloc(sizeof(robj*)*argc);fakeClient->argc = argc;fakeClient->argv = argv;// 4.2  循环读取命令的每个参数for (j = 0; j < argc; j++) {/* Parse the argument len. */char *readres = fgets(buf,sizeof(buf),fp);if (readres == NULL || buf[0] != '$') {fakeClient->argc = j; /* Free up to j-1. */freeFakeClientArgv(fakeClient);if (readres == NULL)goto readerr;elsegoto fmterr;}len = strtol(buf+1,NULL,10);/* Read it into a string object. */argsds = sdsnewlen(SDS_NOINIT,len);if (len && fread(argsds,len,1,fp) == 0) {sdsfree(argsds);fakeClient->argc = j; /* Free up to j-1. */freeFakeClientArgv(fakeClient);goto readerr;}argv[j] = createObject(OBJ_STRING,argsds);/* Discard CRLF. */if (fread(buf,2,1,fp) == 0) {fakeClient->argc = j+1; /* Free up to j. */freeFakeClientArgv(fakeClient);goto readerr;}}// 4.3  根据第一个参数,查询命令表,获取命令cmd = lookupCommand(argv[0]->ptr);if (cmd == server.multiCommand) valid_before_multi = valid_up_to;// 4.4 执行命令fakeClient->cmd = fakeClient->lastcmd = cmd;if (fakeClient->flags & CLIENT_MULTI &&fakeClient->cmd->proc != execCommand){queueMultiCommand(fakeClient);} else {cmd->proc(fakeClient);}/* Clean up. Command code may have changed argv/argc so we use the* argv/argc of the client instead of the local variables. */freeFakeClientArgv(fakeClient);fakeClient->cmd = NULL;if (server.aof_load_truncated) valid_up_to = ftello(fp);if (server.key_load_delay)debugDelay(server.key_load_delay);}if (fakeClient->flags & CLIENT_MULTI) {valid_up_to = valid_before_multi;goto uxeof;}..........................
}

goto部分的代码:

int loadAppendOnlyFile(char *filename) {...................................
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */fclose(fp);freeFakeClient(fakeClient);server.aof_state = old_aof_state;stopLoading(1);aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;server.aof_fsync_offset = server.aof_current_size;return C_OK;readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */if (!feof(fp)) {if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));exit(1);}uxeof: /* Unexpected AOF end of file. */if (server.aof_load_truncated) {serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",(unsigned long long) valid_up_to);if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {if (valid_up_to == -1) {serverLog(LL_WARNING,"Last valid command offset is invalid");} else {serverLog(LL_WARNING,"Error truncating the AOF file: %s",strerror(errno));}} else {/* Make sure the AOF file descriptor points to the end of the* file after the truncate call. */if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",strerror(errno));} else {serverLog(LL_WARNING,"AOF loaded anyway because aof-load-truncated is enabled");goto loaded_ok;}}}if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);exit(1);fmterr: /* Format error. */if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);exit(1);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/13200.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript函数声明

JS函数声明 JS中的方法,多称为函数,函数的声明语法和JAVA中有较大区别 语法1&#xff1a;function 函数名 (参数列表){函数体} 语法2&#xff1a;var 函数名 function (参数列表){函数体} 函数说明 函数没有权限控制符不用声明函数的返回值类型,需要返回在函数体中直接return即…

Python | Leetcode Python题解之第92题反转链表II

题目&#xff1a; 题解&#xff1a; class Solution:def reverseBetween(self, head: ListNode, left: int, right: int) -> ListNode:# 设置 dummyNode 是这一类问题的一般做法dummy_node ListNode(-1)dummy_node.next headpre dummy_nodefor _ in range(left - 1):pre…

关于FIFO Generator IP和XPM_FIFO在涉及位宽转换上的区别

在Xilinx FPGA中&#xff0c;要实现FIFO的功能时&#xff0c;大部分时候会使用两种方法&#xff1a; FIFO Generator IP核XPM_FIFO原语 FIFO Generator IP核的优点是有图形化界面&#xff0c;配置参数非常直观&#xff1b;缺点是参数一旦固定&#xff0c;想要更改的化就只能重…

一次tomcat闪退处理

双击tomcat目录下bin目录中startup.bat 在我的电脑上是一闪而过&#xff0c;不能正常地启动tomcat软件 以记事本打开startup.bat文件&#xff0c;在文件的结尾处加上pause 然后再双击该bat执行&#xff0c;此时窗口就不会关闭&#xff0c;并会将错误信息打印在提示框中 可能是…

英伟达发布 VILA 视觉语言模型,实现多图像推理、增强型上下文学习,性能超越 LLaVA-1.5

前言 近年来&#xff0c;大型语言模型 (LLM) 的发展取得了显著的成果&#xff0c;并逐渐应用于多模态领域&#xff0c;例如视觉语言模型 (VLM)。VLM 旨在将 LLM 的强大能力扩展到视觉领域&#xff0c;使其能够理解和处理图像和文本信息&#xff0c;并完成诸如视觉问答、图像描…

一看就会的AOP事务

文章目录 AOPAOP简介AOP简介和作用AOP的应用场景为什么要学习AOP AOP入门案例思路分析代码实现AOP中的核心概念 AOP工作流程AOP工作流程AOP核心概念在测试类中验证代理对象 AOP切入点表达式语法格式通配符书写技巧 AOP通知类型AOP通知分类AOP通知详解 AOP案例案例-测量业务层接…

Linux bc命令(bc指令)(基本计算器)(任意精度计算语言:支持浮点数运算、变量赋值和自定义函数等)

文章目录 bc命令文档英文中文 Linux bc 命令详解bc 命令的基本用法启动 bc 环境进行基本计算退出 bc bc 中的数学功能执行高级数学计算平方根和指数函数对数函数 处理精度问题 变量和数组变量赋值和使用数组的使用 创建和使用自定义函数 bc 命令的高级用法在脚本中使用 bc基本脚…

Google I/O 大会 | 精彩看点一览

作者 / 开发者关系和开源总监 Timothy Jordan 2024 年 Google I/O 大会于北京时间 5 月 15 日 1:00am 在加利福尼亚的山景城以 Google 主题演讲直播拉开序幕。随后&#xff0c;在北京时间 4:30am 举行开发者主题演讲。大家可前往回看 "Google 主题演讲" 以及 "开…

AIGC时代已至,你准备好抓住机遇了吗?

一、行业前景 AIGC&#xff0c;即人工智能生成内容&#xff0c;是近年来人工智能领域中发展迅猛的一个分支。随着大数据、云计算、机器学习等技术的不断进步&#xff0c;AIGC已经取得了显著的成果&#xff0c;并且在广告、游戏、自媒体、教育、电商等多个领域实现了广泛应用。…

DolphinScheduler(海豚调度)- docker部署实战

1.官方文档 https://dolphinscheduler.apache.org/zh-cn/docs/3.2.1/guide/start/docker 2.docker环境安装 版本情况&#xff08;这个地方踩了不少坑&#xff09;&#xff1a;docker-26.1.2&#xff0c;docker-compose-v2.11.0。 具体可使用我上传的安装包&#xff0c;一键安…

MT3037 新月轩就餐

思路&#xff1a; 此题每道菜的价钱相同&#xff0c;想最小化付的钱即求最小区间长度可以满足“品尝到所有名厨手艺”。 使用双端队列存储元素&#xff0c;队尾不断向后遍历&#xff1a;头->尾 如果队头队尾&#xff0c;则队头往右移一格&#xff0c;直到区间不同元素数m…

Docker部署MaxKB详细步骤(window系统)

上面章节已经实现了ollama李现部署llama3&#xff0c;并实现了一些简单的问答&#xff0c;但是问答的界面是在命令提示符中&#xff0c;交互很不友好&#xff0c;也不方便局域网其他用户访问&#xff0c;所以这节用docker部署MaxKB实现网页访问llama3&#xff0c;首先电脑上需要…

分布式系统的一致性与共识算法(四)

Etcd与Raft算法 Raft保证读请求Linearizability的方法: 1.Leader把每次读请求作为一条日志记录&#xff0c;以日志复制的形式提交&#xff0c;并应用到状态机后&#xff0c;读取状态机中的数据返回(一次RTT、一次磁盘写)2.使用Leader Lease&#xff0c;保证整个集群只有一个L…

使用Flask-RESTful构建RESTful API

文章目录 安装Flask-RESTful导入模块和类创建一个资源类运行应用测试API总结 Flask是一个轻量级的Python web开发框架&#xff0c;而Flask-RESTful是一个基于Flask的扩展&#xff0c;专门用于构建RESTful API。它提供了一些帮助类和方法&#xff0c;使构建API变得更加简单和高效…

详细分析Vue3中的reactive(附Demo)

目录 1. 基本知识2. 用法3. Demo 1. 基本知识 reactive 是一个函数&#xff0c;用于将一个普通的 JavaScript 对象转换为响应式对象 当对象的属性发生变化时&#xff0c;Vue 会自动追踪这些变化&#xff0c;并触发相应的更新 Vue2没有&#xff0c;而Vue3中有&#xff0c;为啥…

公司邮箱是什么?公司邮箱和个人邮箱有什么不同?

公司邮箱是企业用来收发邮件的专业版电子邮箱&#xff0c;不同于个人邮箱的简单功能和有限的存储空间&#xff0c;公司邮箱的功能更加丰富&#xff0c;能够满足企业的日常办公和协作需求。本文将为您详细讲解公司邮箱和个人邮箱的区别&#xff0c;以供您选择更适合自己的邮箱类…

嵌入式——C51版本Keil环境搭建

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 目标搭建流程下载与安装激活STC环境添加校验是否导入STC环境 目标 ● 了解C51版本Keil开发环境的概念和用途 ● 掌握C51版本Keil环…

2024年NOC大赛创客智慧(西瓜创客)Python复赛编程真题模拟试卷包含答案

NOC复赛python模拟题 1.编写一个程序&#xff0c;提示用户输人一个矩形的长度和宽度&#xff0c;并输出其面积, 2.试计算在区间 1 到 n的所有整数中,数字x(0≤x≤9)共出现了多少次?例如在 1到11 中&#xff0c;即在 1,2,3.45,6.7,8.9,10,11 中&#xff0c;数字 1出现了 4 次.…

鸿蒙生态融合进行时!菊风启动适配HarmonyOS NEXT,赋能原生应用实时

​​今日话题 鸿蒙HarmonyOS NEXT 自华为公开宣布鸿蒙 HarmonyOS NEXT 系统以来&#xff0c;该系统受到了业内广泛关注&#xff0c;和以往鸿蒙系统不同的是该系统底座完全由华为自研&#xff0c;摒弃了 Linux 内核和安卓 AOSP 代码&#xff0c;仅兼容鸿蒙内核及鸿蒙系统的应用…

Leetcode---1.两数之和 (详解加哈希表解释和使用)

文章目录 题目 [两数之和](https://leetcode.cn/problems/two-sum/)方法一&#xff1a;暴力枚举代码方法二&#xff1a;哈希表代码 哈希表哈希表的基本概念哈希函数&#xff08;Hash Function&#xff09;&#xff1a;冲突&#xff08;Collision&#xff09;&#xff1a;链地址…