2019独角兽企业重金招聘Python工程师标准>>>
协程,又被称为用户级线程,是在应用层被调度,可以减少因为调用系统调用而阻塞的线程切换的时间.目前有很多协程的实现,由于微信内部大量使用了其直研的的libco协程库,所以我选择了腾讯开源的libco协程库进行研究,学习协程的基本思想.
1,基本原理
协程实质上可以看成是子程序、函数。一个线程上面可以运行多个协程,但是同一时间只能运行一个协程,协程在线程上的切换,是由于遇到阻塞的操作,或者主动让出线程使用权。比如,有10个协程,当前线程正在运行协程1,然后协程1执行一个recv的阻塞操作,协程的调度器能够检测到这个操作,将协程1切换出去,将协程2调度进来执行。如果没有协程的调度器,此时协程1将会由于调用recv这个系统调用且数据未到达而阻塞,进行休眠,此时操作系统将会发生线程切换,调度其他线程执行,而线程切换非常耗时,高达几十微秒(同事测试是20us),即便新执行的线程是用户任务相关的,用户任务也会多了几十微秒的线程切换的消耗。而如果使用协程,协程之间的切换只需要几百纳秒(同事测试为0.35us,即350纳秒),耗时很少。这就是协程发挥优势的地方。
下面讲解libco的源码部分,有一篇文章:C++开源协程库libco-原理与应用.pdf,非常深入的讲解了libco的原理,而且不枯燥,十分推荐读者先看看这篇文章。
由于libco是非对称的协程机制,如果从当前协程A切换到协程B,而协程B又没有切换到下一个协程,在协程B执行结束之后,会返回到协程A执行。
2,libco基本框架
libco中的基本框架如下(引自C/C++协程库libco:微信怎样漂亮地完成异步化改造):
协程接口层实现了协程的基本源语。co_create、co_resume等简单接口负责协程创建于恢复。co_cond_signal类接口可以在协程间创建一个协程信号量,可用于协程间的同步通信。
系统函数Hook层负责主要负责系统中同步API到异步执行的转换。对于常用的同步网络接口,Hook层会把本次网络请求注册为异步事件,然后等待事件驱动层的唤醒执行。
事件驱动层实现了一个简单高效的异步网路框架,里面包含了异步网络框架所需要的事件与超时回调。对于来源于同步系统函数Hook层的请求,事件注册与回调实质上是协程的让出与恢复执行。
本文通过讲解接口层的几个主要函数,使读者对libco协程的框架和原理有一个大概的认识,下一篇文章将会讲解libco如何处理事件循环等。
下面我们从几个主要的协程函数一一分析。
3,主要函数源码解析
- co_create 首先来开一下协程创建的函数,源码如下:
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{if( !co_get_curr_thread_env() ) {co_init_curr_thread_env();}stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );*ppco = co;return 0;
}
void co_init_curr_thread_env()
{pid_t pid = GetPid(); g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];env->iCallStackSize = 0;struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );self->cIsMain = 1;env->pending_co = NULL;env->occupy_co = NULL;coctx_init( &self->ctx );env->pCallStack[ env->iCallStackSize++ ] = self;stCoEpoll_t *ev = AllocEpoll();SetEpoll( env,ev );
}
co_create()的第一行判断是当前线程初始化环境变量的判断,如果没进行环境初始化,那么调用co_init_curr_thread_env() 进行环境初始化,会生成当前环境g_arrCoEnvPerThread[ GetPid() ]的第一个协程 env->pCallStack,其 cIsMain 标志位 1,iCallStackSize表示协程层数,目前只有1层,AllocEpoll()函数中初始化当前环境env的 pstActiveList,pstTimeoutList 这两个列表,这两个列表分别记录了活动协程和超时协程。环境初始化操作在一个线程中只会进行一次。在初始化完成之后,会调用co_create_env()创建一个新的协程,新协程的结构体中的env这个域始终指向当前协程环境g_arrCoEnvPerThread[ GetPid() ]。新协程创建之后,并没有做什么操作。
- co_resume
co_resume()函数是切换协程的函数,也可以称为是启动协程的函数。co_resume()函数的第一行是获取当前线程的协程环境env,第二行获取当前正在执行的协程,也即马上要被切换出去的协程。接下来判断待切换的协程co是否已经被切换过,如果没有,那么为co准备上下文,cStart字段设置为1。这里为co准备的上下文,就是在coctx_make()函数里面,这个函数将函数指针CoRoutineFunc赋值给co->ctx的reg[0],将来上下文切换的时候,就能切换到reg[0]所指向的地址去执行.准备好co的上下文之后,然后将待切换的协程co入栈,置于协程环境env的协程栈的顶端,表明当前最新的协程是co。注意,这并不是说协程栈中只有栈顶才是co,可能栈中某些位置也存了co。最后,调用co_swap(),该函数将协程上下文环境切换为co的上下文环境,并进入co指定的函数内执行,之前被切换出去的协程被挂起,直到co主动yield,让出cpu,才会恢复被切换出去的协程执行.注意,这里的所有的协程都是在当前协程执行的,也就是说,所有的协程都是串行执行的,调用co_resume()之后,执行上下文就跳到co的代码空间中去了。因为co_swap()要等co主动让出cpu才会返回,而co的协程内部可能会resume新的协程继续执行下去,所以co_swap()函数调用可能要等到很长时间才能返回。void co_resume( stCoRoutine_t *co ) {stCoRoutineEnv_t *env = co->env;stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];if( !co->cStart ){coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );co->cStart = 1;}env->pCallStack[ env->iCallStackSize++ ] = co;co_swap( lpCurrRoutine, co ); }
在co_swap()函数代码中,由于libco不是共享栈的模式,即pending_co->cIsShareStack为0,所以执行了if分支,接下来执行coctx_swap(),这是一段汇编源码,内容就是从curr的上下文跳转到pending_co的上下文中执行,通过回调CoRoutineFunc()函数实现,此时当前线程的cpu已经开始执行pending_co协程中的代码,直到pending_co主动让出cpu,才接着执行coctx_swap()下面的代码,由于update_occupy_co为NULL,下面的if语句没有执行,所以相当于coctx_swap()下面没有代码,直接返回到curr协程中.void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) {stCoRoutineEnv_t* env = co_get_curr_thread_env();//get curr stack spchar c;curr->stack_sp= &c;if (!pending_co->cIsShareStack){env->pending_co = NULL;env->occupy_co = NULL;}else {env->pending_co = pending_co;//get last occupy co on the same stack memstCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;//set pending co to occupy thest stack mem;pending_co->stack_mem->occupy_co = pending_co;env->occupy_co = occupy_co;if (occupy_co && occupy_co != pending_co){save_stack_buffer(occupy_co);}}//swap contextcoctx_swap(&(curr->ctx),&(pending_co->ctx) );//stack buffer may be overwrite, so get again;stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();stCoRoutine_t* update_occupy_co = curr_env->occupy_co;stCoRoutine_t* update_pending_co = curr_env->pending_co;if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co){//resume stack bufferif (update_pending_co->save_buffer && update_pending_co->save_size > 0){memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);}} }
- co_yield
co_yield()与co_yield_ct()的功能是一样的,都是使得当前协程让出cpu.
co_yield_env()函数中的第二行获取当前执行的协程,也即当前协程环境的协程栈的栈顶,函数的第一行获取协程栈的次顶,也即上一次被切换的协程last,从这里也可以看出,libco的协程让出cpu,只能让给上一次被切换出去的协程.最后一行是co_swap()函数,前面讲到,该函数会进入last协程的上下文去执行代码,也就是回到上次co_resume()函数内部的co_swap()的地方,继续往下走.void co_yield_env( stCoRoutineEnv_t *env ) {stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];env->iCallStackSize--;co_swap( curr, last); }
当协程正常结束的时候,会继续执行CoRoutineFunc()函数,将协程的cEnd设置为1,表示已经结束,并执行一次co_yield_env(),让出cpu,切换回上一次被让出的协程继续执行.
这里有一点我之前不太理解,怀疑会发生栈溢出的地方,那就是在调用co_yield_env(),进入co_swap()之后,调用coctx_swap(),切换到上一次的last协程的上下文,那么当前协程的co_swap()函数里面的变量,都是在栈空间上面的,切换到last协程的上下文之后,那些变量依然在栈空间上面,不会被销毁,直到回到了main函数的协程,还是没有被销毁。其实这是个误区,这些变量其实不是在栈空间上面,而是在CPU的通用寄存器里面,当调用coctx_swap()之后,这些寄存器变量就会保存到当前协程的栈空间中去,其实是我们之前co_create()函数malloc出来的一片堆空间。这是因为cpu的工作寄存器数量较多,而局部变量较少,而co_swap()函数的变量都是局部变量,直接存放在cpu的工作寄存器中,而coctx_swap()的作用就是将CPU的各个通用寄存器保存到coctx_t结构的regs[1] ~ regs[6]的位置,然后将last协程的coctx_t结构的regs[1]~regs[6]的内容加载到当前的通用寄存器中,并将执行cpu的执行顺序切换到last协程中去执行。 - co_release
co_release()的功能比较简单,就是释放资源void co_release( stCoRoutine_t *co ) {if( co->cEnd ){free( co );} }
- co_self
co_self()函数是获取当前正在执行的协程,只要获取到当前协程环境的线程栈顶的协程即可。stCoRoutine_t *co_self() {return GetCurrThreadCo(); } stCoRoutine_t *GetCurrThreadCo( ) {stCoRoutineEnv_t *env = co_get_curr_thread_env();if( !env ) return 0;return GetCurrCo(env); } stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env ) {return env->pCallStack[ env->iCallStackSize - 1 ]; }
- co_enable_hook_sys
libco封装了系统调用,在系统调用,比如send/recv/condition_wait等函数前面加了一层hook,有了这层hook就可以在系统调用的时候不让线程阻塞而产生线程切换,co_enable_hook_sys()函数允许协程hook,当然也可以不允许hook,直接使用原生的系统调用。void co_enable_hook_sys() {stCoRoutine_t *co = GetCurrThreadCo();if( co ){co->cEnableSysHook = 1;} }