Posted at

Rustのasync/awaitをスムーズに使うためのテクニック

RustのFutureとasync/awaitは、「時間のかかる処理」をするときに、「処理が終わるまでOSスレッドをブロックする(同期Rust)」のではなく、「当該処理を中断して、そのOSスレッドを別のタスクの処理に使う(非同期Rust)」ことで、スレッド数よりも多くの処理を同時に行う仕組みです。

同期Rustと非同期Rustには以下のようなアナロジーが成立します。

同期Rust
非同期Rust

スレッド
OSスレッド
タスク

ランタイム
OS自身
tokioやromio/juliexなどのランタイムライブラリ

すぐ終わる処理
fn(T) -> U
fn(T) -> U

……の作成
fn() {}
fn() {}

……の呼び出し
f()
f()

時間のかかる処理
fn(T) -> U
fn(T) -> impl Future<Output = U>

……の作成
fn() {}
async fn() {}

……の呼び出し
f()
f().await

同期Rustから非同期Rustへの移行は、おおよそこのアナロジーにしたがって行えることになります。 (ライブラリ・フレームワークも同様に移行される必要があります。)

とはいえ、単純置換で移行できるわけではありません。そこで以下でasync/awaitをスムーズに使うためのライブラリやテクニックを紹介します。


futures/futures01/futures-previewクレート

futuresは非同期プログラミングのベースになっているライブラリです。

futuresには主に2つのバージョンがあります。


  • 0.1系。async/awaitがなかった頃に作られた非同期処理ライブラリ。

  • 0.3系。async/awaitを念頭に置いて整理されている。

今ある非同期エコシステムは0.1系ベースのものが多いので、互換性を保つために以下のように書くのがおすすめです。


Cargo.toml

# futures01はfuturesの0.1系を指し続けるクレート。

# 0.3と併存させるために使う
futures01 = "0.1.28"
# 0.3系列はまだpreview版のため、別の名前で提供されている。
# そのうちfuturesにリネームされる
futures-preview = { version = "0.3.0-alpha.17", features = ["io-compat"] }


main.rs

use futures::prelude::*;


futuresクレートには、標準ライブラリの非同期版がいくつか同梱されています。

同期Rust
非同期Rust

Mutex
std::sync::Mutex
futures::lock::Mutex

チャネル
std::sync::mpsc
futures::channel::mpsc

futures::channel::oneshot

イテレータ
std::iter::Iterator
futures::stream::Stream


futuresの互換性

前節のコード例にある features = ["io-compat"] を使うと、futures 0.1系を使っているライブラリと相互運用することができます。

use futures::prelude::*; // TryFutureが入っている

let fut01 = async {
// ...
}.boxed().compat(); // futures01::future::Futureに変換される

use futures::compat::*; // Future01CompatExtが入っている

let fut01 = ...;
let fut03 = fut01.compat(); // futures::future::Futureに変換される


ストリーム処理

ストリームを for で処理するための構文は将来的には入ることが期待されていますが、すぐには入らない予定です。 while let で処理するのがイディオマティックです。

let mut stream = ...;

// pin_utils::pin_mut!(stream); // 場合によっては必要
while let Some(item) = stream.next().await {
// ...
}


runtimeクレート

runtimeは、非同期ランタイムをまとまった形で提供することを目指しているクレートです。非同期ランタイムの立ち上げを簡略化するための属性マクロも提供しています。


Cargo.toml

runtime = "0.3.0-alpha.6"



main.rs

#![feature(async_await)]

use runtime::prelude::*;

#[runtime::main]
async fn main() {
// ...
}

#[runtime::test]
async fn test_foo() {
// ...
}


タスクの起動や非同期ソケットの立ち上げ、タイマーなどのインターフェースが提供されています。これは裏側のランタイム実装 (tokioまたはromio/juliex) やタイマー実装 (futures-timer) に移譲されます。

// タスクの起動

let handle = runtime::spawn(async {
// ...
});
handle.await;

use runtime::net::TcpListener;

// 非同期ソケット
let listener = TcpListener::bind("127.0.0.1:8080")?;
// ...

use std::time::Duration;

use runtime::time::Delay;
// タイマー
Delay::new(Duration_from_secs(1)).await

同期Rust
非同期Rust

main
fn main() {}
#[runtime::main] async fn main() {}

test
#[test] fn test() {}
#[runtime::test] async fn test() {}

bench
#[bench] fn bench() {}
#[runtime::bench] async fn bench() {}

スレッド起動
std::thread::spawn
runtime::spawn

スレッドのjoin
handle.join()
handle.await

UDP
std::net::UdpSocket
runtime::net::UdpSocket

TCPリスナー
std::net::TcpListener
runtime::net::TcpListener

TCP接続
std::net::TcpStream
runtime::net::TcpStream

スリープ
std::thread::sleep();
runtime::time::Delay::new().await;


hyperとの相互運用

hyperはデフォルトで自前のtokioランタイムを起動します。runtimeクレートを使う場合にはデフォルトの rt featureを無効化しておいたほうが無難です。


Cargo.toml

hyper = { version = "0.12.33", default-features = false }


use futures::compat::Compat;

use hyper::server::Server;

// runtimeのspawnerとlistenerを使う
let listener = ...;
let incoming = listener.incoming().map_ok(Compat::new).compat();

let server = Server::builder(incoming)
.executor(Compat::new(runtime::task::Spawner::new()))
.serve(...)
.compat();
server.await;


async-trait

async fnはトレイトやその実装内では使えません。トレイト内async fnの実現にはジェネリック関連型 (Generic Associated Type; GAT) と存在型 (existential type)が必要で、どちらも実装途上です。

また、トレイト内async fnが実装されたとしても、async fnを含むトレイトは関連型を含むためトレイトオブジェクト化が不可能になります。

では非同期関数を含むトレイトを作るにはどうすればいいかというと、 BoxFuture を返すようにすればよいです。

pub trait Foo {

fn foo(&self) -> BoxFuture<'_, i32> {
async { 42 }.boxed()
}
}

impl Foo for () {
fn foo(&self) -> BoxFuture<'_, i32> {
async { 84 }.boxed()
}
}

これはやや面倒です。複数の参照を取るメソッドになると話はさらにややこしくなります。

async-traitクレートにある async_trait 属性マクロを使うと、これをより自然に書くことができます。

#[async_trait]

pub trait Foo {
async fn foo(&self) -> i32 {
42
}
}

#[async_trait]
impl Foo for () {
async fn foo(&self) -> i32 {
84
}
}


まとめ

async/awaitへの移行を容易にする以下のテクニックを紹介しました。


  • futuresクレートの互換性

  • streamに対するfor構文の代替

  • runtimeクレート

  • hyperとの相互運用


  • async_trait マクロ