本章代码对应 commit :65ccec7b180cd56e645f359343a793e75a49eb98
概要
多线程并发执行需要调度器的辅助。调度器的作用是在合适的时刻选择线程执行,并在合适的时候切换线程,防止一个线程占用或多的资源或阻塞,从而实现 cpu 资源分配相对“公平”。本章我们将:
假想我们已经有了一个调度算法。
算法和数据结构是分离的,所以在实现调度器的时候不依赖于算法
创建线程池(thread pool)用于保存所有线程。
创建线程管理器(processor)通过调度算法管理 thread pool。
调度算法
Copy //in process/scheduler.rs
impl Scheduler {
pub fn new (max_time_slice : usize ) -> Scheduler { /* TODO */ }
pub fn push ( &mut self, tid : Tid ) { /* TODO */ }
pub fn pop ( &mut self) -> Option < Tid > { /* TODO */ }
pub fn tick ( &mut self) -> bool { /* TODO */ }
pub fn exit ( &mut self, tid : Tid ) { /* TODO */ }
}
这是本章我们使用的调度算法的对外接口。功能包括创建(new)、添加线程(push)、获取即将被调用的线程(pop)、提醒算法时钟周期的到来(tick)和退出线程时将线程从调度算法中移出(exit)。
线程池(ThreadPool)
线程的运行状态包括但不限于:等待运行、运行中、睡眠和等待退出。这里我们创建一个枚举类型作为进程的状态类型:
Copy // in process/mod.rs
pub type Tid = usize ;
pub type ExitCode = usize ;
// in process/structs.rs
use crate:: process :: { Tid , ExitCode };
#[derive( Clone )]
pub enum Status {
Ready ,
Running ( Tid ),
Sleeping ,
Exited ( ExitCode ),
}
Tid 是线程 id 。就像每个人的身份证号都是不一样的,每一个线程都有独一无二的 id ,这时线程的标识。
线程池是用于存放线程的容器,只需要包含线程的信息和线程调度算法。创建 process/thread_pool.rs :
Copy // in process/thread_pool.rs
use crate:: process :: scheduler :: Scheduler ;
use crate:: process :: structs ::* ;
use alloc :: { vec :: Vec , boxed :: Box };
struct ThreadInfo {
status : Status ,
present : bool ,
thread : Option < Box < Thread >>,
}
pub struct ThreadPool {
threads : Vec < Option < ThreadInfo >>, // 线程信号量的向量
scheduler : Box < Scheduler >, // 调度算法
}
Box 允许我们将一个值放在堆上而不是栈上,留在栈上的则是指向堆数据的指针。除了数据被储存在堆上而不是栈上之外,Box 没有额外的性能损失,不过也没有额外的功能。
让我们先来实现他的构造函数和向线程池中加入线程的功能:
Copy // in process/thread_pool.rs
use crate:: process :: Tid ;
impl ThreadPool {
pub fn new (size : usize , scheduler : Scheduler ) -> ThreadPool {
ThreadPool {
threads : {
let mut th = Vec :: new ();
th . resize_with (size, Default :: default);
th
},
scheduler : Box :: new (scheduler),
}
}
fn alloc_tid ( & self) -> Tid {
for (i, info) in self . threads . iter () . enumerate () {
if info . is_none () {
return i;
}
}
panic! ( "alloc tid failed !" );
}
pub fn add ( &mut self, _thread : Box < Thread >) {
let tid = self . alloc_tid ();
self . threads[tid] = Some ( ThreadInfo {
status : Status :: Ready ,
present : true ,
thread : Some (_thread),
});
self . scheduler . push (tid);
println! ( "tid to alloc: {}" , tid);
}
}
构造函数规定了线程的最大数量 size 和调度算法。由于线程数组是已经创建好的,但是默认内容为 None ,所以在添加线程的时候只需要将从 Vec 中找到一个未使用的位置,把新线程的信息传递过去就可以了。同时,不要忘记为调度算法传入线程 id 。
调度器
由于创建的调度器是全局的,需要考虑一些安全问题和异步问题。为此需要对成员进行一些包装,不过这不影响实现思路。创建 processor.rs :
Copy // in process/processor.rs
use core :: cell :: UnsafeCell ;
use alloc :: boxed :: Box ;
use crate:: process :: Tid ;
use crate:: process :: structs ::* ;
use crate:: process :: thread_pool :: ThreadPool ;
pub struct ProcessorInner {
pool : Box < ThreadPool >,
idle : Box < Thread >,
current : Option <( Tid , Box < Thread >)>,
}
pub struct Processor {
inner : UnsafeCell < Option < ProcessorInner >>,
}
unsafe impl Sync for Processor {}
一个实现了 Sync trait 的类型可以安全的在多个线程中拥有其值的引用。因为他是标记 trait ,所以不需要手动实现。 UnsafeCell 内的元素不严格区分 immutable 和 mutable 。
调度器接口实现
这里先列出功能简单明了的几个函数:
Copy // in process/processor.rs
impl Processor {
pub const fn new () -> Processor {
Processor {
inner : UnsafeCell :: new ( None ),
}
}
pub fn init ( & self, idle : Box < Thread >, pool : Box < ThreadPool > ) {
unsafe {
* self . inner . get () = Some ( ProcessorInner {
pool,
idle,
current : None ,
});
}
}
fn inner ( & self) -> &mut ProcessorInner {
unsafe { &mut * self . inner . get () }
. as_mut ()
. expect ( "Processor is not initialized" )
}
pub fn add_thread ( & self, thread : Box < Thread >) {
self . inner () . pool . add (thread);
}
}
在进行线程切换时,为了防止因为中断引起线程切换出错,需要关闭中断,之后再恢复到原先的中断状态。这里我们先实现三个与中断控制相关的函数:
Copy // in interrupt.rs
#[inline(always)]
pub fn enable_and_wfi () { // 使能中断并等待中断
unsafe {
asm! ( "csrsi sstatus, 1 << 1; wfi" :::: "volatile" );
}
}
#[inline(always)]
pub fn disable_and_store () -> usize { // 禁用中断并返回当前中断状态
let sstatus : usize ;
unsafe {
asm! ( "csrci sstatus, 1 << 1" : "=r" (sstatus) ::: "volatile" );
}
sstatus & ( 1 << 1 )
}
#[inline(always)]
pub fn restore (flags : usize ) { // 根据 flag 设置中断
unsafe {
asm! ( "csrs sstatus, $0" :: "r" (flags) :: "volatile" );
}
}
接下来开始实现三个与调度直接相关的重要函数。
其中调用了一些 ThreadPool 中尚未实现的函数,将于之后实现
run
这是整个调度过程最核心的函数,由 idle 线程调用。具体实现如下:
Copy // in process/processor.rs
use crate:: interrupt :: { disable_and_store, enable_and_wfi };
impl Processor {
pub fn run ( & self) -> ! {
let inner = self . inner ();
// 关闭中断,防止此时产生中断异常导致线程切换出错。
disable_and_store ();
// 循环从线程池中寻找可调度线程
loop {
// 如果存在需要被调度的线程
if let Some (thread) = inner . pool . acquire () {
inner . current = Some (thread);
// 切换至需要被调度的线程
inner . idle . switch_to ( &mut * inner . current . as_mut () . unwrap () . 1 );
// 上一个线程已经结束或时间片用完,切换回 idle 线程
let (tid, thread) = inner . current . take () . unwrap ();
println! ( "thread {} ran just now" , tid);
// 将上一个线程放回线程池中
inner . pool . retrieve (tid, thread);
} else {
// 开启中断并等待中断产生
enable_and_wfi ();
// 关闭中断,从线程池中寻找可调度线程
disable_and_store ();
}
}
}
}
tick
每产生一次时钟中断(即经过一个时钟周期),就需要通知线程池,让他通过调度算法判断是否需要切换线程:
Copy // in process/mod.rs
static CPU : Processor = Processor :: new ();
pub fn tick () {
CPU . tick ();
}
// in interrupt.rs
use crate:: process :: tick;
fn super_timer () {
clock_set_next_event ();
unsafe {
TICK = TICK + 1 ;
if TICK % 100 == 0 {
println! ( "100 ticks!" );
}
}
tick ();
}
// in process/processor.rs
use crate:: interrupt :: restore;
impl Processor {
pub fn tick ( & self) {
let inner = self . inner ();
if ! inner . current . is_none () {
if inner . pool . tick () {
let flags = disable_and_store ();
inner
. current
. as_mut ()
. unwrap ()
. 1
. switch_to ( &mut inner . idle);
// 恢复原先的中断状态
restore (flags);
}
}
}
}
inner.pool.tick
会通知线程池和调度算法已经过了一个时钟周期,同时返回一个布尔值:是否需要进行线程切换。如果需要切换至其他线程,则先切换至 idle 线程,然后由 idle 进行调度(回到 Processer.run
)。
exit
当线程任务完成之后,就可以通过 Processor.exit
结束自己(结束当前线程):
Copy // in process/processor.rs
impl Processor {
pub fn exit ( & self, code : usize ) -> ! {
let inner = self . inner ();
let tid = inner . current . as_ref () . unwrap () . 0 ;
// 通知线程池该线程即将退出
inner . pool . exit (tid, code);
// 切换至 idle 线程,进入调度
inner
. current
. as_mut ()
. unwrap ()
. 1
. switch_to ( &mut inner . idle);
loop {}
}
}
线程池接口实现
线程池接口的功能在前文已经提及,也可由函数名判断函数功能。具体实现也较为简单,所以直接给出实现:
Copy // in process/thread_pool.rs
impl ThreadPool {
pub fn acquire ( &mut self) -> Option <( Tid , Box < Thread >)> {
if let Some (tid) = self . scheduler . pop () {
let mut thread_info = self . threads[tid] . as_mut () . expect ( "thread not exist !" );
thread_info . status = Status :: Running (tid);
return Some ((tid, thread_info . thread . take () . expect ( "thread not exist " )));
} else {
return None ;
}
}
pub fn retrieve ( &mut self, tid : Tid , thread : Box < Thread > ) {
let mut thread_info = self . threads[tid] . as_mut () . expect ( "thread not exist !" );
if thread_info . present {
thread_info . thread = Some (thread);
thread_info . status = Status :: Ready ;
self . scheduler . push (tid);
}
}
pub fn tick ( &mut self) -> bool {
// 通知调度算法时钟周期加一,询问是否需要调度
self . scheduler . tick ()
}
pub fn exit ( &mut self, tid : Tid , code : usize ) {
self . threads[tid] = Some ( ThreadInfo {
status : Status :: Ready ,
present : false ,
thread : None ,
});
self . scheduler . exit (tid);
println! ( "exit code: {}" , code);
}
}
这里用到了 Option.take ,功能与所有权转移或浅拷贝相似
引入 Round Robin 调度算法
在 Cargo.toml 中加入:
Copy RoundRobinScheduler = { path = "crate/RoundRobinScheduler" }
创建 process/scheduler.rs :
Copy use crate:: process :: Tid ;
use RoundRobinScheduler :: RRScheduler ;
pub struct Scheduler {
scheduler : RRScheduler ,
}
impl Scheduler {
pub fn new (max_time_slice : usize ) -> Scheduler {
let s = Scheduler {
scheduler : RRScheduler :: new (max_time_slice),
};
s
}
pub fn push ( &mut self, tid : Tid ) {
self . scheduler . push (tid);
}
pub fn pop ( &mut self) -> Option < Tid > {
self . scheduler . pop ()
}
pub fn tick ( &mut self) -> bool {
self . scheduler . tick ()
}
pub fn exit ( &mut self, tid : Tid ) {
self . scheduler . exit (tid);
}
}
最后,由于 Processor.add_thread
需要 Box<Thread>
类型的参数,所以我们修改一下 struct Thread 的构造函数:
Copy // in process/structs.rs
use alloc :: boxed :: Box ;
use riscv :: register :: satp;
impl Thread {
pub fn new_idle () -> Box < Thread > {
unsafe {
Box :: new ( Thread {
context : Context :: null (),
kstack : KernelStack :: new (),
})
}
}
pub fn new_kernel (entry : extern "C" fn ( usize ) -> ! , arg : usize ) -> Box < Thread > {
unsafe {
let _kstack = KernelStack :: new ();
Box :: new ( Thread {
context : Context :: new_kernel_thread (entry, arg, _kstack . top (), satp :: read () . bits ()),
kstack : _kstack,
})
}
}
}
至此我们的调度器已经全部完成,让我们来测试一下他吧:
Copy // in process/mod.rs
mod structs;
mod scheduler;
mod processor;
mod thread_pool;
use structs :: Thread ;
use alloc :: boxed :: Box ;
use processor :: Processor ;
use thread_pool :: ThreadPool ;
use self :: scheduler :: Scheduler ;
pub type Tid = usize ;
pub type ExitCode = usize ;
static CPU : Processor = Processor :: new ();
pub fn tick () {
CPU . tick ();
}
pub fn init () {
println! ( "+------ now to initialize process ------+" );
let scheduler = Scheduler :: new ( 1 );
let thread_pool = ThreadPool :: new ( 100 , scheduler);
CPU . init (Thread :: new_idle (), Box :: new (thread_pool));
let thread0 = Thread :: new_kernel (hello_thread, 0 );
CPU . add_thread (thread0);
let thread1 = Thread :: new_kernel (hello_thread, 1 );
CPU . add_thread (thread1);
let thread2 = Thread :: new_kernel (hello_thread, 2 );
CPU . add_thread (thread2);
let thread3 = Thread :: new_kernel (hello_thread, 3 );
CPU . add_thread (thread3);
let thread4 = Thread :: new_kernel (hello_thread, 4 );
CPU . add_thread (thread4);
CPU . run ();
}
#[no_mangle]
pub extern "C" fn hello_thread (arg : usize ) -> ! {
println! ( "hello thread" );
println! ( "arg is {}" , arg);
for i in 0 .. 100 {
println! ( "{}{}{}{}{}{}{}{}" , arg, arg, arg, arg, arg, arg, arg, arg);
for j in 0 .. 1000 {
}
}
println! ( "end of thread {}" , arg);
CPU . exit ( 0 )
}
FIX runtime error: panicked at 'Processor is not initialized'
执行 make run
,发现kernel出现了运行时错误:
Copy ...
kernel_end:0x80c01000: kernel_size:0xc01000
+------ now to initialize process ------+
panicked at 'Processor is not initialized', /home/chyyuu/.rustup/toolchains/nightly-2019-03-05-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/option.rs:1038:5
qemu-system-riscv32: terminating on signal 15 from pid 31872 ()
从字面意思上看,是Processor
这个结构没有初始化!但仔细检查代码,在如下代码中有new
和记忆棒初始化的过程:
Copy // in process/mod.rs
...
static CPU : Processor = Processor :: new ();
pub fn init () {
println! ( "+------ now to initialize process ------+" );
let scheduler = Scheduler :: new ( 1 );
let thread_pool = ThreadPool :: new ( 100 , scheduler);
println! ( "+------ now to initialize processor ------+" );
CPU . init (Thread :: new_idle (), Box :: new (thread_pool));
...
所以,不应该是这部分的问题。再进一步检查,在init.rs
中的rust_main
函数在调用process_init()
之前,有一些unsafe代码,怀疑是它造成的???。试着把代码删除。再执行 make run
,发现我们的调度器已经能够自动切换线程,线程来回切换也可以正常恢复原先的工作环境,并且在线程结束后能够正常结束退出。