5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rust勉強中 - その25 -> 並列処理

Last updated at Posted at 2019-11-04

自己紹介

出田 守と申します。
しがないPythonプログラマです。
情報セキュリティに興味があり現在勉強中です。CTFやバグバウンティなどで腕を磨いています。主に低レイヤの技術が好きで、そっちばかり目が行きがちです。

Rustを勉強していくうえで、読んで学び、手を動かし、記録し、楽しく学んでいけたらと思います。

環境

新しい言語を学ぶということで、普段使わないWindowsとVimという新しい開発環境で行っています。
OS: Windows10 Home 64bit 1903
CPU: Intel Core i5-3470 @ 3.20GHz
Rust: 1.38.0
RAM: 8.00GB
Editor: Vim 8.1.1
Terminal: PowerShell

前回

前回はI/Oについて学びました。
Rust勉強中 - その24

fork-join

fork-joinはあるジョブを独立した複数のスレッドとして分割し、それぞれのスレッドで分割されたジョブを実行します。
forkは新たなスレッドを作成し、joinはスレッドの終了を待ちます。
実装が単純で、共有資源のロックがなくスレッドが完全に独立していれば決定的に動作します。ただし、ジョブをCPU負荷的に均等に分配できない可能性があることや、計算結果の統合に時間を要する場合があります。

std::thread::spawnとjoin

spawnはスレッドを新たに作成するために使用します。
spawnの定義は以下のようになっています。

pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T, F: Send + 'static, T: Send + 'static
{
    Builder::new().spawn(f).expect("failed to spawn thread")
}

関数またはクロージャfはFnOnceを制約としています。戻り値にはJoinHandle型を返します。
JoinHandle型のメソッドにjoinが実装されています。
joinはスレッドの終了を待ちます。

pub fn join(mut self) -> Result<T> {
    self.0.join()
}

joinはstd::thread::Resultを返し、子スレッドがパニックを起こした場合エラーになります。さらに、子スレッドで返された値を親スレッドに返します。

以下はfork-joinの例です。

use std::io;
use std::io::prelude::*;
use std::fs::File;
fn passwords(filepath: &str) -> io::Result<Vec<String>> {
    let fd = File::open(filepath)?;
    let reader = io::BufReader::new(fd);
    Ok(reader.lines().filter(|l| l.is_ok()).collect::<io::Result<Vec<String>>>()?)
}

use std::fmt::Display;
fn do_something<T>(passwords: T) -> io::Result<()> 
    where T: IntoIterator,
          T::Item: Display
{
    let fd         = io::stdout();
    let stdout     = fd.lock();
    let mut writer = io::BufWriter::new(stdout);
    for password in passwords {
        writeln!(writer, "{}", password)?;
    }
    Ok(())
}

use std::thread::spawn;
fn parallel(passwords: Vec<String>) -> io::Result<()> {
    const N_THREADS: usize = 2;
    let harf = passwords.len()/N_THREADS;
    let mut chunks: Vec<Vec<String>> = vec![vec![]; 2];
    passwords
        .into_iter()
        .enumerate()
        .for_each(|(i, x)| {
            if i<harf {
                chunks[0].push(x);
            } else {
                chunks[1].push(x);
            }
        });
    
    // Fork
    let mut threads = vec![];
    for chunk in chunks {
        threads.push(
            spawn(move || do_something(chunk))
        );
    }

    // Join
    for thread in threads {
        thread.join().unwrap()?;
    }

    Ok(())
}

fn main() {
    let filepath = "rockyou.txt";
    let passwords = passwords(filepath).expect("Failed to read file");
    // do_something(passwords);

    parallel(passwords).unwrap();
}

rockyou.txtというパスワードリストをpasswords関数で読み込みます。do_somethingはそのパスワードリストに対して何かを行います。この例の場合は単純に一行ずつ表示しています。
これをparallel関数でfork-join並列を実現してみました。
前半部分は単純にパスワードリストを2分割しています。つまりスレッドを2つだけ作成します。
fork部分では、spawnにクロージャとジョブとそれに必要なデータを渡しています。moveとしているのは、値のコピーによるオーバーヘッドを防ぐためです。
join部分では、各スレッドの終了を待ちます。この時.unwrap()はjoinメソッドのthread::Resultに対するもので、子スレッドがパニックを起こせば親スレッドもパニックとして扱うようにしています。次の?は子スレッドの戻り値io::Resultに対してのものです。
また、Arcを用いることで、スレッド間で不変な値を共有することができます。

...
use std::fmt::Display;
fn do_something<T>(known_id: &String, passwords: T) -> io::Result<()> 
    where T: IntoIterator,
          T::Item: Display
{
    let fd         = io::stdout();
    let stdout     = fd.lock();
    let mut writer = io::BufWriter::new(stdout);
    for password in passwords {
        writeln!(writer, "{}, {}", known_id, password)?;
    }
    Ok(())
}

use std::sync::Arc;
use std::thread::spawn;
fn parallel(known_id: String, passwords: Vec<String>) -> io::Result<()> {
    ...
    let arc_id = Arc::new(known_id);
    ...
    // Fork
    let mut threads = vec![];
    for chunk in chunks {
        let cloned_id = arc_id.clone();
        threads.push(
            spawn(move || do_something(&cloned_id, chunk))
        );
    }

    ...
}

fn main() {
    ...
    let known_id = String::from("root");
    ...
    parallel(known_id, passwords).unwrap();
}

チャネル

チャネルは値を、あるスレッドから別のスレッドに送信します。
以下に簡単な例を示します。これはほとんどドキュメントの例と同じです。

use std::sync::mpsc::channel;
use std::thread::spawn;

fn main() {
    let (sender, reciever) = channel();
    let sender2 = sender.clone();

    spawn(move || {
        sender.send("channel1").unwrap();
    });

    spawn(move || {
        sender2.send("channel2").unwrap();
    });

    for data in reciever {
        println!("{}", data);
    }
}

チャネルはstd::sync::mpsc::channelメソッドを使用して、senderとrecieverを作成します。sender.send(item)とすることで、値をチャネルに置きます。値の送信は所有権を移動させることを意味します。sender自体はクローンすることで、一つのreciverに対して複数のsenderを持つことができます。
reciever.recv()とすることでチャネルの値を一つ取り出します。reciever.recv()とすると、チャネルが空ならsenderから値が送信されるまでブロックされます。またrecieverはクローン出来ません。
senderもしくはrecieverどちらか一方がドロップされるとエラーを返します。

senderが値を送りすぎてメモリの無駄にメモリの消費が大きい場合は、std::sync::mpsc::sync_channelメソッドを用いることで、送信容量を制限し送信をブロックすることができます。これを同期チャネルと呼ばれます。

Mutex<T>

pub struct Mutex<T: ?Sized> {
    inner: Box<sys::Mutex>,
    poison: poison::Flag,
    data: UnsafeCell<T>,
}

あるデータをスレッド間で排他アクセスさせたい場合にMutexを使います。Mutexは排他ロックを用いて、排他アクセスのロックを取得した際にデータにmutなアクセスを可能にします。
以下に簡単な例を示します。

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let n_threads = 3;
    let thread_ids = Arc::new(Mutex::new(vec![]));

    let mut handles = vec![];
    for i in 0..n_threads {
        let cloned_thread_ids = thread_ids.clone();
        handles.push(thread::spawn(move || {
            let mut mutex_data = cloned_thread_ids.lock().unwrap();
            mutex_data.push(i);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", thread_ids); // Mutex { data: [0, 1, 2] } 
}

これは各スレッドでスレッドの番号を可変共有データのベクタにpushしています。
Mutex::newで可変共有データを作ります。Arcはスレッド間でデータを共有するために使用しています。
排他ロックを得るにはlockメソッドを使います。lockはResultを返します。この時もし、他のスレッドで何らかのエラーがあった場合は毒されたMutexとしてマークされ、ResultのErrを返します。

...
        handles.push(thread::spawn(move || {
            let mut mutex_data = cloned_thread_ids.lock().unwrap();
            mutex_data.push(i);
            panic!("error");
        }));
...

意図的にパニックを起こしてみました。lock().unwrap()としているので、以下のようにパニックが発生しています。

thread '<unnamed>' panicked at 'error', src\bin\mutex.rs:14:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
thread '<unnamed>thread '' panicked at 'maincalled `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"' pa
nicked at '', called `Result::unwrap()` on an `Err` value: Anysrc\libcore\result.rs', :src\libcore\result.rs1084::10845:
5thread '
<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src\libcore\result.rs
:1084:5

並列処理はまだまだ深い部分もあるみたいですが、さわりだけということで、今回はここまでとします。

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?