# 9. 线程调度

> 本章代码对应 commit ：65ccec7b180cd56e645f359343a793e75a49eb98

## 概要

多线程并发执行需要调度器的辅助。调度器的作用是在合适的时刻选择线程执行，并在合适的时候切换线程，防止一个线程占用或多的资源或阻塞，从而实现 cpu 资源分配相对“公平”。本章我们将：

1. 假想我们已经有了一个调度算法。

   > 算法和数据结构是分离的，所以在实现调度器的时候不依赖于算法
2. 创建线程池（thread pool）用于保存所有线程。
3. 创建线程管理器（processor）通过调度算法管理 thread pool。
4. 实现线程调度相关函数。
5. 引入 Round Robin 调度算法。

## 调度算法

```rust
//in process/scheduler.rs
impl Scheduler {
    pub fn new(max_time_slice: usize) -> Scheduler { /* TODO */ }

    pub fn push(&mut self, tid: Tid) { /* TODO */ }

    pub fn pop(&mut self) -> Option<Tid> { /* TODO */ }

    pub fn tick(&mut self) -> bool { /* TODO */ }

    pub fn exit(&mut self, tid: Tid) { /* TODO */ }
}
```

这是本章我们使用的调度算法的对外接口。功能包括创建（new）、添加线程（push）、获取即将被调用的线程（pop）、提醒算法时钟周期的到来（tick）和退出线程时将线程从调度算法中移出（exit）。

## 线程池（ThreadPool）

线程的运行状态包括但不限于：等待运行、运行中、睡眠和等待退出。这里我们创建一个枚举类型作为进程的状态类型：

```rust
// in process/mod.rs

pub type Tid = usize;
pub type ExitCode = usize;

// in process/structs.rs

use crate::process::{ Tid, ExitCode };

#[derive(Clone)]
pub enum Status {
    Ready,
    Running(Tid),
    Sleeping,
    Exited(ExitCode),
}
```

Tid 是线程 id 。就像每个人的身份证号都是不一样的，每一个线程都有独一无二的 id ，这时线程的标识。

线程池是用于存放线程的容器，只需要包含线程的信息和线程调度算法。创建 **process/thread\_pool.rs** ：

```rust
// in process/thread_pool.rs

use crate::process::scheduler::Scheduler;
use crate::process::structs::*;
use alloc::{ vec::Vec, boxed::Box };

struct ThreadInfo {
   status: Status,
   present: bool,
   thread: Option<Box<Thread>>,
}

pub struct ThreadPool {
    threads: Vec<Option<ThreadInfo>>, // 线程信号量的向量
    scheduler: Box<Scheduler>, // 调度算法
}
```

> [Box](https://github.com/rust-lang/book/blob/master/src/ch15-01-box.md) 允许我们将一个值放在堆上而不是栈上，留在栈上的则是指向堆数据的指针。除了数据被储存在堆上而不是栈上之外，Box 没有额外的性能损失，不过也没有额外的功能。

让我们先来实现他的构造函数和向线程池中加入线程的功能：

```rust
// in process/thread_pool.rs

use crate::process::Tid;

impl ThreadPool {
    pub fn new(size: usize, scheduler: Scheduler) -> ThreadPool {
        ThreadPool {
            threads: {
                let mut th = Vec::new();
                th.resize_with(size, Default::default);
                th
            },
            scheduler: Box::new(scheduler),
        }
    }

    fn alloc_tid(&self) -> Tid {
        for (i, info) in self.threads.iter().enumerate() {
            if info.is_none() {
                return i;
            }
        }
        panic!("alloc tid failed !");
    }

    pub fn add(&mut self, _thread: Box<Thread>) {
        let tid = self.alloc_tid();
        self.threads[tid] = Some(ThreadInfo{
            status: Status::Ready,
            present: true,
            thread: Some(_thread),
        });
        self.scheduler.push(tid);
        println!("tid to alloc: {}", tid);
    }
}
```

构造函数规定了线程的最大数量 size 和调度算法。由于线程数组是已经创建好的，但是默认内容为 None ，所以在添加线程的时候只需要将从 Vec 中找到一个未使用的位置，把新线程的信息传递过去就可以了。同时，不要忘记为调度算法传入线程 id 。

## 调度器

由于创建的调度器是全局的，需要考虑一些安全问题和异步问题。为此需要对成员进行一些包装，不过这不影响实现思路。创建 **processor.rs** ：

```rust
// in process/processor.rs
use core::cell::UnsafeCell;
use alloc::boxed::Box;
use crate::process::Tid;
use crate::process::structs::*;
use crate::process::thread_pool::ThreadPool;

pub struct ProcessorInner {
    pool: Box<ThreadPool>,
    idle: Box<Thread>,
    current: Option<(Tid, Box<Thread>)>,
}

pub struct Processor {
    inner: UnsafeCell<Option<ProcessorInner>>,
}

unsafe impl Sync for Processor {}
```

> 一个实现了 [Sync trait](https://kaisery.gitbooks.io/trpl-zh-cn/content/ch16-04-extensible-concurrency-sync-and-send.html) 的类型可以安全的在多个线程中拥有其值的引用。因为他是标记 trait ，所以不需要手动实现。 [UnsafeCell](https://www.rust-lang.org/zh-CN/documentation.html) 内的元素不严格区分 immutable 和 mutable 。

## 调度器接口实现

这里先列出功能简单明了的几个函数：

```rust
// in process/processor.rs
impl Processor {
    pub const fn new() -> Processor {
        Processor {
            inner: UnsafeCell::new(None),
        }
    }

    pub fn init(&self, idle: Box<Thread>, pool: Box<ThreadPool> ) {
        unsafe {
            *self.inner.get() = Some(ProcessorInner{
                pool,
                idle,
                current: None,
            });
        }
    }

    fn inner(&self) -> &mut ProcessorInner {
        unsafe { &mut *self.inner.get() }
            .as_mut()
            .expect("Processor is not initialized")
    }

    pub fn add_thread(&self, thread: Box<Thread>) {
        self.inner().pool.add(thread);
    }
}
```

在进行线程切换时，为了防止因为中断引起线程切换出错，需要关闭中断，之后再恢复到原先的中断状态。这里我们先实现三个与中断控制相关的函数：

```rust
// in interrupt.rs

#[inline(always)]
pub fn enable_and_wfi() {    // 使能中断并等待中断
    unsafe {
        asm!("csrsi sstatus, 1 << 1; wfi" :::: "volatile");
    }
}

#[inline(always)]
pub fn disable_and_store() -> usize {    // 禁用中断并返回当前中断状态
    let sstatus: usize;
    unsafe {
        asm!("csrci sstatus, 1 << 1" : "=r"(sstatus) ::: "volatile");
    }
    sstatus & (1 << 1)
}

#[inline(always)]
pub fn restore(flags: usize) {    // 根据 flag 设置中断
    unsafe {
        asm!("csrs sstatus, $0" :: "r"(flags) :: "volatile");
    }
}
```

接下来开始实现三个与调度直接相关的重要函数。

> 其中调用了一些 ThreadPool 中尚未实现的函数，将于之后实现

### run

这是整个调度过程最核心的函数，由 idle 线程调用。具体实现如下：

```rust
// in process/processor.rs
use crate::interrupt::{ disable_and_store, enable_and_wfi };

impl Processor {
    pub fn run(&self) -> !{
        let inner = self.inner();
        // 关闭中断，防止此时产生中断异常导致线程切换出错。
        disable_and_store();
        // 循环从线程池中寻找可调度线程
        loop {
            // 如果存在需要被调度的线程
            if let Some(thread) = inner.pool.acquire() {
                inner.current = Some(thread);
                // 切换至需要被调度的线程
                inner.idle.switch_to(&mut *inner.current.as_mut().unwrap().1);
                // 上一个线程已经结束或时间片用完，切换回 idle 线程
                let (tid, thread) = inner.current.take().unwrap();
                println!("thread {} ran just now", tid);
                // 将上一个线程放回线程池中
                inner.pool.retrieve(tid, thread);
            } else {
                // 开启中断并等待中断产生
                enable_and_wfi();
                // 关闭中断，从线程池中寻找可调度线程
                disable_and_store();
            }
        }
    }
}
```

### tick

每产生一次时钟中断（即经过一个时钟周期），就需要通知线程池，让他通过调度算法判断是否需要切换线程：

```rust
// in process/mod.rs

static CPU: Processor = Processor::new();

pub fn tick() {
    CPU.tick();
}

// in interrupt.rs

use crate::process::tick;
fn super_timer() {
    clock_set_next_event();
    unsafe{
        TICK = TICK + 1;
        if TICK % 100 == 0 {
            println!("100 ticks!");
        }
    }
    tick();
}

// in process/processor.rs

use crate::interrupt::restore;

impl Processor {
    pub fn tick(&self) {
        let inner = self.inner();
        if !inner.current.is_none() {
            if inner.pool.tick() {
                let flags = disable_and_store();
                inner
                    .current
                    .as_mut()
                    .unwrap()
                    .1
                    .switch_to(&mut inner.idle);
                // 恢复原先的中断状态
                restore(flags);
            }
        }
    }
}
```

`inner.pool.tick` 会通知线程池和调度算法已经过了一个时钟周期，同时返回一个布尔值：是否需要进行线程切换。如果需要切换至其他线程，则先切换至 idle 线程，然后由 idle 进行调度（回到 `Processer.run`）。

### exit

当线程任务完成之后，就可以通过 `Processor.exit` 结束自己（结束当前线程）：

```rust
// in process/processor.rs
impl Processor {
   pub fn exit(&self, code: usize) -> ! {
       let inner = self.inner();
       let tid = inner.current.as_ref().unwrap().0;
       // 通知线程池该线程即将退出
       inner.pool.exit(tid, code);
       // 切换至 idle 线程，进入调度
       inner
           .current
           .as_mut()
           .unwrap()
           .1
           .switch_to(&mut inner.idle);
       loop {}
   }
}
```

## 线程池接口实现

线程池接口的功能在前文已经提及，也可由函数名判断函数功能。具体实现也较为简单，所以直接给出实现：

```rust
// in process/thread_pool.rs

impl ThreadPool {
    pub fn acquire(&mut self) -> Option<(Tid, Box<Thread>)> {
        if let Some(tid) = self.scheduler.pop() {
            let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
            thread_info.status = Status::Running(tid);
            return Some((tid, thread_info.thread.take().expect("thread not exist ")));
        } else {
            return None;
        }
    }

    pub fn retrieve(&mut self, tid: Tid, thread: Box<Thread> ) {
        let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
        if thread_info.present {
            thread_info.thread = Some(thread);
            thread_info.status = Status::Ready;
            self.scheduler.push(tid);
        }
    }

    pub fn tick(&mut self) -> bool {
        // 通知调度算法时钟周期加一，询问是否需要调度
        self.scheduler.tick()
    }

    pub fn exit(&mut self, tid: Tid, code: usize) {
        self.threads[tid] = Some(ThreadInfo{
            status: Status::Ready,
            present: false,
            thread: None,
        });
        self.scheduler.exit(tid);
        println!("exit code: {}", code);
    }
}
```

> 这里用到了 [Option.take](https://doc.rust-lang.org/std/option/enum.Option.html#method.take) ，功能与所有权转移或浅拷贝相似

## 引入 Round Robin 调度算法

在 Cargo.toml 中加入：

```rust
RoundRobinScheduler = { path  = "crate/RoundRobinScheduler" }
```

创建 **process/scheduler.rs** ：

```rust
use crate::process::Tid;
use RoundRobinScheduler::RRScheduler;

pub struct Scheduler {
    scheduler: RRScheduler,
}

impl Scheduler {
    pub fn new(max_time_slice: usize) -> Scheduler {
        let s = Scheduler {
            scheduler: RRScheduler::new(max_time_slice),
        };
        s
    }

    pub fn push(&mut self, tid: Tid) {
        self.scheduler.push(tid);
    }

    pub fn pop(&mut self) -> Option<Tid> {
        self.scheduler.pop()
    }

    pub fn tick(&mut self) -> bool {
        self.scheduler.tick()
    }

    pub fn exit(&mut self, tid: Tid) {
        self.scheduler.exit(tid);
    }
}
```

最后，由于 `Processor.add_thread` 需要 `Box<Thread>` 类型的参数，所以我们修改一下 **struct Thread** 的构造函数：

```rust
// in process/structs.rs
use alloc::boxed::Box;
use riscv::register::satp;
impl Thread {
    pub fn new_idle() -> Box<Thread> {
        unsafe {
            Box::new(Thread {
                context: Context::null(),
                kstack: KernelStack::new(),
            })
        }
    }

    pub fn new_kernel(entry: extern "C" fn(usize) -> !, arg: usize) -> Box<Thread> {
        unsafe {
            let _kstack = KernelStack::new();
            Box::new(Thread {
                context: Context::new_kernel_thread(entry, arg, _kstack.top(), satp::read().bits()),
                kstack: _kstack,
            })
        }
    }
}
```

至此我们的调度器已经全部完成，让我们来测试一下他吧：

```rust
// in process/mod.rs

mod structs;
mod scheduler;
mod processor;
mod thread_pool;

use structs::Thread;
use alloc::boxed::Box;
use processor::Processor;
use thread_pool::ThreadPool;
use self::scheduler::Scheduler;

pub type Tid = usize;
pub type ExitCode = usize;

static CPU: Processor = Processor::new();

pub fn tick() {
    CPU.tick();
}

pub fn init() {
    println!("+------ now to initialize process ------+");
    let scheduler = Scheduler::new(1);
    let thread_pool = ThreadPool::new(100, scheduler);
    CPU.init(Thread::new_idle(), Box::new(thread_pool));
    let thread0 = Thread::new_kernel(hello_thread, 0);
    CPU.add_thread(thread0);
    let thread1 = Thread::new_kernel(hello_thread, 1);
    CPU.add_thread(thread1);
    let thread2 = Thread::new_kernel(hello_thread, 2);
    CPU.add_thread(thread2);
    let thread3 = Thread::new_kernel(hello_thread, 3);
    CPU.add_thread(thread3);
    let thread4 = Thread::new_kernel(hello_thread, 4);
    CPU.add_thread(thread4);
    CPU.run();
}

#[no_mangle]
pub extern "C" fn hello_thread(arg: usize) -> ! {
    println!("hello thread");
    println!("arg is {}", arg);
    for i in 0..100 {
        println!("{}{}{}{}{}{}{}{}", arg, arg, arg, arg, arg, arg, arg, arg);
        for j in 0..1000 {
        }
    }
    println!("end of thread {}", arg);
    CPU.exit(0)
}
```

### FIX runtime error:  panicked at 'Processor is not initialized'

执行 `make run` ，发现kernel出现了运行时错误：

```
...
kernel_end:0x80c01000: kernel_size:0xc01000 
+------ now to initialize process ------+
panicked at 'Processor is not initialized', /home/chyyuu/.rustup/toolchains/nightly-2019-03-05-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/option.rs:1038:5
qemu-system-riscv32: terminating on signal 15 from pid 31872 ()
```

从字面意思上看，是`Processor`这个结构没有初始化！但仔细检查代码，在如下代码中有`new`和记忆棒初始化的过程：

```rust
// in process/mod.rs
...
static CPU: Processor = Processor::new();
pub fn init() {
    println!("+------ now to initialize process ------+");
    let scheduler = Scheduler::new(1);
    let thread_pool = ThreadPool::new(100, scheduler);
    println!("+------ now to initialize processor ------+");
    CPU.init(Thread::new_idle(), Box::new(thread_pool));
    ...
```

所以，不应该是这部分的问题。再进一步检查，在`init.rs`中的`rust_main`函数在调用`process_init()`之前，有一些unsafe代码，怀疑是它造成的？？？。试着把代码删除。再执行 `make run` ，发现我们的调度器已经能够自动切换线程，线程来回切换也可以正常恢复原先的工作环境，并且在线程结束后能够正常结束退出。
