ではここからランタイムの動作を見ていきましょう。最初に、ランタイムそのものはいつ起動しているのかを見ていきます。
そもそもランタイムはいくつも動作させるものではありませんよね?なので、最初に必要になったときだけランタイムを起動して、以後起動したランタイムを参照するようにしたいです。そういった用途では、once_cell
というライブラリのLazy
が使えます。次のコードはランタイムの定義です。
#![allow(unused_variables)]
fn main() {
use std::thread;
use once_cell::sync::Lazy;
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
thread::Builder::new()
.name("async-std/runtime".to_string())
.spawn(|| abort_on_panic(|| RUNTIME.run()))
.expect("cannot start a runtime thread");
Runtime::new()
});
}
RUNTIME という static 変数を定義しています。この変数は最初に参照されたときに、Lazy::new
の引数のクロージャが呼び出されます。つまり次の部分です。
#![allow(unused_variables)]
fn main() {
|| {
thread::Builder::new()
.name("async-std/runtime".to_string())
.spawn(|| abort_on_panic(|| RUNTIME.run()))
.expect("cannot start a runtime thread");
Runtime::new()
}
}
そして、2度目以降の参照では RUNTIME は初期化時のクロージャの返り値として見られます。つまり、最初の参照では、ランタイムの動作用のスレッドが起動され、2度目以降の参照では動作している、Runtime
への参照になるということです。Runtime への何かしらの処理(非同期タスクの登録など)はすべてこの RUNTIME 変数から行われるため複数のランタイムを起動してしまうこともありません。また、ランタイムは必要になるまで起動されないので、無駄にリソースを食いつぶすこともありません。
では、起動時の非同期ランタイムの持つ情報は初期状態でどの様になっているのでしょうか?次のコードは Runtime のコンストラクタです。
#![allow(unused_variables)]
fn main() {
pub fn new() -> Runtime {
let cpus = num_cpus::get().max(1);
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();
let stealers = processors.iter().map(|p| p.worker.stealer()).collect();
Runtime {
reactor: Reactor::new().unwrap(),
injector: Injector::new(),
stealers,
sched: Mutex::new(Scheduler {
processors,
machines: Vec::new(),
progress: false,
polling: false,
}),
}
}
}
次にランタイム用スレッドでは実際にどのような処理が行われているかを見ていきましょう。コードで言うとRUNTIME.run()
の部分です。ここを見ていきましょう!
コードは多少簡略化していますが、次のようになっています。このコードは一気に読むには少し多いので、ポイントに絞って簡略化したコードをもとに説明していきます。小分けの説明をした後にこのコードに戻ってくるとスルスルっと理解できるはずです。
#![allow(unused_variables)]
fn main() {
pub fn run(&self) {
scope(|s| {
let mut idle = 0;
let mut delay = 0;
loop {
for m in self.make_machines() {
idle = 0;
s.builder()
.name("async-std/machine".to_string())
.spawn(move |_| {
abort_on_panic(|| {
m.run(self);
})
})
.expect("cannot start a machine thread");
}
if idle > 10 {
delay = (delay * 2).min(10_000);
} else {
idle += 1;
delay = 1000;
}
thread::sleep(Duration::from_micros(delay));
}
})
.unwrap();
}
}
次のコードはランタイムの動作を一時的に止めるsleep
処理のところのみを取り出しました。
#![allow(unused_variables)]
fn main() {
if idle > 10 {
delay = (delay * 2).min(10_000);
} else {
idle += 1;
delay = 1000;
}
thread::sleep(Duration::from_micros(delay));
}
ランタイムは無限ループで動作しています。そして、新しく Machine を生成するべきかを毎回判断しています。そのため、新しく machine を作る必要がない状態が続いた場合 cup を無駄に消費し続けることになります。なので、毎回ループの最後にスリープ処理をはさみ、スリープ時間は何もしなかった回数(idel)に応じて増加していくという方式をとっています。
次に残りの部分を見ていきましょう。
#![allow(unused_variables)]
fn main() {
pub fn run(&self) {
loop {
for m in self.make_machines() {
idle = 0;
s.builder()
.name("async-std/machine".to_string())
.spawn(move |_| {
abort_on_panic(|| {
m.run(self);
})
})
.expect("cannot start a machine thread");
}
}
}
}
まず、make_machines
で必要な個数分の machine のリスト返します。そして、その個数分の非同期タスク実行用のスレッドを起動します。その中で、実行すべき非同期タスクの見つけ、実行しています。
ここまでで、ランタイムの起動時の説明は以上です。
次からは必要になる machine 数を判定するmake_machines
と実際にタスクを処理していくMachine::run
の動作を見ていきましょう。
make_machines を呼び出すことで、必要なときに必要な文だけ Machine(os thread)を起動させることが出来ます。また、必要なくなった Machine が持っている processor(実行権限)を奪い、他の Machine に割り当てることで不必要にリソースを使わなくて済むようにしています。
#![allow(unused_variables)]
fn main() {
fn make_machines(&self) -> Vec<Arc<Machine>> {
let mut sched = self.sched.lock().unwrap();
let mut to_start = Vec::new();
for m in &mut sched.machines {
if !m.progress.swap(false, Ordering::SeqCst) {
let opt_p = m.processor.try_lock().and_then(|mut p| p.take());
if let Some(p) = opt_p {
*m = Arc::new(Machine::new(p));
to_start.push(m.clone());
}
}
}
if !sched.polling && !sched.progress {
if let Some(p) = sched.processors.pop() {
let m = Arc::new(Machine::new(p));
to_start.push(m.clone());
sched.machines.push(m);
}
sched.progress = false;
}
to_start
}
}
では今から async-std ランタイムの心臓部であるMachine::run
のコードを呼んでいきます。コード行数としては 100 行を超えるため、最初は面を喰らうかもしれません。ただ一つ一つの処理では難しいことはしていません。初見では理解できなくても数回読んでみることで理解できるはずです。なので共に頑張って読んでいきましょう!
#![allow(unused_variables)]
fn main() {
fn run(&self, rt: &Runtime) {
const YIELDS: u32 = 3;
const SLEEPS: u32 = 10;
let mut fails = 0;
loop {
self.progress.store(true, Ordering::SeqCst);
if let Steal::Success(task) = self.find_task(rt) {
task.run();
fails = 0;
continue;
}
fails += 1;
if fails <= YIELDS {
thread::yield_now();
continue;
}
if fails <= YIELDS + SLEEPS {
let opt_p = self.processor.lock().take();
thread::sleep(Duration::from_micros(10));
*self.processor.lock() = opt_p;
continue;
}
let mut sched = rt.sched.lock().unwrap();
let m = match sched
.machines
.iter()
.position(|elem| ptr::eq(&**elem, self))
{
None => break,
Some(pos) => sched.machines.swap_remove(pos),
};
sched.polling = true;
drop(sched);
rt.reactor.poll(None).unwrap();
sched = rt.sched.lock().unwrap();
sched.polling = false;
sched.machines.push(m);
sched.progress = true;
fails = 0;
}
let opt_p = self.processor.lock().take();
if let Some(p) = opt_p {
let mut sched = rt.sched.lock().unwrap();
sched.processors.push(p);
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
}
}
}
ここまででランタイムの大まかな処理は終わりです。
長いことお疲れさまでした!