LoginSignup
98
57

More than 1 year has passed since last update.

Rustの非同期プログラミングの個人的まとめ 2022年版

Last updated at Posted at 2022-12-15

この記事はWano Group Advent Calendar2022 16日目の記事です。

今回はRustの非同期プログラミング(スレッドやasync)周りで私が知っている事を2022年版としてまとめようと思います。

Rustのバージョンは記事執筆時点で最新の1.65.0を想定しています。

普通のスレッドで非同期処理をする時のあれこれ

まずはOSにお願いして作ってもらう普通のスレッドについてです。

スレッドの作り方

Rustは標準ライブラリ(std)にスレッドを取り扱う為のAPI(std::thread)があります。

fn main() {
    let handle = std::thread::spawn(|| {
        println!("Hello Thread!");
        "🍣".to_string() + "🍺"
    });
    let s = handle.join().unwrap();
    println!("{s}");
}

Playground

std::thread::spawnにクロージャを渡すとスレッドが起動します。
std::thread::spawnJoinHandleを返すので、これのjoinメソッドを使うことでスレッドの終了待ちをする事が出来ます。
上記の例の様にスレッドの終了よりも先にプログラムが終了してしまうようなケースではjoinを使って終了待ちをしないとmain関数の終了と同時にスレッドも終了されてしまいます。

スレッドとして実行するクロージャは戻り値を返す事ができ、この戻り値はhandleのjoinメソッドの戻り値として取得する事が出来ます。

スレッドの停止・再開

適当なロックやoneshotやchannel等を使って停止・再開を制御してもいいですが、スレッドの停止・再開用の機能が標準で提供されています。

fn main() {
    let handle = std::thread::spawn(move || {
        println!("🍣");
        std::thread::park(); // スレッドの停止
        println!("🍺");
    });
    std::thread::sleep(std::time::Duration::from_secs(1));
    println!("🐣");
    handle.thread().unpark(); // スレッドの再開
    handle.join().unwrap();
}

Playground

parkで止めてunparkで再開します。
正しい方が呼び出し易くなるようにそれぞれ定義されている場所が異なっている点に気をつけて下さい。

タイムアウト有りの停止をする為のpark_timeoutもあります。

内部的にはfutexを使ってるみたいです。

他のスレッドの進捗待ちをする

  • std::sync::Condvar
    • 典型的にはMutexで保護されたリソースを他のスレッドが変更するまで待つのに使う。
  • std::sync::Barrier
    • 典型的にはスレッド間で処理の進捗を合わせるのに使う。
  • crossbeam_utils::sync::WaitGroup
    • Barrierと概ね同じですが以下の点で異なります。
      • 事前にスレッド数が分かってなくても良い。後からスレッドが追加される可能性があるケースで便利?
      • Barrierは利用するスレッド全てが一時停止をするが WaitGroupでは停止の必要が無いスレッドは止まらずに処理を続けられる。

CondvarBarrierparking_lotというcrateで高性能版が提供されています。
使える場合はこちらの方が良いでしょう。

スレッド間でのデータのやり取り

普通の参照を共有して読み書き

std::thread::scope を使うと素の参照を何も考えずにそのまま別スレッドに渡せるので楽です。
参照の取り扱いは通常の借用と同じです(なので複数スレッドに&mut Tな可変参照を渡す事はできません)。

fn main() {
    let x = "🍣🍺".to_string();
    let mut y = "🐣".to_string();
    let rev_x = std::thread::scope(|scope| {
        let handle1 = scope.spawn(|| {
            x.chars().rev().collect::<String>()
        });
        let handle2 = scope.spawn(|| {
            y += &x;
            y += "🐔";
        });
        let rev_x = handle1.join().unwrap();
        handle2.join().unwrap();
        rev_x
    });
    println!("{rev_x} & {y}");
}

Playground

Arcで共有して読み書き

std::thread::scopeが合わないケースではstd::sync::Arc(参照カウンタ方式のスレッドを跨げるスマートポインタ)を使ってスレッド間でデータの共有を行います。
この方法だとArcMutex等のロック機構を組み合わせる事で複数スレッドから同じデータを安全に変更する事が出来ます。

データへのアクセス時にどの様なロックが必要かに応じて更に内部の値をラップして使用します。

説明
Arc<T> ただのリファレンスカウントで寿命管理された参照です。Read Onlyな用途に使う事をオススメします。
Arc<Mutex<T>> Mutexから取得したロックを通して内部の値にアクセスすることが出来ます。
Arc<RwLock<T>> 共有ロックと排他ロックを使い分けたい時向けです。こちらも取得したロックを通して内部の値にアクセスできます。
Arc<Mutex<RefCell<T>>> 並行処理で &mut T が必要な時に使用します。
Arc<RwLock<RefCell<T>>> 同上。ロックの種類が異なるだけです。

MutexRwLockparking_lotというcrateで高性能版が提供されています。
使える場合はこちらの方が良いでしょう。

またどの様な型を選択すべきかはこちらのRust Memory Container Cheat-sheetも参考になります。

channelを通して値を別スレッドに送る

標準ライブラリからstd::sync::mpscでmulti-producer single-consumerなchannelが提供されています。

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    {
        let tx = tx.clone();
        std::thread::spawn(move || {
            "🍣🍺🍖".chars().for_each(|c| {
                tx.send(c).unwrap();
                std::thread::sleep(std::time::Duration::from_millis(17));
            });
        });
    };
    {
        let tx = tx.clone();
        std::thread::spawn(move || {
            "🐶😺🐣".chars().for_each(|c| {
                tx.send(c).unwrap();
                std::thread::sleep(std::time::Duration::from_millis(17));
            });
        });
    }
    drop(tx);
    while let Ok(c) = rx.recv() {
        print!("{c}");
    }
}

Playground

また、crossbeamからはmulti-producer multi-consumerなchannelが提供されています。

fn main() {
    let (tx, rx) = crossbeam::channel::unbounded(); // ここがcrossbeam
    {
        let tx = tx.clone();
        std::thread::spawn(move || {
            "🍣🍺🍖".chars().for_each(|c| {
                tx.send(c).unwrap();
                std::thread::sleep(std::time::Duration::from_millis(17));
            });
        });
    };
    {
        let tx = tx.clone();
        std::thread::spawn(move || {
            "🐶😺🐣".chars().for_each(|c| {
                tx.send(c).unwrap();
                std::thread::sleep(std::time::Duration::from_millis(17));
            });
        });
    }
    drop(tx);
    while let Ok(c) = rx.recv() {
        print!("{c}");
    }
}

Playground

Iteratorをスレッドで並列化する

rayonを使います。

use rayon::prelude::*;

fn fib(i: u64) -> u64 {
    if i <= 2 {
        1
    } else {
        fib(i - 2) + fib(i - 1)
    }
}

fn main() {
    let xs: Vec<u64> = (1..=100000).collect();
    let total: u64 = xs
        .into_par_iter() // ← この辺がrayon
        .map(|x| fib(x % 5 + 20))
        .sum();
    println!("total: {total}");
}

Playground

rayonの性能を体感したい場合はPlaygroundだと分かりにくいのでローカルで試すのをオススメします。

Futureによるasync/.awaitで非同期処理をする時のあれこれ

Future traitを中心としたユーザ空間で動く非同期処理についてです。

公式にはRust async bookで基礎的な説明がされています。

基本的な構成要素

Future trait

Future traitはRustで非同期処理を作る上での一番基礎になるビルディングブロックです。

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

RustのFutureは基本的にはpollメソッドで状態を確認して Poll::Ready であれば実行完了と見なすという使われ方をします。

FutureはselfPin<&mut Self>とする事でselfがどこかしらのアドレスに固定されている事を要求しています(Pinが何なのかは後述)。
また、非同期ランタイムが必要以上にpollメソッドを叩かずに済むように、Contextにはpollを叩いて何かしらの進捗が得られる状態になった事をFuture側からランタイムに通知する為のコールバック関数が含まれています。

Futureを作るだけであればContextは非同期ランタイムが作って渡してくるので基本的にはバケツリレーしていれば良いです。

非同期ランタイムを作る場合はWake traitを実装した型を作る必要があるでしょう(もしくはfutures crateのArcWake)。

tokio等の非同期ランタイムはこのFuture traitで抽象化された並行実行可能な処理の実行を担当します。

async/.await

Rust 1.39.0から安定化されたFutureベースの非同期処理の為の構文です。

次の様に使います。

async fn request_example_json() -> anyhow::Result<Example> {
    reqwest::get("https://example.com/example.json")
        .await?
        .json()
        .await
        .map_err(From::from)
}

普通の関数であればfnとする所をasync fnと書く事でこの関数がFutureを返す事を表します。
FutureはRustコンパイラがこの関数用に自動で実装してくれます。
戻り値にはFuture::Outputに入る型だけ書けば良いです。
実質的な戻り値の型は impl Future<Output=T> みたいになります。

.awaitは他の多くの言語と異なりFutureのプロパティであるかの様に書きます。
これにより上記の例の様にメソッドチェーンで最後まで繋げて書くことが出来ます。

注意事項

traitのメソッドにasyncを付けることは出来ません。
traitのメソッドにasyncを付けたい場合はasync-trait crateを利用しましょう。
もしくはPin<Box<dyn Future<Output=T>>>を返すようにします。

async関数はBox化したFutureを返すようにしないと再帰が出来ません。
async-recursion crateを使うとこの辺を自動でやってくれてお手軽に再帰可能に出来ます。

async ブロック

async関数/メソッドにするほどでもない程度の処理はasyncブロックとして書く事が出来ます。

async fn request_example_json_simultaneously() -> anyhow::Result<(Example, Example)> {
    let future1 = async {
        let res = reqwest::get("https://example.com/example.json")
            .await?
            .json()
            .await?;
        anyhow::Ok(res)
    };

    let future2 = async {
        let res = reqwest::get("https://example.com/example.json")
            .await?
            .json()
            .await?;
        anyhow::Ok(res)
    };

    Ok(futures::try_join!(future1, future2)?)
}

Pin

ここまででPinという型がちょこちょこと出てきています。
これは何なのでしょう?

Pinasync/.await構文を成り立たせる為に必須の要素です。

async構文によって自動生成されるFutureはクロージャと同様に環境をキャプチャした構造体です。
クロージャと異なるのは処理中に発生する他のFutureを保持する為のフィールドもあることです。

この構造体が抱えているFutureが同じく構造体が抱えている変数への参照を持っている場合、少し状況を簡単になるように言い換えると構造体のあるフィールドが別のフィールドを参照している状態ではある種の問題が発生します。

それは構造体がmoveした後にフィールドが持っている参照が以前あったアドレスを指し続けてしまう問題です。

これを解決する為に参照先のオブジェクトがmoveしない事を保証する為の仕組みが導入されました。

それがPinUnpinです。
オブジェクトを特定のアドレスに固定することをピン刺しに例えている命名になっています。

Unpinはコンパイラによって自動実装されるtraitでmoveしても安全な型である事を表します。
なので普通の型は自己参照する要素を持っていないので全てUnpinが実装されることになります。

async構文やasyncブロックによって自動生成されたFutureはそれとは反対に!Unpin、つまりUnpinを実装していない型になります。
これによりmoveするのが安全ではない事が表されます。

しかし!UnpinだけではRustにはmoveを防ぐ機能は無いのでまだmove出来てしまいます。

moveしない事を確実に保証する為にPin構造体で目的のオブジェクトの参照をラップします。
参照をラップする事で元のオブジェクトがmoveしようとすればコンパイルエラーになります。
そしてPin構造体はUnpinの実装有無で可変な操作が出来るかどうかを制限することで抜け道も塞いでいます。
非同期ランタイムは何らかのアドレスの変わらない場所にFutureを固定してからPinを通してFutureを取り扱います。

ちなみに!UnpinだけどPinを通した可変な操作をしたい場合はpin-project crateを使うことで出来るようになります。
これはフィールド毎の可変参照を取れるようにする事で元のオブジェクトのmoveは出来ないという形の安全性は保てるという仕組みになっています。

ここはちょっと分かりにくい話なので、よく分からない場合は以下の資料を読むと分かるかもしれません。

futures crate

RustではFuture関係の基本的な機能はstd::futureに入っているのですが、ここに入っているものは最低限のものだけになっています。

一般的に期待されるようなAPIはサードパーティライブラリとして実装されており、その中でも中心的なものがfutures crateです。
futures crateは以下のような機能を提供します。

非同期ランタイム

色々あるが特に理由が無ければ一番人気のtokioを使えば良いです。

tokio系の話

tracingと関連crates

asyncを使うと実際の処理の構造とスタックとレースが一致しなくなります。
そんな状態でも処理の構造を把握できるような形でログを吐く仕組みを提供しているのがtracing crateです。
asyncだけでなく同期的な処理にも使えるので広く公開するライブラリでもなければtracingを利用するのが無難だと思います。

  • tracing
    • 本体。概ねlogの上位互換です。
    • tracing-subscriberを使ってログの書式設定を行います。
  • tracing-log
    • log互換レイヤー。
    • とりあえずtracing_log::LogTracer::init()?しておけばOK。
  • tracing-futures
    • features = ["futures-03"]を付けるとStreamSinkにも.instrument()できるようになります。
  • tracing-opentelemetry
    • tracingとopentelemetryの橋渡し的なやつです。
  • sentry-tracing
    • tracingのeventをsentryに送るやつです。
  • test-log
    • #[test]をこのcrateのものに変えるだけでテスト毎に自動でtracingのsubscriberをセットアップしてくれます。
  • tokio-console
    • tracingの情報を使ってどんなfutureが実行中なのか等をコンソールに表示できるようにするものです。

tracing, tracing-subscriber, tracing-logを使うサンプルコードです。
tracing-subscriberを使ってファイル名・行番号の追加とtimestampのタイムゾーンをデフォルトのUTCからJSTに変更しています。

use time::{format_description::FormatItem, macros::format_description, UtcOffset};
use tracing::Instrument;
use tracing_subscriber::fmt::time::OffsetTime;

const LOG_TIMESTAMP_FORMAT: &[FormatItem] = format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6][offset_hour sign:mandatory][offset_minute]");

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let subscriber = tracing_subscriber::fmt()
        .with_max_level(tracing::Level::TRACE)
        .with_file(true)
        .with_line_number(true)
        .with_timer(OffsetTime::new(
            UtcOffset::from_hms(9, 0, 0)?,
            LOG_TIMESTAMP_FORMAT,
        ))
        // .json() // この行を有効にするとJSONで出力されるようになります。
        .finish();
    tracing::subscriber::set_global_default(subscriber).unwrap();
    tracing_log::LogTracer::init()?;

    tracing::trace!("Start fibonacci calculation");
    println!(
        "{:#?}",
        fibonacci_number(&[128, 175, 156])
            .instrument(tracing::trace_span!(
                "main call fibonacci_number",
                process_id = "deadbeef"
            ))
            .await
    );
    tracing::trace!("End fibonacci calculation");

    Ok(())
}

async fn fibonacci_number(indices: &[usize]) -> Vec<u128> {
    #[async_recursion::async_recursion]
    async fn go(memo: &mut Vec<u128>, i: usize) -> u128 {
        if i >= memo.len() {
            panic!("Index {i} is out of bounds of memo table");
        }

        if memo[i] > 0 {
            tracing::trace!(i = i, "Return memoized value");
            memo[i]
        } else if i <= 1 {
            tracing::trace!(i = i, "Return 1 since i <= 1");
            memo[i] = 1;
            memo[i]
        } else {
            tracing::trace!(i = i, "Return calculated value");
            tokio::task::yield_now().await;
            memo[i] = go(memo, i - 1).await + go(memo, i - 2).await;
            tokio::task::yield_now().await;
            memo[i]
        }
    }

    futures::future::join_all(indices.iter().map(|&i| {
        async move {
            let mut memo = vec![0; i];
            go(&mut memo, i - 1).await
        }
        .instrument(tracing::trace_span!("fibonacci", i = i))
    }))
    .await
}

Cargo.tomlのdependenciesは下記の様になります。

[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-recursion = "1.0"
futures = "0.3"
log = "0.4"
time = { version = "*", features = ["macros"] }
tokio = { version = "1.22.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json", "local-time"] }
tracing-log = "0.1.3"

実行結果(途中省略)

2022-12-05T01:03:15.695646+0900 TRACE async_samples: src/main.rs:22: Start fibonacci calculation
2022-12-05T01:03:15.695780+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=128}: async_samples: src/main.rs:52: Return calculated value i=127
2022-12-05T01:03:15.695821+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:52: Return calculated value i=174
2022-12-05T01:03:15.695852+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=156}: async_samples: src/main.rs:52: Return calculated value i=155
2022-12-05T01:03:15.695883+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=128}: async_samples: src/main.rs:52: Return calculated value i=126
...(省略)...
2022-12-05T01:03:16.374943+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:45: Return memoized value i=170
2022-12-05T01:03:16.374980+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:45: Return memoized value i=171
2022-12-05T01:03:16.375018+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:45: Return memoized value i=172
[
    251728825683549488150424261,
    1672445759041379840132227567949787325,
    178890334785183168257455287891792,
]
2022-12-05T01:03:16.375086+0900 TRACE async_samples: src/main.rs:32: End fibonacci calculation

tokio-uring

https://crates.io/crates/tokio-uring
tokioでio-uringを使うようにできます。

asyncでのコードのサンプル

ランタイムにはtokioを使用する前提でコードの例を掲載します。

tokioをランタイムとして使用

cargo add tokio@1.22 --features fullでCargo.tomlのdependenciesにtokioを追加します。

あとはmain関数にasyncを付けてtokio::mainマクロをmain関数のattributeとして足します。

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // ここにコード
    Ok(())
}

もしくは自分でtokioランタイムの起動タイミングを制御したい場合は次のようにします。

use tokio::runtime::Runtime;

fn main() -> anyhow::Result<()> {
    let rt = Runtime::new()?;
    let ret_val = rt.block_on(async {
        // ここにコード
        anyhow::Ok(())
    });
    Ok(())
}

標準出力に非同期で書く

use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tokio::io::stdout().write_all("Hello Tokio!".as_bytes()).await?;
    Ok(())
}

write_allはロックを取ったりする訳ではないので、長文を出力する場合は他のFutureからの出力と混ざって変になる可能性があります。
きちんとやる場合はロックを取るなり書き出すFutureを1つに絞るなりした方が良いと思います。

標準入力から非同期で読み込み

読み込んだ内容はそのまま標準出力に出す事にします。

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut r = BufReader::new(tokio::io::stdin());
    let mut buf = String::with_capacity(1024);
    let mut w = tokio::io::stdout();
    while r.read_line(&mut buf).await? > 0 {
        w.write_all(buf.as_bytes()).await?;
    }
    Ok(())
}

HTTPリクエストを投げる

GETリクエストを投げてその内容を標準出力に書き出すサンプルです。

HTTPリクエストの送信にはreqwest crateを使います。

use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let content = reqwest::get("https://example.com/example.json")
        .await?
        .text()
        .await?;

    tokio::io::stdout().write_all(content.as_bytes()).await?;

    Ok(())
}

複数のHTTPリクエストを並行に投げて両方の終了を待つ

futures::join!マクロを使います。

use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let fut1 = async {
        let content = reqwest::get("https://example.com/example.json")
            .await?
            .text()
            .await?;
        anyhow::Ok(content)
    };

    let fut2 = async {
        let content = reqwest::get("https://example.com/other.json")
            .await?
            .text()
            .await?;
        anyhow::Ok(content)
    };

    let (result1, result2) = futures::join!(fut1, fut2);

    for content in [result1?, result2?] {
        tokio::io::stdout().write_all(content.as_bytes()).await?;
    }

    Ok(())
}

select! macroで複数のfutureのうち最初に終わるものを待つ

use futures::future::FutureExt;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let f1 = async {
        tokio::time::sleep(Duration::from_secs(3)).await;
        "future-1"
    };
    
    let f2 = async {
        tokio::time::sleep(Duration::from_secs(2)).await;
        "future-2"
    };
    
    let res = futures::select! {
        res1 = f1.fuse() => res1,
        res2 = f2.fuse() => res2,
    };
    
    println!("{res}");
}

Playground

基本的には見たままです。
詳しくはこちらのドキュメントをご覧下さい

Futureのキャンセル

基本的にはdropすればキャンセルされますが tokio::spawn したものは返ってきたhandleをdropしても止まらないので自前でキャンセルする必要があります。
これはtokio_util::sync::CancellationTokenfutures::select!を使うと比較的簡単に書けます。

use futures::future::FutureExt;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cancel_token = CancellationToken::new();
    
    let f1 = {
        let cancel_token = cancel_token.clone();
        tokio::spawn(async move {
            println!("Begin future-1");
            futures::select! {
                _ = tokio::time::sleep(Duration::from_secs(6)).fuse() => println!("End future-1"),
                _ = cancel_token.cancelled().fuse() => println!("Cancelled future-1"),
            };
            "future-1"
        }).fuse()
    };
    
    let f2 = {
        let cancel_token = cancel_token.clone();
        tokio::spawn(async move {
            println!("Begin future-2");
            futures::select! {
                _ = tokio::time::sleep(Duration::from_secs(3)).fuse() => println!("End future-2"),
                _ = cancel_token.cancelled().fuse() => println!("Cancelled future-2"),
            };
            "future-2"
        }).fuse()
    };
    
    futures::pin_mut!(f1,f2);
    
    let res = futures::select! {
        res1 = f1 => res1?,
        res2 = f2 => res2?,
    };
    cancel_token.cancel();
    
    println!("First finished: {res}");
    
    tokio::time::sleep(Duration::from_secs(10)).await;
    
    Ok(())
}

Playground

CancellationToken::new()でオブジェクトを作り、適当にコピーを各futureに渡し、キャンセルしたいタイミングで cancel_token.cancel() とcancelメソッドを呼び出してキャンセルします。
dropでもキャンセルが走るのでメソッドを呼び忘れても安心です。

async対応crateを選ぶ時の注意事項

asyncに対応しているcrateを選ぶ時はそのcrateがどのランタイムに対応しているか確認しましょう。

基本的にはcrateのdependenciesにどのランタイムが入っているかで確認します。
dependenciesにランタイムが無い場合は多分ランタイム非依存です。

あとは動かしてみてエラーが出ずにちゃんと動けば大丈夫です。

その他のasync対応ライブラリ

※ここは後から書き足す予定です。

最後に

漏れとかあると思いますが、この記事を書くにあたって思い出せた・調べられた事は全て書きました。

間違っている所や不足がありましたら優しく教えて頂けると幸いです。

最後まで読んで頂きありがとうございました。

98
57
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
98
57