やっはろー。Sleep sort の実装を通して Rust における並列な処理を雑に理解しよう、という話をします。
Sleep sort とはソートアルゴリズムの一つで、値の大きさに応じてスリープさせて明けた順に出力すると値がソートされている、というものです。
#!/bin/bash
function f() {
sleep "$1"
echo "$1"
}
while [ -n "$1" ]
do
f "$1" &
shift
done
wait
常識を覆すソートアルゴリズム!その名も"sleep sort"! - Islands in the byte stream
スレッドをスリープする
基本のパターンをやります。thread::spawn
でスレッドをつくり、thread::sleep_ms
でスリープです。
use std::{env, thread};
fn main() {
let threads = env::args().skip(1).map(|x| {
thread::spawn(move || {
let n = x.parse::<u32>().unwrap_or(0);
thread::sleep_ms(n * 1000);
println!("{}", n);
})
})
.collect::<Vec<_>>();
for t in threads {
let _ = t.join();
}
}
$ cargo run -- 5 3 6 3 6 3 1 4 7
Running `target/debug/sandbox`
1
3
3
3
4
5
6
6
7
上では捨てていますが、spawn
の返す JoinHandle
を join
すると、スレッドの処理が終わるまで待ってくれる他、スレッドの返す値を受け取ることができます。
let t = thread::spawn(|| "きん!");
let r = t.join();
println!("{:?}", r);
// Ok("\u{304d}\u{3093}\u{ff01}")
println!("{}", r.unwrap());
// きん!
let t = thread::spawn(|| panic!("ぱつ!"));
let r = t.join();
println!("{:?}", r);
// Err(Any)
println!("{}", r.unwrap_err().downcast_ref::<&str>().unwrap());
// ぱつ!
値を共有する
変数に結果を書き込むパターンをやります。Arc
で所有権の共有、Mutex
で排他の制御をすることで、複数スレッドからの安全なアクセスが保証されるっぽいです。
use std::{env, thread};
use std::sync::{Arc, Mutex};
fn main() {
let result = Arc::new(Mutex::new(Vec::new()));
let threads = env::args().skip(1).map(|x| {
let result = result.clone();
thread::spawn(move || {
let n = x.parse::<u32>().unwrap_or(0);
thread::sleep_ms(n * 1000);
let mut result = result.lock().unwrap();
result.push(n);
})
})
.collect::<Vec<_>>();
for t in threads {
let _ = t.join();
}
for x in result.lock().unwrap().iter() {
println!("{}", x);
}
}
Arc
な値の参照カウンタを増やすための clone
と、Mutex
な値を参照するときの lock
が重要っぽいです。
チャンネルで通信する
チャンネルを通して結果を受け取るパターンをやります。Clojure や Go のそれと似ていますが、Sender
と Receiver
とで分かれているところが違います。
use std::{env, sync, thread};
fn main() {
let (tx, rx) = sync::mpsc::channel();
let threads = env::args().skip(1).map(|x| {
let tx = tx.clone();
thread::spawn(move || {
let n = x.parse::<u32>().unwrap_or(0);
thread::sleep_ms(n * 1000);
tx.send(n).unwrap();
})
})
.collect::<Vec<_>>();
for x in rx.iter().take(threads.len()) {
println!("{}", x);
}
}
チャンネルのイテレータを取るところが個人的にポイント高いんですが、普通にやるなら recv
があります。
let (tx, rx) = sync::mpsc::channel();
let _ = thread::spawn(move || {
thread::sleep_ms(3000);
tx.send("きん!").unwrap();
});
println!("ぱつ!");
println!("{}", rx.recv().unwrap());
終わりに
つまりは Book を読みなさいということなんだと思います。