8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rustで並行/並列処理

Last updated at Posted at 2019-10-22

動機

こんな感じで計算機シミュレーションを行って結果を求めたい場合がある.

main.rs
fn func() -> i32 {
   // ... (some heavy simulation) ...
}

fn simulation() {
  let mut sum = 0;
  let trial = 10;
  for _ in 0..trial {
     sum += func();
  }
  return sum / trial;
}

C/C++ならOpenMPIPythonならmultiprocessingとかjoblibとかを使う.
Rustではrayoncrossbeamを使うらしい.
本稿では,The Rust Programming Languageを参考にマルチスレッド(スレッドプール)を実装してみる1

実装

まずcargo new --bin poolで新しいプロジェクトを始める.
最終的にはこんな感じの構成になる.

ファイル構成
.
├── Cargo.lock
├── Cargo.toml
└── src
    ├── bin
    │   └── main.rs
    └── lib.rs

ThreadPoolの実装は,おおよそ ここ に記載されている通りである.
上記のリンク先では,詳細に解説がなされているので参照してほしい.
また,下記は @tric さんにいただいたコメントも反映した形となってる.

src/lib.rs
use std::thread;
use std::sync::{mpsc, Arc, Mutex};

enum Message {
    NewJob(Job),
    Terminate,
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    tx: mpsc::Sender<Message>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (tx, rx) = mpsc::channel();

        let rx = Arc::new(Mutex::new(rx));

        let workers = (0..size).map(|_| Worker::new(Arc::clone(&rx))).collect();

        ThreadPool {
            workers,
            tx,
        }
    }

    pub fn execute<F> (&self, f: F)
        where
            F: FnOnce() + Send + 'static
            {
                let job = Box::new(f);

                self.tx.send(Message::NewJob(job)).unwrap();
            }

}

struct Worker {
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(rx: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let msg = rx.lock().unwrap().recv().unwrap();

                match msg {
                    Message::NewJob(job) => {
                        job();
                    },
                    Message::Terminate => {
                        break;
                    },
                }
            }
        });

        Worker {
            thread: Some(thread),
        }
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for _ in &mut self.workers {
            self.tx.send(Message::Terminate).unwrap();
        }
        for worker in &mut self.workers {

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

本稿で実行するジョブは下記のコードに示す通り一瞬で終わってしまうため,並列化の恩恵がわかりづらい.
よって,個々のジョブでthread::sleep(Duration::from_secs(1));を挟んで時間を稼ぐ.
全部で10個のジョブがあるので,通常に処理するならば約10秒が必要なはずだ.

src/bin/main.rs
extern crate pool;
extern crate num_cpus;
use pool::ThreadPool;
use std::sync::{Mutex, Arc};

// For evaluation purpose:
use std::time::Duration;
use std::thread;
use std::time::{Instant};

fn main() {
    let start = Instant::now();
    let num_pool = num_cpus::get();
    let m = Arc::new(Mutex::new(0));
    { 
        // pool内の全スレッドがDropされるまで待つためのスコープ
        // これがないと完了前に次の処理に移ってしまう

        let pool = ThreadPool::new(num_pool); 

        for _ in 0..10 {
            let m = Arc::clone(&m);
            pool.execute(move || {
                thread::sleep(Duration::from_secs(1));
                let mut num = m.lock().unwrap();
                *num += 1;
            });
        }
    }
    let result = *m.lock().unwrap();
    println!("\nw/ Thread:\nResult: {}, elapsed:{:?}", result, start.elapsed());

    let start = Instant::now();
    let mut num = 0;
    {
        for _ in 0..10 {
            thread::sleep(Duration::from_secs(1));
            num += 1;
        }
    }
    println!("\nw/o Thread:\nResult: {}, elapsed:{:?}", num, start.elapsed());
}

また,実行しているマシンで利用可能な論理コア数を取得するために外部crateのnum_cpusを利用している.
そのため,Cargo.toml[dependencies]に追記する.

Cargo.toml
[package]
...

[dependencies]
num_cpus = "1.0"

実行結果


$ cargo run --release
   Compiling pool v0.1.0 (/Users/Scstechr/Works/projects/pool)
    Finished release [optimized] target(s) in 2.62s
     Running `target/release/main`

w/ Thread:
Result: 10, elapsed:3.010929262s

w/o Thread:
Result: 10, elapsed:10.022431213s

ThreadPoolを利用した場合に,(試した自分の環境では)実行速度が約3分の1になっていることがわかる.

  1. 特に16章および20章が参考になる.

8
7
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?