协程结构体定义
之前我们使用linux下协程函数实现了线程切换,使用的是ucontext_t结构体,和基于这个结构体的四个函数。现在我们要用这些工具来实现我们自己的一个线程结构体,并实现线程调度和线程切换、挂起。
首先我们来实现以下线程结构体:
struct thread_t {ucontext_t ctx;void (*func)();void* args;int state;char stack[1024 * 128]; //128kB栈空间
};
其中state有四种值,RUNNABLE,RUNING,SUSPEND,分别对应0,1,2,即就绪,运行,挂起这三种状态,对应操作系统下一个进程执行和终止之间的三种状态。
再写一个调度的结构体
struct scheduler {ucontext_t main;std::vector<thread_t> threads;int running_thread;scheduler():running_thread(-1) {};
};
调度器需要保存主函数上下文,需要调度的线程集合threads,用一个vector实现,和当前运行线程id;运行线程id初始时赋为-1,表示无线程正在运行。
这样线程结构体和线程调度器就已经实现和完成了。
接下来我们要实现下我们自己的线程创建函数,参数为调度器scheduler,执行函数func和执行函数的参数args
int thread_create(scheduler& myscheduler, void (*func)(), void* args) {thread_t *newthread = new thread_t();newthread->ctx.uc_link = &myscheduler.main;newthread->ctx.uc_stack.ss_sp = newthread->stack;newthread->ctx.uc_stack.ss_size = 1024*128;newthread->func = func;newthread->args = args;newthread->state = 0;myscheduler.threads.push_back(*newthread);return myscheduler.threads.size() - 1;
}
首先创建一个thread_t类型变量作为新线程,将其ctx变量的后继函数设定为调度器中主函数,栈空间和栈大小设置为其默认成员变量。对应参数赋值为给定参数方便后续使用。初始状态设置为就绪态,并将其放入调度器线程集合,线程id设置为当前线程集合大小-1.
线程挂起函数
int thread_yield(scheduler& myscheduler) {if (myscheduler.running_thread == -1) return 0;myscheduler.threads[myscheduler.running_thread].state = 2;setcontext(&myscheduler.main);return 1;
}
线程挂起函数首先判断调度器中当前运行线程id是否为-1,如果是的话就直接返回0,表示写成挂起失败。否则将正在运行线程id对应到调度器中线程集合中相应下标的元素,将其值置为2(挂起),将当前上下文设置为主函数,返回1;
线程恢复运行函数
int thread_resume(scheduler& myscheduler,int threadId) {if (threadId < 0 || threadId >= myscheduler.threads.size()) return -1;if (myscheduler.threads[threadId].state == 2) {// if (myscheduler.running_thread != -1) thread_yield(myscheduler);myscheduler.running_thread = threadId;myscheduler.threads[threadId].state = 1;swapcontext(&myscheduler.main,&myscheduler.threads[threadId].ctx);} else if (myscheduler.threads[threadId].state == 0) { // if (myscheduler.running_thread != -1) thread_yield(myscheduler);myscheduler.running_thread = threadId;myscheduler.threads[threadId].state = 1;getcontext(&myscheduler.threads[threadId].ctx);makecontext(&myscheduler.threads[threadId].ctx, myscheduler.threads[threadId].func, 1, myscheduler.threads[threadId].args);swapcontext(&myscheduler.main,&myscheduler.threads[threadId].ctx);}
}
线程恢复运行函数首先判断给定线程Id是否<0或者>调度器线程集合大小,如果是就说明不满足条件,直接返回。否则判断其状态,我们需要处理的有挂起态和就绪态两种状态,两种情况下都需要将当前运行线程(如果有的话)挂起,将需要运行的线程状态置为1。如果当前需要运行线程之前是挂起,直接切换栈空间即可。否则需要将取当前栈空间并用makecontext函数处理下,再进行切换。
线程全部结束判断函数
int scheduler_finished(scheduler& myscheduler) {for (int i = 0; i < myscheduler.threads.size(); i++) {if (myscheduler.threads[i].state != 2) return 0;}return 1;
}
判断调度器内部线程集合里线程状态是否全为0,是就说明全部执行完,返回0,否则返回1。
运行结果如下.
测试代码如下:
#include <iostream>
#include <ucontext.h>
#include <vector>struct thread_t {ucontext_t ctx;void (*func)();void* args;int state;char stack[1024 * 128]; //128kB栈空间
};struct scheduler {ucontext_t main;std::vector<thread_t> threads;int running_thread;scheduler():running_thread(-1) {};
};scheduler myscheduler;int thread_create(scheduler& myscheduler, void (*func)(), void* args) {thread_t *newthread = new thread_t();newthread->ctx.uc_link = &myscheduler.main;newthread->ctx.uc_stack.ss_sp = newthread->stack;newthread->ctx.uc_stack.ss_size = 1024*128;newthread->func = func;newthread->args = args;newthread->state = 0;myscheduler.threads.push_back(*newthread);return myscheduler.threads.size() - 1;
}int thread_yield(scheduler& myscheduler) {if (myscheduler.running_thread == -1) return 0;myscheduler.threads[myscheduler.running_thread].state = 2;swapcontext(&myscheduler.threads[myscheduler.running_thread].ctx, &myscheduler.main);return 1;
}int thread_resume(scheduler& myscheduler,int threadId) {if (threadId < 0 || threadId >= myscheduler.threads.size()) return -1;if (myscheduler.threads[threadId].state == 2) {//if (myscheduler.running_thread != -1) thread_yield(myscheduler);myscheduler.running_thread = threadId;myscheduler.threads[threadId].state = 1;swapcontext(&myscheduler.main,&myscheduler.threads[threadId].ctx);} else if (myscheduler.threads[threadId].state == 0) { //if (myscheduler.running_thread != -1) thread_yield(myscheduler);myscheduler.running_thread = threadId;myscheduler.threads[threadId].state = 1;getcontext(&myscheduler.threads[threadId].ctx);makecontext(&myscheduler.threads[threadId].ctx, myscheduler.threads[threadId].func, 1, myscheduler.threads[threadId].args);swapcontext(&myscheduler.main,&myscheduler.threads[threadId].ctx);}
}int scheduler_finished(scheduler& myscheduler) {for (int i = 0; i < myscheduler.threads.size(); i++) {if (myscheduler.threads[i].state != 2) return 0;}return 1;
}void thread1() {std::cout << "hello" << std::endl;
}void thread2() {int n = 10;thread_yield(myscheduler);while (n--)std::cout << "world" << std::endl;
}int main() {getcontext(&myscheduler.main);thread_create(myscheduler, &thread1, nullptr);thread_create(myscheduler, &thread2, nullptr);if (!scheduler_finished(myscheduler)) {thread_resume(myscheduler, 0);thread_resume(myscheduler, 1);thread_resume(myscheduler, 1);}return 0;
}
还缺个线程结束的判断,有时间再补上去吧,