写在文章开头
在面试时我们经常会问到这样一道题
你刚刚说redis是单线程的,那你能不能告诉我它是如何基于单个线程完成指令接收与连接接入的?
这时候我们经常会得到沉默,所以对于这道题,笔者会直接通过3.0.0
源码分析的角度来剖析一下redis
单线程的设计与实现。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解redis的单线程模型
单线程处理核心任务
当我们通过./redis-server
启动redis
时,如果我们配置了后台启动,那么shell
进程线程就会调用系统函数即fork
方法创建一个子进程,再通过execve
方法将子进程主体替换成redis
可执行文件也就是我们的redis-server
,而子进程执行时会保持从父进程集成过来的标准输入和输出,最后redis就会调用main方法开始执行自己的启动逻辑了。
到这为止,我们不难看出,在启动阶段redis的启动并不是多线程的,它会根据我们的配置来决定启动逻辑,以我们上文所说的后台启动,它本质是通过父进程fork的方式完成创建与初始化的,这一点我们也可以直接从redis的main方法印证:
int main(int argc, char **argv) {//命令参数解析与初始化//......//如果配置后台启动,则调用daemonize从父进程中fork出来执行if (server.daemonize) daemonize();//......
}
我们步入daemonize
方法,可以看到其内部如果子进程fork
成功,后续的标准输入、输出、错误都会重定向到/dev/null
,由此后的各项工作也都是我们的redis server
的主线程进行负责处理:
void daemonize(void) {int fd;//fork返回0说明fork成功,创建新会话,然后父进程exit(0)直接退出if (fork() != 0) exit(0); /* parent exits */setsid(); /* create a new session *//* Every output goes to /dev/null. If Redis is daemonized but* the 'logfile' is set to 'stdout' in the configuration file* it will not log at all. *///将标准输入、输出、错误重定向写到/dev/null中,由此和终端分离if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {dup2(fd, STDIN_FILENO);dup2(fd, STDOUT_FILENO);dup2(fd, STDERR_FILENO);if (fd > STDERR_FILENO) close(fd);}
}
后续的主线程的socket
就会注册到epoll
中,通过非阻塞调用epoll
函数获取就绪的连接和指令完成与多个客户端的交互:
而上述所说这种工作模式,也就是我们的aeMain
方法,这里笔者也给出的对应的的代码实现,如下所示,aeMain的本质逻辑就是调用无限循环,在循环中调用aeApiPoll
即epoll
非阻塞轮询获取就绪的事件并交给对应的读写事件处理器(rfileProc/wfileProc)
进行处理:
//无限循环调用aeProcessEvents处理读写事件
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);}
}int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{//......//通过epoll完成非阻塞调用numevents = aeApiPoll(eventLoop, tvp);//遍历拿到的事件将其交给读写处理器处理for (j = 0; j < numevents; j++) {//解析出该文件对应的类型aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;//如果事件fe是读事件则交给rfileProcif (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}//如果事件包含写标志,则交给wfileProc处理器处理if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}//......//返回处理事件数return processed; /* return the number of processed file/time events */
}
多线程执行IO事件
截至到上述的片段,redis
大体上我们可以认为是单线程执行,但是在3.0.0
之后源码中,为了避免某些IO任务对主线程的执行效率的影响,redis
还是创建了一些异步线程处理这些任务。
如下图所示,我们以aof
为例,redis
主线程会通过定时任务的方法serverCron
会按照用户的配置检查当前是否需要进行aof
写入,如果需要则通过bioCreateBackgroundJob
提交一个任务到AOF
异步刷盘的任务列表中,此时redis
创建的io线程
就会无限循环调用bioProcessBackgroundJobs
从该列表中取出自己绑定的任务进行异步消费,通过这种简单的多线程模式,保证了耗时的IO
操作不会阻塞主线程:
这里我们先给出对应的事件宏定义,可以看到事件总数为REDIS_BIO_NUM_OPS
即2,然后0是文件关闭事件,1的AOF
异步刷盘事件,通过这样的顺序完成了事件的类型码和总量的定义:
/* Background job opcodes */
#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS 2
对应的这些线程的初始化工作我们可以在main
方法调用的initServer
中可以看到这样一段调用,其内部的调用bioInit
本质就是完成上述IO
任务的线程的创建:
void initServer(void) {int j;//......//创建bio任务线程bioInit();
}
bioInit
它会初始化2个线程以及栈大小(最大不会超过4M),为每个线程各自分配一个队列,分配队列这一步就会按照循环遍历得到的值进行分配,遍历时用REDIS_BIO_NUM_OPS
作为范围控制,遍历到0的处理文件关闭事件,1则是AOF刷盘事件。
完成事件类型队列分配之后,redis会为每个线程分配消费任务的方法指针bioProcessBackgroundJobs
,后续的线程的任务消费和处理都是调用这个方法执行的:
void bioInit(void) {pthread_attr_t attr;pthread_t thread;size_t stacksize;int j;//循环2次,刚刚好对应2个事件即0是文件关闭事件、1是aof刷盘事件for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {//互斥数组初始化pthread_mutex_init(&bio_mutex[j],NULL);//条件数组初始化pthread_cond_init(&bio_condvar[j],NULL);//bio任务数组初始化,每个数组元素都是一个任务列表bio_jobs[j] = listCreate();//表示每种任务列表待处理的任务数为0bio_pending[j] = 0;}//设置线程最大的栈属性大小,默认为1,若小于REDIS_THREAD_STACK_SIZE即4M则乘2pthread_attr_init(&attr);pthread_attr_getstacksize(&attr,&stacksize);if (!stacksize) stacksize = 1; while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);//创建线程并,为每一个线程分配一个任务列表for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {//循环两次 j为0即代表文件关闭事件、1是aof刷盘事件,这个arg会作为事件类型绑定到线程pthread上void *arg = (void*)(unsigned long) j;//调用pthread_create完成线程属性初始化和事件类型的绑定if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");exit(1);}bio_threads[j] = thread;}
}
这里我们也给出bioProcessBackgroundJobs
逻辑可以看到,每个线程调用该方法时,会在无限循环中根据任务的type
按需消费处理:
void *bioProcessBackgroundJobs(void *arg) {struct bio_job *job;//每个线程都会根据自己传入的arg决定任务的type,0为文件关闭事件、1为aof刷盘事件unsigned long type = (unsigned long) arg;sigset_t sigset;//......//按照类型到bio_jobs取任务执行while(1) {listNode *ln;/* The loop always starts with the lock hold. */if (listLength(bio_jobs[type]) == 0) {pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);continue;}//取出自己需要处理的类型的队列任务ln = listFirst(bio_jobs[type]);job = ln->value;/* It is now possible to unlock the background system as we know have* a stand alone job structure to process.*/pthread_mutex_unlock(&bio_mutex[type]);//线程按照自己的类型进行消费if (type == REDIS_BIO_CLOSE_FILE) {close((long)job->arg1);} else if (type == REDIS_BIO_AOF_FSYNC) {aof_fsync((long)job->arg1);} else {redisPanic("Wrong job type in bioProcessBackgroundJobs().");}//完成后释放任务对象zfree(job);//线程解锁 任务移除pthread_mutex_lock(&bio_mutex[type]);listDelNode(bio_jobs[type],ln);bio_pending[type]--;}
}
了解的任务消费的源码之后,我们再来看看任务的投递的逻辑,我们以aof文件刷盘的任务为例,从定时任务函数serverCron
,其内部会判断rdb
或者aof
的pid
不为-1,若不为-1则说明这两个其中一个重写任务完成了,直接内部逻辑,获取当前子进程的pid
是否是aof
子进程的,如果是则步入backgroundRewriteDoneHandler
方法进行任务提交到任务队列中:
这里我们直接从serverCron为入口查看redis定时任务方法逻辑可以看到其内部会查看rdb_child_pid
或者aof_child_pid
是否为-1,判断是否有持久化任务完成了,若发现aof_child_pid
为-1且wait3获取到的pid也为aof的则调用backgroundRewriteDoneHandler
提交异步刷盘任务:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {int j;REDIS_NOTUSED(eventLoop);REDIS_NOTUSED(id);REDIS_NOTUSED(clientData);//......//检查后台的aof重写进程是否结束,若结束的步入循环if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {int statloc;pid_t pid;//获取当前子进程pidif ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {//......if (pid == server.rdb_child_pid) {//......} else if (pid == server.aof_child_pid) {//如果pid 为aof_child_pid则调用backgroundRewriteDoneHandler提交任务到aof队列中backgroundRewriteDoneHandler(exitcode,bysignal);} else {redisLog(REDIS_WARNING,"Warning, detected child with unmatched pid: %ld",(long)pid);}updateDictResizePolicy();}} else {//......}//......
}
步入backgroundRewriteDoneHandler
可以看到,如果AOF
刷盘策略是AOF_FSYNC_EVERYSEC
即异步刷盘则会调用aof_background_fsync
进行文件刷盘,而该方法内部的逻辑就是调用我们上文的所说的提交后台任务方法bioCreateBackgroundJob
:
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {//......if (server.aof_fd == -1) {/* AOF disabled, we don't need to set the AOF file descriptor* to this new file, so we can close it. */close(newfd);} else {/* AOF enabled, replace the old fd with the new one. */oldfd = server.aof_fd;server.aof_fd = newfd;if (server.aof_fsync == AOF_FSYNC_ALWAYS)aof_fsync(newfd);else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)//如果是异步刷盘则将任务提交到对应的队列中aof_background_fsync(newfd);server.aof_selected_db = -1; /* Make sure SELECT is re-issued */aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;/* Clear regular AOF buffer since its contents was just written to* the new AOF from the background rewrite buffer. */sdsfree(server.aof_buf);server.aof_buf = sdsempty();}server.aof_lastbgrewrite_status = REDIS_OK;//......} else if (!bysignal && exitcode != 0) {//......} else {//......}//......
}//调用bioCreateBackgroundJob提交任务到AOF刷盘队列中
void aof_background_fsync(int fd) {bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
小结
自此我们把redis
中主线程和IO任务的线程都以图解和源码印证的方式分析完成了,以笔者的理解,设计者所说的redis是单线程的
本质上的意思是说,对于核心的连接建立和指令处理是通过单个线程高效完成,而其余的一些非核心的IO耗时逻辑还是需要多线程来完成,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
参考
Linux内核学习笔记(4)-- wait、waitpid、wait3 和 wait4:https://www.cnblogs.com/tongye/p/9558320.html