0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Rustにおける並行処理パターンについての考察

Last updated at Posted at 2025-05-11

Rust でカメラ映像やセンサーデータなど、常に最新の情報だけを処理したい場合、どのようなチャネル設計が適切でしょう?

例えば、30ms ごとに新しいフレームが生成されるが、処理には 300ms かかるようなケースを考えてみます。処理スレッドが追いつかない場合、「古いデータは捨てて、常に最新のデータだけを処理したい」というのは自然な要求です。

Rust における「最新データ優先」の並行処理パターンについて、複数の実装アプローチを比較します。「古いデータを上書きするチャネル」の実装方法に焦点を当てます。

想定する条件

  1. カメラキャプチャスレッドが約30msごとに画像を取得している
  2. 処理スレッドは1フレームの処理に約300msかかる
  3. 複数の処理スレッドを使うが、全フレームを処理できるほど多くのスレッドは用意できない
  4. フレームをスキップすることは許容できるが、処理スレッドは常に「最新の」フレームを処理したい

チャネルが満杯になったとき、送信側がブロックされては要求を満たせません。古いデータを上書きするようなチャネルが必要となります。

実装アプローチ

1. tokio::sync::watch - 単一値の監視

Tokio の watch チャネルは、まさにこの「最新値のみを保持する」パターンのために設計されています。

use tokio::sync::watch;

fn main() {
    // 送信者と受信者を作成
    let (tx, rx) = watch::channel(Frame::default());
    
    // 送信者側(カメラキャプチャスレッド)
    std::thread::spawn(move || {
        loop {
            let new_frame = capture_frame();
            // 常に最新の値で上書き
            let _ = tx.send(new_frame);
            std::thread::sleep(std::time::Duration::from_millis(30));
        }
    });
    
    // 受信者側(処理スレッド)
    std::thread::spawn(move || {
        let mut receiver = rx.clone();
        loop {
            // 変更を待ち、最新の値を取得
            if receiver.changed().await.is_ok() {
                let frame = receiver.borrow_and_update().clone();
                process_frame(frame); // 約300ms
            }
        }
    });
}

特徴:

  • シンプルな実装
  • 単一の値のみ保持する(最新のフレームだけを保持)
  • 複数の受信者が同じ最新値を受け取れる(MPMC)
  • 送信操作は非ブロッキング
  • 古い値は新しい値によって自動的に上書きされる

ただし、watch チャネルは単一の値しか保持できないため、複数の最新フレームを保持したい場合は別のアプローチが必要です。

2. tokio::sync::broadcast - 設定したバッファサイズまでの値を保持

より柔軟なアプローチとしては、Tokio の broadcast チャネルがあります。

use tokio::sync::broadcast;

fn main() {
    // キャパシティを持つチャネルを作成
    let (tx, _) = broadcast::channel(10); // 10フレームまでバッファリング
    
    // 送信者側
    let tx_clone = tx.clone();
    std::thread::spawn(move || {
        loop {
            let new_frame = capture_frame();
            // バッファがいっぱいの場合、送信は失敗せず古いメッセージが削除される
            let _ = tx_clone.send(new_frame);
            std::thread::sleep(std::time::Duration::from_millis(30));
        }
    });
    
    // 受信者側
    let mut rx = tx.subscribe();
    std::thread::spawn(move || {
        loop {
            match rx.recv().await {
                Ok(frame) => {
                    process_frame(frame); // 処理(約300ms)
                }
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    println!("{}フレームスキップしました", n);
                }
                Err(_) => break,
            }
        }
    });
}

特徴:

  • 指定したキャパシティまでメッセージをバッファリング
  • バッファがいっぱいになると、古いメッセージが自動的に削除される
  • 複数の受信者がそれぞれ独立してメッセージを消費できる
  • 受信者が処理に追いつけないと Lagged エラーを受け取る
  • どのメッセージが処理されるかをより細かく制御できる

3. crossbeam + semaphore

crossbeam チャネルと semaphore を組み合わせた実装も可能です。

use crossbeam_channel::{bounded, select};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

struct Semaphore {
    count: AtomicUsize,
}

impl Semaphore {
    fn new(count: usize) -> Self {
        Self { count: AtomicUsize::new(count) }
    }
    
    fn try_acquire(&self) -> bool {
        let mut current = self.count.load(Ordering::SeqCst);
        while current > 0 {
            match self.count.compare_exchange(
                current, current - 1, Ordering::SeqCst, Ordering::SeqCst
            ) {
                Ok(_) => return true,
                Err(val) => current = val,
            }
        }
        false
    }
    
    fn release(&self) {
        self.count.fetch_add(1, Ordering::SeqCst);
    }
}

fn main() {
    // 処理スレッド数に基づいたセマフォを作成
    let semaphore = Arc::new(Semaphore::new(3)); // 同時に3つのフレームまで処理可能
    
    // チャネルを作成
    let (tx, rx) = bounded(10); // バッファサイズは適宜調整
    let rx = Arc::new(Mutex::new(rx));
    
    // 送信者側
    std::thread::spawn(move || {
        loop {
            let new_frame = capture_frame();
            // セマフォで処理スロットが空いているときだけ送信を試みる
            if semaphore.try_acquire() {
                if tx.send(new_frame).is_err() {
                    // 送信エラー時はセマフォを解放
                    semaphore.release();
                    break;
                }
            } else {
                // 処理スロットがない場合は古いフレームをスキップ
                println!("処理スレッドがビジー状態: フレームをスキップします");
            }
            std::thread::sleep(std::time::Duration::from_millis(30));
        }
    });
    
    // 複数の処理スレッド
    for _ in 0..3 {
        let rx_clone = Arc::clone(&rx);
        let semaphore_clone = Arc::clone(&semaphore);
        
        std::thread::spawn(move || {
            loop {
                let frame = {
                    let rx_guard = rx_clone.lock().unwrap();
                    match rx_guard.recv() {
                        Ok(frame) => frame,
                        Err(_) => break,
                    }
                };
                
                // フレーム処理
                process_frame(frame);
                
                // 処理が終わったらセマフォを解放
                semaphore_clone.release();
            }
        });
    }
}

特徴:

  • 明示的に古い値を捨てる処理が必要
  • 他のアプローチに比べて制御が複雑になる可能性がある

4. ringbuf - リングバッファを使用

ringbuf クレートを使用したリングバッファベースの実装も考えられます。

use ringbuf::{HeapRb, Rb, Consumer, Producer};

fn main() {
    // リングバッファを作成
    let rb = HeapRb::<Frame>::new(1); // バッファサイズ1
    let (mut prod, mut cons) = rb.split();
    
    // 送信者側
    std::thread::spawn(move || {
        loop {
            let new_frame = capture_frame();
            // 古い要素を上書き
            let _ = prod.push_overwrite(new_frame);
            std::thread::sleep(std::time::Duration::from_millis(30));
        }
    });
    
    // 受信者側
    std::thread::spawn(move || {
        loop {
            if let Some(frame) = cons.pop() {
                process_frame(frame); // 処理(約300ms)
            } else {
                std::thread::sleep(std::time::Duration::from_millis(10));
            }
        }
    });
}

特徴:

  • push_overwrite で古い要素を簡単に上書き可能
  • リングバッファの操作にはロックが必要
  • 明示的なバッファサイズ管理が可能
  • シンプルな API

ただし、push_overwrite にはロックが必要なため、高速な処理が求められる場合には注意が必要です。

5. リングバッファ + Mutex + CondVar

リングバッファと同期プリミティブを組み合わせる方法です。

use std::sync::{Arc, Mutex, Condvar};
use std::collections::VecDeque;

struct OverwritingChannel<T> {
    buffer: Mutex<VecDeque<T>>,
    capacity: usize,
    not_empty: Condvar,
}

impl<T> OverwritingChannel<T> {
    fn new(capacity: usize) -> Self {
        Self {
            buffer: Mutex::new(VecDeque::with_capacity(capacity)),
            capacity,
            not_empty: Condvar::new(),
        }
    }
    
    fn send(&self, item: T) {
        let mut buffer = self.buffer.lock().unwrap();
        if buffer.len() == self.capacity {
            buffer.pop_front(); // 最も古い要素を削除
        }
        buffer.push_back(item);
        self.not_empty.notify_one();
    }
    
    fn recv(&self) -> T {
        let mut buffer = self.buffer.lock().unwrap();
        while buffer.is_empty() {
            buffer = self.not_empty.wait(buffer).unwrap();
        }
        buffer.pop_front().unwrap()
    }
}

特徴:

  • 要件に完全に合わせたカスタム実装が可能
  • 細かい挙動を制御できる
  • 実装が複雑
  • バグを作り込みそう
  • パフォーマンスの最適化が難しそう

まとめ

Rust で「古いデータを上書きするチャネル」を実装する以下のアプローチを紹介しました:

  1. tokio::sync::watch - 最も簡単で単一値のみを扱う場合
  2. tokio::sync::broadcast - 複数のメッセージをバッファリングする場合
  3. crossbeam + semaphore - 非同期コードを避けたい場合
  4. ringbuf - リングバッファベースのシンプルな実装
  5. 自作実装 - 特殊な要件に対応する必要がある場合

要件を見直して、実装はシンプルにするのがいいかなとおもいました

参考リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?