133
113

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

非同期 Rust パターン

Last updated at Posted at 2021-08-28

非同期 Rust パターン

Rust で非同期並列処理を書く時に出てくるパターンについて解説します

tokio 1.0 と futures 0.3 環境です

async function 編

async fn hoge() -> ()

まずは基本

async fn hoge() -> () {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

エラーを返さない非同期関数の代表格として tokio::time::sleep を使った。

fn hoge() -> impl Future<Output=()>

async fn を脱糖したももの。

fn hoge() -> impl std::future::Future<Output=()> {
  async {
      tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  }
}

futures::future::Futurefutures::prelude::Futurefutures::Futurestd::future::Future を re-export しているだけなのでどれを使っても良い。

ところで Send + 'static でない Futuretokio::spawn で tokio のマルチスレッドスケジューラで非同期 並列 処理ができない。
async fnimpl FutureSend + 'static を async 文の body から推論してくれるので素直な async 文は spawn できる

fn hoge() -> impl std::future::Future<Output=()> {
  async {
      tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  }
}
let fut = tokio::spawn(hoge());

しかし裏を返せば関数の戻り値の型が関数の実装依存ということになるので、ちょっと変更すると途端にコンパイルエラーになる
例えば、 async の内部で !SendRc を使うと……

fn hoge() -> impl std::future::Future<Output=()> {
  async {
      let a = std::rc::Rc::new(());
      tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  }
}
let fut = tokio::spawn(hoge());

ある日突然コンパイルエラーがやってくる

error: future cannot be sent between threads safely
   --> src/main.rs:11:11
    |
11  | let fut = tokio::spawn(hoge());
    |           ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl Future`, the trait `Send` is not implemented for `Rc<()>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:7:7
    |
6   |       let a = std::rc::Rc::new(());
    |           - has type `Rc<()>` which is not `Send`
7   |       tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `a` maybe used later
8   |       
9   |   }
    |   - `a` is later dropped here

fn hoge() -> Pin<Box<dyn Future<Output=()> + Send + 'static>>

async fnimpl Future は自己再帰や相互再帰で戻り値の型が推論できないので dyn Trait でトレイトオブジェクトを使う必要がある
この型は脳死でタイピングできるようにしておくこと。

fn hoge() -> std::pin::Pin<Box<dyn std::future::Future<Output=()> + Send + 'static>> {
  Box::pin(async {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  })
}

std::pin::Pinstd::prelude に入ってないのがつらいね。

|| -> Pin<Box<dyn Future<Output=()> + Send + 'static>>

pin box は closure でも使える

let closure = move || -> std::pin::Pin<Box<dyn futures::Future<Output=() + Send + 'static>> {
    Box::pin(async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
};

fn hoge() -> BoxFuture<'static, ()>

Pin<Box<dyn ... が流れるようにタイピングできるようになったころに存在に気がつく便利な型エイリアス。
ちなみに type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a, Global>>; です

fn hoge() -> futures::future::BoxFuture<'static, ()> {
    use futures::FutureExt;
    async {
      tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }.boxed()
}

futures::FutureExtboxed もカッコのネストが減って便利。

async fn hoge() -> Result<(), anyhow::Error>

ここから Result 編です。非同期はだいたい IO。IO には失敗がつきものです。

失敗する関数として tokio::signal::ctrl_c() を用意しました。
異常なシグナル を登録しないかぎり エラーになることはまずない ですが。

async fn hoge() -> Result<(), anyhow::Error> {
    tokio::signal::ctrl_c().await?;
    Ok(())
}

? で early return ができて便利ですね。

fn hoge() -> impl Future<Output=Result<(), anyhow::Error>>

説明不要ですね

fn hoge() -> impl Future<Output=Result<(), anyhow::Error>> {
    async{
        tokio::signal::ctrl_c().await?;
        Ok(())
    }
}

fn hoge() -> impl TryFuture<Item=Ok=(), Error=anyhow::Error>

存在意義がよくわからない trait。
Send がつかなくて tokio::spawn できないなどの害もある。謎。

fn hoge() -> impl futures::TryFuture<Item=(), Error=anyhow::Error> {
    async{
        tokio::signal::ctrl_c().await?;
        Ok(())
    }
}

fn hoge() -> Pin<Box<Future<Output=Result<(), anyhow::Error> + Send + 'static>>

説明不要ですね

fn hoge() -> std::pin::Pin<Box<dyn std::future::Future<Output=Result<(), anyhow::Error>> + Send + 'static>> {
  Box::pin(async {
    tokio::signal::ctrl_c().await?;
    Ok(())
  })
}

|| -> Pin<Box<dyn Future<Output=Result<(), anyhow::Error>> + Send + 'static>>

特に async な closure で Result? で early return したいときに頻出

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    tokio::spawn((|| -> std::pin::Pin<Box<dyn futures::Future<Output=Result<(), anyhow::Error>> + Send + 'static>> {
        Box::pin(async {
            tokio::signal::ctrl_c().await?;
            Ok(())
        })
    })()).await?;
    Ok(())
}

|| -> BoxFuture<'static, Result<(), anyhow::Error>>

少し短くなりました

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    tokio::spawn((|| ->  futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
    use futures::FutureExt;
        async {
            tokio::signal::ctrl_c().await?;
            Ok(())
        }.boxed()
    })()).await?;
    Ok(())
}

fn hoge() -> Box<dyn Future<Item=(), Error=failure::Error> + Send + 'static>

Pinasync 文が導入される前の futures 0.1 の頃 はこちらの型が使われていた。
future はモナドなので combinator を使えば moandic に書くことができた。

fn hoge() -> Box<dyn futures_01::Future<Item=(), Error=failure::Error> + Send + 'static> {
  use futures_01::Future;
  let fut = futures_01::future::ok(())
      .and_then(|()|{
          let a = 0;
      futures_01::future::ok(()) })
             .and_then(|()|{
                 println!("{}", a);
                 futures_01::future::ok(()) }) });
  Box::new(fut)
}

monadic なスタイルでは and_then (>>=) をまたいで参照を持てないため、 でかい構造体は BoxArc に包む必要があったり、
非同期リトライなどの loop が素直に書けないなど問題が 山ほど あったため、喧々諤々の議論の末、 async 文 や .await キーワード、 Pin, Unpin などが導入された。
なお async の脱糖構文にあたる generator は (awaityield が対応) 1.0 以前から議論されていて
未だに仕様が定まっていない最近も仕様が提案される など unstable rust の筆頭機能になっている(個人の感想です)

現在の std::future::Futureasync 文と混ぜて書くには futures-compat が必要。

tokio runtime のバージョンが違う場合は tokio_compattokio-compat-02 なども噛ませる必要がある。
anyhow::Error ではなく failure::Error なあたりに時代を感じさせられる。

fn hoge() -> BoxFuture<'static, Result<(), anyhow::Error>>

TryBoxFuture はないです

fn hoge() -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
    use futures::FutureExt;
    async {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        Ok(())
    }.boxed()
}

|| -> BoxFuture<'static, Result<(), anyhow::Error>>

型が短くなりました。

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    tokio::spawn((move || -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
    use futures::FutureExt;
        async move {
            fuga()?;
            Ok(())
        }.boxed()
    })()).await?;
    Ok(())
}

async fn hoge(foo: &str) -> ()

ここからは参照のライフタイムがある async 関数編です。
まずは基本。main 関数のスタックに載ったヒープメモリへの参照を async fn で使うパターン。

async fn hoge(foo: &str) -> () {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("{}", foo);
}

#[tokio::main]
async fn main() {
    let foo = "hello world!".to_string(); // heap にある String を参照
    hoge(&foo).await;
}

これは &'static でないので tokio::spawn はできない。

error[E0597]: `foo` does not live long enough
 --> src/main.rs:5:21
  |
5 |   tokio::spawn(hoge(&foo)).await;
  |                -----^^^^-
  |                |    |
  |                |    borrowed value does not live long enough
  |                argument requires that `foo` is borrowed for `'static`
6 | }
  | - `foo` dropped here while still borrowed

こういうときは素直に Arc を使おう

async fn hoge(foo: Arc<String>) -> ()

少々野暮ったいですが、こう。

async fn hoge(foo: std::sync::Arc<String>) -> () {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("{}", foo);
}

#[tokio::main]
async fn main() {
    let foo = std::sync::Arc::new("hello world!".to_string());
    hoge(std::sync::Arc::clone(&foo)).await;
}

Arc でしか使えない関数になってしまった!
そんなときは……

async fn hoge(foo: Deref<Target=String>) -> ()

impl Deref<Target=...> で書くと Arc, Rc, Box に全対応した Generic な関数が書ける。
ただし引数が Send かどうかで tokio::spawn できなくなるので注意。

async fn hoge(foo: impl std::ops::Deref<Target=String>) -> () {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("{}", foo.deref());
}

#[tokio::main]
async fn main() {
  let foo = std::sync::Arc::new("hello world!".to_string());
  tokio::spawn(hoge(std::sync::Arc::clone(&foo))).await;
  
  let foo = std::rc::Rc::new("hello world!".to_string());
  hoge(std::rc::Rc::clone(&foo)).await;
 
  let foo = Box::new("hello world!".to_string());
  tokio::spawn(hoge(foo)).await;
  
  // String 自身も Deref<Target=String> である
  let foo = "hello world!".to_string();
  hoge(&foo).await;
}

Arc::cloneRc::clone を直接読んでいるのは Arc<T> 内の T が Clone だったときにメソッド名が衝突して紛らわしいからです

fn hoge(foo: &str) -> impl Future<Output=()> + '_

引数のライフタイムを推論させたもの。

fn hoge(foo: &str) -> impl futures::Future<Output=()> + '_ {
    async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("{}", foo);
    }
}

#[tokio::main]
async fn main() {
  let foo = "hello world!".to_string();
  hoge(&foo).await;
}

async move しないと async 文が生成する無名の impl Future 構造体内にキャプチャする参照が hoge 関数のスタックへの参照になって borrow checker に引っかかる

error[E0373]: async block may outlive the current function, but it borrows `foo`, which is owned by the current function
 --> src/main.rs:2:11
  |
2 |       async {
  |  ___________^
3 | |         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
4 | |         println!("{}", foo);
  | |                        --- `foo` is borrowed here
5 | |     }
  | |_____^ may outlive borrowed value `foo`
  |
note: async block is returned here
 --> src/main.rs:2:5
  |
2 | /     async {
3 | |         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
4 | |         println!("{}", foo);
5 | |     }
  | |_____^
help: to force the async block to take ownership of `foo` (and any other referenced variables), use the `move` keyword
  |
2 |     async move {
3 |         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
4 |         println!("{}", foo);
5 |     }
  |

このへんは closure と考え方が一緒。

fn hoge<'a>(foo: &'a str) -> impl Fn() -> () + 'a {
    move || {
        println!("{}", foo);
    }
}

#[tokio::main]
async fn main() {
  let foo = "hello world!".to_string();
  (hoge(&foo))();
}

move を付けなかった場合はエラーになる

error[E0373]: closure may outlive the current function, but it borrows `foo`, which is owned by the current function
 --> src/main.rs:2:5
  |
2 |     || {
  |     ^^ may outlive borrowed value `foo`
3 |         println!("{}", foo);
  |                        --- `foo` is borrowed here
  |
note: closure is returned here
 --> src/main.rs:2:5
  |
2 | /     || {
3 | |         println!("{}", foo);
4 | |     }
  | |_____^
help: to force the closure to take ownership of `foo` (and any other referenced variables), use the `move` keyword
  |
2 |     move || {
  |     ^^^^^^^

fn hoge<'a>(foo: &'a str) -> impl futures::Future<Output=()> + 'a

↑の参照のライフタイムを明示したもの。

fn hoge<'a>(foo: &'a str) -> impl futures::Future<Output=()> + 'a {
    async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("{}", foo);
    }
}

#[tokio::main]
async fn main() {
  let foo = "hello world!".to_string();
  hoge(&foo).await;
}

ただし clippy にはそんな自明な型は省略しろと怒られる

warning: explicit lifetimes given in parameter types where they could be elided (or replaced with `'_` if needed by type declaration)
 --> src/main.rs:1:1
  |
1 | fn hoge<'a>(foo: &'a str) -> impl futures::Future<Output=()> + 'a {
  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(clippy::needless_lifetimes)]` on by default
  = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes

世知辛いね。

fn hoge<'a>(foo: &'a str) -> Pin<Box<dyn Future<Output=()> + Send + 'a>>

この型は一番汎用的なのでよく使うと思う。

fn hoge<'a>(foo: &'a str) -> std::pin::Pin<Box<dyn std::future::Future<Output=()> + Send + 'a>> {
    use futures::FutureExt;
    async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("{}", foo);
    }.boxed()
}

#[tokio::main]
async fn main() {
  let foo = "hello world!".to_string();
  hoge(&foo).await;
}

fn hoge<'a>(foo: &'a str) -> Pin<Box<dyn Future<Output=()> + 'a>>

Send がついていないバージョン。

fn hoge<'a>(foo: &'a str) -> std::pin::Pin<Box<dyn std::future::Future<Output=()> + Send + 'a>> {
    use futures::FutureExt;
    async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("{}", foo);
    }.boxed_local()
}

#[tokio::main]
async fn main() {
  let foo = "hello world!".to_string();
  hoge(&foo).await;
}

.boxed は Send がついてしまって型が合わないので、この場合は .boxed_local が必要。

fn hoge<'a>(foo: &'a str) -> BoxFuture<'a, ()>

説明不要ですね。

fn hoge<'a>(foo: &'a str) -> futures::future::BoxFuture<'a, ()> {
    use futures::FutureExt;
    async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("{}", foo);
    }.boxed()
}

#[tokio::main]
async fn main() {
  let foo = "hello world!".to_string();
  hoge(&foo).await;
}

for<'a> Fn(&'a str) -> BoxFuture<'a, Result<(), anyhow::Error>>

この型は join_all で並列処理するときに頻出する。

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    let a = vec![
        "a".to_string(),
        "b".to_string(),
        "c".to_string()
    ];

    let futs = a.iter().map(|a|-> futures::future::BoxFuture<'_, Result<(), anyhow::Error>> {
        use futures::FutureExt;
        async move {
            tokio::signal::ctrl_c().await?;
            println!("{}", a);
            Ok(())
        }.boxed()
    });

    let rets = futures::future::join_all(futs).await;
    rets.into_iter().collect::<Result<Vec<_>, _>>()?;

    Ok(())
}

map の引数の aasync 文が生成する無名の impl Future 構造体の中に入れるために async move しています。分かりますか?

ただし 'static ではないので tokio::spawn による非同期並列処理ができません。

for<'a> Fn(&'a str) -> BoxFuture<'static, Result<(), anyhow::Error>>

並列数が増えてきたらマルチスレッドで動かせるように Arc などに包もう。

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    let a = vec![
        Arc::new("a".to_string())
    ];
    let futs = a.iter().map(|a|-> futures::future::BoxFuture<'static, Result<(), anyhow::Error>> {
        let a = Arc::clone(a);
        use futures::FutureExt;
        async move {
            tokio::signal::ctrl_c().await?;
            println!("{}", a);
            Ok(())
        }.boxed()
    });
    let rets = tokio::spawn(futures::future::join_all(futs)).await?;
    rets.into_iter().collect::<Result<Vec<_>, _>>()?;
    Ok(())
}

|..| { async { .. } }

ここからは async closure 編です
move closure と async move の組み合わせを見ていきましょう。
まずは変数キャプチャなしの場合。

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    (0..10_u8).map(|_| async {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }).collect::<Vec<_>>();
    Ok(())
}

closure の外の変数も async の外の変数も async の中で使っていないので move する必要はありません。

|..| { async move { .. } }

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    (0..10_u8).map(|i| async move{
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("{}", i);
    }).collect::<Vec<_>>();
    Ok(())
}

closure の引数 i を async 文の生成する無名の impl Future 構造体の中に取り込むために move が必要です
付けないと怒られます。

error[E0373]: async block may outlive the current function, but it borrows `i`, which is owned by the current function
 --> src/main.rs:3:30
  |
3 |       (0..10_u8).map(|i| async {
  |  ______________________________^
4 | |         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5 | |         println!("{}", i);
  | |                        - `i` is borrowed here
6 | |     }).collect::<Vec<_>>();
  | |_____^ may outlive borrowed value `i`
  |
note: async block is returned here
 --> src/main.rs:3:24
  |
3 |       (0..10_u8).map(|i| async {
  |  ________________________^
4 | |         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5 | |         println!("{}", i);
6 | |     }).collect::<Vec<_>>();
  | |_____^
help: to force the async block to take ownership of `i` (and any other referenced variables), use the `move` keyword
  |
3 |     (0..10_u8).map(|i| async move {
4 |         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5 |         println!("{}", i);
6 |     }).collect::<Vec<_>>();
  |

概念的には move のない async 文は以下のような構造体を生成しています。

struct AsyncStruct<'a> {
    a: &'a u8,
    ...
}
impl<'s> Future for AsyncStruct<'s> {
    ...
}

move をつけるとこう。

struct AsyncStruct<'a> {
    a: u8,
    ...
}
impl<'s> Future for AsyncStruct<'s> {
    ...
}

例えば 10 並列で http request を投げる時に、 http クライアントを使い回す場合なんかにも使います。

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    let client = reqwest::Client::new();
    let futs = (0..10_u8).map(|_|{
        let client = client.clone();
        async move {
            let body = client.get("https://www.rust-lang.org")
                .send().await.unwrap()
                .text().await.unwrap();
            println!("{}", body);
        }
    });
    futures::future::join_all(futs).await;
    Ok(())
}

move |..| { async { .. } }

あんまり場面がない気がする。

move |..| { async move { .. } }

あんまり場面がない気がする。
強いて挙げるならカリー化するとき?

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    let c = std::sync::Arc::new(1);
    let curried = { let c = std::sync::Arc::clone(&c); move |a| move |b| async move {
        println!("a + b + c = {}", a + b + c.as_ref());
    }};
    tokio::spawn((curried)(1)(1)).await?;
    Ok(())
}

async stream 編

ここからは若干 unstable な話になってきます。

非同期ストリームの実装は

の 3 つあり、そのコンビネータ体系は

4 つ存在していて、全部違います。

さらに async iterator の実装には

があり、混沌としています。どうしてこうなった。

fn fuga() -> impl futures::Stream<Item = i64>> + Send

まずは基本です

fn fuga() -> impl futures::Stream<Item = i64>> + Send {
    async_stream::stream! {
        let mut i = 0;
        loop{
            yield i;
            i+=1;
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let src = fuga();
    // stream を await するのに必要 - https://tokio.rs/tokio/tutorial/streams
    futures::pin_mut!(src);
    while let Some(value) = src.next().await {
        println!("got {}", value);
    }
}

fn fuga() -> impl futures::Stream<Item = Result<i64, anyhow::Error>> + Send

fn fuga() -> impl futures::Stream<Item = Result<i64, anyhow::Error>> + Send {
    async_stream::try_stream! {
        let mut i = 0;
        loop{
            tokio::signal::ctrl_c().await?;
            yield i;
            i+=1;
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error>{
    let src = fuga();
    futures::pin_mut!(src);
    while let Some(value) = src.next().await.transpose()? {
        println!("got {}", value);
    }
    Ok(())
}

async spmc stream

single-producer multi-consumer パターンです。

ページネーションのある web-api の各項目を非同期処理する場合などにスループットを上げることができます。

非同期対応した固定長 mpmc キューには flume::bounded が便利です。
固定長なので、バッファしていっぱいになったら producer 側は .await で consumer 側の消費を待てて、
consumer 側も mc の特権である Stream + Clone から .await した順に処理対象が降ってるので。

const CONSUMERS: usize = 10;
const BUFFERS: usize = 100;

#[tokio::main]
async fn main() {
    let src = stream! {
        let mut i = 0_u64;
        loop {
            yield i;
            i += 1;
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        }
    };
    let (src, fut) = into_spmc(src);
    use futures::TryStreamExt;
    let fut2 = src.try_for_each_concurrent(CONSUMERS, |i| async move{
       println!("{:?}:{}", std::thread::current().id(), i);
       Ok(())
    });
    let (a, _) = tokio::spawn(futures::future::join(fut2, fut)).await?;
    a?;
    Ok(())
}

pub fn into_spmc<'a, T>(
    size: usize,
    src: impl futures::Stream<Item = T> + Send + 'a,
) -> (
    impl futures::Stream<Item = T> + Clone + Send + Sync + 'a,
    impl futures::Future<Output = ()> + Send + 'a,
)
where
    T: Send + 'static,
{
    use futures::FutureExt;
    use futures::StreamExt;
    let (tx, rx) = flume::bounded(size);
    let fut = src.map(Ok).forward(tx.into_sink());
    let fut2 = fut.then(|_: Result<_, _>| futures::future::ready(()));
    (rx.into_stream(), fut2)
}

async trait method

ここまで読んだあなたは trait の method も

trait Hoge {
    type Error;
    async fn hoge(&str) -> Result<(), Self::Error>;
}

みたいに書きたくなると思いますが、stable では使えません。
https://github.com/dtolnay/async-trait のマクロを使えば見た目上は使えます。 PinBox にラップされますが。

tokio の current_thread と multi_thread の違い

普段脳死で #[tokio::main] と書いていると気が付きませんが、 tokio のランタイムには以下の設定項目があります

  • 非同期ランタイムが new_multi_thread か current_thread か
  • spawn で並列処理するときの非同期ランタイムの worker_threads はいくつか(new_multi_thread の場合)
  • spawn_blocking または - block_in_place で同期 IO 待つための blocking_threads はいくつか

#[tokio::main] と書いたときのデフォルトの挙動は次のとおりです

let rt = tokio::runtime::Builder::new_multi_thread()
    .max_blocking_threads(512)
    .thread_keep_alive(std::time::Duration::from_secs(10))
    .thread_stack_size(2*1024*1024*1024) // 2MB
    .worker_threads(4)
    .enable_all() 
    .build()
    .unwrap();
  • enable_all - io と timer の runtime を有効化する
  • worker_threads - tokio::spawn したタスクがマルチスレッド処理されるときのワーカスレッド数。デフォは CPU 個数。
  • max_blocking_threads - spawn_blocking や block_in_place で作られたブロッキングスレッドのスレッドープールの大数
  • thread_keep_alive - ブロッキングスレッドがスレッドプールに生成されてから再利用されずにスレッドプールから消えるまでの時間
    • 以前の記事 でも確認したように、 tokio 0.1 の頃の blocking では ブロッキングIO用のスレッドプール は終了も再利用もされずに増えていくバグがありましたが、このパラメータはブロッキングIO用のスレッドプールのスレッド数を制御するものですね
  • thread_stack_size - 同期・非同期それぞれのスレッドプールで生成されるスレッドの最大スタックサイズです。他のパラメータと合わせてメモリ使用量などを制御できそう。現在のデフォは 2MB

見ていきましょう

#[tokio::main(flavor = "current_thread")]

10 個の async ブロックを join_all で並行実行したものを tokio::spawn する例です
tokio::spawn(fut).await で spawn した結果を await で待っていますが、spawn したタスクは tokio のメインスレッドの終了時に待たずにそのまま drop されてしまうからです。
spawn して待たない場合、いつ殺されてもいいようにタスクを作るか graceful shutdown を実装しましょう

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error>{
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..=3).map(|i| async move{
        println!("{}:a:{:?}",i, std::thread::current().id());
        tokio::time::sleep(std::time::Duration::from_millis(1)).await;
        println!("{}:b:{:?}",i, std::thread::current().id());
    }));
    tokio::spawn(fut).await;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

結果がこちら

ThreadId(1)
0:a:ThreadId(1)
1:a:ThreadId(1)
2:a:ThreadId(1)
3:a:ThreadId(1)
4:a:ThreadId(1)
5:a:ThreadId(1)
6:a:ThreadId(1)
7:a:ThreadId(1)
8:a:ThreadId(1)
9:a:ThreadId(1)
0:b:ThreadId(1)
1:b:ThreadId(1)
2:b:ThreadId(1)
3:b:ThreadId(1)
4:b:ThreadId(1)
5:b:ThreadId(1)
6:b:ThreadId(1)
7:b:ThreadId(1)
8:b:ThreadId(1)
9:b:ThreadId(1)
ThreadId(1)

この結果から以下のことがわかります。

  • すべてのタスクが同じスレッドで動作する
  • tokio::spawn はできるが 型上は Send + 'static を要求される

このような性質は JavaScript のランタイムに近いですね。

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), anyhow::Error>{
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..10).map(|i| async move{
        println!("{}:a:{:?}",i, std::thread::current().id());
        tokio::time::sleep(std::time::Duration::from_millis(1)).await;
        println!("{}:b:{:?}",i, std::thread::current().id());
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

結果がこちら

ThreadId(1)
0:a:ThreadId(3)
1:a:ThreadId(3)
2:a:ThreadId(3)
3:a:ThreadId(3)
4:a:ThreadId(3)
5:a:ThreadId(3)
6:a:ThreadId(3)
7:a:ThreadId(3)
8:a:ThreadId(3)
9:a:ThreadId(3)
0:b:ThreadId(2)
1:b:ThreadId(2)
2:b:ThreadId(2)
3:b:ThreadId(2)
4:b:ThreadId(2)
5:b:ThreadId(2)
6:b:ThreadId(2)
7:b:ThreadId(2)
8:b:ThreadId(2)
9:b:ThreadId(2)
ThreadId(1)

この結果から以下のことがわかります。

  • tokio::spawn したタスクは .await をまたいでスレッドが変わる! (Send + 'static の面目躍如!)
  • main 関数内は .await をまたいでもスレッドが変わらない (Send + 'static は要求されていないので)

このような性質は Go のランタイムに近いですね。
実際 tokio のマルチスレッドスケジューラは go と同じ work-stealing を採用しています。

ちなみに tokio::time::sleep の代わりに tokio::task::yield_now を使うと……

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), anyhow::Error>{
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..=3).map(|i| async move{
        println!("{}:a:{:?}",i, std::thread::current().id());
        tokio::task::yield_now().await;
        println!("{}:b:{:?}",i, std::thread::current().id());
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}
ThreadId(1)
0:a:ThreadId(3)
1:a:ThreadId(3)
2:a:ThreadId(3)
3:a:ThreadId(3)
0:b:ThreadId(3)
1:b:ThreadId(3)
2:b:ThreadId(3)
3:b:ThreadId(3)
ThreadId(1)

同一ワーカスレッドで処理されました。このような性質は coroutine や generator に近いですね。
実際このような性質を利用して await で generator を再現しようという genawaiter というクレートもあります (こちらは tokio ランタイムに依存しないようにできています)。

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() -> Result<(), anyhow::Error>{
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..10).map(|i| async move{
        println!("{}:a:{:?}",i, std::thread::current().id());
        tokio::time::sleep(std::time::Duration::from_millis(1)).await;
        println!("{}:b:{:?}",i, std::thread::current().id());
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

結果がこちら。

ThreadId(1)
0:a:ThreadId(2)
1:a:ThreadId(2)
2:a:ThreadId(2)
3:a:ThreadId(2)
4:a:ThreadId(2)
5:a:ThreadId(2)
6:a:ThreadId(2)
7:a:ThreadId(2)
8:a:ThreadId(2)
9:a:ThreadId(2)
0:b:ThreadId(2)
1:b:ThreadId(2)
2:b:ThreadId(2)
3:b:ThreadId(2)
4:b:ThreadId(2)
5:b:ThreadId(2)
6:b:ThreadId(2)
7:b:ThreadId(2)
8:b:ThreadId(2)
9:b:ThreadId(2)
ThreadId(1)

この結果から以下のことがわかります。

  • .await をまたいでスレッドは変わらない
  • tokio::spawn するとメインスレッドとは別スレッドで動く

multi_thread, worker_threads=4, max_blocking_threads=1, thread_keep_alive=1ms

ここからは spawn_blocking 編です

max_blocking_threadsthread_keep_alivetokio::main マクロからは設定できないので自分でランタイムを生成する必要があります。

fn main() -> Result<(), anyhow::Error> {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .max_blocking_threads(1)
        .thread_keep_alive(std::time::Duration::from_millis(1))
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap();
        rt.block_on(_main())?;
    Ok(())
}

ちょっと極端な設定ですが blocking_threads=1, thread_keep_alive=1ms で試してみましょう

spawn_blocking を並列で走らせるとどうなるでしょうか?

async fn _main() -> Result<(), anyhow::Error> {
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..10).map(|i| async move{
        tokio::task::spawn_blocking(move||{
            println!("{}:a':{:?}",i, std::thread::current().id());
            std::thread::sleep(std::time::Duration::from_millis(100));
            println!("{}:b':{:?}",i, std::thread::current().id());
        }).await
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

得られた結果は……

ThreadId(1)
0:a':ThreadId(2)
0:b':ThreadId(2)
1:a':ThreadId(2)
1:b':ThreadId(2)
2:a':ThreadId(2)
2:b':ThreadId(2)
3:a':ThreadId(2)
3:b':ThreadId(2)
4:a':ThreadId(2)
4:b':ThreadId(2)
5:a':ThreadId(2)
5:b':ThreadId(2)
6:a':ThreadId(2)
6:b':ThreadId(2)
7:a':ThreadId(2)
7:b':ThreadId(2)
8:a':ThreadId(2)
8:b':ThreadId(2)
9:a':ThreadId(2)
9:b':ThreadId(2)
ThreadId(1)

見事に blocking_threads=1 といった感じです。
1つのブロッキングスレッドにブロッキングタスク10個キューイングされたみたい。

multi_thread, worker_threads=4, max_blocking_threads=2, thread_keep_alive=1ms

max_blocking_threads=2 にしてみましょう

fn main() -> Result<(), anyhow::Error> {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .max_blocking_threads(2)
        .thread_keep_alive(std::time::Duration::from_millis(1))
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap();
        rt.block_on(_main())?;
    Ok(())
}

async fn _main() -> Result<(), anyhow::Error> {
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..10).map(|i| async move{
        tokio::task::spawn_blocking(move||{
            println!("{}:a':{:?}",i, std::thread::current().id());
            std::thread::sleep(std::time::Duration::from_millis(100));
            println!("{}:b':{:?}",i, std::thread::current().id());
        }).await
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}
ThreadId(1)
0:a':ThreadId(7)
1:a':ThreadId(6)
0:b':ThreadId(7)
1:b':ThreadId(6)
3:a':ThreadId(6)
2:a':ThreadId(7)
3:b':ThreadId(6)
4:a':ThreadId(6)
2:b':ThreadId(7)
5:a':ThreadId(7)
5:b':ThreadId(7)
6:a':ThreadId(7)
4:b':ThreadId(6)
7:a':ThreadId(6)
6:b':ThreadId(7)
8:a':ThreadId(7)
7:b':ThreadId(6)
9:a':ThreadId(6)
8:b':ThreadId(7)
9:b':ThreadId(6)
ThreadId(1)

これもブロッキングスレッドが 2 になって順番に処理しているという感じ。

current_thread, max_blocking_threads=1, thread_keep_alive=1ms

current_thread, max_blocking_threads=1 で試してみましょう。

fn main() -> Result<(), anyhow::Error> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .max_blocking_threads(1)
        .thread_keep_alive(std::time::Duration::from_millis(1))
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap();
        rt.block_on(_main())?;
    Ok(())
}

async fn _main() -> Result<(), anyhow::Error> {
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..10).map(|i| async move{
        tokio::task::spawn_blocking(move||{
            println!("{}:a':{:?}",i, std::thread::current().id());
            std::thread::sleep(std::time::Duration::from_millis(100));
            println!("{}:b':{:?}",i, std::thread::current().id());
        }).await
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

結果は…

ThreadId(1)
0:a':ThreadId(2)
0:b':ThreadId(2)
1:a':ThreadId(2)
1:b':ThreadId(2)
2:a':ThreadId(2)
2:b':ThreadId(2)
3:a':ThreadId(2)
3:b':ThreadId(2)
4:a':ThreadId(2)
4:b':ThreadId(2)
5:a':ThreadId(2)
5:b':ThreadId(2)
6:a':ThreadId(2)
6:b':ThreadId(2)
7:a':ThreadId(2)
7:b':ThreadId(2)
8:a':ThreadId(2)
8:b':ThreadId(2)
9:a':ThreadId(2)
9:b':ThreadId(2)
ThreadId(1)

見事にブロッキングスレッド=1といった感じの結果。
multi_thread と同じ結果になりました。

current_thread, max_blocking_threads=2, thread_keep_alive=1ms

では同じ流れで current_thread, max_blocking_threads=2 で試しみましょう

fn main() -> Result<(), anyhow::Error> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .max_blocking_threads(2)
        .thread_keep_alive(std::time::Duration::from_millis(1))
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap();
        rt.block_on(_main())?;
    Ok(())
}

async fn _main() -> Result<(), anyhow::Error> {
    println!("{:?}", std::thread::current().id());
    let fut = futures::future::join_all((0..10).map(|i| async move{
        tokio::task::spawn_blocking(move||{
            println!("{}:a':{:?}",i, std::thread::current().id());
            std::thread::sleep(std::time::Duration::from_millis(100));
            println!("{}:b':{:?}",i, std::thread::current().id());
        }).await
    }));
    tokio::spawn(fut).await?;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

結果は

ThreadId(1)
0:a':ThreadId(2)
1:a':ThreadId(3)
0:b':ThreadId(2)
2:a':ThreadId(2)
1:b':ThreadId(3)
3:a':ThreadId(3)
2:b':ThreadId(2)
4:a':ThreadId(2)
3:b':ThreadId(3)
5:a':ThreadId(3)
4:b':ThreadId(2)
6:a':ThreadId(2)
5:b':ThreadId(3)
7:a':ThreadId(3)
6:b':ThreadId(2)
8:a':ThreadId(2)
7:b':ThreadId(3)
9:a':ThreadId(3)
9:b':ThreadId(3)
8:b':ThreadId(2)
ThreadId(1)

これも multi_thread とおなじような結果になりました。

multi_thread, worker_threads=1, max_blocking_threads=3, thread_keep_alive=10ms

ここからは block_in_place 編です
この関数を呼ぶと現在のワーカスレッドがブロッキングスレッドへ移行します。 spawn_blocking と違って引数の FnOnceSend + 'static が要求されないのが ウリです。
現在の async が動いているスレッドがブオロックされてしまうので イテレータの要素ごとに tokio::spawn するように変更されています。

fn main() -> Result<(), anyhow::Error> {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .max_blocking_threads(3)
        .thread_keep_alive(std::time::Duration::from_millis(10))
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();
        rt.block_on(_main())?;
    Ok(())
}

async fn _main() -> Result<(), anyhow::Error> {
    println!("{:?}", std::thread::current().id());
    futures::future::join_all((0..10).map(|i| tokio::spawn(async move{
        tokio::task::block_in_place(move||{
            for j in 0..5 {
                println!("{}:{}:a':{:?}",i,j, std::thread::current().id());
                std::thread::sleep(std::time::Duration::from_millis(50));
            }
        })
    }))).await;
    tokio::task::yield_now().await;
    println!("{:?}", std::thread::current().id());
    Ok(())
}

結果は……

ThreadId(1)
0:0:a':ThreadId(2)
1:0:a':ThreadId(3)
2:0:a':ThreadId(4)
3:0:a':ThreadId(5)
0:1:a':ThreadId(2)
1:1:a':ThreadId(3)
2:1:a':ThreadId(4)
3:1:a':ThreadId(5)
0:2:a':ThreadId(2)
1:2:a':ThreadId(3)
2:2:a':ThreadId(4)
3:2:a':ThreadId(5)
0:3:a':ThreadId(2)
1:3:a':ThreadId(3)
2:3:a':ThreadId(4)
3:3:a':ThreadId(5)
0:4:a':ThreadId(2)
1:4:a':ThreadId(3)
2:4:a':ThreadId(4)
3:4:a':ThreadId(5)
4:0:a':ThreadId(2)
5:0:a':ThreadId(3)
6:0:a':ThreadId(4)
7:0:a':ThreadId(5)
4:1:a':ThreadId(2)
5:1:a':ThreadId(3)
6:1:a':ThreadId(4)
7:1:a':ThreadId(5)
4:2:a':ThreadId(2)
6:2:a':ThreadId(4)
5:2:a':ThreadId(3)
7:2:a':ThreadId(5)
4:3:a':ThreadId(2)
5:3:a':ThreadId(3)
6:3:a':ThreadId(4)
7:3:a':ThreadId(5)
4:4:a':ThreadId(2)
6:4:a':ThreadId(4)
5:4:a':ThreadId(3)
7:4:a':ThreadId(5)
8:0:a':ThreadId(2)
9:0:a':ThreadId(4)
8:1:a':ThreadId(2)
9:1:a':ThreadId(4)
8:2:a':ThreadId(2)
9:2:a':ThreadId(4)
8:3:a':ThreadId(2)
9:3:a':ThreadId(4)
8:4:a':ThreadId(2)
9:4:a':ThreadId(4)
ThreadId(1)

この結果から

  • main thread が ThreadId(1), worker thread が ThreadId(2), blocking thread が ThreadId(3) ~ ThreadId(5) だったものが、途中から worker thread が blocking thread になり、
    その代わりに blocking thread が worker thread に入れ替わっているかの挙動をして
    いる。
  • block_in_place も max_blocking_threads の影響を受ける

ということが分かります

非同期制御

join

join_all

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    use futures::future::BoxFuture;
    use futures::FutureExt;
    let fut = futures::future::join_all((0..10).map(|_| -> BoxFuture<'_, Result<(), anyhow::Error>>{async{
        // 並列 request とか
        Ok(())
    }.boxed()}));
    let rets = tokio::spawn(fut).await?;
    rets.into_iter().collect::<Result<Vec<_>, _>>()?;
    Ok(())
}

SPMC で非同期タスクを並列処理するパターン

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    use futures::future::BoxFuture;
    use futures::FutureExt;
    use futures::SinkExt;
    let (tx, rx) = flume::bounded(10);
    let tx_task = (move|| -> BoxFuture<'_, Result<(), anyhow::Error>>{
        async move {
            // なんか非同期でリストを取得し続ける
            let urls = vec!["https://example.com"];
            for url in urls {
                // キューが溢れそうになったら await で待つ
                tx.send_async(url).await?;
            }
        }
    })();
    let rx_task = futures::future::join_all((0..10).map(|_| -> BoxFuture<'_, Result<(), anyhow::Error>>{
        let rx = rx.clone();
        async move {
            while let Some(url) = rx.next().await {
                // reqwest::get とか
            }
            Ok(())
        }.boxed()
    }));
    let (ret, rets) = tokio::spawn(futures::future::join(tx_task, rx_task)).await?;
    ret?;
    rets.into_iter().collect::<Result<Vec<_>, _>>()?;
    Ok(())
}

select

キャンセル

リトライ

ロギング

mutex

channel

Graceful Shutdown

pin

参照

動作確認環境

133
113
8

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
133
113

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?