参照カウンタ
所有権が持つジレンマ
所有権は非常に美しい仕組みですが、ガベージコレクションという救済措置なしでメモリ管理を実現するため、ライフタイムという制約を課しています。
この制約が「複数箇所でデータを共有したい」場合に足枷となるケースがままあります。
例を見てみましょう。
#[derive(Debug)]
struct Config {
api_url: String,
}
struct HttpClient<'a> {
config: &'a Config,
}
struct Logger<'a> {
config: &'a Config,
}
fn main() {
let (client, logger) = initialize();
println!("Client uses {}", client.config.api_url);
println!("Logger uses {}", logger.config.api_url);
}
fn initialize() -> (HttpClient, Logger) {
let config = Config { api_url: String::from("https://api.example.com") };
let client = HttpClient { config: &config };
let logger = Logger { config: &config };
(client, logger)
}
この実装はエラーになります。
なぜなら所有権を持つ変数configはinitializeというメソッドの終了と共にスタックメモリから消去され、合わせてヒープメモリも解放されてしまうからです。
Rustは明示されているライフタイム'aがconfigのライフタイムと一致しないことをチェックし、null参照問題を引き起こしてしまうことを検知してエラーを表示します。
じゃあ、clientに所有権を移動させてしまえばいい、となると今度はloggerの方で利用できなくなってしまうという問題を巻き起こします。
ならば仕様がない、もうそれぞれのオブジェクトに別のConfigインスタンスを作ってしまう、であればエラー自体は出なくなります。
#[derive(Debug, Clone)]
struct Config {
api_url: String,
}
struct HttpClient {
config: Config,
}
struct Logger {
config: Config,
}
fn main() {
let (client, logger) = initialize();
println!("Client uses {}", client.config.api_url);
println!("Logger uses {}", logger.config.api_url);
}
fn initialize() -> (HttpClient, Logger) {
let config = Config { api_url: String::from("https://api.example.com") };
let client = HttpClient { config: config.clone() };
let logger = Logger { config: config.clone() };
(client, logger)
}
が、そもそもやりたかったのは共有であって別々に生成ではありません。また、別々に生成してしまうとその分余計にメモリを使ってしまってパフォーマンス観点でも望ましくありません。
このように、複数の独立した箇所で長期的にデータ共有をしたい場合、Rustの所有権だけでは実現が難しい or 逆に非効率になってしまう場合があります。
Rc型による問題解決
こういった時のために、Rustでは参照カウンタ方式のメモリ管理を実現するクラスが提供されています。
参照カウンタというのはつまり、対象となるアドレスを参照しているオブジェクト数を管理しておいてその数が0になるまではヒープメモリをクリアしない、という方式です。
早速例を見てみましょう。
use std::rc::Rc;
#[derive(Debug)]
struct Config {
api_url: String,
}
struct HttpClient {
config: Rc<Config>,
}
struct Logger {
config: Rc<Config>,
}
fn main() {
let (client, logger) = initialize();
println!("Client uses {}", client.config.api_url);
println!("Logger uses {}", logger.config.api_url);
}
fn initialize() -> (HttpClient, Logger) {
let config = Rc::new(Config { api_url: String::from("https://api.example.com") });
let client = HttpClient { config: Rc::clone(&config) };
let logger = Logger { config: Rc::clone(&config) };
(client, logger)
}
RcはReference Countedの略です。
このRcが保持するデータは、参照するオブジェクトが全てメモリ解放されるまで、ヒープメモリに残り続けます。
そのため、ローカル変数configがinitializeというメソッドの終了と共にスタックメモリから消去されたとしても、clientとloggerの所有権を持つ変数が消去されない限り、configはずっとヒープメモリに残り続けます。
これであれば、不必要にヒープメモリを圧迫することなく、データを共有することができますね。
Rcは所有権を無視する機能ではない
一見すると、Rcは所有権というメモリ管理をやめてしまう方式に見えるかもしれません。
しかしながら、Rustは全ての管理を所有権によって行なっています。この大原則は変わっていません。
所有権において複数の参照を作れるのは共有参照でしたね。つまり、読取のみ可能という方式です。これはRcにもきっちり適用されます。
use std::rc::Rc;
#[derive(Debug)]
struct Config {
api_url: String,
}
struct HttpClient {
config: Rc<Config>,
}
struct Logger {
config: Rc<Config>,
}
fn main() {
let (client, logger) = initialize();
client.config.api_url.push_str("/v1/resource"); // これはエラー
println!("Client uses {}", client.config.api_url);
println!("Logger uses {}", logger.config.api_url);
}
fn initialize() -> (HttpClient, Logger) {
let config = Rc::new(Config { api_url: String::from("https://api.example.com") });
let client = HttpClient { config: Rc::clone(&config) };
let logger = Logger { config: Rc::clone(&config) };
(client, logger)
}
Rcの中で管理しているデータは所有権に則って共有参照されており、値の更新権限は付与されません。
そのため、Rcを使ったからといってRustにおけるメモリの安全性が失われる訳ではないのです。
マルチスレッド
概要
マルチスレッドは、その名の通り複数のスレッドに処理を分けることを言います。
まさに先程挙げた「複数の独立した箇所で長期的にデータ共有」が発生しやすい状況ですね!
Rustではマルチスレッドにおいても所有権の仕組みを維持し、高いパフォーマンスと安全性を提供します。
新規スレッドの追加方法
新規スレッドを作成したい場合、thread::spawnを使います。
基本構文は下記の通りです。
// クロージャにて別スレッドで実行させたい処理を記述します
thread::spawn(クロージャ);
spawnメソッドで生成されたスレッドは即座に処理を開始します。
例えば下記のようにspawnの後に少し待機時間を置いておくと、別スレッドでの処理が既に稼働していることがわかります。
use std::thread::{self, sleep};
use std::time::Duration;
fn main() {
// Rustでは戻り値をあえて使用しない場合、_に代入する表現方法を使います
let _ = thread::spawn(|| {
println!("Hello from thread");
42
});
// 待機時間なしだと別スレッドの処理が起こらないままmainが終了してしまうので、少し待機させる
sleep(Duration::from_millis(100));
}
なのでspawnするだけでもマルチスレッド化はできているのですが、気づいたら処理始まって気づいたら処理終わっているだとさすがに困ってしまいますので、基本的にはスレッドの処理完了を明示的に待つことになるかと思います。
この場合、spawnメソッドの戻り値であるJoinHandlerクラスのオブジェクトに対して、joinメソッドを使うことで実現できます。
use std::thread;
fn main() {
let handle = thread::spawn(|| {
println!("Hello from thread");
42
});
// スレッドの完了を待機し、結果を取得する
match handle.join() {
Ok(result) => println!("Thread finished with result: {}", result),
Err(_) => println!("Thread panicked"),
};
}
スレッド間での通信方法
異なるスレッド間で何らかのデータをやり取りしたい場合、Rustではチャネルと呼ばれる仕組みを利用します。
このチャネルを使うと送信者と受信者のオブジェクトが生成され、チャネルを通してデータのやり取りができるようになります。
use std::sync::mpsc;
use std::thread;
fn main() {
// チャネルを作成、送信者と受信者のインスタンスを取得
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hello");
// メッセージを送信
sender.send(val).unwrap();
});
// メッセージを受信するまで待機
let received = receiver.recv().unwrap();
println!("Got: {}", received);
}
注意点として、チャネルではデータの安全性担保のため、下記ルールが適用されます。
- 送信時、所有権を持たない共有参照・可変参照は渡せない。必ず所有権を渡す必要がある(別スレッドのデータが消えてしまってnull参照問題を引き起こしてしまうリスクがあるため)
- 送信と受信は一方通行(値が同時に送信されると順序や衝突が複雑になり、こちらも安全性を担保できなくなるため)。双方向でやり取りしたい場合、チャネルを2つ作って対応が必要
参照カウンタによる共有と注意点
チャネルでは所有権の移動が必須ですが、参照カウンタであれば複数のスレッドで共有することができます。
ただし、Rcはシングルスレッド用の参照カウンタです。マルチスレッドで参照カウンタを利用する場合はArcという別の型を利用する必要がある点に注意です。
試しにRcのデータを別スレッドにmoveさせようとすると、エラーになります。
use std::thread;
use std::rc::Rc;
#[derive(Debug)]
struct Config {
api_url: String,
}
struct HttpClient {
config: Rc<Config>,
}
struct Logger {
config: Rc<Config>,
}
fn main() {
let (client, logger) = initialize();
let handle = thread::spawn(move || { // moveできないのでエラー
println!("Thread started with config: {:?}", client.config);
42
});
println!("Main thread is running with config: {:?}", logger.config);
match handle.join() {
Ok(result) => println!("Thread finished with result: {}", result),
Err(_) => println!("Thread panicked"),
};
}
fn initialize() -> (HttpClient, Logger) {
let config = Rc::new(Config { api_url: String::from("https://api.example.com") });
let client = HttpClient { config: Rc::clone(&config) };
let logger = Logger { config: Rc::clone(&config) };
(client, logger)
}
Arcの場合、スレッドセーフなデータ共有が可能になります。
use std::thread;
use std::sync::Arc;
#[derive(Debug)]
struct Config {
api_url: String,
}
struct HttpClient {
config: Arc<Config>,
}
struct Logger {
config: Arc<Config>,
}
fn main() {
let (client, logger) = initialize();
let handle = thread::spawn(move || {
println!("Thread started with config: {:?}", client.config);
42
});
println!("Main thread is running with config: {:?}", logger.config);
match handle.join() {
Ok(result) => println!("Thread finished with result: {}", result),
Err(_) => println!("Thread panicked"),
};
}
fn initialize() -> (HttpClient, Logger) {
let config = Arc::new(Config { api_url: String::from("https://api.example.com") });
let client = HttpClient { config: Arc::clone(&config) };
let logger = Logger { config: Arc::clone(&config) };
(client, logger)
}
まとめると下記のようになります。
| 型 | 用途 | パフォーマンス | 略の元表記 |
|---|---|---|---|
Rc<T> |
シングルスレッド | 高速 | Reference Counted |
Arc<T> |
マルチスレッド | Rcに比べると低速 | Atomic Reference Counted |
Send・Syncトレイト
RcとArcにどんな違いがあるかというと、Send・Syncというトレイトを実装しているかどうかです。
-
Send: 型の値を別のスレッドに送信できることを示す -
Sync: 型の値を複数のスレッドから安全に共有できることを示す
Send・Syncはいずれもスレッド安全性を保証するトレイトです。
SyncについてはArcのように何かを共有する場合でのみ必要です。が、Sendについてはマルチスレッドで扱いたい場合に必須実装となります。
といっても、このトレイトは実際にはマーカートレイト(コンパイラや型の性質を理解するための印)と呼ばれるもので、自分達で独自に実装する性質のものではないです。
基本的に多くの型でSendは実装されています。
また、独自の構造体の場合、フィールドが全てimpl Sendであれば自動的に構造体もimpl Sendと見なされるため、明示的な実装対応は必要ありません。
ただし、ジェネリック型の値をマルチスレッドで利用したい場合では少しだけ、この存在を意識する必要があります。
use std::thread;
fn spawn_generic<T: std::fmt::Debug>(val: T) {
thread::spawn(move || { // valはジェネリック型なので、Sendを実装しているかわからないのでエラー
println!("value moved: {:?}", val);
});
}
fn main() {
spawn_generic(42);
}
このような場合、Sendというトレイト境界をきちんと示してあげる必要があります。
また、コメント記載のライフタイムに関する制約も付与する必要があります。
use std::thread;
// 'staticはmoveした値がスレッドの寿命よりも長く存在し、null問題を起こさないことを示す制約
fn spawn_generic<T: std::fmt::Debug + Send + 'static>(val: T) {
thread::spawn(move || {
println!("value moved: {:?}", val);
});
}
fn main() {
spawn_generic(42);
}
マルチスレッドでジェネリック型を扱う場合、Send + 'staticの組み合わせはよく使うので覚えておきましょう。
Arcを可変参照にする方法
Arcはデータを共有参照するもので、データの書き換えは許容していません。
しかし実際の実装対応においてデータの更新も可能したい=つまり可変参照としたいパターンはままあることでしょう。
Rustではこれを実現するための方法として2種類の方法が用意されています。
| 型 | 複数スレッドでの同時読み取り可否 | 書き込み中の排他制御有無 |
|---|---|---|
| RwLock | 可(読み取りは複数スレッドで可) | 有(書き込み中、他スレッドは待機) |
| Mutex | 不可(読み取りも1スレッドずつロック制御) | 同上 |
Arc + RwLockによる可変参照
RwLockは読み書き分離で排他制御を行う構造体です。
複数の読み込みアクセスは同時実行可能だが、書き込みは排他的という制御方法になります。
Arcと組み合わせることによって、複数スレッドでデータ共有しつつ排他制御による安全な更新も可能とします(=可変参照化できます)。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let rw_lock = Arc::new(RwLock::new(5));
// 読み取りスレッド1
let rw_lock1 = Arc::clone(&rw_lock);
let reader_handler = thread::spawn(move || {
match rw_lock1.read() {
Ok(number) => {
println!("Reader: {}", *number);
}
Err(e) => {
eprintln!("Failed to acquire read lock: {}", e);
}
}
});
// 書き込みスレッド
let rw_lock2 = Arc::clone(&rw_lock);
let writer_handler = thread::spawn(move || {
let mut number = match rw_lock2.write() {
Ok(guard) => guard,
Err(e) => {
eprintln!("Failed to acquire write lock: {}", e);
return;
}
};
*number = 6;
println!("Writer updated value to: {}", *number);
});
// 読み取りスレッド2
let rw_lock2 = Arc::clone(&rw_lock);
let reader_handler2 = thread::spawn(move || {
match rw_lock2.read() {
Ok(number) => {
println!("Reader 2: {}", *number);
}
Err(e) => {
eprintln!("Failed to acquire read lock: {}", e);
}
}
});
for handler in [reader_handler, writer_handler, reader_handler2] {
match handler.join() {
Ok(_) => {}
Err(e) => {
eprintln!("Thread panicked: {:?}", e);
}
}
}
}
Arc + Mutexによる可変参照
Mutexは読取も書込も排他制御を行う構造体です。
Arcと組み合わせることによって、RwLockよりも強い排他制御を可能とします。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let mutex = Arc::new(Mutex::new(5));
// 読み取りスレッド1
let mutex1 = Arc::clone(&mutex);
let reader_handler = thread::spawn(move || {
match mutex1.lock() {
Ok(number) => {
println!("Reader: {}", *number);
}
Err(e) => {
eprintln!("Failed to acquire read lock: {}", e);
}
}
});
// 書き込みスレッド
let mutex2 = Arc::clone(&mutex);
let writer_handler = thread::spawn(move || {
let mut number = match mutex2.lock() {
Ok(guard) => guard,
Err(e) => {
eprintln!("Failed to acquire write lock: {}", e);
return;
}
};
*number = 6;
println!("Writer updated value to: {}", *number);
});
// 読み取りスレッド2
let mutex3 = Arc::clone(&mutex);
let reader_handler2 = thread::spawn(move || {
match mutex3.lock() {
Ok(number) => {
println!("Reader 2: {}", *number);
}
Err(e) => {
eprintln!("Failed to acquire read lock: {}", e);
}
}
});
for handler in [reader_handler, writer_handler, reader_handler2] {
match handler.join() {
Ok(_) => {}
Err(e) => {
eprintln!("Thread panicked: {:?}", e);
}
}
}
}
RwLockとMutexの使い分け方
複数スレッドで読み取りが多発するならRwLockの方が良いですが、読み取り回数が少ない場合や特定スレッドでしか読み取りが発生しないのであれば、Mutexの方がロック管理がシンプルな分速いです。
rayonクレート
概要
rayonはRustのデータ並列処理におけるデファクトスタンダードとなっているクレートです。
配列やベクタといったコレクションの処理を簡単かつ効率よく(※)並列化することができます。
※スレッドプールを自動管理し、work-stealingアルゴリズム(スレッド負荷の均等化アルゴリズム)によって効率的な負荷分散を実現します。
下記コマンド実行で依存関係を追加できます。
cargo add rayon
以下、いくつの利用例をご紹介します。
並列処理時のパフォーマンス
rayonによってどのぐらい処理速度が変わるのか、比較してみましょう。
use rayon::prelude::*;
fn main() {
let numbers: Vec<i64> = (1..=1000000).collect();
// 逐次処理
let start = std::time::Instant::now();
let sum_sequential: i64 = numbers.iter().map(|x| x * x).sum();
let sequential_time = start.elapsed();
// 並列処理
let start = std::time::Instant::now();
// par_iter()が並列用のイテレータ生成。これにより自動的に並列処理化される
let sum_parallel: i64 = numbers.par_iter().map(|x| x * x).sum();
let parallel_time = start.elapsed();
println!("逐次処理: {} (時間: {:?})", sum_sequential, sequential_time);
println!("並列処理: {} (時間: {:?})", sum_parallel, parallel_time);
}
自分の環境では下記のような結果になりました。
逐次処理: 333333833333500000 (時間: 17.30375ms)
並列処理: 333333833333500000 (時間: 3.63425ms)
並列ソート
per_iter()ではクロージャにてコレクション操作が可能ですが、per_sort()を使うと並列ソートが可能になります。
use rayon::prelude::*;
fn main() {
let mut data: Vec<i32> = (1..=1000000).rev().collect();
let start = std::time::Instant::now();
data.par_sort();
let elapsed = start.elapsed();
println!("並列ソート時間: {:?}", elapsed);
}
joinによる部分的な並列処理化
join()を使うと、部分的な並列処理化が可能になります。
これは他の並列処理機構ではあまり見ないというか、面白いアプローチですね。
use rayon;
fn fibonacci(n: u32) -> u32 {
if n <= 1 {
return n;
}
// 大きな計算の場合のみ並列化
if n > 30 {
let (a, b) = rayon::join(
|| fibonacci(n - 1),
|| fibonacci(n - 2)
);
a + b
} else {
fibonacci(n - 1) + fibonacci(n - 2)
}
}
fn main() {
let n = 35;
let start = std::time::Instant::now();
let result = fibonacci(n);
let elapsed = start.elapsed();
println!("fibonacci({}) = {} (時間: {:?})", n, result, elapsed);
}
お疲れ様でした!
お疲れ様でした。
次回で最後です。Rustの非同期処理・マクロについて学びます。
Rust入門講座⑨(Final)非同期処理・マクロ定義
Hope you enjoy it!