先来看协程调度器的结构体中处理调度的部分
typedef struct _nty_schedule {...nty_coroutine_queue ready; // 就绪队列 (优先级最高)nty_coroutine_queue defer; // (暂时没用到)nty_coroutine_link busy; // 忙碌链表(暂时没用到)nty_coroutine_rbtree_sleep sleeping; // 超时红黑树 (优先级居中)nty_coroutine_rbtree_wait waiting; // 挂起红黑树(优先级最低)} nty_schedule;
- 新创建的协程会被加入到就绪队列
- 注册了epoll事件的协程会被加入到超时红黑树和挂起红黑树中,两者并不冲突,事件被处理过的协程会被移除红黑树和epoll,不会重复调度
- 一个协程在注册了epoll事件或函数后会被立即挂起,由调度器管理,而调度器并不能实现完全的异步,不能在事件触发后才恢复对应挂起的协程,调度器只能减少同步posix api的阻塞时间,将时间分片
- 那么一个协程被恢复上下文后会怎样呢?执行完这个协程(函数)的代码,然后就结束了(不过一般是死循环),如果这个协程中调用多个函数,则该协程会被多次注册到调度器中(由poll_inner函数实现),也就是说,通过重写socket接口,或其他接口,使其调用poll_inner(),可以实现在一个协程中将不同事件注册到调度器
调度器运行过程:
void nty_schedule_run(void) {nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) return ;while (!nty_schedule_isdone(sched)) {// 1. expired --> sleep rbtreenty_coroutine *expired = NULL;while ((expired = nty_schedule_expired(sched)) != NULL) {nty_coroutine_resume(expired);}// 2. ready queuenty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);while (!TAILQ_EMPTY(&sched->ready)) {nty_coroutine *co = TAILQ_FIRST(&sched->ready);TAILQ_REMOVE(&co->sched->ready, co, ready_next);if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {nty_coroutine_free(co);break;}nty_coroutine_resume(co);if (co == last_co_ready) break;}// 3. wait rbtreenty_schedule_epoll(sched);while (sched->num_new_events) {int idx = --sched->num_new_events;struct epoll_event *ev = sched->eventlist+idx;int fd = ev->data.fd;int is_eof = ev->events & EPOLLHUP;if (is_eof) errno = ECONNRESET;nty_coroutine *co = nty_schedule_search_wait(fd);if (co != NULL) {if (is_eof) {co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);}nty_coroutine_resume(co);}is_eof = 0;}}nty_schedule_free(sched);return ;
}
- 调度器做的事情就很简单了,不断处理就绪队列、超时红黑树、挂起红黑树中的协程,也就是恢复它们的上下文
- 而一个协程中的函数在被恢复上下文后就与调度器无关了,继续执行协程中下一个函数会再次将该协程注册到调度器,前提是该函数被poll_inner函数重写过
- 需要注意的是,一个协程本身就注册了函数或事件,而函数是可以嵌套的,所以实现了一个协程可以调用不同的函数,也就是复用,只需要修改协程的成员变量,并将挂载着新函数的协程加入调度器,如此一来,可以实现让调度器调度单个可能造成线程阻塞的函数
推荐学习 https://xxetb.xetslk.com/s/p5Ibb