概念的な部分を中心に解説しているので、細かい仕様や実装等は割愛させていただいております!
Futureトレイト
実装・基本概念
Futureトレイトが実装されている型は"将来的に値が入ることを期待されている型"になります。
定義は以下のようになっています。
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
OutputはFutureが返す値の型です。
例えばusize型を指定されていたのならばusizeが将来的に返されることを想定されているという意味になります。
そしてFutureトレイトに実装されていなければならない関数として、pollがあります。
この戻り値Pollは以下のようなEnum型になっています。
まだ値が準備できていない状態をさすPendingと値が準備できて実際にその値を持つReadyの2つから構成されています。
enum Poll<T> {
Ready(T),
Pending,
}
poll が Pending を返す場合、別の処理を行うか、後で再度 poll する必要があります。
この仕組みが Rust の非同期処理の土台になっています!
FutureはLazy
FutureトレイトはLazyです。
つまり、Futureトレイトを実装した型を置いておいただけではなにもしません。
executorにpollされて初めて実行が進むのです。
ここで大事になるのがpollの第二引数で渡しているContextです。
contextといいつつ、大事なのは腹持ちしているwakerです。
pub struct Context<'a> {
waker: &'a Waker,
}
impl<'a> Context<'a> {
pub fn waker(&self) -> &Waker { ... }
}
Waker の役割はこのFutureをもう一回 poll してほしい! とexecutorに知らせることです。
wake()が呼ばれると、そのタスクの再実行がスケジュールされます。
ざっくりとした流れは以下のようになっています。
- executorがpoll() を呼ぶ
- まだなら
- cx.waker()を保存 or I/O/タイマーに登録
- Pendingを返す
- waker.wake()でexecutorにまた実行してほしい!と懇願する
- executorがそのタスクをキューに積んで、時がきたら再度poll
- 準備が整ったらReadyを返して完了
このやりとりでpollがReadyを返してくれるまでwake() <=> poll()がループします。
Futureだけだと不便
Future型で取得した値を用いてそのあと別のFuture型を返す処理をするコードが以下になります。
impl Futureを返すのでその値を得るにはチェーンしていかなければならないです。
よってネストをどんどん深くして行く必要があってかなり参ってしまいます。
しかも、先ほど述べたようにFutureはLazyなのでこの関数を実行しただけではなにも起きません。
関数を実行する側でpollを実行するexecutorを明示的に構える必要も出てきます。
use futures::{future, Future, TryFutureExt};
use reqwest::Client;
use serde_json::Value;
fn call_api(client: Client)
-> impl Future<Output = Result<(), reqwest::Error>>
{
client
.get("https://httpbin.org/json")
.send()
.and_then(|resp| future::ready(resp.error_for_status()))
.and_then(|resp| resp.json::<Value>())
.and_then(move |json| {
let title = json.get("slideshow")
.and_then(|s| s.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("untitled")
.to_string();
client
.post("https://httpbin.org/post")
.body(title)
.send()
.and_then(|resp| future::ready(resp.error_for_status()))
.map_ok(|_| ())
})
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = Runtime::new()?;
let client = Client::new();
rt.block_on(call_api(client))?;
Ok(())
}
async, awaitの登場
ここで登場するのがasync, awaitというおなじみの文法です。
これを使うと上のコードがかなりすっきりします。
async fn call_api(client: Client) -> Result<(), reqwest::Error> {
let resp = client
.get("https://httpbin.org/json")
.send()
.await?
.error_for_status()?;
let json: Value = resp.json().await?;
let title = json
.get("slideshow")
.and_then(|s| s.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("untitled")
.to_string();
client
.post("https://httpbin.org/post")
.body(title)
.send()
.await?
.error_for_status()?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
call_api(client).await?;
Ok(())
}
先ほどでてきていたexecutorが明示的にでてきていません。
では先ほどまで行っていた処理はどこに隠蔽されているのでしょうか?
実はそれはawaitに隠蔽されています。
先ほど繰り返しになってしまいますがfoo().awaitはざっくりと以下のことを行っています。
- foo()のFutureを作る
- poll してPendingならWakerを登録して戻る
- wake されたらまた poll
- Ready(val) になったら値を取り出す
つまりは.awaitはただのFutureの糖衣構文だということです。
executorはどちらにしろ必要じゃん、なんで明示されてないの?
と思うかもしれません。
これはmainがtokio::mainで実行されているからです。
軽くtokioについても触れておきます。
tokioランタイム
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
call_api(client).await?;
Ok(())
}
このようにするとmain関数内のexecutorとして、tokioクレートのexecutorが使用されることが暗に約束されます。
そのため、先ほどのように明示的に指定する必要がなくなって、コードも完結にかけていたのですね。
main関数のasync化
tokioのえらいところはそれだけではありません。
一番強力なのはmain関数をasyncでかけるようにすることです。
Rustはエントリポイントであるmain関数は必ず同期関数である必要があります。
そのため、コンパイル時には fn main() -> impl Future<Output<T>> の形にしなければならないのです。
脱糖すると以下のようになります。
fn main() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async_main()) // async_main は#[tokio::main] によって生成される非同期関数
}
tokio はこの変換をコンパイル時に自動で行ってくれるため、開発者は同期化の手間を意識せずに async/await の構文を使えます。
終わりに
RustのFutureトレイトの内部実装や非同期プログラミングについて気になったので調べてみたらおもしろかったのでまとめました。
自分はもう一段階レイヤ下げた並行実行やスレッド関係の話が結構好きなので時間ある時にその記事もかけたらなと思います。
間違いがありましたらご指摘下さいー