LoginSignup
26
15

More than 5 years have passed since last update.

MultiProducer-MultiConsumer (MPMC) channel using crossbeam-channel

Posted at

Rustにはcrossbeamという低レベルの並列処理を実装するためのライブラリ群があります

Crossbeam: support for concurrent programming
https://github.com/crossbeam-rs/crossbeam

tokiorayonのような高次のライブラリもこれらを使用しています。今回は特にcrossbeam-channelを使用してみたいと思います。

std::sync::mpsc

crossbeam-channelは標準ライブラリにあるstd::sync::mpscを意識して作られたAPIです。Multi-Producer Single-Consumer (MPSC)の名前通り、mpscは複数の送信者と一人の受信者の間のチャンネルを構築するためのライブラリです。

use std::thread;
use std::sync::mpsc::channel;

// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
    tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);

(stdのドキュメントより) channel()関数によってSender<T>Receiver<T>の組を作り、これらを使って送信・受信を行います。送られたデータ tx.send(10) は無制限でキューに積まれ、受信者が順番に取り出します rx.recv()。送受信される型Tはチャンネル毎に固定で、この例では型推論からusizeになっています。

crossbeam-channel

これだけでも使いどころによっては十分便利なのですが、受信者が一人しか居ないという状況は大きな制約です。また、Sender<T>, Receiver<T>Syncを持たないのでスレッド間で安全に受け渡すことができません。これを改善し、Multi-Producer Multi-Consumer (MPMC)としたのがcrossbeam-channelです。

単純なサンプルを書いてみました:

extern crate crossbeam_channel;

use crossbeam_channel::unbounded;

use std::thread;
use std::time::Duration;

fn main() {
    // 1. create unbounded channel
    let (tx, rx) = unbounded();

    // 2. start worker (consumer) threads
    let th_cs: Vec<_> = (0..3)
        .map(|n| {
            let rx = rx.clone();
            thread::spawn(move || loop {
                match rx.recv() {
                    Ok(i) => println!("Get {} on thread {}", i, n),
                    Err(_) => break,
                };
                thread::sleep(Duration::from_secs(1));
            })
        })
        .collect();

    // 3. push tasks
    for i in 0..10 {
        tx.send(i).unwrap();
    }
    drop(tx); // close sender

    // 4. wait workers
    for t in th_cs {
        t.join().unwrap();
    }
}
  1. crossbeam-channel::unbounded() によりMPMCチャンネルを作成
    • 上限個数を指定する場合はbounded()で作成
    • 送受信するデータは同じくusize
  2. ワーカースレッドを起動
    • ワーカーは rxのコピーを保持して、そこからデータを読み出す
    • チャンネルが閉じられてたら rx.recv() がエラー(RecvError)を投げるので、それをもってスレッドを終了する
  3. メインスレッドでタスクをチャンネルに投入
    • drop(tx)で明示的にチャンネルを閉じてる
  4. ワーカースレッドの終了を待機

最後に

channelによる並行処理の記述はメモリを共有して行う処理よりも分かりやすくて良いですね(/・ω・)/

26
15
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
26
15