Rustでスレッドを扱う前に
Rustではネイティブスレッドを使うlibnativeと、グリーンスレッドを使うlibgreenが存在していましたが、1.0になるタイミングでlibgreenは削除されて、スレッドはネイティブスレッドとして動作するようになりました。ErlangではN:Mで実行される軽量プロセスが実装されていて、マルチコアCPUで細かいタスクを大量に処理できるようにしています。おそらくGolangもそのようになっていたはずです。
Rustがなぜグリーンスレッドを削除したかというと、Rustはシステムプログラミング言語を目指しており、VMのメモリやCPUのオーバーヘッドをなくして、ネイティブに近い環境で実行されるようにしたためです。Rustではグリーンスレッドを削除することによって "Hello World" のバイナリのサイズを半分にして、低レベルなライブラリのフックをしやすくしたという訳です。
前置きはさておいて、さっそくいくつかのパターンを見ながらRustのスレッド処理を見ていきます。
並列処理の実装パターン
スレッドの中からは適当な時間スリープしてから引数を2倍にして返す関数を呼ぶことにします。
fn do_something(x: i32) -> i32 {
let time = rand::random::<u8>() as u32;
thread::sleep_ms(time);
x * 2
}
スレッド数が1の場合
一番シンプルなパターンです。thread::spawn
にクロージャを渡してスレッドを立ち上げています。
Rustではメモリに所有権があり、ある変数を別の変数に束縛したときにメモリの所有権が移動します。また、メモリ領域を自身のものと所有しているものは常に一つになるようになっており、スコープの外に出るなどして所有権が手放されたタイミングでオブジェクトが破棄されてメモリが解放されます。所有権はコンパイル時に追跡できるようになっており、Rustではこの仕組みによってコンパイル時にメモリ安全性を保証するようになっています。
ということでクロージャ (|| {...}
) の中からdataを参照するために move
で所有権を移動させています。
fn run() {
let data = 1;
thread::spawn(move || {
let result = do_something(data);
println!("{:?}", result); // => 2
});
thread::sleep_ms(500);
}
この仕組みはメモリ安全性を保証するだけでなく、余計なメモリのコピーが発生しなくなるのでパフォーマンス面でも有利になります。
thread::spawn
の戻り値の JoinHandle
でアタッチされているスレッドの終了を待って、タスクの結果をResult型で受け取ることができます。
fn run() {
let data = 1;
let child = thread::spawn(move || {
do_something(data)
});
println!("{:?}", child.join()); // => Ok(2)
}
また、channelを使ったメッセージパッシングでも結果を受け取ることができます。mpsc::channel
は (Sender, Receiver)
のタプルを返します。新しく作ったスレッドの中からsendして、メインスレッドでrecvしています。
fn run() {
let data = 1;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(data * 2);
});
println!("{:?}", rx.recv()); // => Ok(2)
}
Senderからデータを送るときに所有権も一緒に送信されます。
スレッド数がNの場合
次に、要素の数だけスレッドを立ち上げて何かを並列で処理するとします。
fn run() {
let data = vec![1, 2, 3, 4, 5];
for &x in data.iter() {
thread::spawn(move || {
let result = do_something(x);
println!("{:?}", result); // 2, 4, 6, 8, 10 が順不同に表示される
});
}
thread::sleep_ms(500);
}
mpscモジュールのドキュメント (std::sync::mpsc - Rust) に "Multi-producer, single-consumer FIFO queue communication primitives" とあるとおり、Senderはクローン可能で 1:N
で通信することができるので、cloneしたSenderの所有権をクロージャに渡して、複数のスレッドの結果を受け取ることができます。
fn run() {
let data = vec![1, 2, 3, 4, 5];
let (tx, rx) = mpsc::channel();
for &x in data.iter() {
let tx = tx.clone();
thread::spawn(move || {
tx.send(x * 2);
});
}
for _ in data.iter() {
println!("{:?}", rx.recv()); // Ok(2), Ok(4), Ok(6), Ok(8), Ok(10) が順不同に表示される
}
}
より実践的な並列処理
ブロックして時間の掛かる何かの処理をするクライアントがあったとします。
struct Client;
impl Client {
fn fetch(&self, x: i32) -> i32 {
...
}
}
そのstructの関数のスレッドの中から、structのフィールドのクライアントを参照するとします。
struct Runner {
client: Client
}
impl Runner {
fn run(&self, params<Vec<i32>>) {
// ???
}
}
Clientがimmutableな場合
先ほどのように move || {...}
の中でselfを参照しようとすると "captured variable self
does not outlive the enclosing closure" というエラーが出ます。selfの所有権はそのままではmoveできないようです。
異なるスレッド間でimmutableなオブジェクトを共有するにはArc (Atomically Reference Count) という型を使います。"メモリ領域を自身のもの所有しているものは常に一つになるように" と冒頭で書きましたが、リファレンスカウントによって複数の所有権を与えるようにするRc型というのもあります。Arc型はスレッド間でオブジェクトを共有をできるようにしたRc型です。
self.client.clone()
を呼んで、cloneしたオブジェクトをクロージャに渡しています。
struct Runner {
client: Arc<Client>
}
impl Runner {
fn run(&self, data: Vec<i32>) {
let (tx, rx) = mpsc::channel();
for &x in data.iter() {
let client = self.client.clone();
let tx = tx.clone();
thread::spawn(move || {
let result = client.fetch(x);
tx.send(result);
});
}
for i in 0..data.len() {
println!("{:?}", rx.recv());
}
}
cloneの中ではリファレンスカウントのインクリメントをしていて、実際のデータのコピーは行われていません。スコープの外に出たらオブジェクトのdropが呼ばれてリファレンスカウントがデクリメントされるという仕組みになっています。
Clientがmutableな場合
実際にはクライアントで以下のようにmutableでスレッドセーフでない処理が行われるようなことがあります。このような場合にはどのように参照すれば良いでしょうか。
impl Client {
fn destory(&mut self, param: i32) -> Result<(), Error> {
...
}
}
前述のとおりArcは異なるスレッド間で immutable なオブジェクトを共有する仕組みです。このままではデータ競合を引き起こす可能性があります (コンパイル時に検出されてエラーになる)。そこで、クリティカルセクションでアトミック性を確保するのにMutex (std::sync::Mutex - Rust) で排他制御をします。
struct Client {
client: Arc<Mutex<Client>>
}
impl Runner {
fn run(&self, data: Vec<i32>) {
let (tx, rx) = mpsc::channel();
for &x in data.iter() {
let tx = tx.clone();
let client = self.client.clone();
thread::spawn(move || {
let result = client.lock().unwrap().delete(x);
tx.send(result);
});
}
for i in 0..data.len() {
println!("{:?}", rx.recv());
}
}
}
client.lock()
はRAII guardを返して、オブジェクトが利用可能になるまでスレッドをブロックします。このようにしてdestroyは安全にシリアルに処理されます。
まとめ
この記事ではRustでどのように並列処理を実装するかを見てみました。
Rustのスレッドモデルから分かるように、Rustは特にシステムプログラミングに向いています。所有権と借用の仕組みは一見すると複雑で冗長のように見えますが、これによりメモリを効率的に使えるようにしつつ、コンパイル時にデータ競合性を検出し、複数スレッドでも安全に実行できるようにしているので、使いこなせるようになると頼もしいパートナーになってくれることでしょう。