【redis导读】redis作为一款高性能的内存数据库,面试服务端开发,redis是绕不开的话题,如果想提升自己的网络编程的水平和技巧,redis这款优秀的开源软件是很值得大家去分析和研究的。
笔者从大学毕业一直有分析redis源码的想法,但由于各种原因,一直没有付诸行动,今天抽空把redis4.0的源码做了一次深层次的剖析,redis作为一款高效的、支持高并发的内存型数据库,相信很多同学认为redis采用了非常复杂的网络通信架构,但实则不然!redis之所以性能高,redis4.0采用了单线程的模式(redis6.0不再是单线程模式),有效地避免了线程切换和同步所带的性能开销;redis键值对全部存储在内存中,redis自实现了一套高效的内存管理机制,数据的存取都是直接访问内存,无需进行磁盘IO访问。
1、前期准备工作
centos的终端上运行:
wget http://download.redis.io/releases/redis-4.0.11.tar.gz
tar -zxvf redis-4.0.11.tar.gz
cd redis-4.0.11
make -j 5
编译redis源码:
gdb调试redis-server:
gdb redis-server
r
在redis编译目录下,再启一个终端,运行如下指令,把redis-client运行起来:
gdb redis-cli
r
set hello redis
这样就完成了redis的前期准备工作,可以高效地往redis-server中更新键值对,好那接下来看看redis-server关于服务端源码的剖析。
2、调试源码
redis-server也是作为一个独立的进程,既然是独立的进程,那么程序肯定有入口点,也即是main函数入口,全局搜索了下redis的源码,可以看到server.c中有main函数有入口。
int main(int argc, char **argv) {
......
//初始化服务端
initServer();
//设置一些回调函数
aeSetBeforeSleepProc(server.el, beforeSleep);
aeSetAfterSleepProc(server.el, afterSleep);
//aeMain开启事件循环
aeMain(server.el);
......
aeDeleteEventLoop(server.el);
return 0;
}
以上是server.c中main函数的主要执行流,只有一个主线程,初始化服务,设置回调,开始事件循环。那逐步开始拆解,先看看initServer()的执行流。
备注:initServer()接口中很多细节值得大家去学习,也是编写服务端程序容易被遗漏的细节。
/* Global vars */
struct redisServer server; /* Server global state */void setupSignalHandlers(void) {struct sigaction act;/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.* Otherwise, sa_handler is used. */sigemptyset(&act.sa_mask);act.sa_flags = 0;act.sa_handler = sigShutdownHandler;sigaction(SIGTERM, &act, NULL);sigaction(SIGINT, &act, NULL);......return;
}void initServer(void)
{int j;/*忽略SIGHUP、SIGPIPE信号,否则这两个信号容易把redis进程给挂掉*/signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);//设置指定信号处理函数。setupSignalHandlers();....../*全局redisServer对象,生命周期和整个进程保持一致redisServer对象保存了事件循环、客户端队列等成员变量*/server.pid = getpid();server.current_client = NULL;server.clients = listCreate();server.clients_to_close = listCreate();server.slaves = listCreate();server.monitors = listCreate();//clients_pending_write表示已连接客户端,但未注册写事件的队列server.clients_pending_write = listCreate();server.slaveseldb = -1; server.unblocked_clients = listCreate();server.ready_keys = listCreate();//还未给回复的客户端队列server.clients_waiting_acks = listCreate();server.get_ack_from_slaves = 0;server.clients_paused = 0;server.system_memory_size = zmalloc_get_memory_size();createSharedObjects();adjustOpenFilesLimit();/*根据配置的参数,给主evetLoop的各成员队列初始化指定大小的空间比如: 读、写回调函数的aeFileEvent队列typedef struct aeFileEvent {int mask;//可读、可写、异常aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;}aeFileEvent;*///全局就一个redisServer,一个redisServer对应一个eventLoopserver.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);if (server.el == NULL) {serverLog(LL_WARNING,"Failed creating the event loop. Error message: '%s'",strerror(errno));exit(1);}server.db = zmalloc(sizeof(redisDb) * server.dbnum);//开启监听if (server.port != 0 &&listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)exit(1);if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog);if (server.sofd == ANET_ERR) {serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);exit(1);}//将socket设置成非阻塞的anetNonBlock(NULL,server.sofd);}......//创建Redis定时器,用于执行定时任务if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");exit(1);}/*1、为redisServer监听套接字设置连接建立成功回调函数acceptTcpHandler,只关注可读事件,监听套接字产生可读事件,说明连接建立成功。2、将监听socket绑定到IO复用模型上面去*/for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler, NULL) == AE_ERR){serverPanic("Unrecoverable error creating server.ipfd file event.");}}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd, AE_READABLE,acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");/* 创建一个管道,用于主动唤醒被epoll_wait挂起的eventLoop */if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,moduleBlockedClientPipeReadable, NULL) == AE_ERR) {serverPanic("Error registering the readable event for the module ""blocked clients subsystem.");}......
}
基于上述的主流程,我们进一步剖析,如何将监听socket绑定到IO多路复用模型上?进一步剖析aeCreateFileEvent接口。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}static int aeApiAddEvent(aeEventLoop *eventLoop,int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0};int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE)ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;//从这里看redis使用epoll模型,将fd绑定到epfd上if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}
假设epoll模型检测到监听套接字有可读事件产生,那主Loop的势必从epoll_wait接口返回,再根据事件类型,转调我们提前设置的回调函数acceptTcpHandler中来。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {//调用accept接口,生成一个客户端套接字cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);//acceptCommonHandler(cfd,0,cip);}
}#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;//创建cif ((c = createClient(fd)) == NULL) {serverLog(LL_WARNING,"Error registering fd event for the new client: %s (fd=%d)",strerror(errno),fd);close(fd); /* May be already closed, just ignore errors */return;}......
}//以客户端套接字创建一个client对象
client *createClient(int fd) {client *c = zmalloc(sizeof(client));if (fd != -1) {//将客户端套接字设置成非阻塞的anetNonBlock(NULL,fd);//关闭nagel算法anetEnableTcpNoDelay(NULL,fd);//设置TCP链接保活机制if (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);/*将客户端套接字绑定到epfd上,同时设置可读事件回调函数readQueryFromClient*/ if (aeCreateFileEvent(server.el, fd, AE_READABLE,readQueryFromClient, c) == AE_ERR){close(fd);zfree(c);return NULL;}}......
}
那接着看客户端套接字产生了可读事件,进而主Loop循环会执行到和当前客户端套接字相关的回调函数中来,一起看下readQueryFromClient的源码。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
{client *c = (client*)privdata;int nread, readlen;size_t qblen;UNUSED(el);UNUSED(mask);readlen = PROTO_IOBUF_LEN;if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG){ssize_t remaining = (size_t)(c->bulklen+2) - sdslen(c->querybuf);if (remaining < readlen)readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = read(fd, c->querybuf + qblen, readlen);if (nread == -1) {if (errno == EAGAIN) {//说明当前接收缓冲区不够,没法读到最新的数据return;} else {//那说明真的出错了serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {serverLog(LL_VERBOSE, "Client closed connection");freeClient(c);return;} else if (c->flags & CLIENT_MASTER){c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf + qblen, nread);}......if (!(c->flags & CLIENT_MASTER)) {processInputBuffer(c);} else {size_t prev_offset = c->reploff;processInputBuffer(c);size_t applied = c->reploff - prev_offset;if (applied) {replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);sdsrange(c->pending_querybuf, applied, -1);}}
}
processInputBuffer 判断接收到的字符串是不是以星号( * )开头,如果以*开头,设置 client 对象的 reqtype 字段值为 PROTO_REQ_MULTIBULK ,接着调用 processMultibulkBuffer 函数继续处理剩余的字符串。处理后的字符串被解析成 redis 命令,如果是具体的命令,那么redis会按照指定的规则去执行。
既然提到指令command,那么processInputBuffer 接口中肯定有和指令command处理相关的接口。
int processCommand(client *c) {//如果是quit指令,那么给客户端回应一个ok的应答replayif (!strcasecmp(c->argv[0]->ptr,"quit")) {addReply(c,shared.ok);c->flags |= CLIENT_CLOSE_AFTER_REPLY;return C_ERR;}//查找指令,执行对应的指令,出错了,给客户端回应一个错误信息c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {flagTransaction(c);sds args = sdsempty();int i;for (i=1; i < c->argc && sdslen(args) < 128; i++)args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",(char*)c->argv[0]->ptr, args);sdsfree(args);return C_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return C_OK;}......
}
那继续看看addReply接口:
void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;if (sdsEncodedObject(obj)) {if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)_addReplyObjectToList(c,obj);} else if (obj->encoding == OBJ_ENCODING_INT) {......} else {serverPanic("Wrong obj->encoding in addReply()");}
}
继续看prepareClientToWrite接口:
int prepareClientToWrite(client *c) {if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;if ((c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;if (c->fd <= 0) return C_ERR; if (!clientHasPendingReplies(c) &&!(c->flags & CLIENT_PENDING_WRITE) &&(c->replstate == REPL_STATE_NONE ||(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))){/*如果当前client没有CLIENT_PENDING_WRITE标记而且没有暂存的数据要发送,那么给它设置个CLIENT_PENDING_WRITE同时将当前client添加到redisServer的clients_pending_write链表中去*/ c->flags |= CLIENT_PENDING_WRITE;listAddNodeHead(server.clients_pending_write, c);}return C_OK;
}
还有接口_addReplyToBuffer:
/*最重要的一步,将客户端请求command执行的结果添加到cliet对应的buf缓冲区中去。
*/
int _addReplyToBuffer(client *c, const char *s, size_t len)
{size_t available = sizeof(c->buf) - c->bufpos;if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;/*如果client对应的replay链表长度大于0,那么将该应答指令添加到replay链表中去*/ if (listLength(c->reply) > 0) return C_ERR;if (len > available) return C_ERR;memcpy(c->buf + c->bufpos, s, len);c->bufpos += len;return C_OK;
}//_addReplyToBuffer返回C_ERR,那将replay添加到replay链表
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)_addReplyObjectToList(c, obj);
redis4.0最核心的部分就是这个主Loop循环:
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP);}
}
每次循环都会执行下beforesleep接口,beforesleep接口主要做了啥呢,可以看看beforesleep接口的实现:
void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* Handle writes with pending output buffers. */handleClientsWithPendingWrites();......
}int handleClientsWithPendingWrites(void) {listIter li;listNode *ln;int processed = listLength(server.clients_pending_write);//先处理有数据需要发送的链表clients_pending_writelistRewind(server.clients_pending_write, &li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//注销掉CLIENT_PENDING_WRITE标记c->flags &= ~CLIENT_PENDING_WRITE;listDelNode(server.clients_pending_write,ln);//直接往socket写数据if (writeToClient(c->fd, c, 0) == C_ERR) continue;//如果当前client对象有需要发送的replayif (clientHasPendingReplies(c)) {int ae_flags = AE_WRITABLE;if (server.aof_state == AOF_ON &&server.aof_fsync == AOF_FSYNC_ALWAYS){ae_flags |= AE_BARRIER;}/*如果tcp窗口太小,那么数据有可能发不出去,将client的fd可写事件添加到epoll模型上去并注册可写回调函数sendReplyToClient*/if (aeCreateFileEvent(server.el, c->fd, ae_flags,sendReplyToClient, c) == AE_ERR){freeClientAsync(c);}}}return processed;
}//sendReplyToClient也是调用writeToClient接口
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {UNUSED(el);UNUSED(mask);writeToClient(fd,privdata,1);
}
所以分析了这么多,感觉redis的通信模型就是单线程,外加一个主Loop循环,定义一个全局的redisServer对象,定义多个数据成员链表用于管理已连接的client对象集合,需要回复的client对象、有数据需要待发送的client对象集合,epoll模型监听listenSocket、AcceptSocket可读事件,客户端有请求指令发送过来,redisServer解析指令,执行指令,并给client回复执行结果,如果tcp窗口太小,给当前client的fd注册可写事件和可写回调函数sendReplyToClient,待TCP窗口满足发送数据要求时,sendReplyToClient再执行数据的发送。另外主Loop每次循环时都会主动检测待回复链表replay、待发送链表clients_pending_write,如果有数据需要发送给客户端,逐个遍历发送。
3、实测验证
在centos7做下实测,我们同时开启两个redis-cli,先后给redis-server发送两个指令
set hello world
此时看下redis-server的堆栈以及主线程:
redis处理客户端请求,并不是多线程并发处理,而是循环遍历去给pending client回复报文,逐一回应。