Lock in xv6
xv6中实现了2种最简单的锁,自旋锁和睡眠锁,使用 __sync_lock_test_and_set
进行原子操作的加锁,如果 lk->locked
之前为0,我们把它设置为1就表示我们获得了锁,否则循环等待,并且我们没有改变它本身的值。在释放锁的时候使用函数 __sync_lock_release
原子的把 lk->locked
设置为0。
如果在持有锁的情况下发生了中断,然后又调用了该函数,就会发生死锁,因此,我们需要在中断情况下释放锁,xv6更加的保守,直接在获取锁的时候。禁止了该CPU的中断。acquire
调用push_off
并且release
调用pop_off
来跟踪当前CPU上锁的嵌套级别。当计数达到零时,pop_off
恢复最外层临界区域开始时存在的中断使能状态,调用intr_off
和intr_on
函数执行RISC-V指令分别用来禁用和启用中断。
由于编译期或者CPU为了更高的性能可能会重排指令,让代码中后面的语句更先执行,这种CPU的排序规则称为内存模型(memory model)。在这里,我们不能让临界区的代码跑出临界区,不然临界区就毫无意义,因此我们需要在临界区设置屏障,禁止重排指令超过这个屏障
- __sync_synchronize(); 它告诉编译器和CPU不要跨障碍重新排序
load
或store
指令。
在使用多个锁的时候我们可以依靠获取多个锁的顺序相同来避免死锁。
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};
void
acquire(struct spinlock *lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0) {
__sync_fetch_and_add(&(lk->nts), 1);
}
__sync_synchronize();
lk->cpu = mycpu();
}
// Release the lock.
void
release(struct spinlock *lk)
{
if(!holding(lk))
panic("release");
lk->cpu = 0;
__sync_synchronize();
__sync_lock_release(&lk->locked);
pop_off();
}
如果需要长时间持有锁,比如文件IO,像上面的自旋锁就会极大的浪费性能,这时候就需要用到睡眠锁,在持有锁的时候 yield
,让出CPU。自旋锁最适合短的临界区域;睡眠锁对于冗长的操作效果很好。
睡眠锁的实现也比较简单,是利用一个互斥锁保护睡眠锁,能够原子的释放锁和获取锁,最后在获取不到锁的时候 sleep,让出当前CPU,当释放锁的时候会调用wakeup唤醒有可能在等待的锁,重新把该进程设置为 RUNNABLE。 其中wakeup和sleep通过第一个参数 lk
,即一个地址来判断是否是一对,或者说,sleep和wakeup通过这个地址来进行交流
void acquiresleep(struct sleeplock *lk)
{
acquire(&lk->lk);
while (lk->locked) {
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}
void releasesleep(struct sleeplock *lk)
{
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
wakeup(lk);
release(&lk->lk);
}
Process in xv6
xv6中的进程资源是固定的,是内核中的一个数组。最多为 NPROC,进程结构体如下,描述了一个最简单的进程应该具备的所有资源。进程的状态分为UNUSED, SLEEPING, RUNNABLE, RUNNING, ZOMBIE。
struct proc {
struct spinlock lock;
// p->lock must be held when using these:
enum procstate state; // Process state
struct proc *parent; // Parent process
void *chan; // If non-zero, sleeping on chan
int killed; // If non-zero, have been killed
int xstate; // Exit status to be returned to parent's wait
int pid; // Process ID
// these are private to the process, so p->lock need not be held.
uint64 kstack; // Virtual address of kernel stack
uint64 sz; // Size of process memory (bytes)
pagetable_t pagetable; // User page table
struct trapframe *trapframe; // data page for trampoline.S
struct context context; // swtch() here to run process
struct file *ofile[NOFILE]; // Open files
struct inode *cwd; // Current directory
char name[16]; // Process name (debugging)
};
进程相关的函数在 proc.c中,其中包括进程分配,进行释放等,把进程的资源给释放掉。这里就不谈论这些资源的释放和分配了。
进程调度
1. 上下文切换
上下文切换是保存一些必要的寄存器在结构体中,然后从另一个结构体中读取上下文信息。swtch
只保存被调用方保存的寄存器(callee-saved registers),调用方保存的寄存器(caller-saved registers)通过调用C代码保存在栈上(如果需要)。它不保存程序计数器。但swtch
保存ra
寄存器,该寄存器保存调用swtch
的返回地址。当swtch
返回时,它返回到由ra
寄存器指定的指令,即新线程以前调用swtch
的指令。
swtch
保存的寄存器是内核线程的寄存器,存到进程的上下文 p->context
用户线程的寄存器被保存在trampframe里面。
# 把当前寄存器保存在old,加载new到现在的寄存器上
# void swtch(struct context *old, struct context *new);
swtch:
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
....
# 存储寄存器
ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
....
# 加载寄存器
ret
2. 调度器
调度器(scheduler)在每个CPU上以一个特殊线程的形式存在,每个线程都运行scheduler
函数。此函数负责选择下一个要运行的进程。然后调用swtch切换上下文。在swtch的时候,必须持有锁。否则如果2个CPU同时运行该进程,会出错。
void scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();
c->proc = 0;
for(;;){
intr_on();
int nproc = 0;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state != UNUSED) {
nproc++;
}
if(p->state == RUNNABLE) {
p->state = RUNNING;
c->proc = p; // 并不会释放锁,由进程自己释放锁,然后在进入swtch的时候加锁
swtch(&c->context, &p->context);
c->proc = 0;
}
release(&p->lock);
}
if(nproc <= 2) { // only init and sh exist
intr_on();
asm volatile("wfi");
}
}
}
还有一个和调度器配合的协作程序 sched
,该程序负责从其他进程swtch到内核的调度进程,
void sched(void)
{
int intena;
struct proc *p = myproc();
if(!holding(&p->lock))
panic("sched p->lock");
if(mycpu()->noff != 1)
panic("sched locks");
if(p->state == RUNNING)
panic("sched running");
if(intr_get())
panic("sched interruptible");
intena = mycpu()->intena;
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;
}
以图中程序为例,程序 ls
是以前运行过但是被切换出去了的程序,即处于RUNNABLE的程序,程序 CC
是当前正在运行的程序。当定时器中断发生的时候,process CC 陷入内核态,在 usertrap
中被动让出CPU,调用swtch
切换到调度线程,最后由调度线程切换到之前被挂起的 ls
,返回用户空间。xv6并没有直接从用户空间进行切换线程的实现。都是从内核中切换线程。用户态的上下文都保存在trampframe里面,内核进程的上下文保存在 p->context
里面。
上面说的是从用户进程被定时中断中断的时候,如果是内核进程被中断呢,其实也很好理解,也就是少了从用户态陷入内核态这一步而已。
还有一种情况是在创建新进程的时候,需要指定返回地址为 forkret
,其会释放进程锁,从 usertrapret
返回用户空间。而且这时候返回的进程由于复制了tramframe,会回到和父进程相同的地方,不过没有返回值,因此返回值为0,父进程返回子进程PID
3. 定时器中断
最后再简单介绍一下定时器中断,这是xv6之所以能够调度的基础。定时器中断来自附加到每个RISC-V CPU上的时钟硬件。Xv6对该时钟硬件进行编程,以定期中断每个CPU。
RISC-V要求定时器中断在机器模式而不是管理模式下进行。RISC-V机器模式无需分页即可执行,并且有一组单独的控制寄存器,因此在机器模式下运行普通的xv6内核代码是不实际的。因此,xv6处理定时器中断完全不同于之前的陷阱机制。
定时中断的代码一开始并不会直接挑到usertrap
,它会直接跳到 timervec
,其中 ` csrw sip, a1会引起一个软中断,最终挑转到
usertrap`,并且引起的代码为 2。
timervec:
# start.c has set up the memory that mscratch points to:
# scratch[0,8,16] : register save area.
# scratch[24] : address of CLINT's MTIMECMP register.
# scratch[32] : desired interval between interrupts.
csrrw a0, mscratch, a0
sd a1, 0(a0)
sd a2, 8(a0)
sd a3, 16(a0)
# schedule the next timer interrupt
# by adding interval to mtimecmp.
ld a1, 24(a0) # CLINT_MTIMECMP(hart)
ld a2, 32(a0) # interval
ld a3, 0(a1)
add a3, a3, a2
sd a3, 0(a1)
# raise a supervisor software interrupt.
li a1, 2
csrw sip, a1
ld a3, 16(a0)
ld a2, 8(a0)
ld a1, 0(a0)
csrrw a0, mscratch, a0
mret
计时器中断是无法被禁止的,但是跳转到软中断是可以被禁止的。
进程退出
当进程死亡的时候,由父进程进行善后,子进程退出的时候,父进程可能有3种情况
- 调用了wait正在等待 ,这时候它会开开心心的收拾掉残局,然后退出,剩下的进程交给init进程处理。
- 可能在忙其他的,这时候不着急,设置为zombie状态等父进程来清理,
- 父进程已经退出,父进程在退出前会把子进程全部交付给init进程,所以并无大碍。
但是父进程在子进程退出后如果并没有善后,即没有调用wait,那么子进程的部分资源永远不会释放,子进程会成为僵尸进程。
进程除了主动调用 exit 退出以外,还可以设置 p->killed
,但是p->killed
会延迟生效,只会在中断 usertrap
中生效。
Process in real world
等待补充
xv6调度器实现了一个简单的调度策略:它依次运行每个进程。这一策略被称为轮询调度(round robin)。真实的操作系统实施更复杂的策略,例如,允许进程具有优先级。其思想是调度器将优先选择可运行的高优先级进程,而不是可运行的低优先级进程。这些策略可能变得很复杂,因为常常存在相互竞争的目标:例如,操作系统可能希望保证公平性和高吞吐量。此外,复杂的策略可能会导致意外的交互,例如优先级反转(priority inversion)和航队(convoys)。当低优先级进程和高优先级进程共享一个锁时,可能会发生优先级反转,当低优先级进程持有该锁时,可能会阻止高优先级进程前进。当许多高优先级进程正在等待一个获得共享锁的低优先级进程时,可能会形成一个长的等待进程航队;一旦航队形成,它可以持续很长时间。为了避免此类问题,在复杂的调度器中需要额外的机制。