LoginSignup
176
123

More than 3 years have passed since last update.

Rust でお気楽非同期プログラミング

Last updated at Posted at 2020-09-20

Rust 1.39 からは async/await が安定化され、非同期処理がぐっと書きやすくなりました。
Futureトレイトを自分で実装したりloop_fnで所有権を取り回したりmap_errでエラー型を魔改造したり することはもうありません!
おもむろに await していきましょう

この記事は Rust 1.46 と tokio 0.2.22 に基づいています

Rust での非同期処理

Rust では、非同期な計算は Future トレイトとして抽象化されています。JavaScript などでは Promise と呼ばれるものです。以前は非同期処理を扱うときに、場合によっては Future トレイトを実装する必要があることがありましたが、現在では async キーワードを使うことで簡潔に記述することができるようになりました。

async キーワードを使い、

  • 非同期関数 async fn をつくる or
  • async ブロックをつくる

ことで非同期な計算 Future を定義することができます。

Rust では JavaScript などと違い、Future を実行するには、Future

  • await する or
  • 非同期ランタイムに渡して非同期タスクを生成する

必要があります。非同期タスクを生成するには、Future

  • block_on する1 or
  • spawn する

すれば OK です。

非同期ランタイムには大きくわけて tokio と async-std がありますが、これらを一緒に使うことは好ましくありません2。特に理由がない場合は tokio を使うのがいいでしょう。

簡単な使用例

早速 async/await を使ったコードを書いてみましょう

simple-async
// Cargo.toml では、features フラグをこのように記載してください
// [dependencies]
// tokio = { version = "0.2", features = ["full"] }

// async 関数を作ります
async fn hello() -> String {
    "hello, async fn".to_string()
}

#[tokio::main]
async fn main() {
    // async 関数を実行して結果を待ち合わせます
    let greeting: String = hello().await;
    println!("{}", greeting);

    // async ブロックを実行して結果を待ち合わせます
    let world = async {
        println!("hello, async block");
    };
    world.await;
}

Playgroundで実行する

hello 関数には async キーワードがついていますね。これは、hello 関数は非同期に実行されることを表しています。非同期関数の内部では他の Futureawait することもできますが、この例では単に String を返しています。
hello 関数のシグネチャは、内部的には fn hello() -> impl Future<Output=String> と同じです。つまり、戻り値は String ではなく、実行結果が String になるような Future (トレイトを実装したオブジェクト)になります。

さて、hello 関数が Future を返すことがわかりました。await キーワードを使うことで、async 関数を実行完了まで待機することができます。

let greeting: String = hello().await;

では、hello() の実行完了まで待機して、実行結果を取得しています。hello().await の値は、Future の実行結果(関連型の Output) の型になります。

以下のコードでは、 async ブロック world を作って await しています。async ブロック内部でも、他の Futureawait することができますが、この例では単に標準出力に文字列を出力しています。
world も内部的には Future トレイトを実装しているため、await して結果を待ち合わせることができます。

async {
    println!("hello, async block");
}.await;

Rustでの非同期処理 のセクションでは、Future を実行するには block_on するか spawn しなければならない、と説明しました。しかし、コード simple-async には block_onspawn も見つかりません。どういうことでしょうか。それは、#[tokio::main]block_on に展開されるからです。

#[tokio::main]
async fn main() {
    println!("hello");
}

というコードは、コンパイル時に

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

と展開されます。main 関数の処理全体が block_on されているのがわかります。block_onFuture が完了するまでブロックします。

#[tokio::main]

というアノテーションをつけることで、システムで利用できる CPU のコア数分ワーカースレッドが用意され、非同期タスクをいい感じにロードバランシングして実行してくれます。3

Rust では、非同期ランタイムが非同期タスクをポーリングすることで計算が進みます。これを図示するとこのようになります:

非同期タスクの実行

Future はステートマシンに変換され、実行されます。async なコードの内部で別の Future.await することで、複雑な Future を構築することができます4

simple-async が実行される様子を考えてみましょう。main 非同期関数の内部で hello 非同期関数と world 非同期ブロックを .await することで、main 関数全体がひとつの巨大な Future となります。これがステートマシンに変換されて実行されます:

simple-async の実行

この場合、helloworld は直列に実行されます。

let greeting: String = hello().await;
println!("{}", greeting);

というコードでは、hello().await の行は Future が解決するまで待機します。そのため、

async-await
use tokio::time::delay_for;
use std::time::Duration;

async fn hello() {
    delay_for(Duration::from_millis(1000)).await;
    println!("1 sec elapsed");
}

#[tokio::main]
async fn main() {
    // async 関数を実行し、Future の実行が完了するまで待機します
    hello().await;

    // hello() が完了したあとに実行されます
    println!("hello, world");
}

Playgroundで実行する

というコードでは、main 関数の実行が始まってから 1 秒後に

1 sec elapsed
hello, async fn

と表示されます。

println!("{}", greeting);

の行が実行されるのは、hello が完了したあとです。
単に Futureawait すると、並列処理を行うときに不都合が生じることがあります。

非同期タスクを生成する

先程のコード simple-async では、Future を直列に実行していました。Futurespawn すると、非同期タスクをバックグラウンドで並列 (parallel) に実行されます。非同期タスクは tokio によっていずれかのワーカースレッドに割り当てられ、いい感じに実行されます。

spawn
use tokio::time::delay_for;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // async ブロックを実行します
    tokio::spawn(async {
        delay_for(Duration::from_millis(1000)).await;
        println!("1 sec elapsed");
    });

    // 即座に実行されます
    println!("hello, world");


    // hack: async ブロックの実行完了まで待機します
    delay_for(Duration::from_millis(2000)).await;
}

Playgroundで実行する

というコードでは、main 関数の実行が始まってからすぐに

hello, world

と表示され、さらに 1 秒経過してから

1 sec elapsed

と表示されます5

// hack: async ブロックの実行完了まで待機します
delay_for(Duration::from_millis(2000)).await;

という行は、async ブロックの実行完了まで待機するために入れました。spawn されたタスクは完了状態まで実行されるとは限りません。この行を入れない場合、タスクが完了するよりも先にランタイムが終了し main 関数から抜けてしまうため、

1 sec elapsed

が表示されません。

tokio::spawnJoinHandle を返します。これを使うことで、タスクの完了を待つことができます:

spawn-join
use tokio::time::{delay_for, Duration};

#[tokio::main]
async fn main() {
    // async ブロックを実行します
    let handle = tokio::spawn(async {
        delay_for(Duration::from_millis(1000)).await;
        println!("1 sec elapsed");
    });

    // 即座に実行されます
    println!("hello, world");


    // async ブロックの実行完了まで待機します
    let _ = handle.await.unwrap();
}

Playgroundで実行する

spawn した場合に実行結果を取得する

実行に時間がかかる処理(ネットワークにアクセスする、ファイルを読み込むなど)をバックグラウンドで実行しておいて、あとで結果を取得するという使い方がよくされます。非同期タスクの実行結果はどのように取得するのでしょうか。

1 つめの方法として、JoinHandle が利用できます。先程の例 spawn-join では handle.await の値は利用しませんでしたが、handle の実行結果は spawn した非同期タスクの実行結果を T とすると、Result<T, JoinError> となります。以下の例では単に unwrap しています:

spawn-result-joinhandle
#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        "hello world".to_string()
    });

    // handle の実行結果は async ブロックの値になります
    let result: String = handle.await.unwrap();
    println!("{}", result);
}

Playgroundで実行する

他のやり方として、チャネルを利用する方法があります。チャネルとは、非同期タスク間でデータを受け渡す仕組みのことです。非同期タスクは異なるスレッドで実行されることがありますが、チャネルを使うことで安全にデータを受け渡すことができます。ただひとつの値を受け渡すチャネルとして、oneshotチャネル が利用できます:

spawn-result-channel
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async {
        // チャネルを通してなにか値を渡します
        tx.send("hello world".to_string()).unwrap();
    });

    let result = rx.await.unwrap();
    println!("{}", result);
}

Playgroundで実行する

mpscチャネル という、複数の値を受け渡すチャネルもあります。
実行結果が 1 つの値になるときはどちらの方法も使えますが、基本的にはチャネルのほうが取り回しがしやすいでしょう。

複数の Future の結果を待ち合わせる

同時に複数の Future を実行したい場合はどのようにすればよいでしょうか。
2 つ以上の Future を全て完了まで待ち合わせたい場合は、 tokio::join が使えます。

join
#[tokio::main]
async fn main() {
    let fut1 = async {
        // ここでいろいろ .await する
        1
    };
    let fut2 = async {
        // ここでいろいろ .await する
        "hello".to_string()
    };

    // fut1, fut2 を組み合わせた Future をつくって .await します
    let (res1, res2): (i32, String) = tokio::join!(fut1, fut2);

    println!("{} {}", res1, res2);
}

Playgroundで実行する

tokio::join は複数の Future を 1 つの大きな Future にまとめ、.await するものです。fut1fut2 は並行 (concurrent) に実行されますが、並列 (parallel) に実行されるわけでないことに注意しましょう。fut1fut2 は代わりばんこにポーリングされます。実行の様子を図示するとこのようになります:

joinによる実行 - concurrent

並列に実行したい場合は fut1fut2spawn してから、JoinHandlejoin します:

join-spawn
#[tokio::main]
async fn main() {
    let fut1 = async {
        // ここでいろいろ .await する
        1
    };
    let fut2 = async {
        // ここでいろいろ .await する
        "hello".to_string()
    };

    let handle1 = tokio::spawn(fut1);
    let handle2 = tokio::spawn(fut2);

    // handle1, handle2 を組み合わせた Future をつくって .await します
    if let (Ok(res1), Ok(res2))  = tokio::join!(handle1, handle2) {
        println!("{} {}", res1, res2);
    }
}

Playgroundで実行する

実行の様子はこのようになります:

joinによる実行 - parallel

2 つ以上の Future を同時に実行し、いちばん早く完了したものの結果を取得したい場合は tokio::select が使えます。タイムアウトの処理を実装したいときに便利です:

select
use tokio::time::{delay_for, Duration};

#[tokio::main]
async fn main() {
    // fut のほうが早く完了する
    let delay = delay_for(Duration::from_millis(50));
    let fut = async {
        // 実行に 20ms かかります
        delay_for(Duration::from_millis(20)).await;
        1
    };

    tokio::select! {
        _ = delay => {
            println!("timeout");
        }
        v = fut => {
            println!("result: {}", v);
        }
    }

    // delay のほうが早く完了する
    let delay = delay_for(Duration::from_millis(50));
    let fut = async {
        // 実行に 100ms かかります
        delay_for(Duration::from_millis(100)).await;
        1
    };

    tokio::select! {
        _ = delay => {
            println!("timeout");
        }
        v = fut => {
            println!("result: {}", v);
        }
    }
}

Playgroundで実行する

tokio::select! {
    v = fut => {
        println!("result: {}", v);
    }
}

の記述ですが、await したい Futurefut の部分に記述します。fut.await の結果は v で束縛できます。v の値は => のあとのブロックで利用することができます。

例: ネットワーク上の API にアクセスする

これまでの説明では簡単のためにあまりおもしろくない async なコードを実行してきました。このセクションでは、非同期処理を使って GitHub API にアクセスして、リポジトリのスター数を調べてみます。コードと実行結果を示します。コードは GitHub/Kumassy/easy-going-async にあります。

コード:

github-api
use anyhow::{Context, Result};
use futures::future::try_join_all;
use reqwest::header;
use serde_json::Value;
use std::sync::Arc;
use tokio::task::JoinHandle;

/// リポジトリの API にアクセスし、スター数を返します
async fn get_star_count(client: &reqwest::Client, repo: String) -> Result<u64> {
    let resp: Value = client
        .get(&format!("https://api.github.com/repos/{}", repo))
        .send()
        .await? // API にアクセスします
        .json()
        .await?; // JSON をパースします
    let count = resp
        .get("stargazers_count")
        .context("GitHub API error: stargazers_count is not found")?
        .as_u64()
        .context("GitHub API error: stargazers_count is not an integer")?;
    Ok(count)
}

#[tokio::main]
async fn main() -> Result<()> {
    // HTTP request header と user agent を設定します
    let mut headers = header::HeaderMap::new();
    headers.insert(
        header::ACCEPT,
        header::HeaderValue::from_static("application/vnd.github.v3+json"),
    );

    let client = reqwest::Client::builder()
        .user_agent("rust reqwest")
        .default_headers(headers)
        .build()?;
    // 複数の非同期タスクで共有するので、Arc で包みます
    let client = Arc::new(client);

    // 調べたいリポジトリのリストです
    let repos = vec![
        "rust-lang-nursery/failure".to_string(),
        "rust-lang-nursery/lazy-static.rs".to_string(),
        "rust-lang/libc".to_string(),
        "bitflags/bitflags".to_string(),
        "rust-lang/log".to_string(),
    ];

    // spawn した非同期タスクのハンドラです
    let handles: Vec<JoinHandle<Result<u64>>> = repos
        .iter()
        .map(|repo| {
            // tokio::spawn は 'static を要求するため、clone しておきます
            let client = client.clone();
            let repo = repo.clone();

            // client と repo を move します
            tokio::spawn(async move {
                // 非同期タスクの実行結果は Result<u64> です
                get_star_count(&client, repo).await
            })
        })
        .collect::<Vec<_>>();

    // repos に対応するスター数です
    let stars: Vec<u64> = try_join_all(handles) // Vec<Result<T>> を Result<Vec<T>> に変換してくれます
        .await?
        .into_iter()
        .collect::<Result<Vec<u64>>>()?; // Vec<Result<T>> を Result<Vec<T>> に変換します

    for (repo, star) in repos.iter().zip(stars) {
        println!("{} has {} stars", repo, star);
    }

    Ok(())
}

実行結果:

rust-lang-nursery/failure has 1432 stars
rust-lang-nursery/lazy-static.rs has 989 stars
rust-lang/libc has 862 stars
bitflags/bitflags has 326 stars
rust-lang/log has 891 stars

get_star_count は特定のリポジトリのスター数を返す非同期関数です。tokio ベースの HTTP クライアントでは、reqwest が人気なのでこちらを使いました。エラー処理を簡素化するため、anyhow を使っています。anyhow::Result<T>Result<T, anyhow::Error> と同じです。

    let resp: Value = client
        .get(&format!("https://api.github.com/repos/{}", repo))
        .send()
        .await? // API にアクセスします
        .json()
        .await?; // JSON をパースします

というコードでは、GitHub API にアクセスして、JSON 形式のレスポンスをパースしています。send()json()Future を返すため、.await が必要です。次に、resp から目的のスター数を取り出します:

    let count = resp
        .get("stargazers_count")
        .context("GitHub API error: stargazers_count is not found")?
        .as_u64()
        .context("GitHub API error: stargazers_count does not integer")?;

ここでは serde_json::value::Value から目的のデータを頑張って取得していますが、ほしいデータが散らばっていたりネストしたりしている場合は、serde-query が便利そうです。
context はエラー処理のためのものです。get()Option を返すのですが、context を書くことで std::option::NoneError を anyhow でうまく扱えるようになります。

    // HTTP request header と user agent を設定します
    let mut headers = header::HeaderMap::new();
    headers.insert(
        header::ACCEPT,
        header::HeaderValue::from_static("application/vnd.github.v3+json"),
    );

    let client = reqwest::Client::builder()
        .user_agent("rust reqwest")
        .default_headers(headers)
        .build()?;
    // 複数の非同期タスクで共有するので、Arc で包みます
    let client = Arc::new(client);

というコードでは、HTTP クライアントの設定をしています。client は複数の非同期タスクで共有するため、std::sync::Arc で包んでいます。


    // spawn した非同期タスクのハンドラです
    let handles: Vec<JoinHandle<Result<u64>>> = repos
        .iter()
        .map(|repo| {
            // tokio::spawn は 'static を要求するため、clone しておきます
            let client = client.clone();
            let repo = repo.clone();

            // client と repo を move します
            tokio::spawn(async move {
                // 非同期タスクの実行結果は Result<u64> です
                get_star_count(&client, repo).await
            })
        })
        .collect::<Vec<_>>();

というコードでは、非同期タスクを spawn しています。tokio::spawn'static を要求するため、clientrepoclone() し、move しています。move が必要ない場合もありますが、実践的なコードでは move が必要になることが多いでしょう。
非同期タスクの実行結果は Result<u64> ですが、非同期タスクのハンドラの実行結果は JoinError になる可能性があります。したがって、handlesawait すると、

Vec<Result<Result<u64, anyhow::Error>, JoinError>>

になる点に注意してください。

try_join_all は実行結果が Result となる Future のリストを実行して、実行結果をひとつの Result にまとめてくれるものです。? を使うことで、この Result を外しています。ドキュメントのコード例 がわかりやすいです。

    let stars = try_join_all(handles) // Vec<Result<T>> を Result<Vec<T>> に変換してくれます
        .await?

までのコードでは、

Vec<Result<u64, anyhow::Error>>

という型になるでしょう。Result のリストを collect() を使うことで ひとつの Result にまとめ、? を使ってこの Result を外します:

    let stars: Vec<u64> = try_join_all(handles) // Vec<Result<T>> を Result<Vec<T>> に変換してくれます
        .await?
        .into_iter()
        .collect::<Result<Vec<u64>>>()?; // Vec<Result<T>> を Result<Vec<T>> に変換します

最後は stars を整形して標準出力に出力して終わりです:

    for (repo, star) in repos.iter().zip(stars) {
        println!("{} has {} stars", repo, star);
    }

まとめ

async なコードの書き方や実行の仕方を紹介しました。async/await キーワードが安定化されてから、非同期処理を簡単に書くことができるようになりました。async/await を使って、Rust で非同期処理をどんどん書いていきましょう!

さらに詳しく

公式チュートリアルです。async/await に対応した tokio 0.2 向けに内容が更新され、オシャレになりました
https://tokio.rs/tokio/tutorial

非同期 Rust に関する詳しい解説記事です。日本語です。
https://tech-blog.optim.co.jp/entry/2019/11/08/163000

async/await を使ったコードはどのようにステートマシンに変換され、実行されるのでしょうか。また、非同期ランタイムはどのように poll するタイミングを決めているのでしょうか。背景にある Generator や、なぜ Pin が必要になるのか、async な API は poll されるときに何をしているのかが詳しく解説されています。
https://cfsamson.github.io/books-futures-explained/introduction.html

block_onspawn は裏で何をしているのでしょうか。Safe Rust のみを使い、簡単な非同期ランタイムを作って理解を深めてみましょう。
https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
https://stjepang.github.io/2020/01/31/build-your-own-executor.html

謝辞

この記事の一部は @__pandaman64__ 氏に助言をいただき執筆しました :pray:


  1. block_on は非同期タスクが完了するまでスレッドをブロックしてしまうのであまり出番がありませんが、後の説明のために紹介しています 

  2. 非同期ライブラリを使う際は tokio ベースなのか async-std ベースなのかを確認して使い分けてください: https://qiita.com/legokichi/items/53536fcf247143a4721c 

  3. カスタマイズしたい場合は、tokio::runtime::Builder が利用できます。 

  4. 以前は and_thenmap_err などを使って手動で巨大な Future を練り上げていました。苦行でした。 

  5. Rust Playground では残念ながら同時に表示されます。ローカルの環境で実行してみてください。コードは GitHub/Kumassy/easy-going-async にあります 

176
123
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
176
123