非同期 Rust パターン
Rust で非同期並列処理を書く時に出てくるパターンについて解説します
tokio 1.0 と futures 0.3 環境です
async function 編
— legokichi (@duxca) August 28, 2021
async fn hoge() -> ()
まずは基本
async fn hoge() -> () {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
エラーを返さない非同期関数の代表格として tokio::time::sleep を使った。
fn hoge() -> impl Future<Output=()>
async fn
を脱糖したももの。
fn hoge() -> impl std::future::Future<Output=()> {
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
futures::future::Future
や futures::prelude::Future
、 futures::Future
は std::future::Future
を re-export しているだけなのでどれを使っても良い。
ところで Send + 'static
でない Future
は tokio::spawn
で tokio のマルチスレッドスケジューラで非同期 並列 処理ができない。
async fn
や impl Future
は Send + 'static
を async 文の body から推論してくれるので素直な async 文は spawn できる
fn hoge() -> impl std::future::Future<Output=()> {
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
let fut = tokio::spawn(hoge());
しかし裏を返せば関数の戻り値の型が関数の実装依存ということになるので、ちょっと変更すると途端にコンパイルエラーになる
例えば、 async の内部で !Send
な Rc
を使うと……
fn hoge() -> impl std::future::Future<Output=()> {
async {
let a = std::rc::Rc::new(());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
let fut = tokio::spawn(hoge());
ある日突然コンパイルエラーがやってくる
error: future cannot be sent between threads safely
--> src/main.rs:11:11
|
11 | let fut = tokio::spawn(hoge());
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::spawn`
|
= help: within `impl Future`, the trait `Send` is not implemented for `Rc<()>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:7:7
|
6 | let a = std::rc::Rc::new(());
| - has type `Rc<()>` which is not `Send`
7 | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `a` maybe used later
8 |
9 | }
| - `a` is later dropped here
fn hoge() -> Pin<Box<dyn Future<Output=()> + Send + 'static>>
async fn
や impl Future
は自己再帰や相互再帰で戻り値の型が推論できないので dyn Trait
でトレイトオブジェクトを使う必要がある
この型は脳死でタイピングできるようにしておくこと。
fn hoge() -> std::pin::Pin<Box<dyn std::future::Future<Output=()> + Send + 'static>> {
Box::pin(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
})
}
std::pin::Pin
が std::prelude
に入ってないのがつらいね。
|| -> Pin<Box<dyn Future<Output=()> + Send + 'static>>
pin box は closure でも使える
let closure = move || -> std::pin::Pin<Box<dyn futures::Future<Output=() + Send + 'static>> {
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
};
fn hoge() -> BoxFuture<'static, ()>
Pin<Box<dyn ...
が流れるようにタイピングできるようになったころに存在に気がつく便利な型エイリアス。
ちなみに type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a, Global>>;
です
fn hoge() -> futures::future::BoxFuture<'static, ()> {
use futures::FutureExt;
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}.boxed()
}
futures::FutureExt
の boxed
もカッコのネストが減って便利。
async fn hoge() -> Result<(), anyhow::Error>
ここから Result 編です。非同期はだいたい IO。IO には失敗がつきものです。
失敗する関数として tokio::signal::ctrl_c()
を用意しました。
異常なシグナル を登録しないかぎり エラーになることはまずない ですが。
async fn hoge() -> Result<(), anyhow::Error> {
tokio::signal::ctrl_c().await?;
Ok(())
}
?
で early return ができて便利ですね。
fn hoge() -> impl Future<Output=Result<(), anyhow::Error>>
説明不要ですね
fn hoge() -> impl Future<Output=Result<(), anyhow::Error>> {
async{
tokio::signal::ctrl_c().await?;
Ok(())
}
}
fn hoge() -> impl TryFuture<Item=Ok=(), Error=anyhow::Error>
存在意義がよくわからない trait。
Send がつかなくて tokio::spawn
できないなどの害もある。謎。
fn hoge() -> impl futures::TryFuture<Item=(), Error=anyhow::Error> {
async{
tokio::signal::ctrl_c().await?;
Ok(())
}
}
fn hoge() -> Pin<Box<Future<Output=Result<(), anyhow::Error> + Send + 'static>>
説明不要ですね
fn hoge() -> std::pin::Pin<Box<dyn std::future::Future<Output=Result<(), anyhow::Error>> + Send + 'static>> {
Box::pin(async {
tokio::signal::ctrl_c().await?;
Ok(())
})
}
|| -> Pin<Box<dyn Future<Output=Result<(), anyhow::Error>> + Send + 'static>>
特に async な closure で Result
を ?
で early return したいときに頻出
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
tokio::spawn((|| -> std::pin::Pin<Box<dyn futures::Future<Output=Result<(), anyhow::Error>> + Send + 'static>> {
Box::pin(async {
tokio::signal::ctrl_c().await?;
Ok(())
})
})()).await?;
Ok(())
}
|| -> BoxFuture<'static, Result<(), anyhow::Error>>
少し短くなりました
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
tokio::spawn((|| -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
use futures::FutureExt;
async {
tokio::signal::ctrl_c().await?;
Ok(())
}.boxed()
})()).await?;
Ok(())
}
fn hoge() -> Box<dyn Future<Item=(), Error=failure::Error> + Send + 'static>
Pin
と async
文が導入される前の futures 0.1 の頃 はこちらの型が使われていた。
future はモナドなので combinator を使えば moandic に書くことができた。
fn hoge() -> Box<dyn futures_01::Future<Item=(), Error=failure::Error> + Send + 'static> {
use futures_01::Future;
let fut = futures_01::future::ok(())
.and_then(|()|{
let a = 0;
futures_01::future::ok(()) })
.and_then(|()|{
println!("{}", a);
futures_01::future::ok(()) }) });
Box::new(fut)
}
monadic なスタイルでは and_then
(>>=
) をまたいで参照を持てないため、 でかい構造体は Box
や Arc
に包む必要があったり、
非同期リトライなどの loop が素直に書けないなど問題が 山ほど あったため、喧々諤々の議論の末、 async
文 や .await
キーワード、 Pin
, Unpin
などが導入された。
なお async
の脱糖構文にあたる generator は (await
と yield
が対応) 1.0 以前から議論されていて 、
未だに仕様が定まっていない 。 最近も仕様が提案される など unstable rust の筆頭機能になっている(個人の感想です)
現在の std::future::Future
や async
文と混ぜて書くには futures-compat が必要。
tokio runtime のバージョンが違う場合は tokio_compat や tokio-compat-02 なども噛ませる必要がある。
anyhow::Error
ではなく failure::Error
なあたりに時代を感じさせられる。
fn hoge() -> BoxFuture<'static, Result<(), anyhow::Error>>
TryBoxFuture はないです
fn hoge() -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
use futures::FutureExt;
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(())
}.boxed()
}
|| -> BoxFuture<'static, Result<(), anyhow::Error>>
型が短くなりました。
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
tokio::spawn((move || -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
use futures::FutureExt;
async move {
fuga()?;
Ok(())
}.boxed()
})()).await?;
Ok(())
}
async fn hoge(foo: &str) -> ()
ここからは参照のライフタイムがある async 関数編です。
まずは基本。main 関数のスタックに載ったヒープメモリへの参照を async fn で使うパターン。
async fn hoge(foo: &str) -> () {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string(); // heap にある String を参照
hoge(&foo).await;
}
これは &'static
でないので tokio::spawn
はできない。
error[E0597]: `foo` does not live long enough
--> src/main.rs:5:21
|
5 | tokio::spawn(hoge(&foo)).await;
| -----^^^^-
| | |
| | borrowed value does not live long enough
| argument requires that `foo` is borrowed for `'static`
6 | }
| - `foo` dropped here while still borrowed
こういうときは素直に Arc
を使おう
async fn hoge(foo: Arc<String>) -> ()
少々野暮ったいですが、こう。
async fn hoge(foo: std::sync::Arc<String>) -> () {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}
#[tokio::main]
async fn main() {
let foo = std::sync::Arc::new("hello world!".to_string());
hoge(std::sync::Arc::clone(&foo)).await;
}
Arc
でしか使えない関数になってしまった!
そんなときは……
async fn hoge(foo: Deref<Target=String>) -> ()
impl Deref<Target=...>
で書くと Arc
, Rc
, Box
に全対応した Generic な関数が書ける。
ただし引数が Send
かどうかで tokio::spawn
できなくなるので注意。
async fn hoge(foo: impl std::ops::Deref<Target=String>) -> () {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo.deref());
}
#[tokio::main]
async fn main() {
let foo = std::sync::Arc::new("hello world!".to_string());
tokio::spawn(hoge(std::sync::Arc::clone(&foo))).await;
let foo = std::rc::Rc::new("hello world!".to_string());
hoge(std::rc::Rc::clone(&foo)).await;
let foo = Box::new("hello world!".to_string());
tokio::spawn(hoge(foo)).await;
// String 自身も Deref<Target=String> である
let foo = "hello world!".to_string();
hoge(&foo).await;
}
Arc::clone
や Rc::clone
を直接読んでいるのは Arc<T>
内の T が Clone
だったときにメソッド名が衝突して紛らわしいからです 。
fn hoge(foo: &str) -> impl Future<Output=()> + '_
引数のライフタイムを推論させたもの。
fn hoge(foo: &str) -> impl futures::Future<Output=()> + '_ {
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string();
hoge(&foo).await;
}
async move
しないと async
文が生成する無名の impl Future
構造体内にキャプチャする参照が hoge
関数のスタックへの参照になって borrow checker に引っかかる
error[E0373]: async block may outlive the current function, but it borrows `foo`, which is owned by the current function
--> src/main.rs:2:11
|
2 | async {
| ___________^
3 | | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
4 | | println!("{}", foo);
| | --- `foo` is borrowed here
5 | | }
| |_____^ may outlive borrowed value `foo`
|
note: async block is returned here
--> src/main.rs:2:5
|
2 | / async {
3 | | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
4 | | println!("{}", foo);
5 | | }
| |_____^
help: to force the async block to take ownership of `foo` (and any other referenced variables), use the `move` keyword
|
2 | async move {
3 | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
4 | println!("{}", foo);
5 | }
|
このへんは closure と考え方が一緒。
fn hoge<'a>(foo: &'a str) -> impl Fn() -> () + 'a {
move || {
println!("{}", foo);
}
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string();
(hoge(&foo))();
}
move を付けなかった場合はエラーになる
error[E0373]: closure may outlive the current function, but it borrows `foo`, which is owned by the current function
--> src/main.rs:2:5
|
2 | || {
| ^^ may outlive borrowed value `foo`
3 | println!("{}", foo);
| --- `foo` is borrowed here
|
note: closure is returned here
--> src/main.rs:2:5
|
2 | / || {
3 | | println!("{}", foo);
4 | | }
| |_____^
help: to force the closure to take ownership of `foo` (and any other referenced variables), use the `move` keyword
|
2 | move || {
| ^^^^^^^
fn hoge<'a>(foo: &'a str) -> impl futures::Future<Output=()> + 'a
↑の参照のライフタイムを明示したもの。
fn hoge<'a>(foo: &'a str) -> impl futures::Future<Output=()> + 'a {
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string();
hoge(&foo).await;
}
ただし clippy にはそんな自明な型は省略しろと怒られる
warning: explicit lifetimes given in parameter types where they could be elided (or replaced with `'_` if needed by type declaration)
--> src/main.rs:1:1
|
1 | fn hoge<'a>(foo: &'a str) -> impl futures::Future<Output=()> + 'a {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(clippy::needless_lifetimes)]` on by default
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes
世知辛いね。
fn hoge<'a>(foo: &'a str) -> Pin<Box<dyn Future<Output=()> + Send + 'a>>
この型は一番汎用的なのでよく使うと思う。
fn hoge<'a>(foo: &'a str) -> std::pin::Pin<Box<dyn std::future::Future<Output=()> + Send + 'a>> {
use futures::FutureExt;
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}.boxed()
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string();
hoge(&foo).await;
}
fn hoge<'a>(foo: &'a str) -> Pin<Box<dyn Future<Output=()> + 'a>>
Send がついていないバージョン。
fn hoge<'a>(foo: &'a str) -> std::pin::Pin<Box<dyn std::future::Future<Output=()> + Send + 'a>> {
use futures::FutureExt;
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}.boxed_local()
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string();
hoge(&foo).await;
}
.boxed
は Send がついてしまって型が合わないので、この場合は .boxed_local
が必要。
fn hoge<'a>(foo: &'a str) -> BoxFuture<'a, ()>
説明不要ですね。
fn hoge<'a>(foo: &'a str) -> futures::future::BoxFuture<'a, ()> {
use futures::FutureExt;
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", foo);
}.boxed()
}
#[tokio::main]
async fn main() {
let foo = "hello world!".to_string();
hoge(&foo).await;
}
for<'a> Fn(&'a str) -> BoxFuture<'a, Result<(), anyhow::Error>>
この型は join_all
で並列処理するときに頻出する。
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
let a = vec![
"a".to_string(),
"b".to_string(),
"c".to_string()
];
let futs = a.iter().map(|a|-> futures::future::BoxFuture<'_, Result<(), anyhow::Error>> {
use futures::FutureExt;
async move {
tokio::signal::ctrl_c().await?;
println!("{}", a);
Ok(())
}.boxed()
});
let rets = futures::future::join_all(futs).await;
rets.into_iter().collect::<Result<Vec<_>, _>>()?;
Ok(())
}
map
の引数の a
を async
文が生成する無名の impl Future
構造体の中に入れるために async move
しています。分かりますか?
ただし 'static
ではないので tokio::spawn
による非同期並列処理ができません。
for<'a> Fn(&'a str) -> BoxFuture<'static, Result<(), anyhow::Error>>
並列数が増えてきたらマルチスレッドで動かせるように Arc
などに包もう。
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
let a = vec![
Arc::new("a".to_string())
];
let futs = a.iter().map(|a|-> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
let a = Arc::clone(a);
use futures::FutureExt;
async move {
tokio::signal::ctrl_c().await?;
println!("{}", a);
Ok(())
}.boxed()
});
let rets = tokio::spawn(futures::future::join_all(futs)).await?;
rets.into_iter().collect::<Result<Vec<_>, _>>()?;
Ok(())
}
|..| { async { .. } }
ここからは async closure 編です
move
closure と async move
の組み合わせを見ていきましょう。
まずは変数キャプチャなしの場合。
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
(0..10_u8).map(|_| async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}).collect::<Vec<_>>();
Ok(())
}
closure の外の変数も async の外の変数も async の中で使っていないので move する必要はありません。
|..| { async move { .. } }
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
(0..10_u8).map(|i| async move{
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", i);
}).collect::<Vec<_>>();
Ok(())
}
closure の引数 i を async
文の生成する無名の impl Future
構造体の中に取り込むために move が必要です
付けないと怒られます。
error[E0373]: async block may outlive the current function, but it borrows `i`, which is owned by the current function
--> src/main.rs:3:30
|
3 | (0..10_u8).map(|i| async {
| ______________________________^
4 | | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5 | | println!("{}", i);
| | - `i` is borrowed here
6 | | }).collect::<Vec<_>>();
| |_____^ may outlive borrowed value `i`
|
note: async block is returned here
--> src/main.rs:3:24
|
3 | (0..10_u8).map(|i| async {
| ________________________^
4 | | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5 | | println!("{}", i);
6 | | }).collect::<Vec<_>>();
| |_____^
help: to force the async block to take ownership of `i` (and any other referenced variables), use the `move` keyword
|
3 | (0..10_u8).map(|i| async move {
4 | tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5 | println!("{}", i);
6 | }).collect::<Vec<_>>();
|
概念的には move のない async 文は以下のような構造体を生成しています。
struct AsyncStruct<'a> {
a: &'a u8,
...
}
impl<'s> Future for AsyncStruct<'s> {
...
}
move
をつけるとこう。
struct AsyncStruct<'a> {
a: u8,
...
}
impl<'s> Future for AsyncStruct<'s> {
...
}
例えば 10 並列で http request を投げる時に、 http クライアントを使い回す場合なんかにも使います。
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
let client = reqwest::Client::new();
let futs = (0..10_u8).map(|_|{
let client = client.clone();
async move {
let body = client.get("https://www.rust-lang.org")
.send().await.unwrap()
.text().await.unwrap();
println!("{}", body);
}
});
futures::future::join_all(futs).await;
Ok(())
}
move |..| { async { .. } }
あんまり場面がない気がする。
move |..| { async move { .. } }
あんまり場面がない気がする。
強いて挙げるならカリー化するとき?
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
let c = std::sync::Arc::new(1);
let curried = { let c = std::sync::Arc::clone(&c); move |a| move |b| async move {
println!("a + b + c = {}", a + b + c.as_ref());
}};
tokio::spawn((curried)(1)(1)).await?;
Ok(())
}
async stream 編
ここからは若干 unstable な話になってきます。
非同期ストリームの実装は
の 3 つあり、そのコンビネータ体系は
4 つ存在していて、全部違います。
さらに async iterator の実装には
- マクロを使って構造体定義を省略した async_stream
- nihgtly の generator を使った futures_async_stream
があり、混沌としています。どうしてこうなった。
fn fuga() -> impl futures::Stream<Item = i64>> + Send
まずは基本です
fn fuga() -> impl futures::Stream<Item = i64>> + Send {
async_stream::stream! {
let mut i = 0;
loop{
yield i;
i+=1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
#[tokio::main]
async fn main() {
let src = fuga();
// stream を await するのに必要 - https://tokio.rs/tokio/tutorial/streams
futures::pin_mut!(src);
while let Some(value) = src.next().await {
println!("got {}", value);
}
}
fn fuga() -> impl futures::Stream<Item = Result<i64, anyhow::Error>> + Send
fn fuga() -> impl futures::Stream<Item = Result<i64, anyhow::Error>> + Send {
async_stream::try_stream! {
let mut i = 0;
loop{
tokio::signal::ctrl_c().await?;
yield i;
i+=1;
}
}
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
let src = fuga();
futures::pin_mut!(src);
while let Some(value) = src.next().await.transpose()? {
println!("got {}", value);
}
Ok(())
}
async spmc stream
single-producer multi-consumer パターンです。
ページネーションのある web-api の各項目を非同期処理する場合などにスループットを上げることができます。
非同期対応した固定長 mpmc キューには flume::bounded が便利です。
固定長なので、バッファしていっぱいになったら producer 側は .await
で consumer 側の消費を待てて、
consumer 側も mc の特権である Stream + Clone
から .await
した順に処理対象が降ってるので。
- try_for_each_concurrent が便利 -
const CONSUMERS: usize = 10;
const BUFFERS: usize = 100;
#[tokio::main]
async fn main() {
let src = stream! {
let mut i = 0_u64;
loop {
yield i;
i += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
};
let (src, fut) = into_spmc(src);
use futures::TryStreamExt;
let fut2 = src.try_for_each_concurrent(CONSUMERS, |i| async move{
println!("{:?}:{}", std::thread::current().id(), i);
Ok(())
});
let (a, _) = tokio::spawn(futures::future::join(fut2, fut)).await?;
a?;
Ok(())
}
pub fn into_spmc<'a, T>(
size: usize,
src: impl futures::Stream<Item = T> + Send + 'a,
) -> (
impl futures::Stream<Item = T> + Clone + Send + Sync + 'a,
impl futures::Future<Output = ()> + Send + 'a,
)
where
T: Send + 'static,
{
use futures::FutureExt;
use futures::StreamExt;
let (tx, rx) = flume::bounded(size);
let fut = src.map(Ok).forward(tx.into_sink());
let fut2 = fut.then(|_: Result<_, _>| futures::future::ready(()));
(rx.into_stream(), fut2)
}
async trait method
ここまで読んだあなたは trait の method も
trait Hoge {
type Error;
async fn hoge(&str) -> Result<(), Self::Error>;
}
みたいに書きたくなると思いますが、stable では使えません。
https://github.com/dtolnay/async-trait のマクロを使えば見た目上は使えます。 PinBox にラップされますが。
tokio の current_thread と multi_thread の違い
— legokichi (@duxca) August 30, 2021
普段脳死で #[tokio::main]
と書いていると気が付きませんが、 tokio のランタイムには以下の設定項目があります 。
- 非同期ランタイムが new_multi_thread か current_thread か
-
spawn
で並列処理するときの非同期ランタイムの worker_threads はいくつか(new_multi_thread の場合) -
spawn_blocking
または -block_in_place
で同期 IO 待つための blocking_threads はいくつか
#[tokio::main]
と書いたときのデフォルトの挙動は次のとおりです
let rt = tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(512)
.thread_keep_alive(std::time::Duration::from_secs(10))
.thread_stack_size(2*1024*1024*1024) // 2MB
.worker_threads(4)
.enable_all()
.build()
.unwrap();
- enable_all - io と timer の runtime を有効化する
- worker_threads -
tokio::spawn
したタスクがマルチスレッド処理されるときのワーカスレッド数。デフォは CPU 個数。 - max_blocking_threads - spawn_blocking や block_in_place で作られたブロッキングスレッドのスレッドープールの大数
- thread_keep_alive - ブロッキングスレッドがスレッドプールに生成されてから再利用されずにスレッドプールから消えるまでの時間
- 以前の記事 でも確認したように、 tokio 0.1 の頃の blocking では ブロッキングIO用のスレッドプール は終了も再利用もされずに増えていくバグがありましたが、このパラメータはブロッキングIO用のスレッドプールのスレッド数を制御するものですね
- thread_stack_size - 同期・非同期それぞれのスレッドプールで生成されるスレッドの最大スタックサイズです。他のパラメータと合わせてメモリ使用量などを制御できそう。現在のデフォは 2MB
見ていきましょう
#[tokio::main(flavor = "current_thread")]
10 個の async ブロックを join_all で並行実行したものを tokio::spawn
する例です
tokio::spawn(fut).await
で spawn した結果を await で待っていますが、spawn したタスクは tokio のメインスレッドの終了時に待たずにそのまま drop されてしまうからです。
spawn して待たない場合、いつ殺されてもいいようにタスクを作るか graceful shutdown を実装しましょう。
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error>{
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..=3).map(|i| async move{
println!("{}:a:{:?}",i, std::thread::current().id());
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
println!("{}:b:{:?}",i, std::thread::current().id());
}));
tokio::spawn(fut).await;
println!("{:?}", std::thread::current().id());
Ok(())
}
結果がこちら
ThreadId(1)
0:a:ThreadId(1)
1:a:ThreadId(1)
2:a:ThreadId(1)
3:a:ThreadId(1)
4:a:ThreadId(1)
5:a:ThreadId(1)
6:a:ThreadId(1)
7:a:ThreadId(1)
8:a:ThreadId(1)
9:a:ThreadId(1)
0:b:ThreadId(1)
1:b:ThreadId(1)
2:b:ThreadId(1)
3:b:ThreadId(1)
4:b:ThreadId(1)
5:b:ThreadId(1)
6:b:ThreadId(1)
7:b:ThreadId(1)
8:b:ThreadId(1)
9:b:ThreadId(1)
ThreadId(1)
この結果から以下のことがわかります。
- すべてのタスクが同じスレッドで動作する
-
tokio::spawn
はできるが 型上はSend + 'static
を要求される
このような性質は JavaScript のランタイムに近いですね。
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), anyhow::Error>{
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..10).map(|i| async move{
println!("{}:a:{:?}",i, std::thread::current().id());
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
println!("{}:b:{:?}",i, std::thread::current().id());
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
結果がこちら
ThreadId(1)
0:a:ThreadId(3)
1:a:ThreadId(3)
2:a:ThreadId(3)
3:a:ThreadId(3)
4:a:ThreadId(3)
5:a:ThreadId(3)
6:a:ThreadId(3)
7:a:ThreadId(3)
8:a:ThreadId(3)
9:a:ThreadId(3)
0:b:ThreadId(2)
1:b:ThreadId(2)
2:b:ThreadId(2)
3:b:ThreadId(2)
4:b:ThreadId(2)
5:b:ThreadId(2)
6:b:ThreadId(2)
7:b:ThreadId(2)
8:b:ThreadId(2)
9:b:ThreadId(2)
ThreadId(1)
この結果から以下のことがわかります。
-
tokio::spawn
したタスクは.await
をまたいでスレッドが変わる! (Send + 'static
の面目躍如!) - main 関数内は
.await
をまたいでもスレッドが変わらない (Send + 'static
は要求されていないので)
このような性質は Go のランタイムに近いですね。
実際 tokio のマルチスレッドスケジューラは go と同じ work-stealing を採用しています。
ちなみに tokio::time::sleep
の代わりに tokio::task::yield_now
を使うと……
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), anyhow::Error>{
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..=3).map(|i| async move{
println!("{}:a:{:?}",i, std::thread::current().id());
tokio::task::yield_now().await;
println!("{}:b:{:?}",i, std::thread::current().id());
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
ThreadId(1)
0:a:ThreadId(3)
1:a:ThreadId(3)
2:a:ThreadId(3)
3:a:ThreadId(3)
0:b:ThreadId(3)
1:b:ThreadId(3)
2:b:ThreadId(3)
3:b:ThreadId(3)
ThreadId(1)
同一ワーカスレッドで処理されました。このような性質は coroutine や generator に近いですね。
実際このような性質を利用して await で generator を再現しようという genawaiter というクレートもあります (こちらは tokio
ランタイムに依存しないようにできています)。
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() -> Result<(), anyhow::Error>{
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..10).map(|i| async move{
println!("{}:a:{:?}",i, std::thread::current().id());
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
println!("{}:b:{:?}",i, std::thread::current().id());
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
結果がこちら。
ThreadId(1)
0:a:ThreadId(2)
1:a:ThreadId(2)
2:a:ThreadId(2)
3:a:ThreadId(2)
4:a:ThreadId(2)
5:a:ThreadId(2)
6:a:ThreadId(2)
7:a:ThreadId(2)
8:a:ThreadId(2)
9:a:ThreadId(2)
0:b:ThreadId(2)
1:b:ThreadId(2)
2:b:ThreadId(2)
3:b:ThreadId(2)
4:b:ThreadId(2)
5:b:ThreadId(2)
6:b:ThreadId(2)
7:b:ThreadId(2)
8:b:ThreadId(2)
9:b:ThreadId(2)
ThreadId(1)
この結果から以下のことがわかります。
-
.await
をまたいでスレッドは変わらない -
tokio::spawn
するとメインスレッドとは別スレッドで動く
multi_thread, worker_threads=4, max_blocking_threads=1, thread_keep_alive=1ms
ここからは spawn_blocking
編です
max_blocking_threads
と thread_keep_alive
は tokio::main
マクロからは設定できないので自分でランタイムを生成する必要があります。
fn main() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(1)
.thread_keep_alive(std::time::Duration::from_millis(1))
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(_main())?;
Ok(())
}
ちょっと極端な設定ですが blocking_threads=1, thread_keep_alive=1ms で試してみましょう
spawn_blocking
を並列で走らせるとどうなるでしょうか?
async fn _main() -> Result<(), anyhow::Error> {
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..10).map(|i| async move{
tokio::task::spawn_blocking(move||{
println!("{}:a':{:?}",i, std::thread::current().id());
std::thread::sleep(std::time::Duration::from_millis(100));
println!("{}:b':{:?}",i, std::thread::current().id());
}).await
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
得られた結果は……
ThreadId(1)
0:a':ThreadId(2)
0:b':ThreadId(2)
1:a':ThreadId(2)
1:b':ThreadId(2)
2:a':ThreadId(2)
2:b':ThreadId(2)
3:a':ThreadId(2)
3:b':ThreadId(2)
4:a':ThreadId(2)
4:b':ThreadId(2)
5:a':ThreadId(2)
5:b':ThreadId(2)
6:a':ThreadId(2)
6:b':ThreadId(2)
7:a':ThreadId(2)
7:b':ThreadId(2)
8:a':ThreadId(2)
8:b':ThreadId(2)
9:a':ThreadId(2)
9:b':ThreadId(2)
ThreadId(1)
見事に blocking_threads=1
といった感じです。
1つのブロッキングスレッドにブロッキングタスク10個キューイングされたみたい。
multi_thread, worker_threads=4, max_blocking_threads=2, thread_keep_alive=1ms
max_blocking_threads=2 にしてみましょう
fn main() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(2)
.thread_keep_alive(std::time::Duration::from_millis(1))
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(_main())?;
Ok(())
}
async fn _main() -> Result<(), anyhow::Error> {
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..10).map(|i| async move{
tokio::task::spawn_blocking(move||{
println!("{}:a':{:?}",i, std::thread::current().id());
std::thread::sleep(std::time::Duration::from_millis(100));
println!("{}:b':{:?}",i, std::thread::current().id());
}).await
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
ThreadId(1)
0:a':ThreadId(7)
1:a':ThreadId(6)
0:b':ThreadId(7)
1:b':ThreadId(6)
3:a':ThreadId(6)
2:a':ThreadId(7)
3:b':ThreadId(6)
4:a':ThreadId(6)
2:b':ThreadId(7)
5:a':ThreadId(7)
5:b':ThreadId(7)
6:a':ThreadId(7)
4:b':ThreadId(6)
7:a':ThreadId(6)
6:b':ThreadId(7)
8:a':ThreadId(7)
7:b':ThreadId(6)
9:a':ThreadId(6)
8:b':ThreadId(7)
9:b':ThreadId(6)
ThreadId(1)
これもブロッキングスレッドが 2 になって順番に処理しているという感じ。
current_thread, max_blocking_threads=1, thread_keep_alive=1ms
current_thread, max_blocking_threads=1 で試してみましょう。
fn main() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(1)
.thread_keep_alive(std::time::Duration::from_millis(1))
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(_main())?;
Ok(())
}
async fn _main() -> Result<(), anyhow::Error> {
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..10).map(|i| async move{
tokio::task::spawn_blocking(move||{
println!("{}:a':{:?}",i, std::thread::current().id());
std::thread::sleep(std::time::Duration::from_millis(100));
println!("{}:b':{:?}",i, std::thread::current().id());
}).await
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
結果は…
ThreadId(1)
0:a':ThreadId(2)
0:b':ThreadId(2)
1:a':ThreadId(2)
1:b':ThreadId(2)
2:a':ThreadId(2)
2:b':ThreadId(2)
3:a':ThreadId(2)
3:b':ThreadId(2)
4:a':ThreadId(2)
4:b':ThreadId(2)
5:a':ThreadId(2)
5:b':ThreadId(2)
6:a':ThreadId(2)
6:b':ThreadId(2)
7:a':ThreadId(2)
7:b':ThreadId(2)
8:a':ThreadId(2)
8:b':ThreadId(2)
9:a':ThreadId(2)
9:b':ThreadId(2)
ThreadId(1)
見事にブロッキングスレッド=1といった感じの結果。
multi_thread と同じ結果になりました。
current_thread, max_blocking_threads=2, thread_keep_alive=1ms
では同じ流れで current_thread, max_blocking_threads=2 で試しみましょう
fn main() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(2)
.thread_keep_alive(std::time::Duration::from_millis(1))
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(_main())?;
Ok(())
}
async fn _main() -> Result<(), anyhow::Error> {
println!("{:?}", std::thread::current().id());
let fut = futures::future::join_all((0..10).map(|i| async move{
tokio::task::spawn_blocking(move||{
println!("{}:a':{:?}",i, std::thread::current().id());
std::thread::sleep(std::time::Duration::from_millis(100));
println!("{}:b':{:?}",i, std::thread::current().id());
}).await
}));
tokio::spawn(fut).await?;
println!("{:?}", std::thread::current().id());
Ok(())
}
結果は
ThreadId(1)
0:a':ThreadId(2)
1:a':ThreadId(3)
0:b':ThreadId(2)
2:a':ThreadId(2)
1:b':ThreadId(3)
3:a':ThreadId(3)
2:b':ThreadId(2)
4:a':ThreadId(2)
3:b':ThreadId(3)
5:a':ThreadId(3)
4:b':ThreadId(2)
6:a':ThreadId(2)
5:b':ThreadId(3)
7:a':ThreadId(3)
6:b':ThreadId(2)
8:a':ThreadId(2)
7:b':ThreadId(3)
9:a':ThreadId(3)
9:b':ThreadId(3)
8:b':ThreadId(2)
ThreadId(1)
これも multi_thread とおなじような結果になりました。
multi_thread, worker_threads=1, max_blocking_threads=3, thread_keep_alive=10ms
ここからは block_in_place
編です
この関数を呼ぶと現在のワーカスレッドがブロッキングスレッドへ移行します。 spawn_blocking
と違って引数の FnOnce
に Send + 'static
が要求されないのが ウリです。
現在の async が動いているスレッドがブオロックされてしまうので イテレータの要素ごとに tokio::spawn
するように変更されています。
fn main() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(3)
.thread_keep_alive(std::time::Duration::from_millis(10))
.worker_threads(1)
.enable_all()
.build()
.unwrap();
rt.block_on(_main())?;
Ok(())
}
async fn _main() -> Result<(), anyhow::Error> {
println!("{:?}", std::thread::current().id());
futures::future::join_all((0..10).map(|i| tokio::spawn(async move{
tokio::task::block_in_place(move||{
for j in 0..5 {
println!("{}:{}:a':{:?}",i,j, std::thread::current().id());
std::thread::sleep(std::time::Duration::from_millis(50));
}
})
}))).await;
tokio::task::yield_now().await;
println!("{:?}", std::thread::current().id());
Ok(())
}
結果は……
ThreadId(1)
0:0:a':ThreadId(2)
1:0:a':ThreadId(3)
2:0:a':ThreadId(4)
3:0:a':ThreadId(5)
0:1:a':ThreadId(2)
1:1:a':ThreadId(3)
2:1:a':ThreadId(4)
3:1:a':ThreadId(5)
0:2:a':ThreadId(2)
1:2:a':ThreadId(3)
2:2:a':ThreadId(4)
3:2:a':ThreadId(5)
0:3:a':ThreadId(2)
1:3:a':ThreadId(3)
2:3:a':ThreadId(4)
3:3:a':ThreadId(5)
0:4:a':ThreadId(2)
1:4:a':ThreadId(3)
2:4:a':ThreadId(4)
3:4:a':ThreadId(5)
4:0:a':ThreadId(2)
5:0:a':ThreadId(3)
6:0:a':ThreadId(4)
7:0:a':ThreadId(5)
4:1:a':ThreadId(2)
5:1:a':ThreadId(3)
6:1:a':ThreadId(4)
7:1:a':ThreadId(5)
4:2:a':ThreadId(2)
6:2:a':ThreadId(4)
5:2:a':ThreadId(3)
7:2:a':ThreadId(5)
4:3:a':ThreadId(2)
5:3:a':ThreadId(3)
6:3:a':ThreadId(4)
7:3:a':ThreadId(5)
4:4:a':ThreadId(2)
6:4:a':ThreadId(4)
5:4:a':ThreadId(3)
7:4:a':ThreadId(5)
8:0:a':ThreadId(2)
9:0:a':ThreadId(4)
8:1:a':ThreadId(2)
9:1:a':ThreadId(4)
8:2:a':ThreadId(2)
9:2:a':ThreadId(4)
8:3:a':ThreadId(2)
9:3:a':ThreadId(4)
8:4:a':ThreadId(2)
9:4:a':ThreadId(4)
ThreadId(1)
この結果から
- main thread が ThreadId(1), worker thread が ThreadId(2), blocking thread が ThreadId(3) ~ ThreadId(5) だったものが、途中から worker thread が blocking thread になり、
その代わりに blocking thread が worker thread に入れ替わっているかの挙動をして
いる。 - block_in_place も max_blocking_threads の影響を受ける
ということが分かります
非同期制御
join
- https://docs.rs/futures/0.3.16/futures/future/fn.join.html
- https://docs.rs/futures/0.3.16/futures/prelude/future/fn.join_all.html
- https://docs.rs/futures/0.3.16/futures/macro.join.html
- https://docs.rs/futures/0.3.16/futures/future/fn.try_join.html
- https://docs.rs/futures/0.3.16/futures/macro.try_join.html
join_all
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
use futures::future::BoxFuture;
use futures::FutureExt;
let fut = futures::future::join_all((0..10).map(|_| -> BoxFuture<'_, Result<(), anyhow::Error>>{async{
// 並列 request とか
Ok(())
}.boxed()}));
let rets = tokio::spawn(fut).await?;
rets.into_iter().collect::<Result<Vec<_>, _>>()?;
Ok(())
}
SPMC で非同期タスクを並列処理するパターン
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::SinkExt;
let (tx, rx) = flume::bounded(10);
let tx_task = (move|| -> BoxFuture<'_, Result<(), anyhow::Error>>{
async move {
// なんか非同期でリストを取得し続ける
let urls = vec!["https://example.com"];
for url in urls {
// キューが溢れそうになったら await で待つ
tx.send_async(url).await?;
}
}
})();
let rx_task = futures::future::join_all((0..10).map(|_| -> BoxFuture<'_, Result<(), anyhow::Error>>{
let rx = rx.clone();
async move {
while let Some(url) = rx.next().await {
// reqwest::get とか
}
Ok(())
}.boxed()
}));
let (ret, rets) = tokio::spawn(futures::future::join(tx_task, rx_task)).await?;
ret?;
rets.into_iter().collect::<Result<Vec<_>, _>>()?;
Ok(())
}
select
- https://tokio.rs/tokio/tutorial/select
- https://docs.rs/futures/0.3.16/futures/future/fn.select.html
- https://docs.rs/tokio/1.10.1/tokio/macro.select.html
- https://docs.rs/futures/0.3.16/futures/macro.select.html
キャンセル
- https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8
- https://docs.rs/tokio-util/0.6.7/tokio_util/sync/struct.CancellationToken.html
リトライ
ロギング
mutex
- https://docs.rs/futures/0.3.16/futures/lock/struct.Mutex.html
- https://docs.rs/tokio/1.10.1/tokio/sync/struct.Mutex.html
channel
- spsc - https://docs.rs/tokio/1.10.1/tokio/sync/oneshot/index.html
- mpsc - https://docs.rs/tokio/1.10.1/tokio/sync/mpsc/index.html
- mpmc - https://docs.rs/tokio/1.10.1/tokio/sync/broadcast/index.html
- spmc - https://docs.rs/tokio/1.10.1/tokio/sync/watch/index.html
- Semaphore - https://docs.rs/tokio/1.10.1/tokio/sync/struct.Semaphore.html
Graceful Shutdown
pin
- https://github.com/rust-lang/pin-utils
- https://github.com/taiki-e/pin-project
- https://docs.rs/futures/0.3.16/futures/macro.pin_mut.html
- https://docs.rs/tokio/1.10.1/tokio/macro.pin.html
参照
- https://tokio.rs/tokio/tutorial/async
- https://docs.rs/tokio/1.10.1/tokio/index.html#cpu-bound-tasks-and-blocking-code