まとめ
-
tokio::select!
は、 コンパイル時に既知な非同期処理にしか利用できない。 - 動的に変化する非同期処理を待つには、tokio_stream::StreamMap を使う。
- 合わせて、 tokio_stream::wrappers::ReceiverStream を使うと簡潔に書ける。
tokio::select! の 使用例
Rustでは、tokio::select!
を使うことで、複数の非同期処理のうちから、最初の完了を待つことができる。 参考
例えば、次のようなコードである。
select.rs
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel::<usize>(10);
let (tx2, mut rx2) = mpsc::channel::<usize>(10);
tokio::spawn(async move {
tx1.send(1).await.unwrap();
tx1.send(2).await.unwrap();
});
tokio::spawn(async move {
tx2.send(3).await.unwrap();
tx2.send(4).await.unwrap();
});
loop {
// tokio::select!で、rx1.recv() と rx2.recv() の2つの非同期処理を待つ。
// 手抜きのためbreak処理は省略。
tokio::select! {
val = rx1.recv() => {
println!("recv 1, {:?}", val);
},
val = rx2.recv() => {
println!("recv 2, {:?}", val);
}
}
}
}
しかしながら、tokio::select!
は、コンパイル時に、同時に待ちたい非同期処理(上記例では rx1,recv()
と rx2.recv()
)がわかっている必要がある。
言い換えると、 ある条件時のみ (rx2.recv()
) を待ちたいといった場合には tokio::select!
は利用できない。
これを解決するには、 tokio_stream::StreamMap か、 futures::stream::SelectAll を使う
両者の違いは、 tokio_stream::StreamMap
では、どの非同期処理が完了したかを知ることができる。
tokio_stream::StreamMap の 使用例
tokio_stream::StreamMap にサンプルがあるが、tokio_stream::wrappers::ReceiverStream を使うと、より簡潔に記述できる。
コード例を以下に示す。
mpsc2.rs
use tokio::sync::mpsc;
use tokio_stream::{StreamExt, StreamMap};
#[tokio::main]
async fn main() {
let (tx1, rx1) = mpsc::channel::<usize>(10);
let (tx2, rx2) = mpsc::channel::<usize>(10);
// ReceiverをReceiverStreamに変換する。
let rx1 = tokio_stream::wrappers::ReceiverStream::new(rx1);
let rx2 = tokio_stream::wrappers::ReceiverStream::new(rx2);
tokio::spawn(async move {
tx1.send(1).await.unwrap();
tx1.send(2).await.unwrap();
});
tokio::spawn(async move {
tx2.send(3).await.unwrap();
tx2.send(4).await.unwrap();
});
// StreamMapを作成して、追加する。
let mut map = StreamMap::new();
// Insert both streams
map.insert("one", rx1); // insert, remove で 動的に待つ非同期処理を増減させることできる。
map.insert("two", rx2);
loop {
let result = map.next().await;
let (key, val) = match result {
Some(result) => result,
None => break,
};
println!("recv, key:{:?}, val:{:?}", key, val);
}
}
サンプルコードは ここ にあります。