Rust でカメラ映像やセンサーデータなど、常に最新の情報だけを処理したい場合、どのようなチャネル設計が適切でしょう?
例えば、30ms ごとに新しいフレームが生成されるが、処理には 300ms かかるようなケースを考えてみます。処理スレッドが追いつかない場合、「古いデータは捨てて、常に最新のデータだけを処理したい」というのは自然な要求です。
Rust における「最新データ優先」の並行処理パターンについて、複数の実装アプローチを比較します。「古いデータを上書きするチャネル」の実装方法に焦点を当てます。
想定する条件
- カメラキャプチャスレッドが約30msごとに画像を取得している
- 処理スレッドは1フレームの処理に約300msかかる
- 複数の処理スレッドを使うが、全フレームを処理できるほど多くのスレッドは用意できない
- フレームをスキップすることは許容できるが、処理スレッドは常に「最新の」フレームを処理したい
チャネルが満杯になったとき、送信側がブロックされては要求を満たせません。古いデータを上書きするようなチャネルが必要となります。
実装アプローチ
1. tokio::sync::watch - 単一値の監視
Tokio の watch
チャネルは、まさにこの「最新値のみを保持する」パターンのために設計されています。
use tokio::sync::watch;
fn main() {
// 送信者と受信者を作成
let (tx, rx) = watch::channel(Frame::default());
// 送信者側(カメラキャプチャスレッド)
std::thread::spawn(move || {
loop {
let new_frame = capture_frame();
// 常に最新の値で上書き
let _ = tx.send(new_frame);
std::thread::sleep(std::time::Duration::from_millis(30));
}
});
// 受信者側(処理スレッド)
std::thread::spawn(move || {
let mut receiver = rx.clone();
loop {
// 変更を待ち、最新の値を取得
if receiver.changed().await.is_ok() {
let frame = receiver.borrow_and_update().clone();
process_frame(frame); // 約300ms
}
}
});
}
特徴:
- シンプルな実装
- 単一の値のみ保持する(最新のフレームだけを保持)
- 複数の受信者が同じ最新値を受け取れる(MPMC)
- 送信操作は非ブロッキング
- 古い値は新しい値によって自動的に上書きされる
ただし、watch
チャネルは単一の値しか保持できないため、複数の最新フレームを保持したい場合は別のアプローチが必要です。
2. tokio::sync::broadcast - 設定したバッファサイズまでの値を保持
より柔軟なアプローチとしては、Tokio の broadcast
チャネルがあります。
use tokio::sync::broadcast;
fn main() {
// キャパシティを持つチャネルを作成
let (tx, _) = broadcast::channel(10); // 10フレームまでバッファリング
// 送信者側
let tx_clone = tx.clone();
std::thread::spawn(move || {
loop {
let new_frame = capture_frame();
// バッファがいっぱいの場合、送信は失敗せず古いメッセージが削除される
let _ = tx_clone.send(new_frame);
std::thread::sleep(std::time::Duration::from_millis(30));
}
});
// 受信者側
let mut rx = tx.subscribe();
std::thread::spawn(move || {
loop {
match rx.recv().await {
Ok(frame) => {
process_frame(frame); // 処理(約300ms)
}
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("{}フレームスキップしました", n);
}
Err(_) => break,
}
}
});
}
特徴:
- 指定したキャパシティまでメッセージをバッファリング
- バッファがいっぱいになると、古いメッセージが自動的に削除される
- 複数の受信者がそれぞれ独立してメッセージを消費できる
- 受信者が処理に追いつけないと
Lagged
エラーを受け取る - どのメッセージが処理されるかをより細かく制御できる
3. crossbeam + semaphore
crossbeam
チャネルと semaphore
を組み合わせた実装も可能です。
use crossbeam_channel::{bounded, select};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
struct Semaphore {
count: AtomicUsize,
}
impl Semaphore {
fn new(count: usize) -> Self {
Self { count: AtomicUsize::new(count) }
}
fn try_acquire(&self) -> bool {
let mut current = self.count.load(Ordering::SeqCst);
while current > 0 {
match self.count.compare_exchange(
current, current - 1, Ordering::SeqCst, Ordering::SeqCst
) {
Ok(_) => return true,
Err(val) => current = val,
}
}
false
}
fn release(&self) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
fn main() {
// 処理スレッド数に基づいたセマフォを作成
let semaphore = Arc::new(Semaphore::new(3)); // 同時に3つのフレームまで処理可能
// チャネルを作成
let (tx, rx) = bounded(10); // バッファサイズは適宜調整
let rx = Arc::new(Mutex::new(rx));
// 送信者側
std::thread::spawn(move || {
loop {
let new_frame = capture_frame();
// セマフォで処理スロットが空いているときだけ送信を試みる
if semaphore.try_acquire() {
if tx.send(new_frame).is_err() {
// 送信エラー時はセマフォを解放
semaphore.release();
break;
}
} else {
// 処理スロットがない場合は古いフレームをスキップ
println!("処理スレッドがビジー状態: フレームをスキップします");
}
std::thread::sleep(std::time::Duration::from_millis(30));
}
});
// 複数の処理スレッド
for _ in 0..3 {
let rx_clone = Arc::clone(&rx);
let semaphore_clone = Arc::clone(&semaphore);
std::thread::spawn(move || {
loop {
let frame = {
let rx_guard = rx_clone.lock().unwrap();
match rx_guard.recv() {
Ok(frame) => frame,
Err(_) => break,
}
};
// フレーム処理
process_frame(frame);
// 処理が終わったらセマフォを解放
semaphore_clone.release();
}
});
}
}
特徴:
- 明示的に古い値を捨てる処理が必要
- 他のアプローチに比べて制御が複雑になる可能性がある
4. ringbuf - リングバッファを使用
ringbuf
クレートを使用したリングバッファベースの実装も考えられます。
use ringbuf::{HeapRb, Rb, Consumer, Producer};
fn main() {
// リングバッファを作成
let rb = HeapRb::<Frame>::new(1); // バッファサイズ1
let (mut prod, mut cons) = rb.split();
// 送信者側
std::thread::spawn(move || {
loop {
let new_frame = capture_frame();
// 古い要素を上書き
let _ = prod.push_overwrite(new_frame);
std::thread::sleep(std::time::Duration::from_millis(30));
}
});
// 受信者側
std::thread::spawn(move || {
loop {
if let Some(frame) = cons.pop() {
process_frame(frame); // 処理(約300ms)
} else {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
});
}
特徴:
-
push_overwrite
で古い要素を簡単に上書き可能 - リングバッファの操作にはロックが必要
- 明示的なバッファサイズ管理が可能
- シンプルな API
ただし、push_overwrite
にはロックが必要なため、高速な処理が求められる場合には注意が必要です。
5. リングバッファ + Mutex + CondVar
リングバッファと同期プリミティブを組み合わせる方法です。
use std::sync::{Arc, Mutex, Condvar};
use std::collections::VecDeque;
struct OverwritingChannel<T> {
buffer: Mutex<VecDeque<T>>,
capacity: usize,
not_empty: Condvar,
}
impl<T> OverwritingChannel<T> {
fn new(capacity: usize) -> Self {
Self {
buffer: Mutex::new(VecDeque::with_capacity(capacity)),
capacity,
not_empty: Condvar::new(),
}
}
fn send(&self, item: T) {
let mut buffer = self.buffer.lock().unwrap();
if buffer.len() == self.capacity {
buffer.pop_front(); // 最も古い要素を削除
}
buffer.push_back(item);
self.not_empty.notify_one();
}
fn recv(&self) -> T {
let mut buffer = self.buffer.lock().unwrap();
while buffer.is_empty() {
buffer = self.not_empty.wait(buffer).unwrap();
}
buffer.pop_front().unwrap()
}
}
特徴:
- 要件に完全に合わせたカスタム実装が可能
- 細かい挙動を制御できる
- 実装が複雑
- バグを作り込みそう
- パフォーマンスの最適化が難しそう
まとめ
Rust で「古いデータを上書きするチャネル」を実装する以下のアプローチを紹介しました:
- tokio::sync::watch - 最も簡単で単一値のみを扱う場合
- tokio::sync::broadcast - 複数のメッセージをバッファリングする場合
- crossbeam + semaphore - 非同期コードを避けたい場合
- ringbuf - リングバッファベースのシンプルな実装
- 自作実装 - 特殊な要件に対応する必要がある場合
要件を見直して、実装はシンプルにするのがいいかなとおもいました