Rust 1.39 からは async/await が安定化され、非同期処理がぐっと書きやすくなりました。
Futureトレイトを自分で実装したり、loop_fnで所有権を取り回したりmap_err
でエラー型を魔改造したり することはもうありません!
おもむろに await
していきましょう
この記事は Rust 1.46 と tokio 0.2.22 に基づいています
Rust での非同期処理
Rust では、非同期な計算は Future
トレイトとして抽象化されています。JavaScript などでは Promise
と呼ばれるものです。以前は非同期処理を扱うときに、場合によっては Future
トレイトを実装する必要があることがありましたが、現在では async
キーワードを使うことで簡潔に記述することができるようになりました。
async
キーワードを使い、
- 非同期関数
async fn
をつくる or -
async
ブロックをつくる
ことで非同期な計算 Future
を定義することができます。
Rust では JavaScript などと違い、Future
を実行するには、Future
を
-
await
する or - 非同期ランタイムに渡して非同期タスクを生成する
必要があります。非同期タスクを生成するには、Future
を
-
block_on
する1 or -
spawn
する
すれば OK です。
非同期ランタイムには大きくわけて tokio と async-std がありますが、これらを一緒に使うことは好ましくありません2。特に理由がない場合は tokio を使うのがいいでしょう。
簡単な使用例
早速 async/await を使ったコードを書いてみましょう
// Cargo.toml では、features フラグをこのように記載してください
// [dependencies]
// tokio = { version = "0.2", features = ["full"] }
// async 関数を作ります
async fn hello() -> String {
"hello, async fn".to_string()
}
#[tokio::main]
async fn main() {
// async 関数を実行して結果を待ち合わせます
let greeting: String = hello().await;
println!("{}", greeting);
// async ブロックを実行して結果を待ち合わせます
let world = async {
println!("hello, async block");
};
world.await;
}
hello
関数には async
キーワードがついていますね。これは、hello
関数は非同期に実行されることを表しています。非同期関数の内部では他の Future
を await
することもできますが、この例では単に String
を返しています。
hello
関数のシグネチャは、内部的には fn hello() -> impl Future<Output=String>
と同じです。つまり、戻り値は String
ではなく、実行結果が String
になるような Future
(トレイトを実装したオブジェクト)になります。
さて、hello
関数が Future
を返すことがわかりました。await
キーワードを使うことで、async
関数を実行完了まで待機することができます。
let greeting: String = hello().await;
では、hello()
の実行完了まで待機して、実行結果を取得しています。hello().await
の値は、Future
の実行結果(関連型の Output
) の型になります。
以下のコードでは、 async
ブロック world
を作って await
しています。async
ブロック内部でも、他の Future
を await
することができますが、この例では単に標準出力に文字列を出力しています。
world
も内部的には Future
トレイトを実装しているため、await
して結果を待ち合わせることができます。
async {
println!("hello, async block");
}.await;
Rustでの非同期処理 のセクションでは、Future
を実行するには block_on
するか spawn
しなければならない、と説明しました。しかし、コード simple-async には block_on
も spawn
も見つかりません。どういうことでしょうか。それは、#[tokio::main]
が block_on
に展開されるからです。
#[tokio::main]
async fn main() {
println!("hello");
}
というコードは、コンパイル時に
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
と展開されます。main
関数の処理全体が block_on
されているのがわかります。block_on
は Future
が完了するまでブロックします。
#[tokio::main]
というアノテーションをつけることで、システムで利用できる CPU のコア数分ワーカースレッドが用意され、非同期タスクをいい感じにロードバランシングして実行してくれます。3
Rust では、非同期ランタイムが非同期タスクをポーリングすることで計算が進みます。これを図示するとこのようになります:
Future
はステートマシンに変換され、実行されます。async
なコードの内部で別の Future
を .await
することで、複雑な Future
を構築することができます4。
simple-async が実行される様子を考えてみましょう。main
非同期関数の内部で hello
非同期関数と world
非同期ブロックを .await
することで、main
関数全体がひとつの巨大な Future
となります。これがステートマシンに変換されて実行されます:
この場合、hello
と world
は直列に実行されます。
let greeting: String = hello().await;
println!("{}", greeting);
というコードでは、hello().await
の行は Future
が解決するまで待機します。そのため、
use tokio::time::delay_for;
use std::time::Duration;
async fn hello() {
delay_for(Duration::from_millis(1000)).await;
println!("1 sec elapsed");
}
#[tokio::main]
async fn main() {
// async 関数を実行し、Future の実行が完了するまで待機します
hello().await;
// hello() が完了したあとに実行されます
println!("hello, world");
}
というコードでは、main
関数の実行が始まってから 1 秒後に
1 sec elapsed
hello, async fn
と表示されます。
println!("{}", greeting);
の行が実行されるのは、hello
が完了したあとです。
単に Future
を await
すると、並列処理を行うときに不都合が生じることがあります。
非同期タスクを生成する
先程のコード simple-async では、Future
を直列に実行していました。Future
を spawn
すると、非同期タスクをバックグラウンドで並列 (parallel) に実行されます。非同期タスクは tokio によっていずれかのワーカースレッドに割り当てられ、いい感じに実行されます。
use tokio::time::delay_for;
use std::time::Duration;
#[tokio::main]
async fn main() {
// async ブロックを実行します
tokio::spawn(async {
delay_for(Duration::from_millis(1000)).await;
println!("1 sec elapsed");
});
// 即座に実行されます
println!("hello, world");
// hack: async ブロックの実行完了まで待機します
delay_for(Duration::from_millis(2000)).await;
}
というコードでは、main
関数の実行が始まってからすぐに
hello, world
と表示され、さらに 1 秒経過してから
1 sec elapsed
と表示されます5。
// hack: async ブロックの実行完了まで待機します
delay_for(Duration::from_millis(2000)).await;
という行は、async
ブロックの実行完了まで待機するために入れました。spawn
されたタスクは完了状態まで実行されるとは限りません。この行を入れない場合、タスクが完了するよりも先にランタイムが終了し main
関数から抜けてしまうため、
1 sec elapsed
が表示されません。
tokio::spawn
は JoinHandle
を返します。これを使うことで、タスクの完了を待つことができます:
use tokio::time::{delay_for, Duration};
#[tokio::main]
async fn main() {
// async ブロックを実行します
let handle = tokio::spawn(async {
delay_for(Duration::from_millis(1000)).await;
println!("1 sec elapsed");
});
// 即座に実行されます
println!("hello, world");
// async ブロックの実行完了まで待機します
let _ = handle.await.unwrap();
}
spawn した場合に実行結果を取得する
実行に時間がかかる処理(ネットワークにアクセスする、ファイルを読み込むなど)をバックグラウンドで実行しておいて、あとで結果を取得するという使い方がよくされます。非同期タスクの実行結果はどのように取得するのでしょうか。
1 つめの方法として、JoinHandle
が利用できます。先程の例 spawn-join では handle.await
の値は利用しませんでしたが、handle
の実行結果は spawn
した非同期タスクの実行結果を T
とすると、Result<T, JoinError>
となります。以下の例では単に unwrap
しています:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
"hello world".to_string()
});
// handle の実行結果は async ブロックの値になります
let result: String = handle.await.unwrap();
println!("{}", result);
}
他のやり方として、チャネルを利用する方法があります。チャネルとは、非同期タスク間でデータを受け渡す仕組みのことです。非同期タスクは異なるスレッドで実行されることがありますが、チャネルを使うことで安全にデータを受け渡すことができます。ただひとつの値を受け渡すチャネルとして、oneshot
チャネル が利用できます:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async {
// チャネルを通してなにか値を渡します
tx.send("hello world".to_string()).unwrap();
});
let result = rx.await.unwrap();
println!("{}", result);
}
mpsc
チャネル という、複数の値を受け渡すチャネルもあります。
実行結果が 1 つの値になるときはどちらの方法も使えますが、基本的にはチャネルのほうが取り回しがしやすいでしょう。
複数の Future
の結果を待ち合わせる
同時に複数の Future
を実行したい場合はどのようにすればよいでしょうか。
2 つ以上の Future
を全て完了まで待ち合わせたい場合は、 tokio::join が使えます。
#[tokio::main]
async fn main() {
let fut1 = async {
// ここでいろいろ .await する
1
};
let fut2 = async {
// ここでいろいろ .await する
"hello".to_string()
};
// fut1, fut2 を組み合わせた Future をつくって .await します
let (res1, res2): (i32, String) = tokio::join!(fut1, fut2);
println!("{} {}", res1, res2);
}
tokio::join
は複数の Future
を 1 つの大きな Future
にまとめ、.await
するものです。fut1
と fut2
は並行 (concurrent) に実行されますが、並列 (parallel) に実行されるわけでないことに注意しましょう。fut1
と fut2
は代わりばんこにポーリングされます。実行の様子を図示するとこのようになります:
並列に実行したい場合は fut1
と fut2
を spawn
してから、JoinHandle
を join
します:
#[tokio::main]
async fn main() {
let fut1 = async {
// ここでいろいろ .await する
1
};
let fut2 = async {
// ここでいろいろ .await する
"hello".to_string()
};
let handle1 = tokio::spawn(fut1);
let handle2 = tokio::spawn(fut2);
// handle1, handle2 を組み合わせた Future をつくって .await します
if let (Ok(res1), Ok(res2)) = tokio::join!(handle1, handle2) {
println!("{} {}", res1, res2);
}
}
実行の様子はこのようになります:
2 つ以上の Future
を同時に実行し、いちばん早く完了したものの結果を取得したい場合は tokio::select が使えます。タイムアウトの処理を実装したいときに便利です:
use tokio::time::{delay_for, Duration};
#[tokio::main]
async fn main() {
// fut のほうが早く完了する
let delay = delay_for(Duration::from_millis(50));
let fut = async {
// 実行に 20ms かかります
delay_for(Duration::from_millis(20)).await;
1
};
tokio::select! {
_ = delay => {
println!("timeout");
}
v = fut => {
println!("result: {}", v);
}
}
// delay のほうが早く完了する
let delay = delay_for(Duration::from_millis(50));
let fut = async {
// 実行に 100ms かかります
delay_for(Duration::from_millis(100)).await;
1
};
tokio::select! {
_ = delay => {
println!("timeout");
}
v = fut => {
println!("result: {}", v);
}
}
}
tokio::select! {
v = fut => {
println!("result: {}", v);
}
}
の記述ですが、await
したい Future
は fut
の部分に記述します。fut.await
の結果は v
で束縛できます。v
の値は =>
のあとのブロックで利用することができます。
例: ネットワーク上の API にアクセスする
これまでの説明では簡単のためにあまりおもしろくない async
なコードを実行してきました。このセクションでは、非同期処理を使って GitHub API にアクセスして、リポジトリのスター数を調べてみます。コードと実行結果を示します。コードは GitHub/Kumassy/easy-going-async にあります。
コード:
use anyhow::{Context, Result};
use futures::future::try_join_all;
use reqwest::header;
use serde_json::Value;
use std::sync::Arc;
use tokio::task::JoinHandle;
/// リポジトリの API にアクセスし、スター数を返します
async fn get_star_count(client: &reqwest::Client, repo: String) -> Result<u64> {
let resp: Value = client
.get(&format!("https://api.github.com/repos/{}", repo))
.send()
.await? // API にアクセスします
.json()
.await?; // JSON をパースします
let count = resp
.get("stargazers_count")
.context("GitHub API error: stargazers_count is not found")?
.as_u64()
.context("GitHub API error: stargazers_count is not an integer")?;
Ok(count)
}
#[tokio::main]
async fn main() -> Result<()> {
// HTTP request header と user agent を設定します
let mut headers = header::HeaderMap::new();
headers.insert(
header::ACCEPT,
header::HeaderValue::from_static("application/vnd.github.v3+json"),
);
let client = reqwest::Client::builder()
.user_agent("rust reqwest")
.default_headers(headers)
.build()?;
// 複数の非同期タスクで共有するので、Arc で包みます
let client = Arc::new(client);
// 調べたいリポジトリのリストです
let repos = vec![
"rust-lang-nursery/failure".to_string(),
"rust-lang-nursery/lazy-static.rs".to_string(),
"rust-lang/libc".to_string(),
"bitflags/bitflags".to_string(),
"rust-lang/log".to_string(),
];
// spawn した非同期タスクのハンドラです
let handles: Vec<JoinHandle<Result<u64>>> = repos
.iter()
.map(|repo| {
// tokio::spawn は 'static を要求するため、clone しておきます
let client = client.clone();
let repo = repo.clone();
// client と repo を move します
tokio::spawn(async move {
// 非同期タスクの実行結果は Result<u64> です
get_star_count(&client, repo).await
})
})
.collect::<Vec<_>>();
// repos に対応するスター数です
let stars: Vec<u64> = try_join_all(handles) // Vec<Result<T>> を Result<Vec<T>> に変換してくれます
.await?
.into_iter()
.collect::<Result<Vec<u64>>>()?; // Vec<Result<T>> を Result<Vec<T>> に変換します
for (repo, star) in repos.iter().zip(stars) {
println!("{} has {} stars", repo, star);
}
Ok(())
}
実行結果:
rust-lang-nursery/failure has 1432 stars
rust-lang-nursery/lazy-static.rs has 989 stars
rust-lang/libc has 862 stars
bitflags/bitflags has 326 stars
rust-lang/log has 891 stars
get_star_count
は特定のリポジトリのスター数を返す非同期関数です。tokio ベースの HTTP クライアントでは、reqwest が人気なのでこちらを使いました。エラー処理を簡素化するため、anyhow を使っています。anyhow::Result<T>
は Result<T, anyhow::Error>
と同じです。
let resp: Value = client
.get(&format!("https://api.github.com/repos/{}", repo))
.send()
.await? // API にアクセスします
.json()
.await?; // JSON をパースします
というコードでは、GitHub API にアクセスして、JSON 形式のレスポンスをパースしています。send()
と json()
は Future
を返すため、.await
が必要です。次に、resp
から目的のスター数を取り出します:
let count = resp
.get("stargazers_count")
.context("GitHub API error: stargazers_count is not found")?
.as_u64()
.context("GitHub API error: stargazers_count does not integer")?;
ここでは serde_json::value::Value
から目的のデータを頑張って取得していますが、ほしいデータが散らばっていたりネストしたりしている場合は、serde-query が便利そうです。
context
はエラー処理のためのものです。get()
は Option
を返すのですが、context
を書くことで std::option::NoneError
を anyhow でうまく扱えるようになります。
// HTTP request header と user agent を設定します
let mut headers = header::HeaderMap::new();
headers.insert(
header::ACCEPT,
header::HeaderValue::from_static("application/vnd.github.v3+json"),
);
let client = reqwest::Client::builder()
.user_agent("rust reqwest")
.default_headers(headers)
.build()?;
// 複数の非同期タスクで共有するので、Arc で包みます
let client = Arc::new(client);
というコードでは、HTTP クライアントの設定をしています。client
は複数の非同期タスクで共有するため、std::sync::Arc
で包んでいます。
// spawn した非同期タスクのハンドラです
let handles: Vec<JoinHandle<Result<u64>>> = repos
.iter()
.map(|repo| {
// tokio::spawn は 'static を要求するため、clone しておきます
let client = client.clone();
let repo = repo.clone();
// client と repo を move します
tokio::spawn(async move {
// 非同期タスクの実行結果は Result<u64> です
get_star_count(&client, repo).await
})
})
.collect::<Vec<_>>();
というコードでは、非同期タスクを spawn
しています。tokio::spawn
は 'static
を要求するため、client
と repo
を clone()
し、move
しています。move
が必要ない場合もありますが、実践的なコードでは move
が必要になることが多いでしょう。
非同期タスクの実行結果は Result<u64>
ですが、非同期タスクのハンドラの実行結果は JoinError
になる可能性があります。したがって、handles
を await
すると、
Vec<Result<Result<u64, anyhow::Error>, JoinError>>
になる点に注意してください。
try_join_all
は実行結果が Result
となる Future
のリストを実行して、実行結果をひとつの Result
にまとめてくれるものです。?
を使うことで、この Result
を外しています。ドキュメントのコード例 がわかりやすいです。
let stars = try_join_all(handles) // Vec<Result<T>> を Result<Vec<T>> に変換してくれます
.await?
までのコードでは、
Vec<Result<u64, anyhow::Error>>
という型になるでしょう。Result
のリストを collect()
を使うことで ひとつの Result
にまとめ、?
を使ってこの Result
を外します:
let stars: Vec<u64> = try_join_all(handles) // Vec<Result<T>> を Result<Vec<T>> に変換してくれます
.await?
.into_iter()
.collect::<Result<Vec<u64>>>()?; // Vec<Result<T>> を Result<Vec<T>> に変換します
最後は stars
を整形して標準出力に出力して終わりです:
for (repo, star) in repos.iter().zip(stars) {
println!("{} has {} stars", repo, star);
}
まとめ
async
なコードの書き方や実行の仕方を紹介しました。async/await
キーワードが安定化されてから、非同期処理を簡単に書くことができるようになりました。async/await
を使って、Rust で非同期処理をどんどん書いていきましょう!
さらに詳しく
公式チュートリアルです。async/await に対応した tokio 0.2 向けに内容が更新され、オシャレになりました
https://tokio.rs/tokio/tutorial
非同期 Rust に関する詳しい解説記事です。日本語です。
https://tech-blog.optim.co.jp/entry/2019/11/08/163000
async/await を使ったコードはどのようにステートマシンに変換され、実行されるのでしょうか。また、非同期ランタイムはどのように poll
するタイミングを決めているのでしょうか。背景にある Generator
や、なぜ Pin
が必要になるのか、async
な API は poll
されるときに何をしているのかが詳しく解説されています。
https://cfsamson.github.io/books-futures-explained/introduction.html
block_on
や spawn
は裏で何をしているのでしょうか。Safe Rust のみを使い、簡単な非同期ランタイムを作って理解を深めてみましょう。
https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
https://stjepang.github.io/2020/01/31/build-your-own-executor.html
謝辞
この記事の一部は @__pandaman64__ 氏に助言をいただき執筆しました
-
block_on
は非同期タスクが完了するまでスレッドをブロックしてしまうのであまり出番がありませんが、後の説明のために紹介しています ↩ -
非同期ライブラリを使う際は tokio ベースなのか async-std ベースなのかを確認して使い分けてください: https://qiita.com/legokichi/items/53536fcf247143a4721c ↩
-
カスタマイズしたい場合は、tokio::runtime::Builder が利用できます。 ↩
-
以前は
and_then
やmap_err
などを使って手動で巨大なFuture
を練り上げていました。苦行でした。 ↩ -
Rust Playground では残念ながら同時に表示されます。ローカルの環境で実行してみてください。コードは GitHub/Kumassy/easy-going-async にあります ↩