> For the complete documentation index, see [llms.txt](https://xy-plus.gitbook.io/rcore-step-by-step/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://xy-plus.gitbook.io/rcore-step-by-step/xian-cheng-tiao-du.md).

# 9. 线程调度

> 本章代码对应 commit ：65ccec7b180cd56e645f359343a793e75a49eb98

## 概要

多线程并发执行需要调度器的辅助。调度器的作用是在合适的时刻选择线程执行，并在合适的时候切换线程，防止一个线程占用或多的资源或阻塞，从而实现 cpu 资源分配相对“公平”。本章我们将：

1. 假想我们已经有了一个调度算法。

   > 算法和数据结构是分离的，所以在实现调度器的时候不依赖于算法
2. 创建线程池（thread pool）用于保存所有线程。
3. 创建线程管理器（processor）通过调度算法管理 thread pool。
4. 实现线程调度相关函数。
5. 引入 Round Robin 调度算法。

## 调度算法

```rust
//in process/scheduler.rs
impl Scheduler {
    pub fn new(max_time_slice: usize) -> Scheduler { /* TODO */ }

    pub fn push(&mut self, tid: Tid) { /* TODO */ }

    pub fn pop(&mut self) -> Option<Tid> { /* TODO */ }

    pub fn tick(&mut self) -> bool { /* TODO */ }

    pub fn exit(&mut self, tid: Tid) { /* TODO */ }
}
```

这是本章我们使用的调度算法的对外接口。功能包括创建（new）、添加线程（push）、获取即将被调用的线程（pop）、提醒算法时钟周期的到来（tick）和退出线程时将线程从调度算法中移出（exit）。

## 线程池（ThreadPool）

线程的运行状态包括但不限于：等待运行、运行中、睡眠和等待退出。这里我们创建一个枚举类型作为进程的状态类型：

```rust
// in process/mod.rs

pub type Tid = usize;
pub type ExitCode = usize;

// in process/structs.rs

use crate::process::{ Tid, ExitCode };

#[derive(Clone)]
pub enum Status {
    Ready,
    Running(Tid),
    Sleeping,
    Exited(ExitCode),
}
```

Tid 是线程 id 。就像每个人的身份证号都是不一样的，每一个线程都有独一无二的 id ，这时线程的标识。

线程池是用于存放线程的容器，只需要包含线程的信息和线程调度算法。创建 **process/thread\_pool.rs** ：

```rust
// in process/thread_pool.rs

use crate::process::scheduler::Scheduler;
use crate::process::structs::*;
use alloc::{ vec::Vec, boxed::Box };

struct ThreadInfo {
   status: Status,
   present: bool,
   thread: Option<Box<Thread>>,
}

pub struct ThreadPool {
    threads: Vec<Option<ThreadInfo>>, // 线程信号量的向量
    scheduler: Box<Scheduler>, // 调度算法
}
```

> [Box](https://github.com/rust-lang/book/blob/master/src/ch15-01-box.md) 允许我们将一个值放在堆上而不是栈上，留在栈上的则是指向堆数据的指针。除了数据被储存在堆上而不是栈上之外，Box 没有额外的性能损失，不过也没有额外的功能。

让我们先来实现他的构造函数和向线程池中加入线程的功能：

```rust
// in process/thread_pool.rs

use crate::process::Tid;

impl ThreadPool {
    pub fn new(size: usize, scheduler: Scheduler) -> ThreadPool {
        ThreadPool {
            threads: {
                let mut th = Vec::new();
                th.resize_with(size, Default::default);
                th
            },
            scheduler: Box::new(scheduler),
        }
    }

    fn alloc_tid(&self) -> Tid {
        for (i, info) in self.threads.iter().enumerate() {
            if info.is_none() {
                return i;
            }
        }
        panic!("alloc tid failed !");
    }

    pub fn add(&mut self, _thread: Box<Thread>) {
        let tid = self.alloc_tid();
        self.threads[tid] = Some(ThreadInfo{
            status: Status::Ready,
            present: true,
            thread: Some(_thread),
        });
        self.scheduler.push(tid);
        println!("tid to alloc: {}", tid);
    }
}
```

构造函数规定了线程的最大数量 size 和调度算法。由于线程数组是已经创建好的，但是默认内容为 None ，所以在添加线程的时候只需要将从 Vec 中找到一个未使用的位置，把新线程的信息传递过去就可以了。同时，不要忘记为调度算法传入线程 id 。

## 调度器

由于创建的调度器是全局的，需要考虑一些安全问题和异步问题。为此需要对成员进行一些包装，不过这不影响实现思路。创建 **processor.rs** ：

```rust
// in process/processor.rs
use core::cell::UnsafeCell;
use alloc::boxed::Box;
use crate::process::Tid;
use crate::process::structs::*;
use crate::process::thread_pool::ThreadPool;

pub struct ProcessorInner {
    pool: Box<ThreadPool>,
    idle: Box<Thread>,
    current: Option<(Tid, Box<Thread>)>,
}

pub struct Processor {
    inner: UnsafeCell<Option<ProcessorInner>>,
}

unsafe impl Sync for Processor {}
```

> 一个实现了 [Sync trait](https://kaisery.gitbooks.io/trpl-zh-cn/content/ch16-04-extensible-concurrency-sync-and-send.html) 的类型可以安全的在多个线程中拥有其值的引用。因为他是标记 trait ，所以不需要手动实现。 [UnsafeCell](https://www.rust-lang.org/zh-CN/documentation.html) 内的元素不严格区分 immutable 和 mutable 。

## 调度器接口实现

这里先列出功能简单明了的几个函数：

```rust
// in process/processor.rs
impl Processor {
    pub const fn new() -> Processor {
        Processor {
            inner: UnsafeCell::new(None),
        }
    }

    pub fn init(&self, idle: Box<Thread>, pool: Box<ThreadPool> ) {
        unsafe {
            *self.inner.get() = Some(ProcessorInner{
                pool,
                idle,
                current: None,
            });
        }
    }

    fn inner(&self) -> &mut ProcessorInner {
        unsafe { &mut *self.inner.get() }
            .as_mut()
            .expect("Processor is not initialized")
    }

    pub fn add_thread(&self, thread: Box<Thread>) {
        self.inner().pool.add(thread);
    }
}
```

在进行线程切换时，为了防止因为中断引起线程切换出错，需要关闭中断，之后再恢复到原先的中断状态。这里我们先实现三个与中断控制相关的函数：

```rust
// in interrupt.rs

#[inline(always)]
pub fn enable_and_wfi() {    // 使能中断并等待中断
    unsafe {
        asm!("csrsi sstatus, 1 << 1; wfi" :::: "volatile");
    }
}

#[inline(always)]
pub fn disable_and_store() -> usize {    // 禁用中断并返回当前中断状态
    let sstatus: usize;
    unsafe {
        asm!("csrci sstatus, 1 << 1" : "=r"(sstatus) ::: "volatile");
    }
    sstatus & (1 << 1)
}

#[inline(always)]
pub fn restore(flags: usize) {    // 根据 flag 设置中断
    unsafe {
        asm!("csrs sstatus, $0" :: "r"(flags) :: "volatile");
    }
}
```

接下来开始实现三个与调度直接相关的重要函数。

> 其中调用了一些 ThreadPool 中尚未实现的函数，将于之后实现

### run

这是整个调度过程最核心的函数，由 idle 线程调用。具体实现如下：

```rust
// in process/processor.rs
use crate::interrupt::{ disable_and_store, enable_and_wfi };

impl Processor {
    pub fn run(&self) -> !{
        let inner = self.inner();
        // 关闭中断，防止此时产生中断异常导致线程切换出错。
        disable_and_store();
        // 循环从线程池中寻找可调度线程
        loop {
            // 如果存在需要被调度的线程
            if let Some(thread) = inner.pool.acquire() {
                inner.current = Some(thread);
                // 切换至需要被调度的线程
                inner.idle.switch_to(&mut *inner.current.as_mut().unwrap().1);
                // 上一个线程已经结束或时间片用完，切换回 idle 线程
                let (tid, thread) = inner.current.take().unwrap();
                println!("thread {} ran just now", tid);
                // 将上一个线程放回线程池中
                inner.pool.retrieve(tid, thread);
            } else {
                // 开启中断并等待中断产生
                enable_and_wfi();
                // 关闭中断，从线程池中寻找可调度线程
                disable_and_store();
            }
        }
    }
}
```

### tick

每产生一次时钟中断（即经过一个时钟周期），就需要通知线程池，让他通过调度算法判断是否需要切换线程：

```rust
// in process/mod.rs

static CPU: Processor = Processor::new();

pub fn tick() {
    CPU.tick();
}

// in interrupt.rs

use crate::process::tick;
fn super_timer() {
    clock_set_next_event();
    unsafe{
        TICK = TICK + 1;
        if TICK % 100 == 0 {
            println!("100 ticks!");
        }
    }
    tick();
}

// in process/processor.rs

use crate::interrupt::restore;

impl Processor {
    pub fn tick(&self) {
        let inner = self.inner();
        if !inner.current.is_none() {
            if inner.pool.tick() {
                let flags = disable_and_store();
                inner
                    .current
                    .as_mut()
                    .unwrap()
                    .1
                    .switch_to(&mut inner.idle);
                // 恢复原先的中断状态
                restore(flags);
            }
        }
    }
}
```

`inner.pool.tick` 会通知线程池和调度算法已经过了一个时钟周期，同时返回一个布尔值：是否需要进行线程切换。如果需要切换至其他线程，则先切换至 idle 线程，然后由 idle 进行调度（回到 `Processer.run`）。

### exit

当线程任务完成之后，就可以通过 `Processor.exit` 结束自己（结束当前线程）：

```rust
// in process/processor.rs
impl Processor {
   pub fn exit(&self, code: usize) -> ! {
       let inner = self.inner();
       let tid = inner.current.as_ref().unwrap().0;
       // 通知线程池该线程即将退出
       inner.pool.exit(tid, code);
       // 切换至 idle 线程，进入调度
       inner
           .current
           .as_mut()
           .unwrap()
           .1
           .switch_to(&mut inner.idle);
       loop {}
   }
}
```

## 线程池接口实现

线程池接口的功能在前文已经提及，也可由函数名判断函数功能。具体实现也较为简单，所以直接给出实现：

```rust
// in process/thread_pool.rs

impl ThreadPool {
    pub fn acquire(&mut self) -> Option<(Tid, Box<Thread>)> {
        if let Some(tid) = self.scheduler.pop() {
            let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
            thread_info.status = Status::Running(tid);
            return Some((tid, thread_info.thread.take().expect("thread not exist ")));
        } else {
            return None;
        }
    }

    pub fn retrieve(&mut self, tid: Tid, thread: Box<Thread> ) {
        let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
        if thread_info.present {
            thread_info.thread = Some(thread);
            thread_info.status = Status::Ready;
            self.scheduler.push(tid);
        }
    }

    pub fn tick(&mut self) -> bool {
        // 通知调度算法时钟周期加一，询问是否需要调度
        self.scheduler.tick()
    }

    pub fn exit(&mut self, tid: Tid, code: usize) {
        self.threads[tid] = Some(ThreadInfo{
            status: Status::Ready,
            present: false,
            thread: None,
        });
        self.scheduler.exit(tid);
        println!("exit code: {}", code);
    }
}
```

> 这里用到了 [Option.take](https://doc.rust-lang.org/std/option/enum.Option.html#method.take) ，功能与所有权转移或浅拷贝相似

## 引入 Round Robin 调度算法

在 Cargo.toml 中加入：

```rust
RoundRobinScheduler = { path  = "crate/RoundRobinScheduler" }
```

创建 **process/scheduler.rs** ：

```rust
use crate::process::Tid;
use RoundRobinScheduler::RRScheduler;

pub struct Scheduler {
    scheduler: RRScheduler,
}

impl Scheduler {
    pub fn new(max_time_slice: usize) -> Scheduler {
        let s = Scheduler {
            scheduler: RRScheduler::new(max_time_slice),
        };
        s
    }

    pub fn push(&mut self, tid: Tid) {
        self.scheduler.push(tid);
    }

    pub fn pop(&mut self) -> Option<Tid> {
        self.scheduler.pop()
    }

    pub fn tick(&mut self) -> bool {
        self.scheduler.tick()
    }

    pub fn exit(&mut self, tid: Tid) {
        self.scheduler.exit(tid);
    }
}
```

最后，由于 `Processor.add_thread` 需要 `Box<Thread>` 类型的参数，所以我们修改一下 **struct Thread** 的构造函数：

```rust
// in process/structs.rs
use alloc::boxed::Box;
use riscv::register::satp;
impl Thread {
    pub fn new_idle() -> Box<Thread> {
        unsafe {
            Box::new(Thread {
                context: Context::null(),
                kstack: KernelStack::new(),
            })
        }
    }

    pub fn new_kernel(entry: extern "C" fn(usize) -> !, arg: usize) -> Box<Thread> {
        unsafe {
            let _kstack = KernelStack::new();
            Box::new(Thread {
                context: Context::new_kernel_thread(entry, arg, _kstack.top(), satp::read().bits()),
                kstack: _kstack,
            })
        }
    }
}
```

至此我们的调度器已经全部完成，让我们来测试一下他吧：

```rust
// in process/mod.rs

mod structs;
mod scheduler;
mod processor;
mod thread_pool;

use structs::Thread;
use alloc::boxed::Box;
use processor::Processor;
use thread_pool::ThreadPool;
use self::scheduler::Scheduler;

pub type Tid = usize;
pub type ExitCode = usize;

static CPU: Processor = Processor::new();

pub fn tick() {
    CPU.tick();
}

pub fn init() {
    println!("+------ now to initialize process ------+");
    let scheduler = Scheduler::new(1);
    let thread_pool = ThreadPool::new(100, scheduler);
    CPU.init(Thread::new_idle(), Box::new(thread_pool));
    let thread0 = Thread::new_kernel(hello_thread, 0);
    CPU.add_thread(thread0);
    let thread1 = Thread::new_kernel(hello_thread, 1);
    CPU.add_thread(thread1);
    let thread2 = Thread::new_kernel(hello_thread, 2);
    CPU.add_thread(thread2);
    let thread3 = Thread::new_kernel(hello_thread, 3);
    CPU.add_thread(thread3);
    let thread4 = Thread::new_kernel(hello_thread, 4);
    CPU.add_thread(thread4);
    CPU.run();
}

#[no_mangle]
pub extern "C" fn hello_thread(arg: usize) -> ! {
    println!("hello thread");
    println!("arg is {}", arg);
    for i in 0..100 {
        println!("{}{}{}{}{}{}{}{}", arg, arg, arg, arg, arg, arg, arg, arg);
        for j in 0..1000 {
        }
    }
    println!("end of thread {}", arg);
    CPU.exit(0)
}
```

### FIX runtime error:  panicked at 'Processor is not initialized'

执行 `make run` ，发现kernel出现了运行时错误：

```
...
kernel_end:0x80c01000: kernel_size:0xc01000 
+------ now to initialize process ------+
panicked at 'Processor is not initialized', /home/chyyuu/.rustup/toolchains/nightly-2019-03-05-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/option.rs:1038:5
qemu-system-riscv32: terminating on signal 15 from pid 31872 ()
```

从字面意思上看，是`Processor`这个结构没有初始化！但仔细检查代码，在如下代码中有`new`和记忆棒初始化的过程：

```rust
// in process/mod.rs
...
static CPU: Processor = Processor::new();
pub fn init() {
    println!("+------ now to initialize process ------+");
    let scheduler = Scheduler::new(1);
    let thread_pool = ThreadPool::new(100, scheduler);
    println!("+------ now to initialize processor ------+");
    CPU.init(Thread::new_idle(), Box::new(thread_pool));
    ...
```

所以，不应该是这部分的问题。再进一步检查，在`init.rs`中的`rust_main`函数在调用`process_init()`之前，有一些unsafe代码，怀疑是它造成的？？？。试着把代码删除。再执行 `make run` ，发现我们的调度器已经能够自动切换线程，线程来回切换也可以正常恢复原先的工作环境，并且在线程结束后能够正常结束退出。


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://xy-plus.gitbook.io/rcore-step-by-step/xian-cheng-tiao-du.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
