一条Redis命令是如何执行的?

一条Redis命令是如何执行的?

  • 源码结构
  • 核心数据结构
    • redisServer
    • redisClient
    • redisDb
    • redisObject
    • aeEventLoop
  • 核心流程
    • redis启动流程
      • main()
    • 主循环aeEventProcess执行过程
    • 命令执行的流程
      • 过程1(redis启动)
      • 过程2(客户端与服务端建立链接)
      • 过程3 (客户端发送命令给服务端)
      • 过程4 (写就绪将结果写回客户端)

Redis(Remote Dictionary Server)是一个开源的 内存数据库,它提供 高性能的键值(key-value)存储系统常用于缓存、消息队列、会话存储等应用场景。本文向大家分享redis基本概念和流程,希望能和大家一起从源码角度分析一条命令执行过程。

源码结构

基于redis1.0源码,有如下与主流程相关的源码文件:

核心数据结构

redisServer

redisServer是存储redis服务端运行的结构体,在启动的时候就会初始化完成。结构如下,它主要包含跟监听的socket有关的参数portfd;对应存储数据的redisDb列表;链接的客户端列表clients;事件循环*el

struct redisServer {int port;       // 服务端监听的端口int fd;         // 服务端起的socket对应的文件句柄redisDb *db;    // redis db的列表,一般实际生产环境只用一个
//  3-lineslist *clients;  // 服务端的列表
// 2 linesaeEventLoop *el; // 事件循环
//  36 lines
};

redisClient

redisClient是客户端在服务端存储的状态信息,每当一个客户端与服务端链接时,都会新创建redisClient结构体到redisServer->clients列表中。

typedef struct redisClient {int fd;          // 客户端发送命令和接收结果的socket文件句柄redisDb *db;     // 对应的db
//  1-linesds querybuf;    // 查询命令存储的缓冲区robj **argv;     // 查询命令转成的命令参数int argc;        // 参数个数
// 1-lineslist *reply;     // 命令执行完的回复的结果,是个列表int sentlen;     // 结果已经发送的长度
// 8-lines
} redisClient;

redisClient包含命令传输所使用的querybuf,命令在经过处理后会存放到argv中;然后比较重要的是*reply表示服务端给到客户端的回复的数据,这是个列表会在客户端写就绪的时候一个一个写回客户端,sentlen则是标识了传输的长度;然后就是对应的db与socket句柄fd。

redisDb

redisDb是redis的键值对存储的位置,主要包含两大块,一块存储数据,另一块存储过期信信息,dict结构实际上是两个哈希表,至于为什么有两个,这里是为了做渐进式rehash使用(后面会详细介绍),rehashidx用于表示rehash进度,iterators迭代器是表示遍历集合操作个数,表里面的元素就是entry,这里面包含key和value以及指向下一个元素的指针。

typedef struct redisDb {dict *dict;    // 字典1 存储数据dict *expires; // 字典2存储过期数据int id;        // db的id
} redisDb;typedef struct dict {dictType *type; // 类型,主要定义相关的函数void *privdata;dictht ht[2];  // 两个hash table,用于做渐进式rehash使用int rehashidx; /* rehash进度 rehashidx == -1表示不是正在rehash*/int iterators; /* number of iterators currently running */
} dict;typedef struct dictht {dictEntry **table;      // 存储数据的表unsigned long size;     // 大小unsigned long sizemask; // size-1,计算index的使用[1]unsigned long used;     // 已经使用的长度 
} dictht;typedef struct dictEntry {void *key;             // 键,在redis中一般是指向一个SDS类型的数据union {void *val;         // 值,在redis中一般指向redisObjectuint64_t u64;      // 特定情况下优化整数存储,比如过期     int64_t s64;       // 特定情况下优化整数存储} v;struct dictEntry *next; // 下一个entry
} dictEntry;

redisObject

redisObject是redis存储对象基本的表现形式,它可以存储类似SDS list set等数据结构,并且存储了一些信息用于内存管理,比如refcount这是一个整数字段,用于存储对象的引用计数。每当有一个新的指针指向这个对象时,引用计数会增加;当指针不再指向这个对象时,引用计数会减少。当引用计数降到 0 时,表示没有任何地方再使用这个对象,对象的内存可以被回收。lru在储对象的 LRU(最近最少使用)时间,这个时间戳是相对于服务器的 lruclock 的,用于实现缓存淘汰策略。当 Redis 需要释放内存时,它会根据这个时间戳来判断哪些对象是最近最少被使用的,从而决定淘汰哪些对象。

typedef struct redisObject {void *ptr;              // 指向具体数据的指针int refcount;           // 引用计数unsigned type:4;        // 类型unsigned notused:2;     // 未使用,可能是为了扩展/占位unsigned encoding:4;    // 编码方式 unsigned lru:22;        // 最近最少使用
} robj;

aeEventLoop

aeEventloop是redis事件模型基础数据,它主要包含文件事件和时间事件的两个链表。对于文件事件来说,包含文件句柄fd,事件类型mask,对应处理函数fileProc;对于时间事件来说包含id、执行时间(when_sec、when_ms)和对应执行函数timeProc 对应的源代码如下:

/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); // 文件事件回调函数
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); // 时间事件回调函数
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); // 事件结束时执行的函数/* File event structure */
typedef struct aeFileEvent {int fd; // 事件对应的文件句柄IDint mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */aeFileProc *fileProc; // 文件事件回调函数aeEventFinalizerProc *finalizerProc; // 事件结束时执行的函数void *clientData; // 对应客户端的扩展数据struct aeFileEvent *next; // 下一个文件事件(链表存储)
} aeFileEvent;/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */long when_sec; /* seconds */long when_ms; /* milliseconds */aeTimeProc *timeProc; // 事件事件回调函数aeEventFinalizerProc *finalizerProc; // 事件结束时执行的函数void *clientData; // 事件结束时执行的函数struct aeTimeEvent *next;// 下一个事件(链表存储)
} aeTimeEvent;/* State of an event based program */
typedef struct aeEventLoop {long long timeEventNextId;aeFileEvent *fileEventHead;aeTimeEvent *timeEventHead;int stop;
} aeEventLoop;

核心流程

redis启动流程

在这里插入图片描述

main()

看源码,一般从main函数看起,redis启动的main函数位于redis.c中,可以看到启动时,首先初始化了配置initServerConfig(),然后初始化了server端服务initServer(),接下来注册处理函数为acceptHandler的文件事件,然后启动了redis的主循环开始处理事件了。

int main(int argc, char **argv) {initServerConfig(); // 初始化配置
// 9-lines 从文件中读取配置initServer(); // 初始化服务if (server.daemonize) daemonize(); // TODOredisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
// 5-lines,内存检查,加载rdbif (aeCreateFileEvent(server.el, server.fd, AE_READABLE,acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); // 创建一个读的文件事件,处理者是acceptHandlerredisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);aeMain(server.el); // 启动redis主循环aeDeleteEventLoop(server.el); // 主循环退出清理资源return 0;
}

然后我们看看initServer做了什么初始化,鉴于本文是阐述基本原理,因此注释掉了非主链路上的代码,可以看到它初始化了客户端列表、事件循环、db、创建了时间事件,将这几个核心的组件初始化了

static void initServer() {
// 5-lines ...server.clients = listCreate(); // 初始化客户端列表
// 4-linesserver.el = aeCreateEventLoop(); // 初始化事件循环server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 为db分配内存
// 3-lines ...server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); // 初始化服务端的sever socket
// 4-lines ...for (j = 0; j < server.dbnum; j++) { // 初始化数据存储server.db[j].dict = dictCreate(&hashDictType,NULL);server.db[j].expires = dictCreate(&setDictType,NULL);server.db[j].id = j;}
// 8-lines ...aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);// 初始化时间事件
}

主循环aeEventProcess执行过程

redis在main函数中调用aeMain函数,aeMain函数则不停的循环调用aeEventProcess处理事件,redis是事件驱动的程序,主要包含文件事件和时间事件,在aeProcessEvents中处理这些事件。

void aeMain(aeEventLoop *eventLoop)
{eventLoop->stop = 0;while (!eventLoop->stop)aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
  1. 将文件事件装到不同的集合(可读、可写、异常)中
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 9-line 变量初始化和前置判断FD_ZERO(&rfds); // 清空fd集合FD_ZERO(&wfds); // 清空fd集合FD_ZERO(&efds); // 清空fd集合// 检查文件事件,将它们分别放到对应的集合中if (flags & AE_FILE_EVENTS) {while (fe != NULL) {if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);if (maxfd < fe->fd) maxfd = fe->fd;numfd++;fe = fe->next;}}// ...
}
  1. 计算超时时间
    在调用select()函数的时候,在监听的fd没有就绪时,会阻塞住;这里还需要处理时间事件,因此我们需要给select()设置一个超时时间,以防阻塞的时候错过了执行时间事件。超时时间计算通过找到最近的一条时间事件的执行时间计算得到
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 42-lines ...接上文if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int retval;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop); // 遍历timeEvent的链表,拿到最近的执行时间事件if (shortest) {long now_sec, now_ms;// 计算时间差aeGetTime(&now_sec, &now_ms); // 拿到当前时间tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;// 计算秒差if (shortest->when_ms < now_ms) { // 比较毫秒位tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; // 计算微秒差tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}} else { //在某些情况下,事件循环需要立即返回,而不是等待事件的发生。这通常发生在非阻塞模式下,即使没有事件发生,事件循环也不应该阻塞等待。AE_DONT_WAIT 是一个标志,用于指示事件循环在这种非阻塞模式下运行。所以此处就进尽快返回ASAPif (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {// 设置为NULL,永久阻塞等待就绪tvp = NULL; /* wait forever */}}
}
  1. 执行文件事件
    拿到超时时间后就开始执行事件了,首先调用select(),传入事件集合(&rfds, &wfds, &efds),拿到就绪文件事件的个数,然后开始挨个检查就绪的文件事件执行,值的注意的是在redis1.0中调用的是select()系统调用,在后续的redis版本中调用的是epoll()相关函数。
retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);if (retval > 0) {fe = eventLoop->fileEventHead;while(fe != NULL) {int fd = (int) fe->fd;// 检查fd是不是在集合里面if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||(fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||(fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))){// 求maskint mask = 0;if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))mask |= AE_READABLE;if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))mask |= AE_WRITABLE;if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))mask |= AE_EXCEPTION;                    fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); // 执行文件事件中的回调函数processed++; // 处理完成+1fe = eventLoop->fileEventHead; // 在事件循环中处理完一个事件后,文件事件列表可能会发生变化。这种变化可能是因为在处理事件的过程中,某些操作(如关闭文件描述符、修改事件订阅等)导致了文件事件列表的更新。                    FD_CLR(fd, &rfds); // 清除执行完毕的fdFD_CLR(fd, &wfds); // 清除执行完毕的fdFD_CLR(fd, &efds); // 清除执行完毕的fd} else {fe = fe->next; // 处理下一个}}}
  1. 执行时间事件
    时间事件的执行就相对简单一些,主要逻辑就是比较事件执行时间是否比当前时间大了,到达执行时间便执行;另外一个点是看这个事件是一次性的还是周期性,一次性的事件要删掉;另外下一次执行的时间点是回调函数返回的,然后写到事件的结构体中
if (flags & AE_TIME_EVENTS) { // 需要处理时间事件te = eventLoop->timeEventHead; // 取到第一个事件maxId = eventLoop->timeEventNextId-1; // 记录最大的IDwhile(te) {long now_sec, now_ms;long long id;if (te->id > maxId) {// 如果ID>maxID则认为这个事件是新加的不在此次循环处理,否则可能出现无限循环的情况te = te->next;continue;}aeGetTime(&now_sec, &now_ms); //拿到当前时间if (now_sec > te->when_sec || //比较秒(now_sec == te->when_sec && now_ms >= te->when_ms)) // 比较毫秒{int retval;id = te->id;retval = te->timeProc(eventLoop, id, te->clientData); //执行时间事件的回调函数if (retval != AE_NOMORE) {// 如果是不是一次性的aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); //添加下一次时间事件} else {aeDeleteTimeEvent(eventLoop, id); //如果是一次性的删除这个时间}te = eventLoop->timeEventHead; // 执行完一次时间事件后,列表可能发生变化,下一次需要从头开始处理} else {te = te->next;}}

命令执行的流程

了解完redis整体事件驱动的运行架构后,redis的一条命令执行经过四个过程:redis启动、客户端前来连接、客户端发送命令到服务端、服务端回复结果给客户端。 下面让我们详细看看:

过程1(redis启动)

上一章中,redis在启动的时候会通过anetTcpSever创建一个socket server,再调用aeCreateFileEvent注册一个readable事件,回调函数为acceptHander,对应文件的句柄就是server的fd。

int main(int argc, char **argv) {initServerConfig(); // 初始化配置
// 8-lines 从文件中读取配置initServer(); // 初始化服务if (server.daemonize) daemonize(); // TODOredisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
// 5-lines,内存检查,加载rdbif (aeCreateFileEvent(server.el, server.fd, AE_READABLE,acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); // 创建一个读的文件事件,处理者是acceptHandlerredisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);aeMain(server.el); // 启动redis主循环aeDeleteEventLoop(server.el); // 循环推出清理资源return 0;
}

过程2(客户端与服务端建立链接)

第一个事件中处理函数是acceptHander,顾名思义就是接收客户端了链接并进行进一步处理,首先执行了anetAccept()函数,拿到了客户端和服务端交互的文件句柄fd,接下来执行createClient()函数,创建客户端实例

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// 6-lines ...cfd = anetAccept(server.neterr, fd, cip, &cport); // 核心是执行了accept,建立了连接拿到了与client交互的cfdif (cfd == AE_ERR) {redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);return;}redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);if ((c = createClient(cfd)) == NULL) { // 初始化客户端redisLog(REDIS_WARNING,"Error allocating resoures for the client");close(cfd); /* May be already closed, just ingore errors */return;}// 如果当前客户端数量超过了 `maxclients` 的设置,服务器会接受新的连接,发送错误消息,然后关闭连接。if (server.maxclients && listLength(server.clients) > server.maxclients) {char *err = "-ERR max number of clients reached\r\n";/* That's a best effort error message, don't check write errors */(void) write(c->fd,err,strlen(err));freeClient(c);return;}server.stat_numconnections++;
}

在createClient()中,初始化了redisClient的一些参数,最重要的是注册了一个文件事件,对应的执行函数是readQueryFromClient

static redisClient *createClient(int fd) {redisClient *c = zmalloc(sizeof(*c));
// 15-lines...listSetFreeMethod(c->reply,decrRefCount);  //设置了链表 c->reply 的释放方法为 decrRefCount 函数listSetDupMethod(c->reply,dupClientReplyValue);// 设置了链表 c->reply 的复制方法为 dupClientReplyValue 函数if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,readQueryFromClient, c, NULL) == AE_ERR) { // 创建了一个文件事件,执行函数为readQueryFromClientfreeClient(c);return NULL;}if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");return c;
}

过程3 (客户端发送命令给服务端)

接上文 当客户端发送命令到服务端时,数据到达服务端经过网卡、协议栈等一系列操作后,达到可读状态后,就会执行readQueryFromClient(),处理客户端传过来的命令,首先会执行read()方法从缓冲区中读取一块数据,将其追加到c->querybuf后面,根据redis协议进行querybuf的解析,并将其转换成sds的redisObject,存储到argv中,然后执行processCommand()处理命令,注意这里只是展示主流程的代码和说明,这里为了保证客户端输入能在各种情况下都work做了比较多的校验和错误处理;另外redis客户端和服务端交互的协议有两种一种是inline的、另外一种是bulk的,在querybuf转换成argv时,根据协议不同(bulklen==-1),走的也是不同的解析逻辑。

static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = (redisClient*) privdata;char buf[REDIS_IOBUF_LEN];int nread;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);nread = read(fd, buf, REDIS_IOBUF_LEN);
// 14-lines ... 读取buf检验处理if (nread) {c->querybuf = sdscatlen(c->querybuf, buf, nread); // 将buf追加到querybuf后面c->lastinteraction = time(NULL); // 更新最后一次交互时间} else {return;}again:if (c->bulklen == -1) { // inline协议/* Read the first line of the query */char *p = strchr(c->querybuf,'\n');size_t querylen;if (p) {
// 28-lines... 处理读取的buf//   这里将输入的参数存到argv中,argc++for (j = 0; j < argc; j++) {if (sdslen(argv[j])) {c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);c->argc++;} else {sdsfree(argv[j]);}}zfree(argv);// 执行processCommand处理命令if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;return;} else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {redisLog(REDIS_DEBUG, "Client protocol error");freeClient(c);return;}}//  15 lines ... bulk命令处理
}

接下来我们继续看看重头戏processCommand的处理过程,首先执行lookupCommand,从cmdTable中遍历找到符合要求的命令,然后进行一些认证和数据合法性校验后,执行cmd的proc函数执行命令,执行完毕后,清理命令执行的过程数据。

static int processCommand(redisClient *c) {
// 11 lines...cmd = lookupCommand(c->argv[0]->ptr); // 从表里面查找命令
// 46 lines ... cmd、argv、argc、内存、认证等校验dirty = server.dirty;cmd->proc(c); // 执行命令
// 4 lines.. 变更通知给连接的从服务器(slaves)和监控客户端(monitors)if (c->flags & REDIS_CLOSE) { freeClient(c);return 0;}resetClient(c); // 清理client命令相关字段return 1;
}static struct redisCommand cmdTable[] = {{"get",getCommand,2,REDIS_CMD_INLINE},{"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},{"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},{"del",delCommand,-2,REDIS_CMD_INLINE},{"exists",existsCommand,2,REDIS_CMD_INLINE},{"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}
}

让我们以get命令为例看看 getCommand()做了什么事,首先从DB里面去查找这个key,然后调用addReply,将结果回复加到回复队列中去,可以看到它回复了协议头、数据、协议尾三段数据。

static void getCommand(redisClient *c) {robj *o = lookupKeyRead(c->db,c->argv[1]);if (o == NULL) {addReply(c,shared.nullbulk);} else {if (o->type != REDIS_STRING) {addReply(c,shared.wrongtypeerr);} else {addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));addReply(c,o);addReply(c,shared.crlf);}}
}

让我们看看lookupKeyRead 做了什么,最终执行的是dict的方法dictFind,这个函数首先根据key算出在table中的位置,然后开始遍历entry链表,通过dictCompareHashKeys方法比较key是不是相等最终找到这个key取出返回。

static robj *lookupKeyRead(redisDb *db, robj *key) {expireIfNeeded(db,key); // 检查过期return lookupKey(db,key);
} static robj *lookupKey(redisDb *db, robj *key) {dictEntry *de = dictFind(db->dict,key); //找到keyreturn de ? dictGetEntryVal(de) : NULL; //找到key对应的value
}
dictEntry *dictFind(dict *ht, const void *key)
{dictEntry *he;unsigned int h;if (ht->size == 0) return NULL;h = dictHashKey(ht, key) & ht->sizemask; // 计算在哈希表的位置he = ht->table[h];while(he) {if (dictCompareHashKeys(ht, key, he->key)) // 比较entry和key是否相等return he;he = he->next;}return NULL;
}

具体是怎么回复结果的呢,addReply函数通过调用aeCreateFileEvent 创建了写入类型的文件事件,然后就是尾插法将要回复的obj添加到c->reply的尾部,等待fd写就绪时执行事件

static void addReply(redisClient *c, robj *obj) {if (listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||c->replstate == REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c, NULL) == AE_ERR) return; //创建了文件事件if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");incrRefCount(obj); // 引用+1
}

过程4 (写就绪将结果写回客户端)

当 socket 的发送缓冲区有足够空间,并且网络状态允许数据发送时,socket 变为写就绪状态时,这时候就会aeEventLoop->fileEvents中取出就绪的reply事件,执行sendReplyToClient()函数,这个函数会遍历c->reply列表,按照顺序一个一个通过调用write()方法写回给客户端,值的注意的是,Redis 限制了单次事件循环中可以写入的最大字节数(REDIS_MAX_WRITE_PER_EVENT),防止一个客户端占用所有的服务器资源,特别是当该客户端连接速度非常快(例如通过本地回环接口 loopback interface)并且发送了一个大请求(如 KEYS * 命令),如果c->reply全写完了,就干掉这个写入事件

static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = privdata;int nwritten = 0, totwritten = 0, objlen;robj *o;
// 4 lines ...while(listLength(c->reply)) {o = listNodeValue(listFirst(c->reply)); // 取出第一个objlen = sdslen(o->ptr); // 拿到长度if (objlen == 0) { // 长度为零删掉listDelNode(c->reply,listFirst(c->reply));continue;}if (c->flags & REDIS_MASTER) {/* Don't reply to a master */nwritten = objlen - c->sentlen;} else {// 从上一次发送的最后的位置(c—>sentlen),发送剩余长度的数据(objlen - c->sentlen)nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);if (nwritten <= 0) break;}c->sentlen += nwritten; // 更新已经发送的长度totwritten += nwritten; // 更新本次事件已经发送的长度// 如果已经发送的长度==要发送的对象长度,这个对象就发送完了,删掉if (c->sentlen == objlen) {listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;}// 对单个客户端单个事件发送的长度进行限制,因为redis时单线程,防止一个客户端有// 大量返回数据时,会阻塞主循环处理,导致无法服务其他客户端if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;}
// 9-lines... if (totwritten > 0) c->lastinteraction = time(NULL); // 更新最后交互时间/** 当Redis在事件循环中处理客户端连接的数据发送时,它会逐个取出回复列表中的数据进行发送。* 每发送完一个数据,就从列表中删除该数据。* 因此,如果回复列表的长度为0,说明所有的回复数据都已经发送完毕。*/if (listLength(c->reply) == 0) {c->sentlen = 0;aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/33327.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Android】Android Studio 使用Kotlin写代码时代码提示残缺问题解决

问题描述 Android Studio升级之后&#xff0c;从Android Studio 4.2升级到Android Studio Arctic Fox版本&#xff0c;因为项目比较老&#xff0c;使用的Gradle 版本是3.1.3&#xff0c;这个版本的Android Studio最低支持Gradle 3.1版本&#xff0c;应该算是比较合适的版本。 …

不翻墙安装yolov8环境下的RT-DETR并实现PCB表面缺陷检测

目录 一、新建conda环境二、安装yolov8环境1.克隆安装包2.安装依赖包3.测试模型 任务2&#xff1a;基于RT-DETR实现PKU-PCB表面缺陷检测数据准备 数据增强测试 总结 一、新建conda环境 创建并激活conda环境&#xff1a; 在conda创建一个名为yolov8的新环境&#xff0c;并在其中…

国际网络专线的开通流程

1. 选择服务商&#xff1a;首先&#xff0c;您需要选择一个可靠的服务商来提供国际网络专线服务。确保服务商具有良好的声誉和专业知识&#xff0c;以便为您提供高质量的网络连接和支持。 2. 评估需求&#xff1a;在与服务商沟通之前&#xff0c;您需要明确自己的网络需求。这…

dp经典问题:LCS问题

dp&#xff1a;LCS问题 最长公共子序列&#xff08;Longest Common Subsequence, LCS&#xff09;问题 是寻找两个字符串中最长的子序列&#xff0c;使得这个子序列在两个字符串中出现的相对顺序保持一致&#xff0c;但不要求连续。 力扣原题链接 1.定义 给定两个字符串 S1…

猫狗识别—视频识别

猫狗识别—视频识别 1. 导入所需的库&#xff1a;2. 创建Tkinter主窗口并设置标题&#xff1a;3. 设置窗口的宽度和高度&#xff1a;4. 创建一个Canvas&#xff0c;它将用于显示视频帧&#xff1a;5. 初始化一个视频流变量cap&#xff0c;用于存储OpenCV的视频捕获对象&#xf…

【速速收藏】适用于Linux系统的五个优秀PDF编辑器

PDF (Portable Document Format) 是便携文档格式的缩写&#xff0c;这是一种用于电子共享文档的标准格式&#xff0c;广泛应用于各种文档类型的存储和分发。然而&#xff0c;有时我们可能需要对PDF文档进行更改和编辑。本文将介绍五款在Linux平台上广受欢迎的PDF编辑器。 ​​…

陀螺仪LSM6DSV16X与AI集成(8)----MotionFX库解析空间坐标

陀螺仪LSM6DSV16X与AI集成.8--MotionFX库解析空间坐标 概述视频教学样品申请源码下载开启CRC串口设置开启X-CUBE-MEMS1设置加速度和角速度量程速率选择设置FIFO速率设置FIFO时间戳批处理速率配置过滤链初始化定义MotionFX文件卡尔曼滤波算法主程序执行流程lsm6dsv16x_motion_fx…

【分布式事务】Seata AT实战

目录 Seata 介绍 Seata 术语 Seata AT 模式 介绍 实战&#xff08;nacos注册中心&#xff0c;db存储&#xff09; 部署 Seata 实现 RM 实现 TM 可能遇到的问题 1. Seata 部署成功&#xff0c;服务启动成功&#xff0c;全局事务不生效 2. 服务启动报错 can not get …

[java]集合类stream的相关操作

1.对list中的map进行分组 下面例子中&#xff0c;根据高度height属性进行分组 List<Map<String, Float>>originalList new ArrayList<>();originalList.add(new HashMap<String,Float>() {{put("lng", 180.0f);put("lat",90f);…

C++使用Poco库封装一个FTP客户端类

0x00 Poco库中 Poco::Net::FTPClientSession Poco库中FTP客户端类是 Poco::Net::FTPClientSession , 该类的接口比较简单。 上传文件接口&#xff1a; beginUpload() , endUpload() 下载文件接口&#xff1a; beginDownload() , endDownload() 0x01 FTPCli类说明 FTPCli类…

CSS规则——font-face

font-face 什么是font-face&#xff1f; 想要让网页文字千变万化&#xff0c;仅靠font-family还不够&#xff0c;还要借助font-face&#xff08;是一个 CSS 规则&#xff0c;它允许你在网页上使用自定义字体&#xff0c;而不仅仅是用户系统中预装的字体。这意味着你可以通过提…

jemeter基本使用

后端关验签&#xff0c;设置请求头编码和token 配置编码和token

Linux安装minio及mc客户端(包含ARM处理器架构)

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

练习实践:ubuntu18.04安装、配置Nginx+PHP环境,两种配置方式,多站点

参考来源&#xff1a; https://help.aliyun.com/document_detail/464753.html https://www.cnblogs.com/laosan007/p/12803287.html https://blog.csdn.net/qq_55364077/article/details/132207083 【安装同版本7.2的php】 需要知道对应php和nginx的安装版本 需要安装php-fpm…

<sa8650>QCX Usecase 使用详解—如何在 QCX 框架中添加新的自定义Usecase/Pipeline

<sa8650>QCX Usecase 使用详解—如何在 QCX 框架中添加新的自定义Usecase/Pipeline 一、前言二、为 Usecase/Pipeline 创建新文件夹三、Create Usecase XML四、为 camxAutoo_Test 管道创建拓扑五、添加Usecase/Pipeline编译六、 使用 Qcarcam_Test 应用程序运行Usecase一、前…

【总线】AXI4第五课时:信号描述

大家好,欢迎来到今天的总线学习时间!如果你对电子设计、特别是FPGA和SoC设计感兴趣&#xff0c;那你绝对不能错过我们今天的主角——AXI4总线。作为ARM公司AMBA总线家族中的佼佼者&#xff0c;AXI4以其高性能和高度可扩展性&#xff0c;成为了现代电子系统中不可或缺的通信桥梁…

【高性能计算笔记】

第1章 - 高性能计算介绍 1. 概念&#xff1a; 高性能计算(High performance computing&#xff0c;缩写HPC)&#xff1a; 指通常使用很多处理器&#xff08;作为单个机器的一部分&#xff09;或者某一集群中组织的几台计算机&#xff08;作为单个计算资源操作&#xff09;的…

零门槛用AI,302.AI让人工智能变得简单易用

当下人工智能火爆&#xff0c;提到AI&#xff0c;几乎每个人都能说上几句&#xff0c;但是你真的会使用AI吗&#xff1f; 当涉及到如何实际使用AI时&#xff0c;许多人可能会觉得它太过高深莫测&#xff0c;从而产生一种距离感&#xff0c;不知如何开始。我和大家也一样&#x…

Android性能优化-内存优化

&#xff11;、为什么进行内存优化&#xff08;如果不进行内存优化&#xff09; APP运营内存限制&#xff0c;OOM导致APP崩溃 APP性能&#xff0c;流畅性&#xff0c;响应速度和体验 2、Android内存管理方式: Android系统内存分配与回收方式 APP内存限制机制 切换应用时&…

AGV选型要点及步骤,保证企业选择的AGV小车更实用

AGV AGV小车作为智能化物流仓储不可或缺的工具&#xff0c;在制造业得到了广泛的应用&#xff0c;市场需求呈现出井喷式增长。但是AGV市场还存在着很多问题&#xff0c;制造企业在产品选型时往往缺乏正确的引导。 AGV智能仓储 毫无疑问,我们的自动化物流系统已离不开AGV小车了,…