動機
こんな感じで計算機シミュレーションを行って結果を求めたい場合がある.
fn func() -> i32 {
// ... (some heavy simulation) ...
}
fn simulation() {
let mut sum = 0;
let trial = 10;
for _ in 0..trial {
sum += func();
}
return sum / trial;
}
C/C++
ならOpenMPI
,Python
ならmultiprocessing
とかjoblib
とかを使う.
Rust
ではrayon
やcrossbeam
を使うらしい.
本稿では,The Rust Programming Languageを参考にマルチスレッド(スレッドプール)を実装してみる1.
実装
まずcargo new --bin pool
で新しいプロジェクトを始める.
最終的にはこんな感じの構成になる.
.
├── Cargo.lock
├── Cargo.toml
└── src
├── bin
│ └── main.rs
└── lib.rs
ThreadPool
の実装は,おおよそ ここ に記載されている通りである.
上記のリンク先では,詳細に解説がなされているので参照してほしい.
また,下記は @tric さんにいただいたコメントも反映した形となってる.
use std::thread;
use std::sync::{mpsc, Arc, Mutex};
enum Message {
NewJob(Job),
Terminate,
}
pub struct ThreadPool {
workers: Vec<Worker>,
tx: mpsc::Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (tx, rx) = mpsc::channel();
let rx = Arc::new(Mutex::new(rx));
let workers = (0..size).map(|_| Worker::new(Arc::clone(&rx))).collect();
ThreadPool {
workers,
tx,
}
}
pub fn execute<F> (&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.tx.send(Message::NewJob(job)).unwrap();
}
}
struct Worker {
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(rx: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let msg = rx.lock().unwrap().recv().unwrap();
match msg {
Message::NewJob(job) => {
job();
},
Message::Terminate => {
break;
},
}
}
});
Worker {
thread: Some(thread),
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for _ in &mut self.workers {
self.tx.send(Message::Terminate).unwrap();
}
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
本稿で実行するジョブは下記のコードに示す通り一瞬で終わってしまうため,並列化の恩恵がわかりづらい.
よって,個々のジョブでthread::sleep(Duration::from_secs(1));
を挟んで時間を稼ぐ.
全部で10個のジョブがあるので,通常に処理するならば約10秒が必要なはずだ.
extern crate pool;
extern crate num_cpus;
use pool::ThreadPool;
use std::sync::{Mutex, Arc};
// For evaluation purpose:
use std::time::Duration;
use std::thread;
use std::time::{Instant};
fn main() {
let start = Instant::now();
let num_pool = num_cpus::get();
let m = Arc::new(Mutex::new(0));
{
// pool内の全スレッドがDropされるまで待つためのスコープ
// これがないと完了前に次の処理に移ってしまう
let pool = ThreadPool::new(num_pool);
for _ in 0..10 {
let m = Arc::clone(&m);
pool.execute(move || {
thread::sleep(Duration::from_secs(1));
let mut num = m.lock().unwrap();
*num += 1;
});
}
}
let result = *m.lock().unwrap();
println!("\nw/ Thread:\nResult: {}, elapsed:{:?}", result, start.elapsed());
let start = Instant::now();
let mut num = 0;
{
for _ in 0..10 {
thread::sleep(Duration::from_secs(1));
num += 1;
}
}
println!("\nw/o Thread:\nResult: {}, elapsed:{:?}", num, start.elapsed());
}
また,実行しているマシンで利用可能な論理コア数を取得するために外部crateのnum_cpusを利用している.
そのため,Cargo.toml
の[dependencies]
に追記する.
[package]
...
[dependencies]
num_cpus = "1.0"
実行結果
$ cargo run --release
Compiling pool v0.1.0 (/Users/Scstechr/Works/projects/pool)
Finished release [optimized] target(s) in 2.62s
Running `target/release/main`
w/ Thread:
Result: 10, elapsed:3.010929262s
w/o Thread:
Result: 10, elapsed:10.022431213s
ThreadPool
を利用した場合に,(試した自分の環境では)実行速度が約3分の1になっていることがわかる.