126
81

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rustのasync/awaitをスムーズに使うためのテクニック

Posted at

RustのFutureとasync/awaitは、「時間のかかる処理」をするときに、「処理が終わるまでOSスレッドをブロックする(同期Rust)」のではなく、「当該処理を中断して、そのOSスレッドを別のタスクの処理に使う(非同期Rust)」ことで、スレッド数よりも多くの処理を同時に行う仕組みです。

同期Rustと非同期Rustには以下のようなアナロジーが成立します。

同期Rust 非同期Rust
スレッド OSスレッド タスク
ランタイム OS自身 tokioやromio/juliexなどのランタイムライブラリ
すぐ終わる処理 fn(T) -> U fn(T) -> U
……の作成 fn() {} fn() {}
……の呼び出し f() f()
時間のかかる処理 fn(T) -> U fn(T) -> impl Future<Output = U>
……の作成 fn() {} async fn() {}
……の呼び出し f() f().await

同期Rustから非同期Rustへの移行は、おおよそこのアナロジーにしたがって行えることになります。 (ライブラリ・フレームワークも同様に移行される必要があります。)

とはいえ、単純置換で移行できるわけではありません。そこで以下でasync/awaitをスムーズに使うためのライブラリやテクニックを紹介します。

futures/futures01/futures-previewクレート

futuresは非同期プログラミングのベースになっているライブラリです。

futuresには主に2つのバージョンがあります。

  • 0.1系。async/awaitがなかった頃に作られた非同期処理ライブラリ。
  • 0.3系。async/awaitを念頭に置いて整理されている。

今ある非同期エコシステムは0.1系ベースのものが多いので、互換性を保つために以下のように書くのがおすすめです。

Cargo.toml
# futures01はfuturesの0.1系を指し続けるクレート。
# 0.3と併存させるために使う
futures01 = "0.1.28"
# 0.3系列はまだpreview版のため、別の名前で提供されている。
# そのうちfuturesにリネームされる
futures-preview = { version = "0.3.0-alpha.17", features = ["io-compat"] }
main.rs
use futures::prelude::*;

futuresクレートには、標準ライブラリの非同期版がいくつか同梱されています。

同期Rust 非同期Rust
Mutex std::sync::Mutex futures::lock::Mutex
チャネル std::sync::mpsc futures::channel::mpsc
futures::channel::oneshot
イテレータ std::iter::Iterator futures::stream::Stream

futuresの互換性

前節のコード例にある features = ["io-compat"] を使うと、futures 0.1系を使っているライブラリと相互運用することができます。

use futures::prelude::*; // TryFutureが入っている

let fut01 = async {
    // ...
}.boxed().compat(); // futures01::future::Futureに変換される
use futures::compat::*; // Future01CompatExtが入っている

let fut01 = ...;
let fut03 = fut01.compat(); // futures::future::Futureに変換される

ストリーム処理

ストリームを for で処理するための構文は将来的には入ることが期待されていますが、すぐには入らない予定です。 while let で処理するのがイディオマティックです。

let mut stream = ...;
// pin_utils::pin_mut!(stream); // 場合によっては必要
while let Some(item) = stream.next().await {
    // ...
}

runtimeクレート

runtimeは、非同期ランタイムをまとまった形で提供することを目指しているクレートです。非同期ランタイムの立ち上げを簡略化するための属性マクロも提供しています。

Cargo.toml
runtime = "0.3.0-alpha.6"
main.rs
#![feature(async_await)]

use runtime::prelude::*;

#[runtime::main]
async fn main() {
    // ...
}

#[runtime::test]
async fn test_foo() {
    // ...
}

タスクの起動や非同期ソケットの立ち上げ、タイマーなどのインターフェースが提供されています。これは裏側のランタイム実装 (tokioまたはromio/juliex) やタイマー実装 (futures-timer) に移譲されます。

// タスクの起動
let handle = runtime::spawn(async {
    // ...
});
handle.await;
use runtime::net::TcpListener;
// 非同期ソケット
let listener = TcpListener::bind("127.0.0.1:8080")?;
// ...
use std::time::Duration;
use runtime::time::Delay;
// タイマー
Delay::new(Duration_from_secs(1)).await
同期Rust 非同期Rust
main fn main() {} #[runtime::main] async fn main() {}
test #[test] fn test() {} #[runtime::test] async fn test() {}
bench #[bench] fn bench() {} #[runtime::bench] async fn bench() {}
スレッド起動 std::thread::spawn runtime::spawn
スレッドのjoin handle.join() handle.await
UDP std::net::UdpSocket runtime::net::UdpSocket
TCPリスナー std::net::TcpListener runtime::net::TcpListener
TCP接続 std::net::TcpStream runtime::net::TcpStream
スリープ std::thread::sleep(); runtime::time::Delay::new().await;

hyperとの相互運用

hyperはデフォルトで自前のtokioランタイムを起動します。runtimeクレートを使う場合にはデフォルトの rt featureを無効化しておいたほうが無難です。

Cargo.toml
hyper = { version = "0.12.33", default-features = false }
use futures::compat::Compat;
use hyper::server::Server;

// runtimeのspawnerとlistenerを使う
let listener = ...;
let incoming = listener.incoming().map_ok(Compat::new).compat();

let server = Server::builder(incoming)
    .executor(Compat::new(runtime::task::Spawner::new()))
    .serve(...)
    .compat();
server.await;

async-trait

async fnはトレイトやその実装内では使えません。トレイト内async fnの実現にはジェネリック関連型 (Generic Associated Type; GAT) と存在型 (existential type)が必要で、どちらも実装途上です。

また、トレイト内async fnが実装されたとしても、async fnを含むトレイトは関連型を含むためトレイトオブジェクト化が不可能になります。

では非同期関数を含むトレイトを作るにはどうすればいいかというと、 BoxFuture を返すようにすればよいです。

pub trait Foo {
    fn foo(&self) -> BoxFuture<'_, i32> {
        async { 42 }.boxed()
    }
}

impl Foo for () {
    fn foo(&self) -> BoxFuture<'_, i32> {
        async { 84 }.boxed()
    }
}

これはやや面倒です。複数の参照を取るメソッドになると話はさらにややこしくなります。

async-traitクレートにある async_trait 属性マクロを使うと、これをより自然に書くことができます。

#[async_trait]
pub trait Foo {
    async fn foo(&self) -> i32 {
        42
    }
}

#[async_trait]
impl Foo for () {
    async fn foo(&self) -> i32 {
        84
    }
}

まとめ

async/awaitへの移行を容易にする以下のテクニックを紹介しました。

  • futuresクレートの互換性
  • streamに対するfor構文の代替
  • runtimeクレート
  • hyperとの相互運用
  • async_trait マクロ
126
81
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
126
81

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?