6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Rustで動的に変化する複数の非同期処理を待つ

Posted at

まとめ


tokio::select! の 使用例

Rustでは、tokio::select!を使うことで、複数の非同期処理のうちから、最初の完了を待つことができる。 参考

例えば、次のようなコードである。

select.rs
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<usize>(10);
    let (tx2, mut rx2) = mpsc::channel::<usize>(10);

    tokio::spawn(async move {
        tx1.send(1).await.unwrap();
        tx1.send(2).await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send(3).await.unwrap();
        tx2.send(4).await.unwrap();
    });

    loop {
        // tokio::select!で、rx1.recv() と rx2.recv() の2つの非同期処理を待つ。
        // 手抜きのためbreak処理は省略。
        tokio::select! {
            val = rx1.recv() => {
                println!("recv 1, {:?}", val);
            },
            val = rx2.recv() => {
                println!("recv 2, {:?}", val);
            }
        }
    }
}

しかしながら、tokio::select! は、コンパイル時に、同時に待ちたい非同期処理(上記例では rx1,recv()rx2.recv() )がわかっている必要がある。
言い換えると、 ある条件時のみ (rx2.recv()) を待ちたいといった場合には tokio::select! は利用できない。

これを解決するには、 tokio_stream::StreamMap か、 futures::stream::SelectAll を使う

両者の違いは、 tokio_stream::StreamMap では、どの非同期処理が完了したかを知ることができる。

tokio_stream::StreamMap の 使用例

tokio_stream::StreamMap にサンプルがあるが、tokio_stream::wrappers::ReceiverStream を使うと、より簡潔に記述できる。

コード例を以下に示す。

mpsc2.rs
use tokio::sync::mpsc;
use tokio_stream::{StreamExt, StreamMap};

#[tokio::main]
async fn main() {
    let (tx1, rx1) = mpsc::channel::<usize>(10);
    let (tx2, rx2) = mpsc::channel::<usize>(10);

    // ReceiverをReceiverStreamに変換する。
    let rx1 = tokio_stream::wrappers::ReceiverStream::new(rx1);
    let rx2 = tokio_stream::wrappers::ReceiverStream::new(rx2);

    tokio::spawn(async move {
        tx1.send(1).await.unwrap();
        tx1.send(2).await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send(3).await.unwrap();
        tx2.send(4).await.unwrap();
    });

    // StreamMapを作成して、追加する。
    let mut map = StreamMap::new();
    // Insert both streams
    map.insert("one", rx1);  // insert, remove で 動的に待つ非同期処理を増減させることできる。
    map.insert("two", rx2);

    loop {
        let result = map.next().await;
        let (key, val) = match result {
            Some(result) => result,
            None => break,
        };
        println!("recv, key:{:?}, val:{:?}", key, val);
    }
}

サンプルコードは ここ にあります。

6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?