Edited at

Futureを拡張するとAlgebraic Effectsになる

この記事は私が作ったライブラリ「Effective Rust」の紹介記事です.このライブラリはAlgebraic EffectsをRustで扱うためのライブラリなのですが,実装しながら実はAlgebraic EffectsはFutureの素直な拡張ではないかという考えに至り,筆を執りました.この記事ではまずFutureの紹介をした後,Effective Rustを使ってみます.


Futureとポーリングモデル

Futureは「結果が未確定な計算」を表すオブジェクトです.同様な抽象化はC#のTaskやJavaScriptのPromiseなどと様々なプログラミング言語に導入されています.

Rustにおいても非同期プログラミングを支援するためにFutureの開発が進んできました.

最初はfuturestokioといったサードパーティライブラリとしての動きが主でしたが,2019年に入ってついにFutureがRust 1.36から標準ライブラリに入ることが決定しました.さらに,async/await構文も1.37での導入に向けて議論が進んでいます.async/awaitの導入によってFutureを用いた非同期プログラムを通常の同期的なプログラムと同様に書けるようになります.2019年の後半からはRustにおけるネットワークプログラミングなどがより書きやすくなっていくことが期待されます.

さて,Futureは以下のAPIを持ちます1

pub trait Future {

// 計算の最終結果の型
type Output;

// 計算を進める
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
// 計算が完了した
Ready(T),
// 計算が完了していない
Pending,
}

Futurepollメソッドを呼び出すことで計算が完了していたかどうか問い合わせることができます.完了していた場合はpollPoll::Readyに最終結果を包んで返し,まだ完了していないならばPoll::Pendingを返します.tokioなどの非同期ランタイムは生成されたFutureをそれぞれ完了するまで(Readyを返すまで)繰り返しpollを呼び出します.このように繰り返し問い合わせを行う実行モデルをポーリングモデルと呼びます.

さて,ここで問題となるのはPendingが帰ってきた場合いつ次のpollを呼び出せばよいのかです.結果がPendingということは処理がまだ実行中ですから,すぐに呼び直してもまたPendingとなってしまうのです.適切なタイミングまでpollの呼び出しを遅延させなければCPUの無駄遣いとなってしまいます.

ということは,Futureにはpollが再度呼び出し可能になった(処理が進行可能になった)ことを通知するための仕組みが必要です.そのためにRustはWakerAPIを用意しています.pollの第二引数cx: &mut Contextwakerを呼び出すとWakerが取得できます.

impl<'a> Context<'a> {

pub fn waker(&self) -> &'a Waker;
}

impl Waker
where
Self: Clone,
{
pub fn wake(self);
pub fn wake_by_ref(&self);
}

Wakerwakeメソッドによって該当するFutureが継続可能となったことを通知します.これにより,非同期ランタイムはwakeが呼び出されるまで待つことで効率的に処理を再開できます.逆に,wakeが呼び出されない場合はそのFutureは永遠に計算が進むことがありません.ですから,Future::pollには「Pendingを返す場合には必ずwakeがその後呼ばれるようにする」という契約が存在します.

最後に,ソケットからデータを非同期に受信する例でFutureの具体的な挙動を説明してみます.登場人物は


  1. 実行スレッド

  2. 監視スレッド

の2人です.実行スレッドはFuture::pollを呼びだすスレッドです.pollの中ではソケットに対してreadシステムコールを呼びますが,データがまだ届いていない場合はEWOULDBLOCKエラーを返します.このとき,pollPendingを返すことで結果がまだ未確定であることを示します.ですが,その前にwakeが呼び出されるように調整してあげなければいけません.

ここで登場するのが監視スレッドです.実行スレッドは監視スレッドにファイルディスクリプタとWakerを登録します.監視スレッドはepollkqueueなどのシステムコールを用いて登録されたファイルディスクリプタが利用可能となったかをチェックします.利用可能になると,監視スレッドはファイルディスクリプタに紐づけられたWakerwakeを呼びだします.これによって,実行スレッドが再度pollを呼びだすことができ,計算が進んでいきます.

Futureの実行モデルについてさらに詳細に知りたい方はtokioのドキュメントを読むことをおすすめします.tokioはRustの非同期ランタイムのデファクトスタンダードで,実行スレッドと監視スレッドの双方を提供しています2


エフェクトを追加する

さて,ここまではFutureの仕組みを解説してきました.Futureの実行モデルではPendingが返ってきたとき,処理を中断する必要があることは分かりますが,なぜそうなったのかは分かりません.では,中断の理由が分かるとしたらどうなるでしょうか.ここでは,処理の中断の原因をエフェクトと呼びます.IOはエフェクトの一つです.そのほかにも外部状態や例外などもエフェクトとして扱えます.それらエフェクトを統一的な方法で処理できるのがAlgebraic Effectsです.

まずはPoll構造体をエフェクトが扱えるように拡張しましょう.

pub enum Poll<T, E> {

// 計算が完了した
Ready(T),
// エフェクトが発生した
Effect(E),
// エフェクトの処理中
Pending,
}

エフェクトを発生したことを表すバリアントEffectを追加しました.型引数Eがエフェクトの型を表しています.Futureにもエフェクトを追加しましょう.ここではエフェクトを持つ(発生させる)計算ということで,トレイトの名前もEffectfulにしています3

pub trait Effectful {

/// 計算の最終結果の型
type Output;

/// 計算が持つエフェクトの型
type Effect;

/// 計算を進める
fn poll(self: Pin<&mut Self>, cx: &Context) -> Poll<Self::Output, Self::Effect>;
}


Effective Rust入門

このEffectfulトレイトを中心として計算を組み立てていくのがEffective Rustライブラリです.せっかくですから簡単なプログラムを書きながら入門してみましょう4

ここでは例外エフェクトを取り上げます.RustにはC++やJavaのような例外の仕組みは無く5Resultを使ってエラーの伝播をしています.しかし,Algebraic Effectsの仕組みを用いることでRustにも例外機構を実装することができます(しかもライブラリで!).早速プロジェクトの作成といきたいところですが,その前にRustのNightlyバージョンをインストールする必要があります.これはEffective RustがRustの最新機能(ジェネレータとnever type)を使って実装されているからです.インストールが終わったらプロジェクトを作りましょう.

$ rustup toolchain install nightly

...省略...
nightly-x86_64-unknown-linux-gnu updated - rustc 1.36.0-nightly (a3404557c 2019-05-03)
$ cargo new eff-tutorial
Created binary (application) `eff-tutorial` package

Cargo.toml[dependencies]セクションにEffective Rustを追加します.

[dependencies]

eff = "0.0.7"

それではsrc/main.rsを編集していきます.まずは以下のfeature宣言が必要です.これはコンパイラに最新の機能を利用することを伝えています.

#![feature(generators, never_type)]

必要な宣言をuseしましょう.

use eff::{eff, handler, perform, Effect, Effectful};

例外エフェクトNotAnIntegerを宣言します.Effectトレイトの実装によってエフェクトを宣言できます.Output関連型はそのエフェクトが解決した際の値の型を表します.

#[derive(Debug)]

struct NotAnInteger<'a>(&'a str);

impl Effect for NotAnInteger<'_> {
type Output = i32;
}

次にNotAnIntegerを発生させる関数sum_linesを定義してみましょう.sum_linesは引数から1行ずつ整数を読み込み,その和を返します.行が整数でなかった場合にはNotAnInteger例外を送出します.

エフェクトを持つ関数を宣言するにはeffマクロを使います.eff属性は発生するエフェクトを引数に取ります.

エフェクトを発生させるにはperform!マクロを用います.perform!の結果は発生したエフェクトのOutputの型の値です.NotAnIntegerエフェクトは整数に解決されるので,その値を代わりに和に使えます.

#[eff(NotAnInteger)]

fn sum_lines(s: &str) -> i32 {
let lines = s.split('\n');
let mut sum = 0;

for line in lines {
match line.parse::<i32>() { // 行を整数としてパース
Ok(x) => sum += x, // 整数の場合,sumに足す
Err(_e) => sum += perform!(NotAnInteger(line)), // 整数ではなかった場合,NotAnIntegerエフェクトを発生させてその結果を足す
}
}

sum
}

main関数でsum_linesを使ってみましょう.

fn main() {

let sum_comp = sum_lines("1\n2\nthree\n4\n5");
}

この状態で実行しても特に処理は行われません(for文にprintlnを入れるなどすると分かると思います).なぜかというと,effを付けて宣言した関数は本来の戻り値(i32)の代わりにEffectfulトレイトを実装した計算を返しており,それをpollするまで計算が進行しないからです.この挙動はイテレータやasync関数の計算が遅延されるのと同じです.

さて,NotAnIntegerエフェクトが発生したときの処理を書いてみましょう.エフェクトの処理を行うような関数をハンドラと呼びます.ハンドラはhandler!マクロによって定義します.

let handler = handler! {

x => x,
NotAnInteger(s), _k => panic!("not an integer: {}", s),
};

handler!matchに似ています.最初のアーム(x => x)は値ハンドラであり,大元の計算が正常に完了した際に実行されます.この値ハンドラは計算結果の値をそのまま返しています.

以降のアームはエフェクトのハンドラですが,今回はNotAnIntegerしか処理しないのでこれも一つだけです.エフェクトハンドラは2つ引数をとり,第一引数が発生したエフェクトとなります.第二引数については後で説明します.このハンドラではNotAnIntegerエフェクトが発生した場合すぐpanic!で強制終了しています.

このハンドラをsum_compに適用するにはEffectful::handleメソッドを使います.

let handled = sum_comp.handle(handler);

handledもやはりエフェクト付き計算を表しており,まだ実行はされません.結果の型はi32sum_compと変わりませんが,しかしhandledはもはやエフェクトを発生させることがありません.なぜならば,ハンドラによってNotAnIntegerの処理をしてしまったからです.これを表すために,handledEffect = !となります.

!はnever typeと呼ばれる型で,!型の値は存在しません.panic!や無限ループなどが!型と扱われ,!からどの型にも暗黙に変換することができます.発生するエフェクトが存在しないということを表すのにぴったりですね.

さて,ハンドラの導入も終わったので計算を実行してみます.Effectful::pollhandledに対して呼び出すことで計算が進みます.EffectfulFutureと同じ実行モデルを採用しているのでPendingが来たらwakeまで待たなければいけません.ですが,実はEffect = !のときはEffectful::block_onを呼び出すと計算が完了するまでこれらを自動で行ってくれます.使ってみましょう.

let sum = handled.block_on(); // 完了するまで計算を実行し,結果のi32を返す

println!("sum = {:?}", sum);

完全なコード例は以下の通りです.

#![feature(generators, never_type)]

use eff::{eff, handler, perform, Effect, Effectful};

#[derive(Debug)]
struct NotAnInteger<'a>(&'a str);

impl Effect for NotAnInteger<'_> {
type Output = i32;
}

#[eff(NotAnInteger)]
fn sum_lines(s: &str) -> i32 {
let lines = s.split('\n');
let mut sum = 0;

for line in lines {
match line.parse::<i32>() { // 行を整数としてパース
Ok(x) => sum += x, // 整数の場合,sumに足す
Err(_e) => sum += perform!(NotAnInteger(line)), // 整数ではなかった場合,NotAnIntegerエフェクトを発生させてその結果を足す
}
}

sum
}

fn main() {
// エフェクト付き計算を作成
let sum_comp = sum_lines("1\n2\nthree\n4\n5");

// ハンドラを定義
let handler = handler! {
x => x,
NotAnInteger(s), _k => panic!("not an integer: {}", s),
};

// ハンドラを導入
let handled = sum_comp.handle(handler);

// 計算を実行し結果表示
let sum = handled.block_on();
println!("sum = {:?}", sum);
}

これを実行するとthread 'main' panicked at 'not an integer: three'とプログラムがパニックするはずです.これは,引数の3行目がthreeと数値では無くNotAnIntegerが発生し,ハンドラでpanic!したからです.例えば3行目を数値の3に置き換えると15と表示して正常終了します.

次にハンドラの挙動を変えてみましょう.今度は例外発生時にpanic!するのではなくResultを返すようにしてみます.handlerの宣言を次のようにします.

let handler = handler! {

x => Ok(x),
NotAnInteger(s), _k => Err(format!("not an integer: {}", s)),
};

値ハンドラではOkで結果の値を包み,エフェクトハンドラではErrを返しています.ハンドラ以外はそのままにしてください.実行すると今度はsum = Err("not an integer: three")と表示され,プロセスは正常終了します.three3にするとsum = Ok(15)と表示されるので,今度は値ハンドラを通ったことが分かります.

さらに進んでみましょう.これまでは例外発生時に計算を中断していましたが,Algebraic Effectsではエフェクトハンドラから元の計算に処理を戻すことも可能です.ハンドラを以下のように書き直してみます.

let handler = handler! {

x => x,
NotAnInteger(s), k => {
eprintln!("not an integer: {}", s);
perform!(k.resume(0))
}
};

このハンドラはエフェクトハンドラの第二引数kを利用しています.kresumeメソッドは,元の計算(sum_lines)に処理を戻すような特別なエフェクトを生成します.そのエフェクトをperform!することで,ハンドラを中断して元の処理に戻ることができます.この際,resumeに渡した引数が元の計算におけるperform!の結果となります(sum += perform!(...)と足してたのを思い出しましょう).ですから,resumeの引数はNotAnInteger::Output = i32でなければいけません.このハンドラは0sum_linesに戻しており,つまり欠損値を0として処理しています.

実行してみましょう.

not an integer: three

sum = 12

three0として扱われていますね.

これまでの例で注目すべきなのは,大本の計算のsum_lines自体は同じなのにハンドラを書き換えることでsum_linesの振る舞い自体が変わっているということです.しかも計算を中断するのか,それともデフォルト値で穴埋めして継続するのかといった制御構造もハンドラによってコントロールすることができています.例えば,コマンドライン引数によってどのハンドラで処理を行うかスイッチするということだって可能です.これは,それぞれのハンドラがsum_linesというプログラムに対して別の解釈を与えていると考えることができ,Algebraic Effectsという仕組みの持つ特徴の一つです.

Effective Rustはまだ生まれたばかりのライブラリです.デザイン面で乗り越えるべきハードルもありますし,怪しいunsafeだっていっぱいです.ですが,自分はAlgebraic EffectsとそのRustへの応用に強い希望を抱いています.例えば次のようなことができると考えています.


  • ランタイムへの型安全なアクセス.Futureでは特定のランタイム下でのみ動作するような場合でも,実際に正しいランタイムが選ばれているかということを型でチェックすることはできませんでした.ランタイムを要求するエフェクトを用いれば正しいランタイムで実行されることを保証できます.

  • Dependency Injection.テスト時にはDBへのアクセスをモックするようなハンドラに切り替えるということもできるでしょう.

  • GC.GCヒープからメモリを確保するような処理をエフェクトとして表し,ハンドラでは使用量をチェックしてGCを実行します.GhostCellみたいなテクニックを組み合わせてスタックを走査しなくてもトレーシングができるようになるとうれしい.

  • ストリーミングパーサ.データが全て揃う前からパースを進めることができるパーサのことをストリーミングパーサと呼びます.次の処理に必要なデータだけを待つようにすれば実装できるはず.


  • tokioとのインテグレーション.Effectfulの構成から明らかにEffectful<Output = T, Effect = !>Future<Item = T>と同型です.ということは,エフェクトを持たない計算をtokioの洗練されたランタイムで動作させることもできておかしくありません.巨人の肩の上に乗っていくぞ!

興味を持たれた方はぜひEffective Rustを試してみてください.また,自分のAlgebraic Effects自体これをつくりながら理解していったので,理解が及んでいない部分もあると思います.何か意見・感想がありましたらぜひ記事のコメントやTwitterに投げてください.コントリビュータも募集中です!





  1. ここでは,標準ライブラリに入るFutureを表示しています.futuresクレイトで提供されるFutureは異なるシグネチャを持っています. 



  2. tokioの言葉では,実行スレッドがExecutorで監視スレッドがdriverとなります. 



  3. 実装上の都合により&Context&mut Contextではなく)を引数にとっています. 



  4. 例はConcurrent Programming with Effect Handlersからとらせて頂きました. 



  5. 正確にはpanic!が他言語の例外と同等の働きをします.