9. 线程调度

本章代码对应 commit :65ccec7b180cd56e645f359343a793e75a49eb98

概要

多线程并发执行需要调度器的辅助。调度器的作用是在合适的时刻选择线程执行,并在合适的时候切换线程,防止一个线程占用或多的资源或阻塞,从而实现 cpu 资源分配相对“公平”。本章我们将:

  1. 假想我们已经有了一个调度算法。

    算法和数据结构是分离的,所以在实现调度器的时候不依赖于算法

  2. 创建线程池(thread pool)用于保存所有线程。

  3. 创建线程管理器(processor)通过调度算法管理 thread pool。

  4. 实现线程调度相关函数。

  5. 引入 Round Robin 调度算法。

调度算法

//in process/scheduler.rs
impl Scheduler {
    pub fn new(max_time_slice: usize) -> Scheduler { /* TODO */ }

    pub fn push(&mut self, tid: Tid) { /* TODO */ }

    pub fn pop(&mut self) -> Option<Tid> { /* TODO */ }

    pub fn tick(&mut self) -> bool { /* TODO */ }

    pub fn exit(&mut self, tid: Tid) { /* TODO */ }
}

这是本章我们使用的调度算法的对外接口。功能包括创建(new)、添加线程(push)、获取即将被调用的线程(pop)、提醒算法时钟周期的到来(tick)和退出线程时将线程从调度算法中移出(exit)。

线程池(ThreadPool)

线程的运行状态包括但不限于:等待运行、运行中、睡眠和等待退出。这里我们创建一个枚举类型作为进程的状态类型:

// in process/mod.rs

pub type Tid = usize;
pub type ExitCode = usize;

// in process/structs.rs

use crate::process::{ Tid, ExitCode };

#[derive(Clone)]
pub enum Status {
    Ready,
    Running(Tid),
    Sleeping,
    Exited(ExitCode),
}

Tid 是线程 id 。就像每个人的身份证号都是不一样的,每一个线程都有独一无二的 id ,这时线程的标识。

线程池是用于存放线程的容器,只需要包含线程的信息和线程调度算法。创建 process/thread_pool.rs

// in process/thread_pool.rs

use crate::process::scheduler::Scheduler;
use crate::process::structs::*;
use alloc::{ vec::Vec, boxed::Box };

struct ThreadInfo {
   status: Status,
   present: bool,
   thread: Option<Box<Thread>>,
}

pub struct ThreadPool {
    threads: Vec<Option<ThreadInfo>>, // 线程信号量的向量
    scheduler: Box<Scheduler>, // 调度算法
}

Box 允许我们将一个值放在堆上而不是栈上,留在栈上的则是指向堆数据的指针。除了数据被储存在堆上而不是栈上之外,Box 没有额外的性能损失,不过也没有额外的功能。

让我们先来实现他的构造函数和向线程池中加入线程的功能:

// in process/thread_pool.rs

use crate::process::Tid;

impl ThreadPool {
    pub fn new(size: usize, scheduler: Scheduler) -> ThreadPool {
        ThreadPool {
            threads: {
                let mut th = Vec::new();
                th.resize_with(size, Default::default);
                th
            },
            scheduler: Box::new(scheduler),
        }
    }

    fn alloc_tid(&self) -> Tid {
        for (i, info) in self.threads.iter().enumerate() {
            if info.is_none() {
                return i;
            }
        }
        panic!("alloc tid failed !");
    }

    pub fn add(&mut self, _thread: Box<Thread>) {
        let tid = self.alloc_tid();
        self.threads[tid] = Some(ThreadInfo{
            status: Status::Ready,
            present: true,
            thread: Some(_thread),
        });
        self.scheduler.push(tid);
        println!("tid to alloc: {}", tid);
    }
}

构造函数规定了线程的最大数量 size 和调度算法。由于线程数组是已经创建好的,但是默认内容为 None ,所以在添加线程的时候只需要将从 Vec 中找到一个未使用的位置,把新线程的信息传递过去就可以了。同时,不要忘记为调度算法传入线程 id 。

调度器

由于创建的调度器是全局的,需要考虑一些安全问题和异步问题。为此需要对成员进行一些包装,不过这不影响实现思路。创建 processor.rs

// in process/processor.rs
use core::cell::UnsafeCell;
use alloc::boxed::Box;
use crate::process::Tid;
use crate::process::structs::*;
use crate::process::thread_pool::ThreadPool;

pub struct ProcessorInner {
    pool: Box<ThreadPool>,
    idle: Box<Thread>,
    current: Option<(Tid, Box<Thread>)>,
}

pub struct Processor {
    inner: UnsafeCell<Option<ProcessorInner>>,
}

unsafe impl Sync for Processor {}

一个实现了 Sync trait 的类型可以安全的在多个线程中拥有其值的引用。因为他是标记 trait ,所以不需要手动实现。 UnsafeCell 内的元素不严格区分 immutable 和 mutable 。

调度器接口实现

这里先列出功能简单明了的几个函数:

// in process/processor.rs
impl Processor {
    pub const fn new() -> Processor {
        Processor {
            inner: UnsafeCell::new(None),
        }
    }

    pub fn init(&self, idle: Box<Thread>, pool: Box<ThreadPool> ) {
        unsafe {
            *self.inner.get() = Some(ProcessorInner{
                pool,
                idle,
                current: None,
            });
        }
    }

    fn inner(&self) -> &mut ProcessorInner {
        unsafe { &mut *self.inner.get() }
            .as_mut()
            .expect("Processor is not initialized")
    }

    pub fn add_thread(&self, thread: Box<Thread>) {
        self.inner().pool.add(thread);
    }
}

在进行线程切换时,为了防止因为中断引起线程切换出错,需要关闭中断,之后再恢复到原先的中断状态。这里我们先实现三个与中断控制相关的函数:

// in interrupt.rs

#[inline(always)]
pub fn enable_and_wfi() {    // 使能中断并等待中断
    unsafe {
        asm!("csrsi sstatus, 1 << 1; wfi" :::: "volatile");
    }
}

#[inline(always)]
pub fn disable_and_store() -> usize {    // 禁用中断并返回当前中断状态
    let sstatus: usize;
    unsafe {
        asm!("csrci sstatus, 1 << 1" : "=r"(sstatus) ::: "volatile");
    }
    sstatus & (1 << 1)
}

#[inline(always)]
pub fn restore(flags: usize) {    // 根据 flag 设置中断
    unsafe {
        asm!("csrs sstatus, $0" :: "r"(flags) :: "volatile");
    }
}

接下来开始实现三个与调度直接相关的重要函数。

其中调用了一些 ThreadPool 中尚未实现的函数,将于之后实现

run

这是整个调度过程最核心的函数,由 idle 线程调用。具体实现如下:

// in process/processor.rs
use crate::interrupt::{ disable_and_store, enable_and_wfi };

impl Processor {
    pub fn run(&self) -> !{
        let inner = self.inner();
        // 关闭中断,防止此时产生中断异常导致线程切换出错。
        disable_and_store();
        // 循环从线程池中寻找可调度线程
        loop {
            // 如果存在需要被调度的线程
            if let Some(thread) = inner.pool.acquire() {
                inner.current = Some(thread);
                // 切换至需要被调度的线程
                inner.idle.switch_to(&mut *inner.current.as_mut().unwrap().1);
                // 上一个线程已经结束或时间片用完,切换回 idle 线程
                let (tid, thread) = inner.current.take().unwrap();
                println!("thread {} ran just now", tid);
                // 将上一个线程放回线程池中
                inner.pool.retrieve(tid, thread);
            } else {
                // 开启中断并等待中断产生
                enable_and_wfi();
                // 关闭中断,从线程池中寻找可调度线程
                disable_and_store();
            }
        }
    }
}

tick

每产生一次时钟中断(即经过一个时钟周期),就需要通知线程池,让他通过调度算法判断是否需要切换线程:

// in process/mod.rs

static CPU: Processor = Processor::new();

pub fn tick() {
    CPU.tick();
}

// in interrupt.rs

use crate::process::tick;
fn super_timer() {
    clock_set_next_event();
    unsafe{
        TICK = TICK + 1;
        if TICK % 100 == 0 {
            println!("100 ticks!");
        }
    }
    tick();
}

// in process/processor.rs

use crate::interrupt::restore;

impl Processor {
    pub fn tick(&self) {
        let inner = self.inner();
        if !inner.current.is_none() {
            if inner.pool.tick() {
                let flags = disable_and_store();
                inner
                    .current
                    .as_mut()
                    .unwrap()
                    .1
                    .switch_to(&mut inner.idle);
                // 恢复原先的中断状态
                restore(flags);
            }
        }
    }
}

inner.pool.tick 会通知线程池和调度算法已经过了一个时钟周期,同时返回一个布尔值:是否需要进行线程切换。如果需要切换至其他线程,则先切换至 idle 线程,然后由 idle 进行调度(回到 Processer.run)。

exit

当线程任务完成之后,就可以通过 Processor.exit 结束自己(结束当前线程):

// in process/processor.rs
impl Processor {
   pub fn exit(&self, code: usize) -> ! {
       let inner = self.inner();
       let tid = inner.current.as_ref().unwrap().0;
       // 通知线程池该线程即将退出
       inner.pool.exit(tid, code);
       // 切换至 idle 线程,进入调度
       inner
           .current
           .as_mut()
           .unwrap()
           .1
           .switch_to(&mut inner.idle);
       loop {}
   }
}

线程池接口实现

线程池接口的功能在前文已经提及,也可由函数名判断函数功能。具体实现也较为简单,所以直接给出实现:

// in process/thread_pool.rs

impl ThreadPool {
    pub fn acquire(&mut self) -> Option<(Tid, Box<Thread>)> {
        if let Some(tid) = self.scheduler.pop() {
            let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
            thread_info.status = Status::Running(tid);
            return Some((tid, thread_info.thread.take().expect("thread not exist ")));
        } else {
            return None;
        }
    }

    pub fn retrieve(&mut self, tid: Tid, thread: Box<Thread> ) {
        let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
        if thread_info.present {
            thread_info.thread = Some(thread);
            thread_info.status = Status::Ready;
            self.scheduler.push(tid);
        }
    }

    pub fn tick(&mut self) -> bool {
        // 通知调度算法时钟周期加一,询问是否需要调度
        self.scheduler.tick()
    }

    pub fn exit(&mut self, tid: Tid, code: usize) {
        self.threads[tid] = Some(ThreadInfo{
            status: Status::Ready,
            present: false,
            thread: None,
        });
        self.scheduler.exit(tid);
        println!("exit code: {}", code);
    }
}

这里用到了 Option.take ,功能与所有权转移或浅拷贝相似

引入 Round Robin 调度算法

在 Cargo.toml 中加入:

RoundRobinScheduler = { path  = "crate/RoundRobinScheduler" }

创建 process/scheduler.rs

use crate::process::Tid;
use RoundRobinScheduler::RRScheduler;

pub struct Scheduler {
    scheduler: RRScheduler,
}

impl Scheduler {
    pub fn new(max_time_slice: usize) -> Scheduler {
        let s = Scheduler {
            scheduler: RRScheduler::new(max_time_slice),
        };
        s
    }

    pub fn push(&mut self, tid: Tid) {
        self.scheduler.push(tid);
    }

    pub fn pop(&mut self) -> Option<Tid> {
        self.scheduler.pop()
    }

    pub fn tick(&mut self) -> bool {
        self.scheduler.tick()
    }

    pub fn exit(&mut self, tid: Tid) {
        self.scheduler.exit(tid);
    }
}

最后,由于 Processor.add_thread 需要 Box<Thread> 类型的参数,所以我们修改一下 struct Thread 的构造函数:

// in process/structs.rs
use alloc::boxed::Box;
use riscv::register::satp;
impl Thread {
    pub fn new_idle() -> Box<Thread> {
        unsafe {
            Box::new(Thread {
                context: Context::null(),
                kstack: KernelStack::new(),
            })
        }
    }

    pub fn new_kernel(entry: extern "C" fn(usize) -> !, arg: usize) -> Box<Thread> {
        unsafe {
            let _kstack = KernelStack::new();
            Box::new(Thread {
                context: Context::new_kernel_thread(entry, arg, _kstack.top(), satp::read().bits()),
                kstack: _kstack,
            })
        }
    }
}

至此我们的调度器已经全部完成,让我们来测试一下他吧:

// in process/mod.rs

mod structs;
mod scheduler;
mod processor;
mod thread_pool;

use structs::Thread;
use alloc::boxed::Box;
use processor::Processor;
use thread_pool::ThreadPool;
use self::scheduler::Scheduler;

pub type Tid = usize;
pub type ExitCode = usize;

static CPU: Processor = Processor::new();

pub fn tick() {
    CPU.tick();
}

pub fn init() {
    println!("+------ now to initialize process ------+");
    let scheduler = Scheduler::new(1);
    let thread_pool = ThreadPool::new(100, scheduler);
    CPU.init(Thread::new_idle(), Box::new(thread_pool));
    let thread0 = Thread::new_kernel(hello_thread, 0);
    CPU.add_thread(thread0);
    let thread1 = Thread::new_kernel(hello_thread, 1);
    CPU.add_thread(thread1);
    let thread2 = Thread::new_kernel(hello_thread, 2);
    CPU.add_thread(thread2);
    let thread3 = Thread::new_kernel(hello_thread, 3);
    CPU.add_thread(thread3);
    let thread4 = Thread::new_kernel(hello_thread, 4);
    CPU.add_thread(thread4);
    CPU.run();
}

#[no_mangle]
pub extern "C" fn hello_thread(arg: usize) -> ! {
    println!("hello thread");
    println!("arg is {}", arg);
    for i in 0..100 {
        println!("{}{}{}{}{}{}{}{}", arg, arg, arg, arg, arg, arg, arg, arg);
        for j in 0..1000 {
        }
    }
    println!("end of thread {}", arg);
    CPU.exit(0)
}

FIX runtime error: panicked at 'Processor is not initialized'

执行 make run ,发现kernel出现了运行时错误:

...
kernel_end:0x80c01000: kernel_size:0xc01000 
+------ now to initialize process ------+
panicked at 'Processor is not initialized', /home/chyyuu/.rustup/toolchains/nightly-2019-03-05-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/option.rs:1038:5
qemu-system-riscv32: terminating on signal 15 from pid 31872 ()

从字面意思上看,是Processor这个结构没有初始化!但仔细检查代码,在如下代码中有new和记忆棒初始化的过程:

// in process/mod.rs
...
static CPU: Processor = Processor::new();
pub fn init() {
    println!("+------ now to initialize process ------+");
    let scheduler = Scheduler::new(1);
    let thread_pool = ThreadPool::new(100, scheduler);
    println!("+------ now to initialize processor ------+");
    CPU.init(Thread::new_idle(), Box::new(thread_pool));
    ...

所以,不应该是这部分的问题。再进一步检查,在init.rs中的rust_main函数在调用process_init()之前,有一些unsafe代码,怀疑是它造成的???。试着把代码删除。再执行 make run ,发现我们的调度器已经能够自动切换线程,线程来回切换也可以正常恢复原先的工作环境,并且在线程结束后能够正常结束退出。

Last updated