C#のコード
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class ConcurrentQueueExample
{
public static void Main(string[] args)
{
// ConcurrentQueue のインスタンスを生成
var queue = new ConcurrentQueue<string>();
// producer スレッドのタスク
Task producerTask = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
string data = $"Data {i}";
queue.Enqueue(data);
Console.WriteLine($"Producer: Enqueued {data}");
Thread.Sleep(100); // 少し間隔をあける
}
});
// consumer スレッドのタスク
Task consumerTask = Task.Run(() =>
{
while (true)
{
if (queue.TryDequeue(out string data))
{
Console.WriteLine($"Consumer: Dequeued {data}");
// データの処理を行う
Thread.Sleep(200); // 少し処理時間をシミュレート
}
else
{
// キューが空の場合は少し待機
if (producerTask.IsCompleted && queue.IsEmpty)
{
break; // producerが完了し、かつキューが空なら終了
}
Thread.Sleep(50);
}
}
});
// producerとconsumerの完了を待機
Task.WhenAll(producerTask, consumerTask).Wait();
Console.WriteLine("処理完了");
}
}
Rustのコード
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// チャネルの作成
let (tx, rx) = mpsc::channel();
// Producer スレッド
let producer_tx = tx.clone();
let producer_handle = thread::spawn(move || {
for i in 0..10 {
let data = format!("Data {}", i);
producer_tx.send(data.clone()).unwrap(); // ここを修正
println!("Producer: Enqueued {}", data);
thread::sleep(Duration::from_millis(100));
}
});
// Consumer スレッド
let consumer_handle = thread::spawn(move || {
loop {
match rx.recv() {
Ok(data) => {
println!("Consumer: Dequeued {}", data);
// データ処理をシミュレート
thread::sleep(Duration::from_millis(200));
}
Err(_) => {
// チャネルが閉じられたら終了
println!("Consumer: Channel closed, exiting");
break;
}
}
}
});
// スレッドの終了を待機
producer_handle.join().unwrap();
drop(tx); // これがないと consumer のループが終了しない
consumer_handle.join().unwrap();
println!("処理完了");
}
メモ
use std::sync::mpsc
- use: 別のモジュールやクレート(パッケージ)から特定のアイテム(型、関数、モジュールなど)を現在のスコープに導入する
- std: Rustの標準ライブラリを提供するクレート
- sync: 複数のスレッド間での共有データへのアクセスや、スレッド間通信などの同期機能を提供するモジュール
mpsc
複数producer、単一consumerのチャネル。producer側をclone()して増やして使う。1チャンネルだけならcloneしなくても使うこともできる。consumer側はclone()を実装していないのでclone()できない
let (tx, rx) = mpsc::channel();
letの定義のときにタプルを使える。
(let tx, let rx) = mpsc::channel();
このようには書けない
drop
スコープ途中でインスタンスを破棄する。clone()されたconsumerはスレッド内のスコープ抜けでdrop()されている。Copyトレイトを実装した型はdrop()しても何も起きずコンパイル時に警告が出る。
theradの開始、クロージャ (|| { ... })
クロージャはラムダ関数。thread::spawnにラムダ関数を渡してスレッドを起動する
クロージャには3種類あって(Fn, FnMut, FnOnce)、Fn は副作用がないこと、FnMut は副作用がり、FnOnce は一度しか呼び出せない。Fn, FnMut, FnOnce のいずれのトレイトをクロージャが実装するかは、主にクロージャの内部処理(キャプチャした変数をどのように使うか)によって決まる
関数の引数にクロージャを渡す場合などFn, FnMut, FnOnceを使い分ける。どの種類のクロージャになるかはコンパイル時に決まるが、Fn,FnMut,FnOnceなどの指定の型とあっていないとコンパイルエラーとなる
let add_one = |x: i32| x + 1; // 引数あり、 Fn かもしれない
let increment = || count += 1; // 引数なし、 FnMut かもしれない
let greet = move || println!("hi"); // 引数なし、FnOnce かもしれない
クロージャの前にmoveを付けたときがけ、クロージャ内部に所有権が移る。一部だけ所有権を移すとかはできずに、登場する変数全部の所有権がわたってしまう。
所有権を破棄されると困るものはcloneする。Copyトレイルを実装していればコピーになるのでもともとの所有権はそのままのこる
その他
String型はCopyトレイルを実装していない
matchはswitch case相当