rust では、夏が終わる頃(?)に出そうな 1.38 (追記: 1.39 になりました) で、いよいよ async/await
が stable として使えるようになる模様です。
その後は、各種ライブラリの非同期API は async/await
ベース に統一されていくんだろうなと思います。
ただやはり現状は、非同期まわりは過度期で、ライブラリごとに古いAPIのみを使っているか、新しいAPIのサポートも入ってきているか、あるいは、async/await
が安定するまで対応を見送っているかまちまちだったりします。
rust で非同期処理の標準的な仕組みををつかうためには、以下2つのものが必要です。
いろいろなライブラリを組み合わせる場合、これらの互換性を気にしないといけないことになります。
Future
rustでは非同期に実行する処理をこの Future というtraitの値で表現する。 (他の言語での Task なり Promise なりの立ち位置)
- 0.1 系
- 長らくデファクトだった futures crate の 0.1系
- 現行のライブラリはまだこっちに依存しているものも多い
- 0.3 系
- await できる新しい Future。
- trait自体は既に標準ライブラリに入っている
- futures crate は、0.3系で、この新しい 標準のFuture を参照するように変更された
- ※標準の Future 以外の色々なユーティリティや型が futures 0.3 にはいっているので、新しいFutureを使う場合は futures 0.3系をつかうことになる
- 0.1系とは色々変わっている
-
async
内で使用するために、Futureを ポーリング するためのレシーバが&mut self
からPin<&mut Self>
に変わっていたり、ポーリング可能なことを通知するために Wakerを渡せるようになっていたり。また、エラーは関連型ではなくOutput=Result<T, E>
で表現するように変わった。 - この辺の解説は、日本語だと 井山梃子歴史館さんの Rustジェネレータ徹底解説 - 井山梃子歴史館 - BOOTH もくわしいです!
-
- await できる新しい Future。
futures には 0.1系と 0.3系 を相互に変換して、共存させるための仕組みが提供されているので、Future自体は別々のバージョンを混ぜてつかうのは難しくないとおもいます。
Compatibility Layer | Futures-rs
こちらも参考になります。
Rustのasync/awaitをスムーズに使うためのテクニック - Qiita
ランタイム
非同期な処理ということは、起動したスレッドをブロックしない代わりに、どこか別のスレッドで処理が走る、ということなので、イベントループ用スレッドなりスレッドプールなりが別口に必要です。
rust では、ここの実装をランタイムと呼んでいます。
javascriptやc#なんかは 非同期ランタイムをとくに意識せずに、暗黙のうちにその環境でどのように非同期処理が実行されるか選択されます。言語処理系と非同期はほぼきっても切れない関係なのです。ところがrustの場合、非同期処理に入るコードを書く人がが明示的に指定して起動することになってます。
-
rustasync/runtime
- tokio に依存しない、標準的なAPIの提案的な立ち位置のようすのランタイム実装。
- 実装として tokio を選択することもできるようになっているけど、現在、まだ tokio 0.1系を参照しているため、tokio 0.3系と併用することができない(?)
- また、tokio にはこのランタイムにはない、ファイルI/O や スレッドをブロックする処理のための機能が入っていたりする。
- tokio 0.1系
- 長らくデファクトだった、futures 0.1系を使ったランタイム。(&その他)
- tokio 0.3系
- futures 0.3 系をベースにした 新しい tokio
tokio や runtime の機能のなかには、それ自身のランタイムの上で走らせることを想定しているものがあり、依存しているライブラリが tokio のそれに依存していたりすると、runtimeとは併用できない、ということが起こったりします。
たとえば、tokio::spawn(..)
などを使っている部分があると、 runtime crateのランタイムでそれを走らせると実行時にコケてしまいます。
これを両ランタイムで動かすためには、たぶん、spawn (Executor
) の特定の実装に依存せずにフレキシブル取り替えられるようにライブラリ側に対応してもらう必要があるとおもいます。
redis を非同期APIで使う選択肢
さいきん、サーバ用途でrustをつかっていたんですが、redis を非同期に使う迄に色々と苦労してしまったので、以下、備考録です。
(やっぱり、redis の良いところは、よく使われているためにどの言語でもクライアントをつくる人手が多くて相対的に出来が良いところだなと思う)
1. darkredis
async/await
ベースの redis クライアントをスクラッチで実装してくれている crate を2コくらいみかけました。
なかでも 、darkredis はコネクションプールが入っていたりと、なかなか使いやすそうです。
ただ、Redis とのコネクションが 切れたりhalf-connection になってしまった場合、再接続するとかのケアはおそらくとくに入ってなさなさそうなので、考える必要がありそう。
また、ランタイムが runtime crate なので、tokio 0.3系のランタイムに依存している場合だと現状だと使えないと思います。
たとえば、tokio::spawn(..)
などに依存している部分があると、 runtime crateのランタイムで走らせると実行時にコケてしまいます……
( spawnの実装を取り替える以外の解決方法があれば知りたい)
- Future → 0.3系
- ランタイム →
runtime
-
runtime::net::TcpStream
を使っている
-
2. redis-rs の aio モジュール
mitsuhiko/redis-rs: Redis library for rust
Github の ☆ がとても多い redis-rs 。同期的にredisと通信するAPIが提供されてますが、実は、aio
ネームスペース以下に非同期のAPIも入っています。
非同期APIの使いかたは ドキュメントや test_async.rs などを参照のこと。
また、非同期にredis にアクセスするための Connection が2種類あるので注意です。
-
redis::Connection
- 非同期じゃないコネクション。
-
redis::aio::Connection
- ランタイム内で非同期にコネクションを確立。非同期にクエリすることができる
- ただし、 *複数のスレッドから同時に使うための排他制御はサポートされていない
-
redis::aio::SharedConnection
- 〃
- 内部的にクエリが channel で制御されていて、複数のスレッドから同じものを参照してつかうことができる。
redis-rs を 非同期に使う場合は、実質 SharedConnection を使うことになるとおもいます。
非同期ランタイムからばんばんクエリするためには、どっちみちコネクションを lock して使うなどの処理が必要になるはず。
尚、コネクションを非同期に確立するランタイムと、コネクションをつかうランタイムは同じにしないと実行時エラーになるので注意です。(僕は自前でコネクションを共有するコードを書こうとして地味にそこに嵌った)
Future 0.1系の API
- Future -> 0.1系
- ランタイム -> tokio 0.1系
- SharedConnection は
tokio_executor::spawn
を使っている
- SharedConnection は
SharedConnection
は、内部で tokio_executor::spawn
をしていますが、ここは tokio 0.1系のランタイム上で走らせていないと実行時エラーになるようです。
そのため、tokio 0.3系のランタイムやruntime上で redis-rs のSharedConnectionoを使用するのは現状だとできないということになりそうです。
おそらく、hyperのサーバのように、 特定の実装に依存せずに、Executor trait を外側から注入できるようにすれば良いのかな、と思いますが、その辺の対応はまだ入ってません。
追記:
PRを投げてみたところマージされました。
https://github.com/mitsuhiko/redis-rs/pull/229
3. tokio の I/O blocking スレッド上で 同期APIでredisと通信する案
実は tokio のランタイムは、スレッドプールを2種類もっています。
-
core_threads
- 通常のスレッドプール。デフォルトのスレッド数=コア数。ここで走る各タスクは CPU を占有する時間が短いことを想定している
-
blocking_threds
- CPUを占有してしまうタスクのため、core_threads とは別に隔離されたスレッド
- デフォルトのスレッド数 = 100
tokio_threadpool::blocking - Rust
However, it is common to want to perform a blocking operation while processing an asynchronous computation. Examples of blocking operation include:
- Performing synchronous file operations (reading and writing).
- Blocking on acquiring a mutex.
- Performing a CPU bound computation, like cryptographic encryption or decryption.
単にI/O完了を待つためだけの処理であれば、同期的な処理を このblocking用スレッド上で同期的に流してあげれば、外側からは非同期処理として扱うことができます。
非同期I/Oほどのスループットはでないとおもうけど、blockingスレッド数までは並列に実行できることが期待できます。
( 実は、tokio-fs の実装をみると、 std::fs
による同期的なI/O をこの blockingスレッドで実行していたりします。
( C# の I/O Completion Port Thread と似たものかな ?
blocking()
関数をつかうことで、任意の処理を blockingスレッド上で実行することができます。
以下は一例です。
let (tx, rx) = futures::channel::oneshot::channel(); // 非同期にspawnしたタスクから結果を受けとるためにチャンネルをつくる
let tx = Arc::new(Mutex::new(Some(tx)));
tokio::spawn(async move {
poll_fn(move |_| {
let key = key.clone(); // poll_fn のクロージャは複数回実行されるかもしれない(FnMut) なので、blocking に所有される前に cloneしないといけない
let handle = handle.clone();
let tx = tx.clone();
blocking(move || { // このクロージャ内に CPUを占有する処理を書く。ここ自体は一度しか実行されない
let mut conn = handle.lock().unwrap();
let result = redis::cmd("GET")
.arg(&key)
.query::<Option<Vec<u8>>>(&mut *conn);
let mut guard = tx.lock().unwrap();
guard.take().unwrap().send(result);
})
}).await;
});
let result = rx.await?;
僕がつくっていたサーバは、同時接続数が大量に必要なわけではなかったので、とりあえずこのやりかたを採用してみました。