目次
- 16.0 概要
- 16.1 スレッドを使用してコードを同時に走らせる
- 16.2 メッセージ受け渡しを使ってスレッド間でデータを転送する
- 16.3 状態共有並行性(複数のスレッド間でのメモリ共有)
-
16.4
SyncとSendトレイトで拡張可能な並行性
16.0 概要
-
std::threadの基本的な使い方(スレッド)- -->
spawnで新規スレッドを生成する,joinハンドルで全スレッドの終了を待つ - 新規スレッド作成時には
moveで環境変数の所有権を奪い取ることが多い
- -->
-
std::sync::mpscの基本的な使い方(スレッド間のメッセージのやり取りにチャンネルを用いる)- 送信機と受信機の対生成
- --> チャンネルの生成
- 送信
- -->
txからの送信
- -->
- 受信
-
txは送信する値の所有権を奪う- --> チャンネルと所有権の転送
-
rxをイテレータとして扱い、txからメッセージを複数回受け取る - 送信機を複数個に増やすには
mpsc::Sender::clone(&tx)する
- 送信機と受信機の対生成
-
Mutex<T>の基本的な使い方(相互排他的なデータアクセス)- ミューテックスはロックシステム経由で保持しているデータを死守する (guarding)
- 生成・ロック・アンロック
- マルチスレッドの場合は
Arc<T>と併用する- --> マルチスレッドの場合
-
Mutex<T>は、RefCell<T>のように内部可変性を提供する - デッドロック
-
SyncとSend(並行性に関連する不変条件を強制するマーカートレイト)
16.1 スレッドを使用してコードを同時に走らせる
Rust ではランタイムをできるだけ小さくするために、1:1 スレッドの実装のみを提供する。(M:N スレッドの実装をしたクレートもある)
用語解説
- プロセス:プログラムのコードの実行単位(1プログラムに対し1プロセス)
- スレッド:プログラム内の独立した部分を走らせる機能(1プログラム内で複数走る)
- プログラム言語によってスレッドの実装法は異なる
- 多くのOSで新規スレッド作成用のAPIが提供されている
- 言語がOSのAPIを呼び出すモデルを "1:1" と呼ぶ. このモデルでは、1つのOSスレッドに対し1つの言語スレッド
- 言語がスレッドの独自の特別な実装を提供し、各言語スレッドに対し異なる数のOSスレッドが実行されるモデルを "M:N" モデルと呼ぶ. また、プログラミング言語が提供するスレッドはグリーンスレッドと呼ばれる
-
ランタイム:、言語によって全てのバイナリに含まれるコード
- ランタイムが小さいと機能も少ないすが、バイナリのサイズも小
さくなる
- ランタイムが小さいと機能も少ないすが、バイナリのサイズも小
spawn で新規スレッドを生成する
-
main関数もスレッドを作成する模様- →
mainスレッドなどの既存スレッドの内部から他のスレッドを生成する
- →
-
新規スレッドを生成するには、
thread::spawn関数を呼び出す- 引数には、新規スレッドで走らせたいコードを含むクロージャを渡す
-
スレッドを一定時間休止するには
thread::sleep関数を用いる- 休止中は(おそらく)他のスレッドが実行される
-
例:以下のコードではスポーンドスレッド内のループが10回実行する前に、メインスレッドが先に実行終了するので、スポーンメソッド内のループはすべて実行されずに動作を終了する
use std::thread; use std::time::Duration; fn main() { thread::spawn(|| { for i in 1..10 { println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } }実行結果:
hi number 1 from the main thread! hi number 1 from the spawned thread! hi number 2 from the main thread! hi number 2 from the spawned thread! hi number 3 from the main thread! hi number 3 from the spawned thread! hi number 4 from the main thread! hi number 4 from the spawned thread! hi number 5 from the spawned thread!
join ハンドルで全スレッドの終了を待つ
-
thread::spawnの戻り値を変数に保存することで、立ち上げたスレッドの実行を強制し、完全に実行されるのを待つことができる -
thread::spawnの返り値の型はJoinHandle-
JoinHandleのjoinメソッドを呼び出すと、ハンドルが表すスレッドの終了までjoinメソッド呼び出し元のスレッドをブロックする
-
-
例:
joinメソッドを呼び出すことで、スポーンドスレッドの終了までメインスレッドの終了を先延ばしさせているuse std::thread; use std::time::Duration; fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } handle.join().unwrap(); }実行結果:
hi number 1 from the main thread! hi number 1 from the spawned thread! hi number 2 from the main thread! hi number 2 from the spawned thread! hi number 3 from the spawned thread! hi number 3 from the main thread! hi number 4 from the spawned thread! hi number 4 from the main thread! hi number 5 from the spawned thread! hi number 6 from the spawned thread! hi number 7 from the spawned thread! hi number 8 from the spawned thread! hi number 9 from the spawned thread!
スレッドで move クロージャを使用する
-
moveクロージャは、thread::spawnとともによく使用される- あるスレッドから別のスレッドに値の所有権を移すために新しいスレッドを生成する際に特に有用
-
例:
use std::thread; fn main() { let v = vec![1, 2, 3]; // ここの `move` は必要. なぜなら、spawned thread は v よりも長生きする可能性があるから. let handle = thread::spawn(move || { println!("Here's vector: {:?}", v); }); // 例えば、ここで、`drop(v);` すると、確実に spawned thread は v よりも長生きする handle.join().unwrap(); }
16.2 メッセージ受け渡しを使ってスレッド間でデータを転送する
「メモリを共有することでやり取りするな;代わりにやり取りすることでメモリを共有しろ」
- Rust ではチャンネルでメッセージ送信並行性を達成する
- チャンネルは、転送機(
tx)と受信機(rx)からなる -
txとrxのいずれかがドロップされるとチャンネルは閉じられる
- チャンネルは、転送機(
チャンネルの生成
-
use std::sync::mpsc;で導入 -
let (tx, rx) = mpsc::channel();でチャンネルを作成する -
ここで、
mpscとは multiple producer, single consumer を表す- (1チャンネルにつき複数送信機と単一の受信機が存在可能)
-
例:
use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); }
tx からの送信
-
まず送信側にしたいスレッドに
txを所有権ごと渡す -
tx.sendを呼び出すとResult<T,E>型を返す- すでに
rxがドロップされており、送信先が存在しなければ、Errが返ってくる - 例えば、
tx.send.unwrap();とするとErrの場合にパニックを起こす
- すでに
-
例:
use std::thread; use std::sync::mpsc; fn main() { let (tx, _rx) = mpsc::channel(); // `move` をつけているので、`tx` はクロージャにムーブされる thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); // あえて unwrap することでエラー発生時にパニックを起こすようにしている }); }
rx での受信(recv メソッドと try_recv メソッド)
recv メソッド
-
メッセージを受信したいスレッドで
rx.recvを呼びだすとResult<T,E>が返ってくる-
recvメソッドを呼び出すとそのスレッドの実行をブロックして値がチャンネルに流れてくるのを待機する -
txがドロップされていたらrecvメソッドはErrを返す
-
-
例:
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); // `move` をつけているので、`tx` はクロージャにムーブされる thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
try_recv メソッド
-
try_recvメソッドはスレッドの実行をブロックせず、代わりに即座にResult<T,E>を返す- メッセージがあればそれを含む
Ok値 - なければ
Errを返す
- メッセージがあればそれを含む
- メッセージ待機中に他にやることがある場合に有用
チャンネルと所有権の転送
-
tx.send(hoge)はhogeの所有権を奪う(他のスレッドに移す)- おかげで、送信後に誤って再度値を使用するのが防がれる
-
例えば、以下のコードはコンパイルエラーを起こす:
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); // ここで `val` はムーブ済み println!("val is {}", val); // なので、ここでは `val` にアクセスできない }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
複数の値を送信し、受信側が待機するのを確かめる
-
rxをイテレータとして扱うこともできる- イテレータの繰り返しは、チャンネルが閉じられると終了する
-
この場合、
rxはtxから値を複数回受け取る -
例:
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }これを実行すると以下のような出力を返す(一秒おきに一行ずつ表示される)
Got: hi Got: from Got: the Got: thread
転送機をクローンして複数の生成器を作成する(multiple producer)
-
txに対しmpsc::Sender::clone(&tx)することでrxの対となる送信機を複製することができる(すべて同一のrxにメッセージを送信する) -
例:
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }コードを実行すると、出力は以下のようなものになる
Got: hi Got: more Got: from Got: messages Got: the Got: for Got: thread Got: you
16.3 状態共有並行性(複数のスレッド間でのメモリ共有)
Mutex<T>:ミューテックス(同時には1つまでのスレッドにしかアクセスを許可しない)
-
ミューテックス:”mutual exclusion”(相互排他) の省略形
- ミューテックスはロックシステム経由で保持しているデータを死守する (guarding)
- ミューテックス内にアクセスするには、ミューテックスのロックを所望し、アクセスを要請する
- データの使用が終わったらアンロックする
-
生成には
Mutex::newを用いる -
内部のデータにアクセスするにはミューテックスインスタンスの
lockメソッドを呼び出してロックを取得する- この動作は現在のスレッドをブロックする(ロックを得られる順番が来るまで動作を停止する)
- ロックを保持するスレッドがパニックを起こしたら
lockメソッドはErrを返す -
lockメソッドの返り値はMutexGuard<T>というスマートポインタ- --> 参照外しやドロップ時の動作などが実装されている
シングルスレッドの場合
-
例:シングルスレッドでの使用例
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num: MutexGuard<i32> = m.lock().unwrap(); // `lock` メソッドは `Result<MutexGuard<T>, E>` を返すので `unwrap()` して、`Err` 返却時にはパニックするように設定している *num = 6; // `MutexGuard<T>` は `Deref` トレイトを実装しているので参照外しで内部データにアクセスできる } // `MutexGuard<T>` は `Drop` トレイトを実装しているのでスコープを外れるここで自動的にアンロックされる println!("m = {:?}", m); // m = 6;上のスコープで行った変更が反映されている }
マルチスレッドの場合
-
複数のスレッド間で一つのミューテックスを共有する場合は
Arc<T>を用いてMutex<T>を複製・共有する-
Arc<T>は "Atomic refference counter" の略 - マルチスレッドでも使える
Rc<T>のようなもの(Rc<T>はシングルスレッド用)
-
-
スレッドに
moveする前にArc::clone(&mutex)でクローンし -
クローンしたものをスレッドに渡して、
lockするなり煮るなり焼くなりする -
例:複数のスレッド間でミューテックスを共有する
use std::sync::{Mutex, MutexGuard}; use std::thread; use std::sync::Arc; fn main() { let counter: Arc<Mutex<i32>> = Arc::new(Mutex::new(0)); // `Arc::new` で複数の所有者の存在を許す(`Rc::new` でも似たことができるがマルチスレッドに対応していない) let mut handles = vec![]; for _ in 0..10 { let counter: Arc<Mutex<i32>> = Arc::clone(&counter); // thread に `move` する前に複製 let handle = thread::spawn(move || { // ここで `move` キーワードをつけているので `counter` の所有権はクロージャに移動する let mut num: MutexGuard<i32> = counter.lock().unwrap(); // 複製した `Arc<Mutex<i32>>`(`couter`)に対し `unlock` メソッドを呼び出す *num += 1; }); // ここ(クロージャの終り)で複製された `Arc<Mutex<i32>>`(`couter`)はドロップされ、Mutex はアンロックされる handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", counter.lock().unwrap()); }
RefCell<T>/Rc<T>と Mutex<T>/Arc<T> の類似性(内部不変性)
-
Mutex<T>は、RefCell<T>のように内部可変性を提供する-
Arc<Mutex<T>>型の変数をlet(一見して不変)で定義しても、lockメソッドでその内部にある値への可変参照(MutexGuard<T>型)を得ることができる - 例:マルチスレッドの場合 の例の中の
counter
-
-
デッドロックには注意!!
- デッドロック:二つのミューテックス A, B をロックしないと進まない処理が二つあるときに、一方の処理が A をロックし、もう一方が B をロックすると両方の処理が進まなくなる
16.4 Sync と Send トレイトで拡張可能な並行性
- この節の内容はやや発展的
- 基本的には「
Send(Sync)を実装している型からなる型は自動でSend(Sync)になる」ことを押さえておけばよい - これらのマーカートレイトは「並行性に関連する不変条件を強制することに役立つだけ」
- ここでいう「並行性に関連する不変条件」とは、ある型に関する以下のような決め事のこと:
- 「その型は複数のスレッド間での所有権の移動を許可しているか?」
- 「その型は複数のスレッドからのアクセスを許可しているか?」
- ここでいう「並行性に関連する不変条件」とは、ある型に関する以下のような決め事のこと:
- これらのトレイトを手動で実装して、
SendあるいはSyncではない部品からなる新しい並行な型を構成するには unsafe な Rust コードを実装することが必要になる(詳しくは "The Rustonomicon" を参照せよ)
- 基本的には「
Send マーカートレイトでスレッド間の所有権の転送を許可する
-
Sendマーカートレイト:Sendを実装した型の所有権をスレッド間で転送できることを示唆する- Rust のほとんどの型は
Sendを実装している - 生ポインタを除くほとんどの基本型も
Send - 完全に
Sendの型からなる型も全て自動的にSendと印付けされる -
Rc<T>を含む一部の例外では実装されていない
- Rust のほとんどの型は
Sync マーカートレイトで複数のスレッドからのアクセスを許可する
-
Syncマーカートレイト:Syncを実装した型は、複数のスレッドから参照されても安全であることを示唆する-
&T(Tへの参照)がSendなら、型TはSync - 基本型は
Sync -
Syncの型からのみ構成される型もSync -
Rc<T>やCell系などの一部例外では実装されていない - 一方
Mutex<T>はSyncなのでマルチスレッドの場合の例ような実装ができる
-