この記事は私が作ったライブラリ「Effective Rust」の紹介記事です.このライブラリはAlgebraic EffectsをRustで扱うためのライブラリなのですが,実装しながら実はAlgebraic EffectsはFuture
の素直な拡張ではないかという考えに至り,筆を執りました.この記事ではまずFuture
の紹介をした後,Effective Rustを使ってみます.
Future
とポーリングモデル
Future
は「結果が未確定な計算」を表すオブジェクトです.同様な抽象化はC#のTask
やJavaScriptのPromise
などと様々なプログラミング言語に導入されています.
Rustにおいても非同期プログラミングを支援するためにFuture
の開発が進んできました.
最初はfutures
やtokio
といったサードパーティライブラリとしての動きが主でしたが,2019年に入ってついにFuture
がRust 1.36から標準ライブラリに入ることが決定しました.さらに,async/await
構文も1.37での導入に向けて議論が進んでいます.async/await
の導入によってFuture
を用いた非同期プログラムを通常の同期的なプログラムと同様に書けるようになります.2019年の後半からはRustにおけるネットワークプログラミングなどがより書きやすくなっていくことが期待されます.
さて,Future
は以下のAPIを持ちます1.
pub trait Future {
// 計算の最終結果の型
type Output;
// 計算を進める
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
// 計算が完了した
Ready(T),
// 計算が完了していない
Pending,
}
Future
はpoll
メソッドを呼び出すことで計算が完了していたかどうか問い合わせることができます.完了していた場合はpoll
はPoll::Ready
に最終結果を包んで返し,まだ完了していないならばPoll::Pending
を返します.tokio
などの非同期ランタイムは生成されたFuture
をそれぞれ完了するまで(Ready
を返すまで)繰り返しpoll
を呼び出します.このように繰り返し問い合わせを行う実行モデルをポーリングモデルと呼びます.
さて,ここで問題となるのはPending
が帰ってきた場合いつ次のpoll
を呼び出せばよいのかです.結果がPending
ということは処理がまだ実行中ですから,すぐに呼び直してもまたPending
となってしまうのです.適切なタイミングまでpoll
の呼び出しを遅延させなければCPUの無駄遣いとなってしまいます.
ということは,Future
にはpoll
が再度呼び出し可能になった(処理が進行可能になった)ことを通知するための仕組みが必要です.そのためにRustはWaker
APIを用意しています.poll
の第二引数cx: &mut Context
のwaker
を呼び出すとWaker
が取得できます.
impl<'a> Context<'a> {
pub fn waker(&self) -> &'a Waker;
}
impl Waker
where
Self: Clone,
{
pub fn wake(self);
pub fn wake_by_ref(&self);
}
Waker
のwake
メソッドによって該当するFuture
が継続可能となったことを通知します.これにより,非同期ランタイムはwake
が呼び出されるまで待つことで効率的に処理を再開できます.逆に,wake
が呼び出されない場合はそのFuture
は永遠に計算が進むことがありません.ですから,Future::poll
には「Pending
を返す場合には必ずwake
がその後呼ばれるようにする」という契約が存在します.
最後に,ソケットからデータを非同期に受信する例でFuture
の具体的な挙動を説明してみます.登場人物は
- 実行スレッド
- 監視スレッド
の2人です.実行スレッドはFuture::poll
を呼びだすスレッドです.poll
の中ではソケットに対してread
システムコールを呼びますが,データがまだ届いていない場合はEWOULDBLOCK
エラーを返します.このとき,poll
はPending
を返すことで結果がまだ未確定であることを示します.ですが,その前にwake
が呼び出されるように調整してあげなければいけません.
ここで登場するのが監視スレッドです.実行スレッドは監視スレッドにファイルディスクリプタとWaker
を登録します.監視スレッドはepoll
・kqueue
などのシステムコールを用いて登録されたファイルディスクリプタが利用可能となったかをチェックします.利用可能になると,監視スレッドはファイルディスクリプタに紐づけられたWaker
のwake
を呼びだします.これによって,実行スレッドが再度poll
を呼びだすことができ,計算が進んでいきます.
Future
の実行モデルについてさらに詳細に知りたい方はtokio
のドキュメントを読むことをおすすめします.tokio
はRustの非同期ランタイムのデファクトスタンダードで,実行スレッドと監視スレッドの双方を提供しています2.
エフェクトを追加する
さて,ここまではFuture
の仕組みを解説してきました.Future
の実行モデルではPending
が返ってきたとき,処理を中断する必要があることは分かりますが,なぜそうなったのかは分かりません.では,中断の理由が分かるとしたらどうなるでしょうか.ここでは,処理の中断の原因をエフェクトと呼びます.IOはエフェクトの一つです.そのほかにも外部状態や例外などもエフェクトとして扱えます.それらエフェクトを統一的な方法で処理できるのがAlgebraic Effectsです.
まずはPoll
構造体をエフェクトが扱えるように拡張しましょう.
pub enum Poll<T, E> {
// 計算が完了した
Ready(T),
// エフェクトが発生した
Effect(E),
// エフェクトの処理中
Pending,
}
エフェクトを発生したことを表すバリアントEffect
を追加しました.型引数E
がエフェクトの型を表しています.Future
にもエフェクトを追加しましょう.ここではエフェクトを持つ(発生させる)計算ということで,トレイトの名前もEffectful
にしています3.
pub trait Effectful {
/// 計算の最終結果の型
type Output;
/// 計算が持つエフェクトの型
type Effect;
/// 計算を進める
fn poll(self: Pin<&mut Self>, cx: &Context) -> Poll<Self::Output, Self::Effect>;
}
Effective Rust入門
このEffectful
トレイトを中心として計算を組み立てていくのがEffective Rustライブラリです.せっかくですから簡単なプログラムを書きながら入門してみましょう4.
ここでは例外エフェクトを取り上げます.RustにはC++やJavaのような例外の仕組みは無く5,Result
を使ってエラーの伝播をしています.しかし,Algebraic Effectsの仕組みを用いることでRustにも例外機構を実装することができます(しかもライブラリで!).早速プロジェクトの作成といきたいところですが,その前にRustのNightlyバージョンをインストールする必要があります.これはEffective RustがRustの最新機能(ジェネレータとnever type)を使って実装されているからです.インストールが終わったらプロジェクトを作りましょう.
$ rustup toolchain install nightly
...省略...
nightly-x86_64-unknown-linux-gnu updated - rustc 1.36.0-nightly (a3404557c 2019-05-03)
$ cargo new eff-tutorial
Created binary (application) `eff-tutorial` package
Cargo.toml
の[dependencies]
セクションにEffective Rustを追加します.
[dependencies]
eff = "0.0.7"
それではsrc/main.rs
を編集していきます.まずは以下のfeature
宣言が必要です.これはコンパイラに最新の機能を利用することを伝えています.
#![feature(generators, never_type)]
必要な宣言をuse
しましょう.
use eff::{eff, handler, perform, Effect, Effectful};
例外エフェクトNotAnInteger
を宣言します.Effect
トレイトの実装によってエフェクトを宣言できます.Output
関連型はそのエフェクトが解決した際の値の型を表します.
#[derive(Debug)]
struct NotAnInteger<'a>(&'a str);
impl Effect for NotAnInteger<'_> {
type Output = i32;
}
次にNotAnInteger
を発生させる関数sum_lines
を定義してみましょう.sum_lines
は引数から1行ずつ整数を読み込み,その和を返します.行が整数でなかった場合にはNotAnInteger
例外を送出します.
エフェクトを持つ関数を宣言するにはeff
マクロを使います.eff
属性は発生するエフェクトを引数に取ります.
エフェクトを発生させるにはperform!
マクロを用います.perform!
の結果は発生したエフェクトのOutput
の型の値です.NotAnInteger
エフェクトは整数に解決されるので,その値を代わりに和に使えます.
#[eff(NotAnInteger)]
fn sum_lines(s: &str) -> i32 {
let lines = s.split('\n');
let mut sum = 0;
for line in lines {
match line.parse::<i32>() { // 行を整数としてパース
Ok(x) => sum += x, // 整数の場合,sumに足す
Err(_e) => sum += perform!(NotAnInteger(line)), // 整数ではなかった場合,NotAnIntegerエフェクトを発生させてその結果を足す
}
}
sum
}
main
関数でsum_lines
を使ってみましょう.
fn main() {
let sum_comp = sum_lines("1\n2\nthree\n4\n5");
}
この状態で実行しても特に処理は行われません(for文にprintln
を入れるなどすると分かると思います).なぜかというと,eff
を付けて宣言した関数は本来の戻り値(i32
)の代わりにEffectful
トレイトを実装した計算を返しており,それをpoll
するまで計算が進行しないからです.この挙動はイテレータやasync
関数の計算が遅延されるのと同じです.
さて,NotAnInteger
エフェクトが発生したときの処理を書いてみましょう.エフェクトの処理を行うような関数をハンドラと呼びます.ハンドラはhandler!
マクロによって定義します.
let handler = handler! {
x => x,
NotAnInteger(s), _k => panic!("not an integer: {}", s),
};
handler!
はmatch
に似ています.最初のアーム(x => x
)は値ハンドラであり,大元の計算が正常に完了した際に実行されます.この値ハンドラは計算結果の値をそのまま返しています.
以降のアームはエフェクトのハンドラですが,今回はNotAnInteger
しか処理しないのでこれも一つだけです.エフェクトハンドラは2つ引数をとり,第一引数が発生したエフェクトとなります.第二引数については後で説明します.このハンドラではNotAnInteger
エフェクトが発生した場合すぐpanic!
で強制終了しています.
このハンドラをsum_comp
に適用するにはEffectful::handle
メソッドを使います.
let handled = sum_comp.handle(handler);
handled
もやはりエフェクト付き計算を表しており,まだ実行はされません.結果の型はi32
でsum_comp
と変わりませんが,しかしhandled
はもはやエフェクトを発生させることがありません.なぜならば,ハンドラによってNotAnInteger
の処理をしてしまったからです.これを表すために,handled
のEffect = !
となります.
!
はnever typeと呼ばれる型で,!
型の値は存在しません.panic!
や無限ループなどが!
型と扱われ,!
からどの型にも暗黙に変換することができます.発生するエフェクトが存在しないということを表すのにぴったりですね.
さて,ハンドラの導入も終わったので計算を実行してみます.Effectful::poll
をhandled
に対して呼び出すことで計算が進みます.Effectful
もFuture
と同じ実行モデルを採用しているのでPending
が来たらwake
まで待たなければいけません.ですが,実はEffect = !
のときはEffectful::block_on
を呼び出すと計算が完了するまでこれらを自動で行ってくれます.使ってみましょう.
let sum = handled.block_on(); // 完了するまで計算を実行し,結果のi32を返す
println!("sum = {:?}", sum);
完全なコード例は以下の通りです.
#![feature(generators, never_type)]
use eff::{eff, handler, perform, Effect, Effectful};
#[derive(Debug)]
struct NotAnInteger<'a>(&'a str);
impl Effect for NotAnInteger<'_> {
type Output = i32;
}
#[eff(NotAnInteger)]
fn sum_lines(s: &str) -> i32 {
let lines = s.split('\n');
let mut sum = 0;
for line in lines {
match line.parse::<i32>() { // 行を整数としてパース
Ok(x) => sum += x, // 整数の場合,sumに足す
Err(_e) => sum += perform!(NotAnInteger(line)), // 整数ではなかった場合,NotAnIntegerエフェクトを発生させてその結果を足す
}
}
sum
}
fn main() {
// エフェクト付き計算を作成
let sum_comp = sum_lines("1\n2\nthree\n4\n5");
// ハンドラを定義
let handler = handler! {
x => x,
NotAnInteger(s), _k => panic!("not an integer: {}", s),
};
// ハンドラを導入
let handled = sum_comp.handle(handler);
// 計算を実行し結果表示
let sum = handled.block_on();
println!("sum = {:?}", sum);
}
これを実行するとthread 'main' panicked at 'not an integer: three'
とプログラムがパニックするはずです.これは,引数の3行目がthree
と数値では無くNotAnInteger
が発生し,ハンドラでpanic!
したからです.例えば3行目を数値の3
に置き換えると15
と表示して正常終了します.
次にハンドラの挙動を変えてみましょう.今度は例外発生時にpanic!
するのではなくResult
を返すようにしてみます.handler
の宣言を次のようにします.
let handler = handler! {
x => Ok(x),
NotAnInteger(s), _k => Err(format!("not an integer: {}", s)),
};
値ハンドラではOk
で結果の値を包み,エフェクトハンドラではErr
を返しています.ハンドラ以外はそのままにしてください.実行すると今度はsum = Err("not an integer: three")
と表示され,プロセスは正常終了します.three
を3
にするとsum = Ok(15)
と表示されるので,今度は値ハンドラを通ったことが分かります.
さらに進んでみましょう.これまでは例外発生時に計算を中断していましたが,Algebraic Effectsではエフェクトハンドラから元の計算に処理を戻すことも可能です.ハンドラを以下のように書き直してみます.
let handler = handler! {
x => x,
NotAnInteger(s), k => {
eprintln!("not an integer: {}", s);
perform!(k.resume(0))
}
};
このハンドラはエフェクトハンドラの第二引数k
を利用しています.k
のresume
メソッドは,元の計算(sum_lines
)に処理を戻すような特別なエフェクトを生成します.そのエフェクトをperform!
することで,ハンドラを中断して元の処理に戻ることができます.この際,resume
に渡した引数が元の計算におけるperform!
の結果となります(sum += perform!(...)
と足してたのを思い出しましょう).ですから,resume
の引数はNotAnInteger::Output = i32
でなければいけません.このハンドラは0
をsum_lines
に戻しており,つまり欠損値を0
として処理しています.
実行してみましょう.
not an integer: three
sum = 12
three
が0
として扱われていますね.
これまでの例で注目すべきなのは,大本の計算のsum_lines
自体は同じなのにハンドラを書き換えることでsum_lines
の振る舞い自体が変わっているということです.しかも計算を中断するのか,それともデフォルト値で穴埋めして継続するのかといった制御構造もハンドラによってコントロールすることができています.例えば,コマンドライン引数によってどのハンドラで処理を行うかスイッチするということだって可能です.これは,それぞれのハンドラがsum_lines
というプログラムに対して別の解釈を与えていると考えることができ,Algebraic Effectsという仕組みの持つ特徴の一つです.
Effective Rustはまだ生まれたばかりのライブラリです.デザイン面で乗り越えるべきハードルもありますし,怪しいunsafe
だっていっぱいです.ですが,自分はAlgebraic EffectsとそのRustへの応用に強い希望を抱いています.例えば次のようなことができると考えています.
- ランタイムへの型安全なアクセス.
Future
では特定のランタイム下でのみ動作するような場合でも,実際に正しいランタイムが選ばれているかということを型でチェックすることはできませんでした.ランタイムを要求するエフェクトを用いれば正しいランタイムで実行されることを保証できます. - Dependency Injection.テスト時にはDBへのアクセスをモックするようなハンドラに切り替えるということもできるでしょう.
- GC.GCヒープからメモリを確保するような処理をエフェクトとして表し,ハンドラでは使用量をチェックしてGCを実行します.
GhostCell
みたいなテクニックを組み合わせてスタックを走査しなくてもトレーシングができるようになるとうれしい. - ストリーミングパーサ.データが全て揃う前からパースを進めることができるパーサのことをストリーミングパーサと呼びます.次の処理に必要なデータだけを待つようにすれば実装できるはず.
-
tokio
とのインテグレーション.Effectful
の構成から明らかにEffectful<Output = T, Effect = !>
はFuture<Item = T>
と同型です.ということは,エフェクトを持たない計算をtokio
の洗練されたランタイムで動作させることもできておかしくありません.巨人の肩の上に乗っていくぞ!
興味を持たれた方はぜひEffective Rustを試してみてください.また,自分のAlgebraic Effects自体これをつくりながら理解していったので,理解が及んでいない部分もあると思います.何か意見・感想がありましたらぜひ記事のコメントやTwitterに投げてください.コントリビュータも募集中です!