Rustで並行プログラミングを行う場合、標準ライブラリの機能だけでは足りないことがあります。より高度な並行処理パターンを実装したい場合や、パフォーマンスを最適化したい場合には、サードパーティのライブラリを利用する必要があります。
Crossbeamは、そのような要望に応えるために開発された、Rustの並行プログラミングのためのツールセットです。標準ライブラリを補完し、より強力な並行処理機能を提供します。
この記事では、Crossbeamの基本的な機能を紹介し、実際のコード例を通してその使い方を解説します。
Crossbeamとは
Crossbeamは、Rustにおける並行プログラミングのためのツールセットです。標準ライブラリのstd::sync
やstd::thread
を強化・拡張する形で、以下のような機能を提供しています:
- アトミック操作
- データ構造
- メモリ管理
- スレッド同期
- その他のユーティリティ
特に、標準ライブラリのstd::sync::mpsc
チャネルよりも高機能で高性能なチャネルの実装や、スレッド間でデータを共有するための安全な仕組みなど、実用的な機能が多数含まれています。
インストール方法
Crossbeamを使用するには、Cargo.toml
に以下の依存関係を追加します:
[dependencies]
crossbeam = "0.8"
特定の機能だけを使いたい場合は、個別のクレートを指定することもできます:
[dependencies]
crossbeam-channel = "0.5" # チャネルだけ使用する場合
主な機能
Crossbeamが提供する主な機能を見ていきましょう。
1. チャネル(Channel)
Crossbeamのチャネルは、標準ライブラリのmpsc
チャネルよりも多機能で高性能です。マルチプロデューサー・マルチコンシューマー(MPMC)モデルをサポートしており、送信者と受信者の両方を複数のスレッド間で共有できます。
use crossbeam::channel;
fn main() {
// 有界チャネル(バッファサイズ10)の作成
let (s, r) = channel::bounded(10);
// 無界チャネル(理論上無制限のバッファサイズ)の作成
let (s_unbounded, r_unbounded) = channel::unbounded();
// 送信
s.send(42).unwrap();
// 受信
let value = r.recv().unwrap();
println!("受信した値: {}", value);
}
select!マクロ
select!
マクロは、複数のチャネル操作を同時に待機できる強力な機能です。これにより、複数のチャネルからの入力を効率的に処理できます。
use crossbeam::channel::{self, select};
use std::time::Duration;
fn main() {
let (s1, r1) = channel::bounded(1);
let (s2, r2) = channel::bounded(1);
// タイムアウト用のチャネル
let timeout = channel::after(Duration::from_secs(1));
// 別スレッドからデータを送信
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(500));
s1.send("メッセージ1").unwrap();
});
// select!で複数のチャネルを待機
select! {
recv(r1) -> msg => println!("チャネル1からメッセージを受信: {:?}", msg),
recv(r2) -> msg => println!("チャネル2からメッセージを受信: {:?}", msg),
recv(timeout) -> _ => println!("タイムアウトしました"),
}
}
2. スコープ付きスレッド(Scoped Threads)
標準ライブラリのスレッドでは、スタック上の変数を別スレッドで借用することは通常できません。Crossbeamのscope
機能を使うと、スタック上の変数を安全に借用しながら新しいスレッドを生成できます。
use crossbeam::scope;
fn main() {
let mut numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// スコープ付きスレッドの作成
scope(|s| {
// スタック上の変数(numbers)への参照を渡す
s.spawn(|_| {
// numbersの先頭半分を処理
for i in 0..numbers.len() / 2 {
numbers[i] *= 2;
}
});
// 別スレッドを生成
s.spawn(|_| {
// numbersの後半を処理
for i in numbers.len() / 2..numbers.len() {
numbers[i] *= 3;
}
});
// スコープを抜けるとき、すべてのスレッドの終了を待機
}).unwrap();
// スレッドが修正した値を確認
println!("処理後の数値: {:?}", numbers);
}
3. アトミックなデータ構造
Crossbeamは様々なアトミックなデータ構造を提供します。
AtomicCell
AtomicCell<T>
は、型T
の値をスレッド間で安全に共有するための汎用コンテナです。標準ライブラリのAtomicUsize
などとは異なり、あらゆる型(Copy
トレイトの実装の有無に関わらず)に対応します。
use crossbeam::atomic::AtomicCell;
fn main() {
// 整数値のAtomicCell
let atomic_num = AtomicCell::new(10);
// 値の取得
let value = atomic_num.load();
println!("現在の値: {}", value);
// 値の更新
atomic_num.store(20);
// フェッチ&加算
let old_value = atomic_num.fetch_add(5);
println!("以前の値: {}, 現在の値: {}", old_value, atomic_num.load());
// 文字列のようなCopyでない型も使用可能
let atomic_string = AtomicCell::new(String::from("Hello"));
// スワップ操作
let old_string = atomic_string.swap(String::from("World"));
println!("以前の文字列: {}, 現在の文字列: {}", old_string, atomic_string.load());
}
4. Work-Stealing Deque
Crossbeamのdeque
モジュールは、タスクスケジューラーの構築に使用できるWork-Stealing Dequeを提供します。これは主に、スレッドプールやタスクベースの並行処理システムを構築する際に役立つ高度な機能です。
以下は、公式ドキュメントに基づく基本的な使用例です:
use crossbeam::deque::{Worker, Stealer};
use std::thread;
fn main() {
// ワーカーとスティーラーを作成
let w = Worker::new_fifo();
let s = w.stealer();
// いくつかのタスクをワーカーのキューに追加
w.push(1);
w.push(2);
w.push(3);
// 別スレッドでスティーラーを使用
let handle = thread::spawn(move || {
// 他のスレッドからタスクを盗む
match s.steal() {
crossbeam::deque::Steal::Success(task) => println!("盗んだタスク: {}", task),
crossbeam::deque::Steal::Empty => println!("キューは空です"),
crossbeam::deque::Steal::Retry => println!("競合が発生、再試行が必要"),
}
});
// 自分のキューからタスクを処理
while let Some(task) = w.pop() {
println!("タスクを処理: {}", task);
}
// 別スレッドの終了を待機
handle.join().unwrap();
}
Work-Stealing Dequeは、高度な並行処理パターンを実装する際に特に役立ちます。例えば、独自のスレッドプールや並行タスク処理システムを構築する場合などです。
5. エポックベースのガベージコレクション
Crossbeamのepoch
モジュールは、「ロックフリーなデータ構造を構築するためのエポックベースのガベージコレクション」を提供します。これはロックフリーデータ構造の構築に役立ちます。
エポックベースのガベージコレクションはかなり高度なトピックですが、公式ドキュメントでは以下のような基本的な使い方が示されています:
use crossbeam::epoch::{self, Atomic, Owned};
use std::sync::atomic::Ordering;
fn main() {
// Atomic<T>は共有参照を格納するためのコンテナ
let a = Atomic::new(1);
// 現在のエポック(ガベージコレクション用)にスレッドを「ピン」
let guard = &epoch::pin();
// アトミック値の読み込み
let p = a.load(Ordering::SeqCst, guard);
// 新しい値で置き換え
a.store(Owned::new(2), Ordering::SeqCst);
// 変更を確認
let updated = a.load(Ordering::SeqCst, guard);
println!("更新後の値: {:?}", unsafe { updated.as_ref() });
// guardがドロップされるとエポックが進み、
// もう参照されていないデータがガベージコレクションされる可能性がある
}
この機能は主に、ロックフリーなデータ構造(リストやマップなど)を自分で実装する場合に使われるものであり、通常の並行プログラミングでは直接使う機会は少ないでしょう。
Crossbeamの基本的な使用例
以下の例では、Crossbeamの基本機能を組み合わせて使う方法を示します。scope
を使ってスタック上のデータを共有しながら並行処理を行う例です。
use crossbeam::scope;
fn main() {
// 共有するデータ
let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut results = vec![0; 8];
// スコープ付きスレッドを使用
scope(|s| {
// 前半を処理するスレッド
s.spawn(|_| {
for i in 0..4 {
results[i] = data[i] * 10;
}
});
// 後半を処理するスレッド
s.spawn(|_| {
for i in 4..8 {
results[i] = data[i] * 10;
}
});
// スコープを抜けるとすべてのスレッドが終了するのを待機
}).unwrap();
// 両方のスレッドが処理を完了した後に結果を表示
println!("結果: {:?}", results);
}
もう少し実用的な例として、チャネルとスコープを組み合わせたシンプルなプロデューサー ー コンシューマーパターンを見てみましょう:
use crossbeam::channel;
use crossbeam::scope;
use std::time::Duration;
fn main() {
// 有界チャネルを作成
let (sender, receiver) = channel::bounded(5);
scope(|s| {
// プロデューサースレッド
s.spawn(|_| {
for i in 0..10 {
// データを送信
if sender.send(i).is_err() {
println!("受信者が切断されました");
break;
}
println!("送信: {}", i);
std::thread::sleep(Duration::from_millis(100));
}
// スコープを抜けるとsenderは自動的にドロップされる
});
// コンシューマースレッド
s.spawn(|_| {
// チャネルからデータを受信
while let Ok(data) = receiver.recv() {
println!("受信: {}", data);
std::thread::sleep(Duration::from_millis(200));
}
println!("チャネルが閉じられました");
});
// スコープを抜けると両方のスレッドの終了を待機
}).unwrap();
}
Crossbeamのその他の機能
Crossbeamには、この記事で紹介した以外にも多くの便利な機能があります:
- ShardedLock: 高速な並行読み取りをサポートするシャード化された読み取り/書き込みロック
- WaitGroup: 計算の開始または終了を同期するためのプリミティブ
- ArrayQueue/SegQueue: 複数生産者・複数消費者(MPMC)向けのキュー実装
- Parker: スレッドパーキングのプリミティブ
- CachePadded: キャッシュライン長にパディングして値を整列させる機能
まとめ
Crossbeamは、Rustの並行プログラミングを強化するための強力なツールセットです。標準ライブラリが提供する基本的な並行処理機能を拡張し、より高性能かつ使いやすいAPIを提供します。
Crossbeamは特に以下のような場合に役立ちます:
- 複数の生産者と複数の消費者を持つ通信パターンが必要な場合(標準ライブラリのmpscよりも機能的で高性能)
- スタック上の変数をスレッド間で共有したい場合(スコープ機能)
- 高性能な並行データ構造が必要な場合(ワークスチーリングキューなど)
- スレッド同期のためのより高度なプリミティブが必要な場合(WaitGroupなど)
高機能すぎて使いこなすのも大変そうですが学んでいきたい