set命令,在我们看来很简单,set zhangsan lisi,redis给我们返回一个 ok,就完事了。那redis的服务端是怎么处理这条简单的命令的?是不是像我们看起来的这么简单。今天这篇文章就来聊聊这个问题。
在上一篇文章中,我们聊了redis IO多路复用的事件驱动框架。我们大致了解了redis是如何接收连接,如何将客户端的连接行为封装成事件并结合IO多路复用实现了对客户端连接的监听
这一篇,我们聊聊当内核监听到客户端连接事件后,具体是如何处理连接事件的,我们用set命令来举例子。
文章目录
- 1、事件回调函数执行
- 2、处理客户端连接
- 3、处理读事件
- 4、读事件对应命令解析
- 5、获取命令执行函数
- 6、命令执行函数填充
- 7、set命令执行函数
- 8、客户端命令响应
1、事件回调函数执行
上一篇的最后,我们讲到了redis通过epoll_wait函数从内核轮询就绪的事件,获取到事件后,开始执行回调处理函数。
这部分代码在ae.c的aeProcessEvents方法中
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{......//eventLoop->maxfd != -1,这个表达式代表有IO事件发生if (eventLoop->maxfd != -1 ||//(flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)),这个表达式代表有紧急的时间事件发生((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {......//调用多路复用API/* Call the multiplexing API, will return only on timeout or when* some event fires. *///轮询获取就绪的事件numevents = aeApiPoll(eventLoop, tvp);......for (j = 0; j < numevents; j++) {//获取就绪事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];......//如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数if (!invert && fe->mask & mask & AE_READABLE) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}......}}......
}
2、处理客户端连接
接下来,流程就到了执行注册函数,也就是acceptTcpHandler方法,我们看一下acceptTcpHandler方法的实现。主要逻辑是:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {......while(max--) {//创建已连接套接字cfdcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);......acceptCommonHandler(cfd,0,cip);}
}
可以看到,主要逻辑在acceptCommonHandler中
static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;//创建客户端if ((c = createClient(fd)) == NULL) {......}......
}
acceptCommonHandler中,针对就绪的连接事件,首先创建一个客户端client,注意一下这个结构体,后面的命令执行、命令返回都需要用到这个结构体,很重要.
static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;//创建客户端if ((c = createClient(fd)) == NULL) {......}......
}
继续进入createClient方法,可以看到在createClient方法中,又创建了一个读事件,执行函数是readQueryFromClient
client *createClient(int fd) {client *c = zmalloc(sizeof(client));......if (fd != -1) {......//这里给已连接事件注册的事件类型是:AE_READABLE,//这是因为无论客户端发送的请求是读或写操作,对于 server 来说,都是要读取客户端的请求并解析处理if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR){close(fd);zfree(c);return NULL;}}......
}
3、处理读事件
至此,连接事件处理函数的主要逻辑就过完了,其中最主要的两块逻辑,一块是创建客户端,一块是注册一个读事件。读事件注册好了,接下来,就等着客户端发送具体的执行命令。
假设此时,我们在客户端执行了set zhangsan lisi
首先redis客户端先进行处理,使用RESP协议将这条命令发给redis服务端,服务端收到这条命令后,走事件处理的逻辑,之后触发回调函数,我们的这个例子里,回调函数就是readQueryFromClient。
我们看一下readQueryFromClient的逻辑,其实不管是写还是读,对于redis来说都需要先读,之后在命令执行时再区分写还是读。
//处理读事件的函数
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {client *c = (client*) privdata;......//为读缓冲区分配空间c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);//调用read函数读取数据nread = read(fd, c->querybuf+qblen, readlen);......//进一步处理读取内容processInputBufferAndReplicate(c);
}
可以看到,主要就是调用read函数读取客户端数据,然后调用processInputBufferAndReplicate处理函数
我们进入processInputBufferAndReplicate看一下
void processInputBufferAndReplicate(client *c) {//当前客户端不属于主从复制中的主节点if (!(c->flags & CLIENT_MASTER)) {processInputBuffer(c);} else {......if (applied) {//将主节点接收到的命令同步给从节点replicationFeedSlavesFromMasterStream(server.slaves,c->pending_querybuf, applied);sdsrange(c->pending_querybuf,applied,-1);}}
}
4、读事件对应命令解析
我们进入processInputBuffer方法看一下。这个方法的主要逻辑有两部分。
一部分是processMultibulkBuffer函数的执行。
一部分是processCommand函数的执行。
void processInputBuffer(client *c) {server.current_client = c;/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {......//如果命令以"*"开头,说明是RESP协议的请求,RESP协议是redis客户端和服务器端的通信协议if (!c->reqtype) {if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {//不是RESP协议,那就是管道命令.比如:telnet命令c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {//如果不是RESP协议,执行该函数if (processInlineBuffer(c) != C_OK) break;} else if (c->reqtype == PROTO_REQ_MULTIBULK) {//如果是RESP协议,执行该函数if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* Only reset the client when the command was executed. *///调用processCommand执行命令if (processCommand(c) == C_OK) {......}......}}......
}
我看这部分源码的时候,忽略了processMultibulkBuffer,但其实这部分代码很重要。里面的主要逻辑是解析RESP协议的内容,比如我们执行了一条set命令 set zhangsan lisi,此时redis客户端会将这条命令以RESP协议的形式发送到redis的服务端。到了服务端,就要靠processMultibulkBuffer进行解析。如果忽略了这部分代码,后面的逻辑就看不明白。
所以,我们先看processMultibulkBuffer方法.这个方法中,我们主要就看我保留的这部分逻辑,这部分就是在解析RESP协议,比如:set zhangsan lisi,最终会被解析成三个RedisObject结构体,存储到c->argv数组中,argv数组是一个RedisObject数组。后面set命令真正执行时,会从该数组中获取到数据执行。
int processMultibulkBuffer(client *c) {......if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {/* Not enough data (+2 == trailing \r\n) */break;} else {//解析RESP协议,获取到RESP协议中的具体指令,赋值到client->argv变量上,后面lookupCommand函数会用解析到的指令名称查询真正的执行函数,比如:set的执行函数是setCommand......c->argv[c->argc++] =createStringObject(c->querybuf+c->qb_pos,c->bulklen);c->qb_pos += c->bulklen+2;......c->bulklen = -1;c->multibulklen--;}}......
}
processMultibulkBuffer方法执行完,我们就获取到了客户端想要执行的指令以及对应的键值对数据。接下来,我们返回主流程,继续看processInputBuffer的处理逻辑,解析完命令后,后面就开始处理命令。这里面有两段逻辑最重要。
第一段:根据前面解析RESP协议得到的c->argv数组获取到指令名称,比如:set或者get,通过命令的名称从lookupCommand函数中获取到执行命令的函数
5、获取命令执行函数
int processCommand(client *c) {//是否为quit命令if (!strcasecmp(c->argv[0]->ptr,"quit")) {//quit命令直接退出......}......//在全局变量server的commands成员变量中查找相关的命令,存储命令的数据结构是一个hash表c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);....../* Exec the command *///如果客户端有CLIENT_MULTI标记,并且当前命令不是exec、discard、multi和watch命令,将命令入队保存,等待后续一起处理if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){//将命令入队queueMultiCommand(c);addReply(c,shared.queued);} else {//调用call函数执行命令call(c,CMD_CALL_FULL);......}return C_OK;
}
这里岔开话题,说一下lookupCommand函数,这个函数的主要作用就是根据命令名称获取到执行命令的函数。里面的具体实现是从一个server.commands成员变量中获取数据.server.commands是一个hash结构,lookupCommand的出参是redisCommand结构体。
struct redisCommand *lookupCommand(sds name) {return dictFetchValue(server.commands, name);
}
6、命令执行函数填充
server.commands的初始化操作是在main函数的initServerConfig()函数中,先创建hash,然后填充hash
void initServerConfig(void) {......//创建hash结构server.commands = dictCreate(&commandTableDictType,NULL);......//填充hash结构populateCommandTable();......
}
populateCommandTable函数就是将redisCommandTable中的数据放入server.commands哈希结构中
void populateCommandTable(void) {int j;int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);for (j = 0; j < numcommands; j++) {struct redisCommand *c = redisCommandTable+j;......//哈希结构的key是RedisCommand中的name,value是RedisCommand结构体retval1 = dictAdd(server.commands, sdsnew(c->name), c);......}
}
填充hash结构,就是将redisCommandTable数组中的数据填充到hash结构中.
redisCommandTable数组是RedisCommand结构体的集合
struct redisCommand redisCommandTable[] = {//第一个参数是命令的名称//第二个参数是命令的实现函数......{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}......
}
可以看到redisCommandTable中有很多我们熟悉的命令,比如:set、get命令。
7、set命令执行函数
现在server.commands哈希结构体中有了数据,假如我们执行的是set命令,所以通过lookupCommand获取命令执行函数,就是setCommand,接着调用call函数,执行该命令,所以我们进入setCommand函数继续看逻辑。
void setCommand(client *c) {int j;robj *expire = NULL;int unit = UNIT_SECONDS;int flags = OBJ_SET_NO_FLAGS;......for (j = 3; j < c->argc; j++) {char *a = c->argv[j]->ptr;robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];if ((a[0] == 'n' || a[0] == 'N') &&(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&!(flags & OBJ_SET_XX)){flags |= OBJ_SET_NX;}}......setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
首先判断set命令的具体指令,我们知道,除了set命令,还有setNX,setEX。之后就调用setGenericCommand函数执行具体的逻辑
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {......//如果有NX选项,就查找key是否存在,如果key存在,直接返回nullif ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)){addReply(c, abort_reply ? abort_reply : shared.nullbulk);return;}//开始设置键值对setKey(c->db,key,val);......//如果客户端设置了过期时间.这里需要处理过期的逻辑if (expire) setExpire(c,c->db,key,mstime()+milliseconds);notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);//调用addReply函数响应客户端addReply(c, ok_reply ? ok_reply : shared.ok);
}
setGenericCommand函数的逻辑中,比较重要的有两块逻辑:
一块是setKey,存储键值对
一块是addReply,响应客户端。
我们先看setKey的执行逻辑
setKey的入参,有三个,第一个是库名,默认是0库,第二个是set命令的key,第三个是set命令的value值
void setKey(redisDb *db, robj *key, robj *val) {//查找key是否存在if (lookupKeyWrite(db,key) == NULL) {//不存在新增dbAdd(db,key,val);} else {//存在覆盖dbOverwrite(db,key,val);}......
}
进入lookupKeyWrite函数,主要是查找key是否存在,查找key的逻辑是在dictFind函数中,该函数入参有两个,第一个是全局hash函数,第二个是key值
robj *lookupKey(redisDb *db, robj *key, int flags) {//从redis库的全局hash结构中查找dictEntry *de = dictFind(db->dict,key->ptr);if (de) {robj *val = dictGetVal(de);......return val;} else {return NULL;}
}
看到这里,其实我们就知道,redis的set命令也用到了hash结构,用来加速命令的执行。如果未找到该key,就直接返回null.然后就执行键值对的新增命令。
新增命令的具体逻辑,在dictAddRaw函数中。
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{long index;dictEntry *entry;dictht *ht;//全局hash表是否在进行rehashif (dictIsRehashing(d)) _dictRehashStep(d);//key已经存在,直接返回if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)return NULL;如果在进行rehash,使用1表,否则使用0表。这块是一个面试题,rehash的过程中,会用到两个hash表ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];//下面就开始真正的新增操作了//先初始化一个Entry节点entry = zmalloc(sizeof(*entry));//这两行代码是在将新的entry添加到现有链表中。注意ht->table[index],这代表一个元素。//如果index这个位置上有元素A,那新建entry的下一个元素就是A,同时index的位置上的节点变更为新建的entryentry->next = ht->table[index];ht->table[index] = entry;//hash表的节点数增加1ht->used++;//将key值设置到Entry中,这个函数是一个宏,用gdb打断点的话,宏里面直接走的else逻辑,也就是(entry)->key = (_key_);一个简单的赋值操作dictSetKey(d, entry, key);return entry;
}
8、客户端命令响应
至此,我们就跟完了一条完整的set命令执行过程。
set命令之后,然后就是响应客户端的操作,逻辑在addReply方法中,主要就是执行prepareClientToWrite函数
void addReply(client *c, robj *obj) {//执行prepareClientToWrite函数if (prepareClientToWrite(c) != C_OK) return;......
}
prepareClientToWrite函数封装了clientInstallWriteHandler函数,其中主要的逻辑就是将客户端插入到server.clients_pending_write列表中。很显然,这不是真正的响应客户端操作,因为我们知道读取客户端数据,用的是read函数,那响应客户端数据,盲猜也得是write之类的函数。所以,一定有另外的地方处理clients_pending_write列表.
处理的逻辑在server.c文件的beforeSleep函数中
void beforeSleep(struct aeEventLoop *eventLoop) {......//注册写事件处理逻辑handleClientsWithPendingWrites();......
}
handleClientsWithPendingWrites没有入参。可以看到该函数的主要逻辑就是从clients_pending_write链表中获取节点,然后进行处理,这里面会调用write函数
int handleClientsWithPendingWrites(void) {listIter li;listNode *ln;//从clients_pending_write列表中获取数据int processed = listLength(server.clients_pending_write);//获取待写回的客户端列表,这个li是一个迭代器的指针listRewind(server.clients_pending_write,&li);//遍历每一个待写回的客户端while((ln = listNext(&li))) {......//获取到待响应的客户端client *c = listNodeValue(ln);//调用writeToClient将当前客户端的输出缓冲区数据写回,该函数封装了和read方法对应的write方法if (writeToClient(c->fd,c,0) == C_ERR) continue;......}}
beforeSleep函数的触发逻辑是在server.c的main方法中
首先通过aeSetBeforeSleepProc函数将beforeSleep注册到EventLoop结构体中。
int main(int argc, char **argv) {......//redis启动过程中会将server.c文件中的beforeSleep函数注册到事件循环框架结构体的beforesleep变量处aeSetBeforeSleepProc(server.el,beforeSleep);......
}
之后,随着事件循环不断的执行,beforeSleep函数也会不断的执行。
void aeMain(aeEventLoop *eventLoop) {......while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)//执行beforeSleep函数eventLoop->beforesleep(eventLoop);///......不断的调用内核,获取就绪的事件进行处理aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);}
}
以上,就是一个完整的set命令的执行过程。
文章参考了极客时间的<redis源码剖析与实战>,课程中没有我描述的这么详细,算是对课程做了一个补充