ひとつの Vec<T>
を複数のスレッドから同時に操作したいことがあります. 例えば Vec<i32>
の全成分に 1 を足す計算は, シリアルなら
const N: usize = 1024;
fn main() {
let mut vec: Vec<i32> = (0..N as i32).collect();
for x in vec.iter_mut() {
*x += 1;
}
println!("{:?}", &vec);
}
と書けますが, この手の処理は並列化したくなります1. これは各成分を Mutex
にする (つまり Vec<Mutex<T>>
を使う) ことで実現できます. この際に Vec<Mutex<T>>
そのものをスレッド間で共有したいので Arc
で包む必要があります.
use std::thread;
use std::sync::{Arc, Mutex};
const N: usize = 1024;
const THREAD: usize = 4; // 子スレッドの数
fn main() {
let vec: Vec<Mutex<i32>> = (0..N as i32).map(|n| Mutex::new(n)).collect();
let vec = Arc::new(vec);
let handles = (0..THREAD).map(|c| {
let vec = Arc::clone(&vec);
thread::spawn(move || {
// c 番目のスレッドは最初の c 個を飛ばして `THREAD` 個置きに要素を取得する.
// 全スレッド (c=0..`THREAD`) の処理が終われば, これで `vec` の全要素に一度ずつ触ったことになる.
for x in vec.iter().skip(c).step_by(THREAD) {
let mut x = x.lock().unwrap();
*x += 1;
}
})
}).collect::<Vec<_>>(); // collect するとイテレータの各要素が評価されて子スレッドの処理が始まる.
// 子スレッドの処理の終了待ち
for handle in handles.into_iter() {
let _ = handle.join().unwrap();
}
// Arc から vec を取り出す
let vec = Arc::try_unwrap(vec).unwrap();
// 結果の表示
for x in vec.iter() {
println!("{}", x.lock().unwrap());
}
}
Mutex を lock しているので, 複数のスレッドが同一の要素 (例えば vec[0]
) にアクセスすることも可能です. 後からアクセスしようとしたスレッドは, その要素が解放されるまで待機します.
なお, 上のコードでは vec
のどの要素も処理にかかる時間がほぼ同じだと思っていますが, 要素によって時間がまちまちだったりすると, スレッドプール式の実装をした方がよいかもしれません.
2019-11-17 追記 vec
を Arc から取り出すようにコードを修正しました. 上のコードではデータは immutable なインスタンス Arc<Vec<_>>
としてスレッド間共有されているため, その構造を変更するような操作 (例えば要素数を増減すること) はできません. あくまでその要素 Mutex<T>
の中の値を書き換える機能だけを有します (Mutex の内部可変性). 一方, 並列処理が終わった後に Arc から中身を取り出すと単なる Vec<_>
になりますから, メインスレッドは (mut
付きで確保すれば) 要素数を増減するなど任意にその構造を変更できるようになります. 例えば Vec<Mutex<T>>
を Vec<T>
へと変換したければ
let vec = vec.into_iter()
.map(|x| *x.lock().unwrap())
.collect::<Vec<_>>();
です.
-
さすがに 1 を足すだけでは単純すぎて並列化してもオーバーヘッドのためむしろ遅くなりますが... 当然もっと複雑な処理を想定しています. ↩