文章目录
- 修改xv6内核调度算法
- 1.实验环境
- 2.基于优先级的调度算法
- (1).基本实现思路
- (2).实现流程
- (3).一些问题
- 3.乐透调度算法
- (1).思路
- (2).实现流程
- (3).一些问题
- 总结
- 参考资料
修改xv6内核调度算法
1.实验环境
这一次的实验因为是在xv6内核中实现一些调度算法,因此我本次实验直接采用了Lab中使用的xv6内核实验环境:
2.基于优先级的调度算法
(1).基本实现思路
为了实现的便利,我首先决定实现的调度算法是优先级调度策略,这个策略的实现相对比较简单,我大概只需要在PCB中增加一个priority属性,在修改完调度函数之后,只需要在后续增加调整priority字段的操作即可。
(2).实现流程
那么首先么当然是修改proc.h当中的struct proc的定义:
struct proc {struct spinlock lock;// p->lock must be held when using these:enum procstate state; // Process statestruct proc *parent; // Parent processvoid *chan; // If non-zero, sleeping on chanint killed; // If non-zero, have been killedint xstate; // Exit status to be returned to parent's waitint pid; // Process ID// these are private to the process, so p->lock need not be held.uint64 kstack; // Virtual address of kernel stackuint64 sz; // Size of process memory (bytes)pagetable_t pagetable; // User page tablestruct trapframe *trapframe; // data page for trampoline.Sstruct context context; // swtch() here to run processstruct file *ofile[NOFILE]; // Open filesstruct inode *cwd; // Current directorychar name[16]; // Process name (debugging)#ifdef PrioSchedint priority; // Process priority(default 50)uint64 rtime; // Process running times#endif
};
对于优先级调度的两个参数,仅当PrioSched宏定义的时候才会生效,priority就是简单的优先级数值,默认为50,ctime是PCB的创建时间,在优先级相同的情况下优先执行ctime比较小的进程,因此在修改了struct proc之后,还可以修改proc.c的scheduler代码如下:
void scheduler(void) {struct proc *p;struct cpu *c = mycpu();c->proc = 0;for (;;) {// Avoid deadlock by ensuring that devices can interrupt.intr_on();int found = 0;#ifdef PrioSchedstruct proc* high_prio = 0;for (p = proc; p < &proc[NPROC]; p++) {acquire(&p->lock);if (p->state == RUNNABLE) {if (high_prio == 0) {high_prio = p;}else {acquire(&high_prio->lock);int is_lock = 1;if (p->priority > high_prio->priority) {release(&high_prio->lock);is_lock = 0;high_prio = p;}else if (p->priority == high_prio->priority) {if (p->rtime < high_prio->rtime) {release(&high_prio->lock);is_lock = 0;high_prio = p;}}if (is_lock) {release(&high_prio->lock);}}found = 1;}release(&p->lock);}if (found == 1) {acquire(&high_prio->lock);if (high_prio->state != RUNNABLE) {release(&high_prio->lock);continue;}high_prio->rtime++;high_prio->state = RUNNING;c->proc = high_prio;swtch(&c->context, &high_prio->context); c->proc = 0;release(&high_prio->lock);}#endif#ifdef DEFAULT_SCHEDfor (p = proc; p < &proc[NPROC]; p++) {acquire(&p->lock);if (p->state == RUNNABLE) {// Switch to chosen process. It is the process's job// to release its lock and then reacquire it// before jumping back to us.p->state = RUNNING;c->proc = p;swtch(&c->context, &p->context);// Process is done running for now.// It should have changed its p->state before coming back.c->proc = 0;found = 1;}release(&p->lock);}#endifif (found == 0) {intr_on();asm volatile("wfi");}}
}
在定义了PrioSched宏之后,scheduler的行为也会随之改变,CPU会依次查找整个进程表,以找到优先级最高的进程,并且之后进行上下文切换进行执行,它的修改是比较简单的,因此接下来我们要做的就是对进程初始化的一些函数进行修改。
所以这里首先修改的是allocproc,在这里增加对于优先级和rtime的初始化操作:
static struct proc *allocproc(void) {...
found:// add priority based scheduler args#ifdef PrioSchedp->priority = 50;p->rtime = 0;#endifp->pid = allocpid();// Allocate a trapframe page....
}
之后操作的就是fork函数,在最后添加了对应增加优先级参数的两行代码:
int fork(void) {...np->state = RUNNABLE;#ifdef PrioSchednp->priority = 25;np->rtime = 0;#endifrelease(&np->lock);return pid;
}
对应的,在freeproc函数里也要增加相应的操作:
static void freeproc(struct proc *p) {if (p->trapframe) kfree((void *)p->trapframe);p->trapframe = 0;if (p->pagetable) proc_freepagetable(p->pagetable, p->sz);p->pagetable = 0;p->sz = 0;p->pid = 0;p->parent = 0;p->name[0] = 0;p->chan = 0;p->killed = 0;p->xstate = 0;p->state = UNUSED;#ifdef PrioSchedp->priority = 0;p->rtime = 0;#endif
}
在完成了所有这些操作之后,实际上就完成了:
xv6内核成功启动,并且运行各种其中的程序也都是可以正常运行的,实际上这个代码应该是正确的了,因为在之前写的代码中实际上出现了很多次panic
(3).一些问题
我写的早期的scheduler代码实际上是这样的:
void scheduler(void) {struct proc *p;struct cpu *c = mycpu();c->proc = 0;for (;;) {// Avoid deadlock by ensuring that devices can interrupt.intr_on();int found = 0;#ifdef PrioSchedstruct proc* high_prio = 0;for (p = proc; p < &proc[NPROC]; p++) {acquire(&p->lock);if (p->state == RUNNABLE) {if (high_prio == 0) {high_prio = p;}else {if (p->priority > high_prio->priority) {high_prio = p;}else if (p->priority == high_prio->priority) {if (p->rtime < high_prio->rtime) {high_prio = p;}}}found = 1;}release(&p->lock);}if (found == 1) {acquire(&high_prio->lock);high_prio->rtime++;high_prio->state = RUNNING;c->proc = high_prio;swtch(&c->context, &high_prio->context); c->proc = 0;release(&high_prio->lock);}#endif...}
}
实际上最主要的区别就在于最后判断found是否为1准备进行上下文切换的时候没有再次判断已经获取到的high_prio进程目前是否是RUNNABLE状态,这导致了后续尝试启动系统的时候:只有编译时附加CPUS=1,即禁用多处理器情况下才能正常工作,而多处理器情况下都会直接在trap处理程序当中报错。
也就是最下方的报错,这里应该是因为panic或者printf没有保障线程安全所以打印发生了错乱:
这个bug困扰了我很久,我尝试定位了具体的错误位置,主要应该就在两个地方:
void kerneltrap() {...if ((which_dev = devintr()) == 0) {printf("scause %p\n", scause);printf("sepc=%p stval=%p\n", r_sepc(), r_stval());panic("kerneltrap");}// give up the CPU if this is a timer interrupt.if (which_dev == 2 && myproc() != 0 && myproc()->state == RUNNING) yield();...
}
首先是trap.c中的内核trap处理程序kerneltrap,这里会将scause寄存器保存的地址等信息全部打印出来,并且报一个panic,不难发现,报错信息里的panic貌似不是kerneltrap,仔细观察之后发现应该是sched lock,而这个在上一次的Lab当中实际上已经研究过了:
void sched(void) {int intena;struct proc *p = myproc();if (!holding(&p->lock)) panic("sched p->lock");if (mycpu()->noff != 1) panic("sched locks");if (p->state == RUNNING) panic("sched running");if (intr_get()) panic("sched interruptible");intena = mycpu()->intena;swtch(&p->context, &mycpu()->context);mycpu()->intena = intena;
}
实际上是程序调用sched()函数的时候在判断当前CPU的关中断操作栈的计数是否为1,也就是说实际上在两个CPU同时试图运行同一个进程的时候,会出现上述的一系列问题,这一系列问题目前暂时还不明确机制是什么样的,之后我应该还会继续研究相关的问题,不过至少在我意识到这个问题,加上代码之后,它就可以正常执行了。
不过实际上基于优先级的调度算法还需要考虑一些别的东西,比如设置进程优先级的系统调用等,这些我暂时都还没有实现。
3.乐透调度算法
(1).思路
这个调度算法实际上比优先级调度算法要更简单一点,每一个进程都有彩票的张数,每一次遍历进程表时,找到一个就绪态进程就尝试进行抽奖,如果抽出的数量小于某个进程拥有彩票的张数,那么就轮到当前这个进程进行调度,所以它的实现应该是非常简单的。
(2).实现流程
第一步还是给proc.h里的struct proc增加tickets字段:
struct proc {struct spinlock lock;// p->lock must be held when using these:enum procstate state; // Process statestruct proc *parent; // Parent processvoid *chan; // If non-zero, sleeping on chanint killed; // If non-zero, have been killedint xstate; // Exit status to be returned to parent's waitint pid; // Process ID// these are private to the process, so p->lock need not be held.uint64 kstack; // Virtual address of kernel stackuint64 sz; // Size of process memory (bytes)pagetable_t pagetable; // User page tablestruct trapframe *trapframe; // data page for trampoline.Sstruct context context; // swtch() here to run processstruct file *ofile[NOFILE]; // Open filesstruct inode *cwd; // Current directorychar name[16]; // Process name (debugging)#ifdef PrioSchedint priority; // Process priority(default 50)uint64 rtime; // Process running times#endif#ifdef LotterySchedint tickets; // Process tickets for Lottery Scheduler#endif
};
然后再在allocproc函数中增加对于乐透调度的彩票数初始化,这里初始化的规则是对于每个进程初始都分配一张彩票(同步也在freeproc中增加了对应代码):
static struct proc *allocproc(void) {...
found:// Add priority based scheduler args#ifdef PrioSchedp->priority = 50;p->rtime = 0;#endif#ifdef LotterySchedp->tickets = 1;#endifp->pid = allocpid();...
}
还需要实现两个函数:
int random(int max) {if(max <= 0) {return 1;}static int z1 = 12345; // 12345 for rest of zxstatic int z2 = 12345; // 12345 for rest of zxstatic int z3 = 12345; // 12345 for rest of zxstatic int z4 = 12345; // 12345 for rest of zxint b;b = (((z1 << 6) ^ z1) >> 13);z1 = (((z1 & 4294967294) << 18) ^ b);b = (((z2 << 2) ^ z2) >> 27);z2 = (((z2 & 4294967288) << 2) ^ b);b = (((z3 << 13) ^ z3) >> 21);z3 = (((z3 & 4294967280) << 7) ^ b);b = (((z4 << 3) ^ z4) >> 12);z4 = (((z4 & 4294967168) << 13) ^ b);// if we have an argument, then we can use itint rand = ((z1 ^ z2 ^ z3 ^ z4)) % max;if(rand < 0) {rand = rand * -1;}return rand;
}int totalTickets() {struct proc* p;int tickets = 0;for (p = proc; p < &proc[NPROC]; p++) {if (p->state == RUNNABLE) {tickets += p->tickets;}}return tickets;
}
一个用于生成乐透调度过程中的伪随机数(random),还有一个则是统计目前可以用来抽奖的所有进程的彩票总数,最后就是scheduler了:
void scheduler(void) {struct proc *p;struct cpu *c = mycpu();c->proc = 0;for (;;) {// Avoid deadlock by ensuring that devices can interrupt.intr_on();int found = 0;...#ifdef LotterySchedfor (p = proc; p < &proc[NPROC]; p++) {acquire(&p->lock);if (p->state == RUNNABLE) {// Switch to chosen process. It is the process's job// to release its lock and then reacquire it// before jumping back to us.int total = totalTickets();int prize = -1;if (total > 0 || prize <= 0) {prize = random(total);}prize -= p->tickets;if (prize >= 0) {release(&p->lock);continue;}if (p != 0) {p->state = RUNNING;c->proc = p;swtch(&c->context, &p->context);// Process is done running for now.// It should have changed its p->state before coming back.c->proc = 0;found = 1;}}release(&p->lock);}#else...}
}
最终也成功运行了,经过测试,乐透调度的实现应该也是正确的了
(3).一些问题
因为参考的一些实现是基于x86版的xv6内核,x86版的xv6内核在进程调度方面和risc-v版本的略有差异,其实主要体现在:risc-v版内核对每个PCB都有一个单独的自旋锁来保护,而x86版的内核只对整个进程表上一把大锁,所以这时候下面这种写法是合法的:
void scheduler(void) {struct proc *p;struct cpu *c = mycpu();c->proc = 0;for (;;) {// Avoid deadlock by ensuring that devices can interrupt.intr_on();int found = 0;...#ifdef LotterySched...if (prize >= 0) continue;...}
}
即一旦没有抽中,则立刻continue开始尝试对下一个进程进行遍历,但是在risc-v版本的xv6中如果这么做,就会在没有释放锁的情况下开始后续的遍历,所以这时候错误就显而易见了。
其实还有一个值得注意的点就是:我实现的totalTickets函数实际上也是基于对整个表上一把锁这个思路去实现的:
int totalTickets() {struct proc* p;int tickets = 0;for (p = proc; p < &proc[NPROC]; p++) {if (p->state == RUNNABLE) {tickets += p->tickets;}}return tickets;
}
在这里的实现当中完全是无锁的,对于整个进程表上锁的情况来说,这种写法完全是争取的,但是对于risc-v就无法保证了,但是我也思考过直接加锁的情况,实际上问题在于:如果在遍历的时候对每一个进程加锁,最后两个CPU可能会互相死锁,这个问题我可能没有办法解决,所以这样的写法虽然可能会导致没有进程可以被调度,但是至少不会出现死锁,后续可能还要针对这个问题进行改进
总结
这一次的调度算法实现实际上还是有一定的难度的,因为这一次相当于是在对xv6的关键操作进行修改,在实现第一个基于优先级的调度算法的时候我出现了相当多的问题,花费了一整个早上的时间进行debug,实际上对于内核的debug是相当困难的,有些情况可能断点都非常难命中。
不过好在最后还是实现了两个调度算法,后续的诸如多级反馈队列(MLFQ)之类的调度算法以后我应该还会继续尝试实现的。
参考资料
- [Medium]-Modifying riscv-xv6
- [GitHub]-tweaked-xv6
- [Medium]-xv6 -Implementing ps, nice system calls and priority scheduling
- [GitHub]-Customized-xv6-OS
- [GitHub]-xv6-Custom-Scheduling-Algorithm
- [GitBook]-Build a OS-XV6 CPU Scheduling