この記事はWano Group Advent Calendar2022 16日目の記事です。
今回はRustの非同期プログラミング(スレッドやasync)周りで私が知っている事を2022年版としてまとめようと思います。
Rustのバージョンは記事執筆時点で最新の1.65.0を想定しています。
普通のスレッドで非同期処理をする時のあれこれ
まずはOSにお願いして作ってもらう普通のスレッドについてです。
スレッドの作り方
Rustは標準ライブラリ(std)にスレッドを取り扱う為のAPI(std::thread)があります。
fn main() {
let handle = std::thread::spawn(|| {
println!("Hello Thread!");
"🍣".to_string() + "🍺"
});
let s = handle.join().unwrap();
println!("{s}");
}
std::thread::spawnにクロージャを渡すとスレッドが起動します。
std::thread::spawn
はJoinHandleを返すので、これのjoinメソッドを使うことでスレッドの終了待ちをする事が出来ます。
上記の例の様にスレッドの終了よりも先にプログラムが終了してしまうようなケースではjoin
を使って終了待ちをしないとmain関数の終了と同時にスレッドも終了されてしまいます。
スレッドとして実行するクロージャは戻り値を返す事ができ、この戻り値はhandleのjoinメソッドの戻り値として取得する事が出来ます。
スレッドの停止・再開
適当なロックやoneshotやchannel等を使って停止・再開を制御してもいいですが、スレッドの停止・再開用の機能が標準で提供されています。
fn main() {
let handle = std::thread::spawn(move || {
println!("🍣");
std::thread::park(); // スレッドの停止
println!("🍺");
});
std::thread::sleep(std::time::Duration::from_secs(1));
println!("🐣");
handle.thread().unpark(); // スレッドの再開
handle.join().unwrap();
}
parkで止めてunparkで再開します。
正しい方が呼び出し易くなるようにそれぞれ定義されている場所が異なっている点に気をつけて下さい。
タイムアウト有りの停止をする為のpark_timeoutもあります。
内部的にはfutexを使ってるみたいです。
他のスレッドの進捗待ちをする
-
std::sync::Condvar
- 典型的にはMutexで保護されたリソースを他のスレッドが変更するまで待つのに使う。
-
std::sync::Barrier
- 典型的にはスレッド間で処理の進捗を合わせるのに使う。
-
crossbeam_utils::sync::WaitGroup
-
Barrier
と概ね同じですが以下の点で異なります。- 事前にスレッド数が分かってなくても良い。後からスレッドが追加される可能性があるケースで便利?
-
Barrier
は利用するスレッド全てが一時停止をするがWaitGroup
では停止の必要が無いスレッドは止まらずに処理を続けられる。
-
Condvar
とBarrier
はparking_lotというcrateで高性能版が提供されています。
使える場合はこちらの方が良いでしょう。
スレッド間でのデータのやり取り
普通の参照を共有して読み書き
std::thread::scope
を使うと素の参照を何も考えずにそのまま別スレッドに渡せるので楽です。
参照の取り扱いは通常の借用と同じです(なので複数スレッドに&mut T
な可変参照を渡す事はできません)。
fn main() {
let x = "🍣🍺".to_string();
let mut y = "🐣".to_string();
let rev_x = std::thread::scope(|scope| {
let handle1 = scope.spawn(|| {
x.chars().rev().collect::<String>()
});
let handle2 = scope.spawn(|| {
y += &x;
y += "🐔";
});
let rev_x = handle1.join().unwrap();
handle2.join().unwrap();
rev_x
});
println!("{rev_x} & {y}");
}
Arcで共有して読み書き
std::thread::scope
が合わないケースではstd::sync::Arc(参照カウンタ方式のスレッドを跨げるスマートポインタ)を使ってスレッド間でデータの共有を行います。
この方法だとArc
とMutex
等のロック機構を組み合わせる事で複数スレッドから同じデータを安全に変更する事が出来ます。
データへのアクセス時にどの様なロックが必要かに応じて更に内部の値をラップして使用します。
型 | 説明 |
---|---|
Arc<T> |
ただのリファレンスカウントで寿命管理された参照です。Read Onlyな用途に使う事をオススメします。 |
Arc<Mutex<T>> |
Mutexから取得したロックを通して内部の値にアクセスすることが出来ます。 |
Arc<RwLock<T>> |
共有ロックと排他ロックを使い分けたい時向けです。こちらも取得したロックを通して内部の値にアクセスできます。 |
Arc<Mutex<RefCell<T>>> |
並行処理で &mut T が必要な時に使用します。 |
Arc<RwLock<RefCell<T>>> |
同上。ロックの種類が異なるだけです。 |
Mutex
とRwLock
はparking_lotというcrateで高性能版が提供されています。
使える場合はこちらの方が良いでしょう。
またどの様な型を選択すべきかはこちらのRust Memory Container Cheat-sheetも参考になります。
channelを通して値を別スレッドに送る
標準ライブラリからstd::sync::mpscでmulti-producer single-consumerなchannelが提供されています。
fn main() {
let (tx, rx) = std::sync::mpsc::channel();
{
let tx = tx.clone();
std::thread::spawn(move || {
"🍣🍺🍖".chars().for_each(|c| {
tx.send(c).unwrap();
std::thread::sleep(std::time::Duration::from_millis(17));
});
});
};
{
let tx = tx.clone();
std::thread::spawn(move || {
"🐶😺🐣".chars().for_each(|c| {
tx.send(c).unwrap();
std::thread::sleep(std::time::Duration::from_millis(17));
});
});
}
drop(tx);
while let Ok(c) = rx.recv() {
print!("{c}");
}
}
また、crossbeamからはmulti-producer multi-consumerなchannelが提供されています。
fn main() {
let (tx, rx) = crossbeam::channel::unbounded(); // ここがcrossbeam
{
let tx = tx.clone();
std::thread::spawn(move || {
"🍣🍺🍖".chars().for_each(|c| {
tx.send(c).unwrap();
std::thread::sleep(std::time::Duration::from_millis(17));
});
});
};
{
let tx = tx.clone();
std::thread::spawn(move || {
"🐶😺🐣".chars().for_each(|c| {
tx.send(c).unwrap();
std::thread::sleep(std::time::Duration::from_millis(17));
});
});
}
drop(tx);
while let Ok(c) = rx.recv() {
print!("{c}");
}
}
Iteratorをスレッドで並列化する
rayonを使います。
use rayon::prelude::*;
fn fib(i: u64) -> u64 {
if i <= 2 {
1
} else {
fib(i - 2) + fib(i - 1)
}
}
fn main() {
let xs: Vec<u64> = (1..=100000).collect();
let total: u64 = xs
.into_par_iter() // ← この辺がrayon
.map(|x| fib(x % 5 + 20))
.sum();
println!("total: {total}");
}
rayonの性能を体感したい場合はPlaygroundだと分かりにくいのでローカルで試すのをオススメします。
Futureによるasync/.awaitで非同期処理をする時のあれこれ
Future traitを中心としたユーザ空間で動く非同期処理についてです。
公式にはRust async bookで基礎的な説明がされています。
基本的な構成要素
Future trait
Future trait
はRustで非同期処理を作る上での一番基礎になるビルディングブロックです。
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
RustのFutureは基本的にはpollメソッドで状態を確認して Poll::Ready
であれば実行完了と見なすという使われ方をします。
Futureはself
をPin<&mut Self>
とする事でself
がどこかしらのアドレスに固定されている事を要求しています(Pinが何なのかは後述)。
また、非同期ランタイムが必要以上にpoll
メソッドを叩かずに済むように、Context
にはpollを叩いて何かしらの進捗が得られる状態になった事をFuture側からランタイムに通知する為のコールバック関数が含まれています。
Futureを作るだけであればContextは非同期ランタイムが作って渡してくるので基本的にはバケツリレーしていれば良いです。
非同期ランタイムを作る場合はWake traitを実装した型を作る必要があるでしょう(もしくはfutures crateのArcWake)。
tokio等の非同期ランタイムはこのFuture traitで抽象化された並行実行可能な処理の実行を担当します。
async/.await
Rust 1.39.0から安定化されたFutureベースの非同期処理の為の構文です。
次の様に使います。
async fn request_example_json() -> anyhow::Result<Example> {
reqwest::get("https://example.com/example.json")
.await?
.json()
.await
.map_err(From::from)
}
普通の関数であればfn
とする所をasync fn
と書く事でこの関数がFutureを返す事を表します。
FutureはRustコンパイラがこの関数用に自動で実装してくれます。
戻り値にはFuture::Output
に入る型だけ書けば良いです。
実質的な戻り値の型は impl Future<Output=T>
みたいになります。
.await
は他の多くの言語と異なりFutureのプロパティであるかの様に書きます。
これにより上記の例の様にメソッドチェーンで最後まで繋げて書くことが出来ます。
注意事項
traitのメソッドにasyncを付けることは出来ません。
traitのメソッドにasyncを付けたい場合はasync-trait crateを利用しましょう。
もしくはPin<Box<dyn Future<Output=T>>>
を返すようにします。
async関数はBox化したFutureを返すようにしないと再帰が出来ません。
async-recursion crateを使うとこの辺を自動でやってくれてお手軽に再帰可能に出来ます。
async ブロック
async関数/メソッドにするほどでもない程度の処理はasyncブロックとして書く事が出来ます。
async fn request_example_json_simultaneously() -> anyhow::Result<(Example, Example)> {
let future1 = async {
let res = reqwest::get("https://example.com/example.json")
.await?
.json()
.await?;
anyhow::Ok(res)
};
let future2 = async {
let res = reqwest::get("https://example.com/example.json")
.await?
.json()
.await?;
anyhow::Ok(res)
};
Ok(futures::try_join!(future1, future2)?)
}
Pin
ここまででPinという型がちょこちょこと出てきています。
これは何なのでしょう?
Pin
はasync/.await
構文を成り立たせる為に必須の要素です。
async構文によって自動生成されるFutureはクロージャと同様に環境をキャプチャした構造体です。
クロージャと異なるのは処理中に発生する他のFutureを保持する為のフィールドもあることです。
この構造体が抱えているFutureが同じく構造体が抱えている変数への参照を持っている場合、少し状況を簡単になるように言い換えると構造体のあるフィールドが別のフィールドを参照している状態ではある種の問題が発生します。
それは構造体がmoveした後にフィールドが持っている参照が以前あったアドレスを指し続けてしまう問題です。
これを解決する為に参照先のオブジェクトがmoveしない事を保証する為の仕組みが導入されました。
それがPin
とUnpin
です。
オブジェクトを特定のアドレスに固定することをピン刺しに例えている命名になっています。
Unpin
はコンパイラによって自動実装されるtraitでmoveしても安全な型である事を表します。
なので普通の型は自己参照する要素を持っていないので全てUnpin
が実装されることになります。
async構文やasyncブロックによって自動生成されたFutureはそれとは反対に!Unpin
、つまりUnpinを実装していない型になります。
これによりmoveするのが安全ではない事が表されます。
しかし!Unpin
だけではRustにはmoveを防ぐ機能は無いのでまだmove出来てしまいます。
moveしない事を確実に保証する為にPin
構造体で目的のオブジェクトの参照をラップします。
参照をラップする事で元のオブジェクトがmoveしようとすればコンパイルエラーになります。
そしてPin構造体はUnpin
の実装有無で可変な操作が出来るかどうかを制限することで抜け道も塞いでいます。
非同期ランタイムは何らかのアドレスの変わらない場所にFutureを固定してからPin
を通してFutureを取り扱います。
ちなみに!Unpin
だけどPinを通した可変な操作をしたい場合はpin-project crateを使うことで出来るようになります。
これはフィールド毎の可変参照を取れるようにする事で元のオブジェクトのmoveは出来ないという形の安全性は保てるという仕組みになっています。
ここはちょっと分かりにくい話なので、よく分からない場合は以下の資料を読むと分かるかもしれません。
futures crate
RustではFuture関係の基本的な機能はstd::futureに入っているのですが、ここに入っているものは最低限のものだけになっています。
一般的に期待されるようなAPIはサードパーティライブラリとして実装されており、その中でも中心的なものがfutures crateです。
futures crateは以下のような機能を提供します。
- FutureExt等の拡張trait
- 非同期ストリーム
- join!、select!等の複数Futureを同時に動かすマクロ
-
async対応版のchannel
- mpmcが欲しい場合はtokio::sync::broadcastかasync-channelにあります。
-
IOの抽象化
- AsyncRead, AsyncReadExt
- AsyncWrite, AsyncWriteExt
- copy関数
- async対応版のMutex, BiLock
- RwLockはこれから https://github.com/rust-lang/futures-rs/pull/2082
- 今欲しい場合は以下のどれかを使う
- RwLockはこれから https://github.com/rust-lang/futures-rs/pull/2082
非同期ランタイム
-
Tokio
- Rustのデファクトスタンダードな非同期処理ランタイムです。迷ったらこれ。
- async-std
- futures::executor
- futures-lite
- smol
- ※あとから多分増えます
色々あるが特に理由が無ければ一番人気のtokioを使えば良いです。
tokio系の話
tracingと関連crates
asyncを使うと実際の処理の構造とスタックとレースが一致しなくなります。
そんな状態でも処理の構造を把握できるような形でログを吐く仕組みを提供しているのがtracing crateです。
asyncだけでなく同期的な処理にも使えるので広く公開するライブラリでもなければtracingを利用するのが無難だと思います。
-
tracing
- 本体。概ねlogの上位互換です。
- tracing-subscriberを使ってログの書式設定を行います。
-
tracing-log
- log互換レイヤー。
- とりあえず
tracing_log::LogTracer::init()?
しておけばOK。
-
tracing-futures
-
features = ["futures-03"]
を付けるとStream
やSink
にも.instrument()
できるようになります。
-
-
tracing-opentelemetry
- tracingとopentelemetryの橋渡し的なやつです。
-
sentry-tracing
- tracingのeventをsentryに送るやつです。
-
test-log
-
#[test]
をこのcrateのものに変えるだけでテスト毎に自動でtracingのsubscriberをセットアップしてくれます。
-
-
tokio-console
- tracingの情報を使ってどんなfutureが実行中なのか等をコンソールに表示できるようにするものです。
tracing, tracing-subscriber, tracing-logを使うサンプルコードです。
tracing-subscriberを使ってファイル名・行番号の追加とtimestamp
のタイムゾーンをデフォルトのUTCからJSTに変更しています。
use time::{format_description::FormatItem, macros::format_description, UtcOffset};
use tracing::Instrument;
use tracing_subscriber::fmt::time::OffsetTime;
const LOG_TIMESTAMP_FORMAT: &[FormatItem] = format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6][offset_hour sign:mandatory][offset_minute]");
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_file(true)
.with_line_number(true)
.with_timer(OffsetTime::new(
UtcOffset::from_hms(9, 0, 0)?,
LOG_TIMESTAMP_FORMAT,
))
// .json() // この行を有効にするとJSONで出力されるようになります。
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
tracing_log::LogTracer::init()?;
tracing::trace!("Start fibonacci calculation");
println!(
"{:#?}",
fibonacci_number(&[128, 175, 156])
.instrument(tracing::trace_span!(
"main call fibonacci_number",
process_id = "deadbeef"
))
.await
);
tracing::trace!("End fibonacci calculation");
Ok(())
}
async fn fibonacci_number(indices: &[usize]) -> Vec<u128> {
#[async_recursion::async_recursion]
async fn go(memo: &mut Vec<u128>, i: usize) -> u128 {
if i >= memo.len() {
panic!("Index {i} is out of bounds of memo table");
}
if memo[i] > 0 {
tracing::trace!(i = i, "Return memoized value");
memo[i]
} else if i <= 1 {
tracing::trace!(i = i, "Return 1 since i <= 1");
memo[i] = 1;
memo[i]
} else {
tracing::trace!(i = i, "Return calculated value");
tokio::task::yield_now().await;
memo[i] = go(memo, i - 1).await + go(memo, i - 2).await;
tokio::task::yield_now().await;
memo[i]
}
}
futures::future::join_all(indices.iter().map(|&i| {
async move {
let mut memo = vec![0; i];
go(&mut memo, i - 1).await
}
.instrument(tracing::trace_span!("fibonacci", i = i))
}))
.await
}
Cargo.tomlのdependenciesは下記の様になります。
[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-recursion = "1.0"
futures = "0.3"
log = "0.4"
time = { version = "*", features = ["macros"] }
tokio = { version = "1.22.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json", "local-time"] }
tracing-log = "0.1.3"
実行結果(途中省略)
2022-12-05T01:03:15.695646+0900 TRACE async_samples: src/main.rs:22: Start fibonacci calculation
2022-12-05T01:03:15.695780+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=128}: async_samples: src/main.rs:52: Return calculated value i=127
2022-12-05T01:03:15.695821+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:52: Return calculated value i=174
2022-12-05T01:03:15.695852+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=156}: async_samples: src/main.rs:52: Return calculated value i=155
2022-12-05T01:03:15.695883+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=128}: async_samples: src/main.rs:52: Return calculated value i=126
...(省略)...
2022-12-05T01:03:16.374943+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:45: Return memoized value i=170
2022-12-05T01:03:16.374980+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:45: Return memoized value i=171
2022-12-05T01:03:16.375018+0900 TRACE main call fibonacci_number{process_id="deadbeef"}:fibonacci{i=175}: async_samples: src/main.rs:45: Return memoized value i=172
[
251728825683549488150424261,
1672445759041379840132227567949787325,
178890334785183168257455287891792,
]
2022-12-05T01:03:16.375086+0900 TRACE async_samples: src/main.rs:32: End fibonacci calculation
tokio-uring
https://crates.io/crates/tokio-uring
tokioでio-uringを使うようにできます。
asyncでのコードのサンプル
ランタイムにはtokioを使用する前提でコードの例を掲載します。
tokioをランタイムとして使用
cargo add tokio@1.22 --features full
でCargo.tomlのdependenciesにtokioを追加します。
あとはmain関数にasyncを付けてtokio::main
マクロをmain関数のattributeとして足します。
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// ここにコード
Ok(())
}
もしくは自分でtokioランタイムの起動タイミングを制御したい場合は次のようにします。
use tokio::runtime::Runtime;
fn main() -> anyhow::Result<()> {
let rt = Runtime::new()?;
let ret_val = rt.block_on(async {
// ここにコード
anyhow::Ok(())
});
Ok(())
}
標準出力に非同期で書く
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tokio::io::stdout().write_all("Hello Tokio!".as_bytes()).await?;
Ok(())
}
write_all
はロックを取ったりする訳ではないので、長文を出力する場合は他のFutureからの出力と混ざって変になる可能性があります。
きちんとやる場合はロックを取るなり書き出すFutureを1つに絞るなりした方が良いと思います。
標準入力から非同期で読み込み
読み込んだ内容はそのまま標準出力に出す事にします。
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut r = BufReader::new(tokio::io::stdin());
let mut buf = String::with_capacity(1024);
let mut w = tokio::io::stdout();
while r.read_line(&mut buf).await? > 0 {
w.write_all(buf.as_bytes()).await?;
}
Ok(())
}
HTTPリクエストを投げる
GETリクエストを投げてその内容を標準出力に書き出すサンプルです。
HTTPリクエストの送信にはreqwest crateを使います。
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let content = reqwest::get("https://example.com/example.json")
.await?
.text()
.await?;
tokio::io::stdout().write_all(content.as_bytes()).await?;
Ok(())
}
複数のHTTPリクエストを並行に投げて両方の終了を待つ
futures::join!
マクロを使います。
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let fut1 = async {
let content = reqwest::get("https://example.com/example.json")
.await?
.text()
.await?;
anyhow::Ok(content)
};
let fut2 = async {
let content = reqwest::get("https://example.com/other.json")
.await?
.text()
.await?;
anyhow::Ok(content)
};
let (result1, result2) = futures::join!(fut1, fut2);
for content in [result1?, result2?] {
tokio::io::stdout().write_all(content.as_bytes()).await?;
}
Ok(())
}
select!
macroで複数のfutureのうち最初に終わるものを待つ
use futures::future::FutureExt;
use std::time::Duration;
#[tokio::main]
async fn main() {
let f1 = async {
tokio::time::sleep(Duration::from_secs(3)).await;
"future-1"
};
let f2 = async {
tokio::time::sleep(Duration::from_secs(2)).await;
"future-2"
};
let res = futures::select! {
res1 = f1.fuse() => res1,
res2 = f2.fuse() => res2,
};
println!("{res}");
}
基本的には見たままです。
詳しくはこちらのドキュメントをご覧下さい。
Futureのキャンセル
基本的にはdropすればキャンセルされますが tokio::spawn
したものは返ってきたhandleをdropしても止まらないので自前でキャンセルする必要があります。
これはtokio_util::sync::CancellationTokenとfutures::select!
を使うと比較的簡単に書けます。
use futures::future::FutureExt;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
let f1 = {
let cancel_token = cancel_token.clone();
tokio::spawn(async move {
println!("Begin future-1");
futures::select! {
_ = tokio::time::sleep(Duration::from_secs(6)).fuse() => println!("End future-1"),
_ = cancel_token.cancelled().fuse() => println!("Cancelled future-1"),
};
"future-1"
}).fuse()
};
let f2 = {
let cancel_token = cancel_token.clone();
tokio::spawn(async move {
println!("Begin future-2");
futures::select! {
_ = tokio::time::sleep(Duration::from_secs(3)).fuse() => println!("End future-2"),
_ = cancel_token.cancelled().fuse() => println!("Cancelled future-2"),
};
"future-2"
}).fuse()
};
futures::pin_mut!(f1,f2);
let res = futures::select! {
res1 = f1 => res1?,
res2 = f2 => res2?,
};
cancel_token.cancel();
println!("First finished: {res}");
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(())
}
CancellationToken::new()
でオブジェクトを作り、適当にコピーを各futureに渡し、キャンセルしたいタイミングで cancel_token.cancel()
とcancelメソッドを呼び出してキャンセルします。
dropでもキャンセルが走るのでメソッドを呼び忘れても安心です。
async対応crateを選ぶ時の注意事項
asyncに対応しているcrateを選ぶ時はそのcrateがどのランタイムに対応しているか確認しましょう。
基本的にはcrateのdependenciesにどのランタイムが入っているかで確認します。
dependenciesにランタイムが無い場合は多分ランタイム非依存です。
あとは動かしてみてエラーが出ずにちゃんと動けば大丈夫です。
その他のasync対応ライブラリ
※ここは後から書き足す予定です。
- Web Application Framework
- DB
- Test
- wiremock テストサーバを立てられる。
- Wasm
最後に
漏れとかあると思いますが、この記事を書くにあたって思い出せた・調べられた事は全て書きました。
間違っている所や不足がありましたら優しく教えて頂けると幸いです。
最後まで読んで頂きありがとうございました。