Rustにはcrossbeamという低レベルの並列処理を実装するためのライブラリ群があります
Crossbeam: support for concurrent programming
https://github.com/crossbeam-rs/crossbeam
tokioやrayonのような高次のライブラリもこれらを使用しています。今回は特にcrossbeam-channelを使用してみたいと思います。
std::sync::mpsc
crossbeam-channelは標準ライブラリにあるstd::sync::mpscを意識して作られたAPIです。Multi-Producer Single-Consumer (MPSC)の名前通り、mpsc
は複数の送信者と一人の受信者の間のチャンネルを構築するためのライブラリです。
use std::thread;
use std::sync::mpsc::channel;
// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);
(stdのドキュメントより) channel()
関数によってSender<T>
とReceiver<T>
の組を作り、これらを使って送信・受信を行います。送られたデータ tx.send(10)
は無制限でキューに積まれ、受信者が順番に取り出します rx.recv()
。送受信される型T
はチャンネル毎に固定で、この例では型推論からusize
になっています。
crossbeam-channel
これだけでも使いどころによっては十分便利なのですが、受信者が一人しか居ないという状況は大きな制約です。また、Sender<T>
, Receiver<T>
がSync
を持たないのでスレッド間で安全に受け渡すことができません。これを改善し、Multi-Producer Multi-Consumer (MPMC)としたのがcrossbeam-channelです。
単純なサンプルを書いてみました:
extern crate crossbeam_channel;
use crossbeam_channel::unbounded;
use std::thread;
use std::time::Duration;
fn main() {
// 1. create unbounded channel
let (tx, rx) = unbounded();
// 2. start worker (consumer) threads
let th_cs: Vec<_> = (0..3)
.map(|n| {
let rx = rx.clone();
thread::spawn(move || loop {
match rx.recv() {
Ok(i) => println!("Get {} on thread {}", i, n),
Err(_) => break,
};
thread::sleep(Duration::from_secs(1));
})
})
.collect();
// 3. push tasks
for i in 0..10 {
tx.send(i).unwrap();
}
drop(tx); // close sender
// 4. wait workers
for t in th_cs {
t.join().unwrap();
}
}
-
crossbeam-channel::unbounded()
によりMPMCチャンネルを作成- 上限個数を指定する場合は
bounded()
で作成 - 送受信するデータは同じく
usize
- 上限個数を指定する場合は
- ワーカースレッドを起動
- ワーカーは
rx
のコピーを保持して、そこからデータを読み出す - チャンネルが閉じられてたら
rx.recv()
がエラー(RecvError
)を投げるので、それをもってスレッドを終了する
- ワーカーは
- メインスレッドでタスクをチャンネルに投入
-
drop(tx)
で明示的にチャンネルを閉じてる
-
- ワーカースレッドの終了を待機
最後に
channelによる並行処理の記述はメモリを共有して行う処理よりも分かりやすくて良いですね(/・ω・)/