RustFSに学ぶRustの実装パターン
これまでローカル開発環境のS3互換のストレージとしてMinIOを利用してきましたが、先日MinIOはメンテナンスモードとなってしまいました。
This project is currently under maintenance and is not accepting new changes.
代替を探した結果、Rustで書かれたS3互換オブジェクトストレージ「RustFS」を見つけました。
RustFSは高性能で信頼性の高いストレージシステムを目指しており、Rustの特徴を活かした設計がなされています。
本記事ではRustFSのコードベースからRustの特徴が効果的に活用されている3つの実装パターンを紹介します。
https://github.com/rustfs/rustfs
1. Arc + RwLock による型安全な並行キャッシング
ファイル: crates/kms/src/manager.rs
KMS(Key Management Service)マネージャーは、複数の非同期タスクから安全にアクセスされるキャッシュを管理しています。Rustの所有権システムと型システムにより、データ競合がコンパイル時に防止されます。
並行処理における共有状態の問題
まず、なぜ並行処理で共有状態を扱うことが難しいのかをおさらいしましょう。
複数のスレッド(またはタスク)が同じデータに同時にアクセスする場合、以下の問題が発生する可能性があります:
スレッドA: カウンター読み取り → 値は 5
スレッドB: カウンター読み取り → 値は 5
スレッドA: 5 + 1 = 6 を書き込み
スレッドB: 5 + 1 = 6 を書き込み ← 本来は 7 になるべき!
これが「データ競合(Data Race)」と呼ばれる問題です。Rustはこの問題をコンパイル時に防止します。
Rustの所有権システム
Rustを理解する上で最も重要な概念が「所有権(Ownership)」です。
// 基本ルール1: 各値には「所有者」が1つだけ存在する
let s1 = String::from("hello");
// 基本ルール2: 所有権は「移動(move)」する
let s2 = s1; // s1 の所有権が s2 に移動
// println!("{}", s1); // コンパイルエラー!s1 はもう使えない
// 基本ルール3: 参照を使えば所有権を移動せずにアクセスできる
let s3 = String::from("world");
let len = calculate_length(&s3); // 参照を渡す(借用)
println!("{} の長さは {}", s3, len); // s3 はまだ使える
この所有権システムにより、「誰がデータを所有しているか」が常に明確になります。しかし、複数のスレッドで同じデータを共有したい場合はどうすればよいでしょうか?
Arc - 複数所有者のための仕組み
Arc(Atomic Reference Counted)は、複数の所有者がいる場合に使用するスマートポインタです。
use std::sync::Arc;
// 通常の所有権: 1つの所有者のみ
let data = vec![1, 2, 3];
// Arc を使うと複数の所有者を持てる
let shared_data = Arc::new(vec![1, 2, 3]);
let clone1 = Arc::clone(&shared_data); // 参照カウント: 2
let clone2 = Arc::clone(&shared_data); // 参照カウント: 3
// 3つの変数が同じデータを「所有」している
なぜ Rc ではなく Arc なのか?
| 型 | 正式名称 | スレッド安全 | 用途 |
|---|---|---|---|
Rc<T> |
Reference Counted | いいえ | シングルスレッド環境 |
Arc<T> |
Atomic Reference Counted | はい | マルチスレッド環境 |
Arcは内部で「アトミック操作」を使用しており、複数スレッドから同時にカウントを増減しても安全です。
use std::sync::Arc;
use std::thread;
let data = Arc::new(vec![1, 2, 3]);
// 複数スレッドで安全に共有できる
let handles: Vec<_> = (0..3).map(|i| {
let data = Arc::clone(&data); // 各スレッドに所有権のコピーを渡す
thread::spawn(move || {
println!("スレッド{}: {:?}", i, data);
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
RwLock - 読み書きの排他制御
Arcだけでは、データを「読む」ことしかできません。データを変更するにはRwLock(Read-Write Lock)が必要です。
use std::sync::Arc;
use tokio::sync::RwLock; // 非同期版 RwLock
let cache = Arc::new(RwLock::new(HashMap::new()));
RwLock の動作原理
読み取りロック (read)
- 複数のスレッドが同時に取得可能
- データの読み取りのみ許可
- 書き込みロックが取得されている間は待機
書き込みロック (write)
- 1つのスレッドのみが取得可能
- データの読み書き両方が可能
- 他のすべてのロックが解放されるまで待機
Mutex との比較
| 特性 | Mutex<T> |
RwLock<T> |
|---|---|---|
| 同時読み取り | 1スレッドのみ | 複数スレッド可能 |
| 書き込み | 1スレッドのみ | 1スレッドのみ |
| 用途 | 読み書きが同程度 | 読み取りが多い場合 |
| オーバーヘッド | 低い | やや高い |
キャッシュは「読み取りが多く、書き込みが少ない」典型的なユースケースなので、RwLockが最適です。
実際のコード解説
RustFSのKmsManager構造体を見てみましょう:
ファイル:
crates/kms/src/manager.rs(31-35行目)
pub struct KmsManager {
backend: Arc<dyn KmsBackend>,
cache: Arc<RwLock<KmsCache>>,
config: KmsConfig,
}
読み取りロックと書き込みロックの使い分け
ファイル:
crates/kms/src/manager.rs(73-100行目)
pub async fn generate_data_key(
&self,
request: GenerateDataKeyRequest,
) -> Result<GenerateDataKeyResponse> {
// ステップ1: キャッシュから読み取り(読み取りロック)
if self.config.enable_cache {
let cache = self.cache.read().await;
if let Some(cached_key) = cache.get_data_key(&request.key_id).await {
if cached_key.key_spec == request.key_spec {
return Ok(GenerateDataKeyResponse {
key_id: request.key_id.clone(),
plaintext_key: cached_key.plaintext.clone(),
ciphertext_blob: cached_key.ciphertext.clone(),
});
}
}
// ここで cache (RwLockReadGuard) がスコープを抜けて自動解放
}
// ステップ2: バックエンドから新しいキーを生成
let response = self.backend.generate_data_key(request).await?;
// ステップ3: キャッシュに書き込み(書き込みロック)
if self.config.enable_cache {
let mut cache = self.cache.write().await;
cache
.put_data_key(&response.key_id, &response.plaintext_key, &response.ciphertext_blob)
.await;
// ここで cache (RwLockWriteGuard) がスコープを抜けて自動解放
}
Ok(response)
}
Send と Sync トレイト - コンパイラによる安全性保証
Rustには特別なマーカートレイトがあり、型がスレッド間で安全に使用できるかをコンパイル時に検証します。
// Send: この型の値を別スレッドに「送信」できる
// Sync: この型への参照を複数スレッドで「共有」できる
コンパイラが安全性を保証する例:
use std::rc::Rc;
// スレッド安全ではない参照カウント
let not_thread_safe = Rc::new(42);
// これはコンパイルエラーになる!
std::thread::spawn(move || {
println!("{}", not_thread_safe);
});
Rustコンパイラは、スレッド安全でない型をスレッド間で共有しようとすると、コンパイル時にエラーを出します。これにより、データ競合が実行時に発生することを防ぎます。
Rustの型システムが防ぐバグの例
use std::sync::Arc;
use std::rc::Rc;
use std::cell::RefCell;
use tokio::sync::RwLock;
// 1. ロック取得忘れ → コンパイルエラー
let cache: Arc<RwLock<Vec<i32>>> = Arc::new(RwLock::new(vec![]));
// cache.push(1); // エラー: Arc<RwLock<Vec<i32>>> に push メソッドはない
// 2. 書き込み時に読み取りロック → コンパイルエラー
let guard = cache.read().await;
// guard.push(1); // エラー: RwLockReadGuard は不変参照のみ提供
// 3. 正しい方法
let mut guard = cache.write().await;
guard.push(1); // OK: RwLockWriteGuard は可変参照を提供
// 4. スレッド安全でない型を共有しようとする → コンパイルエラー
struct UnsafeCache {
data: Rc<RefCell<Vec<i32>>>, // Rc も RefCell も Send/Sync ではない
}
let cache: Arc<RwLock<UnsafeCache>> =
Arc::new(RwLock::new(UnsafeCache { /* ... */ }));
// ^^^^^^^^^^^^^^^^^^^^^^^^
// エラー: `Rc<RefCell<Vec<i32>>>` cannot be shared between threads safely
// `Rc<RefCell<Vec<i32>>>` はスレッド間で安全に共有できません
//
// 理由: UnsafeCache が Rc を含むため Send + Sync を満たさない
// → Arc<RwLock<UnsafeCache>> もスレッド間で共有できない
// → コンパイラが危険なコードを事前に拒否
2. RAIIパターンとDropトレイトによる自動リソース解放
ファイル: crates/lock/src/guard.rs
分散ロックの管理において、Rustの RAII(Resource Acquisition Is Initialization)パターンが活用されています。スコープを抜けると自動的にロックが解放されるため、デッドロックやリソースリークを防止できます。
リソース管理の問題 - なぜ自動解放が重要なのか
プログラミングにおいて、「リソース」とは使用後に解放が必要なもの全般を指します:
- ファイルハンドル(開いたら閉じる必要がある)
- ネットワーク接続(接続したら切断する必要がある)
- ロック(取得したら解放する必要がある)
- メモリ(確保したら解放する必要がある)
リソースリークの例:
問題のあるコード:
lock.acquire() // ロック取得
│
├─→ do_something() // 何か処理
│ └─→ エラー発生!
│ └─→ 関数から早期リターン
│
└─→ lock.release() // ← ここに到達しない!
// ロックが解放されずデッドロック
RAII とは何か
RAII(Resource Acquisition Is Initialization)は、リソースの獲得と解放をオブジェクトのライフサイクルに結びつけるパターンです。
// RAIIの基本概念
{
let guard = lock.acquire(); // リソース獲得(オブジェクト作成時)
do_something()?; // エラーが発生しても...
do_another_thing()?; // 途中で return しても...
} // ← スコープを抜けると guard が自動的に破棄され、ロックが解放される
RAIIの利点:
RAIIパターン(Rust):
{
let guard = lock.acquire(); // ロック取得
│
├─→ do_something()
│ └─→ エラー発生!
│ └─→ 関数から早期リターン
│
└─→ } // スコープ終了時に guard の Drop が自動呼び出し
// → ロックは必ず解放される!
}
Drop トレイトとは
RustのDropトレイトは、値がスコープを抜けるときに自動的に呼び出される特別なトレイトです。
// Drop トレイトの定義
pub trait Drop {
fn drop(&mut self); // スコープ終了時に自動呼び出し
}
// 簡単な例
struct MyResource {
name: String,
}
impl Drop for MyResource {
fn drop(&mut self) {
println!("{} が解放されました", self.name);
}
}
fn main() {
{
let resource = MyResource { name: "テスト".to_string() };
println!("リソースを使用中...");
} // ← ここで自動的に "テスト が解放されました" と表示
println!("スコープを抜けました");
}
// 出力:
// リソースを使用中...
// テスト が解放されました
// スコープを抜けました
分散ロックとは
RustFSのような分散ストレージでは、複数のサーバーが同じリソースにアクセスする可能性があります。分散ロックは、複数のサーバー間で排他制御を行う仕組みです。
分散ロックの概念:
サーバーA ──┐
├──→ ロックサービス ──→ 共有リソース
サーバーB ──┤ (Redis, etcd等) (ファイル等)
│
サーバーC ──┘
- 同時に1つのサーバーのみがロックを取得可能
- ロックを取得したサーバーだけがリソースにアクセス可能
- サーバーがクラッシュした場合もロック解放が必要
Drop トレイトの実装 - 詳細解説
ファイル:
crates/lock/src/guard.rs(87-117行目)
impl Drop for LockGuard {
fn drop(&mut self) {
// 1. 既に解放済みなら何もしない
if self.disarmed {
return;
}
// 2. 解放ジョブを作成
let job = UnlockJob {
lock_id: self.lock_id.clone(),
clients: self.clients.clone(),
};
// 3. バックグラウンドワーカーにジョブを送信
// try_send: ブロックしない送信(Drop内で await は使えないため)
if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
// 4. チャネルが満杯または閉じている場合のフォールバック
let lock_id = self.lock_id.clone();
let clients = self.clients.clone();
tracing::warn!(
"LockGuard channel send failed ({}), \
spawning fallback unlock task for {}",
err, lock_id
);
// 5. 新しい非同期タスクを生成してロック解放
let handle = tokio::spawn(async move {
// 全クライアントに対してロック解放を試みる
let futures_iter = clients.into_iter().map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
});
// 全ての解放処理を並行実行
let _ = futures::future::join_all(futures_iter).await;
});
// 6. JoinHandle を明示的にドロップ
// (タスクはバックグラウンドで継続実行)
drop(handle);
}
}
}
LazyLock による遅延初期化
LazyLockは、値が初めてアクセスされたときに一度だけ初期化を行う仕組みです。
ファイル:
crates/lock/src/guard.rs(31-55行目)
use std::sync::LazyLock;
// グローバル変数として定義
static UNLOCK_RUNTIME: LazyLock<UnlockRuntime> = LazyLock::new(|| {
// この中身は最初のアクセス時に一度だけ実行される
// チャネル作成(8192要素のバッファ)
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
// バックグラウンドワーカーを起動
tokio::spawn(async move {
// ジョブを受信し続けるループ
while let Some(job) = rx.recv().await {
// 受信したジョブを処理(ロック解放)
let mut any_ok = false;
let lock_id = job.lock_id.clone();
for client in job.clients.into_iter() {
if client.release(&lock_id).await.unwrap_or(false) {
any_ok = true;
}
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
}
}
});
UnlockRuntime { tx }
});
Rustの型システムが防ぐ問題
// 1. ムーブ後のアクセス防止
let guard = lock.acquire();
let moved_guard = guard; // 所有権がムーブ
// guard.lock_id; // コンパイルエラー!guard はもう使えない
// 2. ダングリング参照の防止
fn get_lock_id() -> &LockId {
let guard = lock.acquire();
&guard.lock_id
// ^^^^^
// コンパイルエラー!guard は関数終了時に破棄されるため、
// その参照を返すことはできない
}
// 3. 二重解放の防止
let guard = lock.acquire();
drop(guard); // 明示的に解放
// drop(guard); // コンパイルエラー!guard はもう存在しない
3. ジェネリクスとトレイト境界による型安全な並行バッチ処理
ファイル: crates/ecstore/src/batch_processor.rs
Erasure Coding(イレイジャー符号化)ストレージ層では、多数のI/O操作を効率的に並行処理する必要があります。ジェネリクスとトレイト境界を活用することで、型安全性を保ちながら汎用的なバッチ処理を実現しています。
ジェネリクスとは何か
ジェネリクス(Generics)は、具体的な型を指定せずにコードを書く仕組みです。これにより、同じロジックを異なる型に対して再利用できます。
// ジェネリクスなしの場合:型ごとに関数を書く必要がある
fn largest_i32(list: &[i32]) -> i32 { /* ... */ }
fn largest_f64(list: &[f64]) -> f64 { /* ... */ }
fn largest_char(list: &[char]) -> char { /* ... */ }
// ジェネリクスありの場合:1つの関数で全ての型に対応
fn largest<T: PartialOrd>(list: &[T]) -> &T {
// ^^^^^^^^^^^^^
// T は任意の型を表す「型パラメータ」
// PartialOrd は「比較可能」という制約(トレイト境界)
let mut largest = &list[0];
for item in list {
if item > largest {
largest = item;
}
}
largest
}
// 使用例:コンパイラが自動的に型を推論
let numbers = vec![34, 50, 25, 100, 65];
let result = largest(&numbers); // T = i32
let chars = vec!['y', 'm', 'a', 'q'];
let result = largest(&chars); // T = char
トレイト境界とは何か
トレイト境界(Trait Bounds)は、ジェネリック型が満たすべき条件を指定する仕組みです。
// トレイト: 型が持つべき機能を定義
trait Printable {
fn print(&self);
}
// トレイト境界: T は Printable を実装している型のみ許可
fn print_item<T: Printable>(item: T) {
item.print(); // T が Printable を実装していることが保証されている
}
// where 句を使った書き方(複雑な境界の場合に読みやすい)
fn print_items<T>(items: Vec<T>)
where
T: Printable + Clone, // Printable と Clone の両方を実装
{
for item in items {
item.print();
}
}
重要なマーカートレイト:
| トレイト | 意味 | 例 |
|---|---|---|
Send |
別スレッドに所有権を転送できる | ほとんどの型 |
Sync |
複数スレッドから参照できる |
Arc<T>, &T
|
'static |
プログラム全体で有効なライフタイム | 所有された値、静的参照 |
// Send + 'static の意味
fn spawn_task<T>(value: T)
where
T: Send + 'static,
// ^^^^ ^^^^^^^
// | このタスクが終了するまで value が有効である保証
// |
// value を別スレッドに安全に送信できる保証
{
tokio::spawn(async move {
// value を使用
});
}
Future トレイトとは何か
Futureは、Rustの非同期処理の基盤となるトレイトです。
// Future トレイトの簡略化した定義
trait Future {
type Output; // 非同期処理が完了したときの結果の型
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
// async 関数は自動的に Future を返す
async fn fetch_data() -> String {
// 非同期処理
"data".to_string()
}
// 上記は以下と同等
fn fetch_data() -> impl Future<Output = String> {
async {
"data".to_string()
}
}
トレイト境界での Future の使い方:
// F は Future トレイトを実装し、その Output が Result<T> である型
fn execute<T, F>(task: F) -> Result<T>
where
F: Future<Output = Result<T>>,
// ^^^^^^^^^^^^^^^^^^^^^^
// F を .await すると Result<T> が得られることを保証
{
// ...
}
コード全体の詳細解説
ファイル:
crates/ecstore/src/batch_processor.rs(36-82行目)
pub async fn execute_batch<T, F>(&self, tasks: Vec<F>) -> Vec<Result<T>>
where
T: Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
{
// 1. 空のタスクリストは早期リターン
if tasks.is_empty() {
return Vec::new();
}
// 2. セマフォで同時実行数を制限
// 例: max_concurrent = 4 なら、同時に4タスクまで実行
let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
// 3. JoinSet: 複数の非同期タスクを管理するコレクション
// - タスクの追加/完了待ち/結果収集が可能
// - パニックしたタスクも安全に処理
let mut join_set = JoinSet::new();
// 4. 結果を格納するベクター(初期値はエラー)
let mut results = Vec::with_capacity(tasks.len());
for _ in 0..tasks.len() {
results.push(Err(Error::other("Not completed")));
}
// 5. 全タスクを JoinSet に登録
for (i, task) in tasks.into_iter().enumerate() {
let sem = semaphore.clone(); // Arc なので clone は参照カウント増加のみ
join_set.spawn(async move {
// セマフォを取得(同時実行数を制限)
let _permit = sem.acquire().await
.map_err(|_| Error::other("Semaphore error"))?;
// ^^^^^^^
// 許可証(permit)はスコープ終了時に自動解放(RAII!)
// タスクを実行
let result = task.await;
// インデックスと結果をタプルで返す
Ok::<(usize, Result<T>), Error>((i, result))
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
// 明示的な型注釈(コンパイラの型推論を助ける)
});
}
// 6. 結果を収集(パターンマッチングで網羅的に処理)
while let Some(join_result) = join_set.join_next().await {
match join_result {
// 成功: タスクが正常に完了
Ok(Ok((index, task_result))) => {
if index < results.len() {
results[index] = task_result;
}
}
// システムエラー: セマフォエラーなど
Ok(Err(e)) => {
tracing::warn!("Batch processor system error: {:?}", e);
}
// パニック: タスク内でパニックが発生
Err(join_error) => {
// 他のタスクには影響しない(隔離されている)
tracing::warn!("Task panicked in batch processor: {:?}", join_error);
}
}
}
results
}
パターンマッチングによる網羅的エラーハンドリング
Rustのパターンマッチングは、全てのケースを処理することを強制します。
// JoinSet::join_next() の戻り値の型
// Option<Result<Result<(usize, Result<T>), Error>, JoinError>>
while let Some(join_result) = join_set.join_next().await {
// ^^^^^^^^^^^
// Result<Result<(usize, Result<T>), Error>, JoinError>
match join_result {
// ケース1: タスクは正常に実行完了し、内部の処理も成功
Ok(Ok((index, task_result))) => {
// ^^^^^^^^^^^^^^^^^^^
// (usize, Result<T>)
results[index] = task_result;
}
// ケース2: タスクは正常に実行されたが、内部でエラー
Ok(Err(e)) => {
// ^
// Error(セマフォエラーなど)
tracing::warn!("System error: {:?}", e);
}
// ケース3: タスクがパニックした
Err(join_error) => {
// ^^^^^^^^^^
// JoinError(パニック情報を含む)
tracing::warn!("Task panicked: {:?}", join_error);
}
// コンパイラが全てのケースを処理していることを検証
// 足りないケースがあるとコンパイルエラー!
}
}
ゼロコスト抽象化とモノモーフィゼーション
Rustのジェネリクスは「ゼロコスト抽象化」を実現します。
// ジェネリック関数
fn process<T: Clone>(value: T) -> T {
value.clone()
}
// 使用時
let a: i32 = process(42);
let b: String = process("hello".to_string());
コンパイル時に起こること(モノモーフィゼーション):
// コンパイラが自動生成する具体的な関数
fn process_i32(value: i32) -> i32 {
value.clone()
}
fn process_string(value: String) -> String {
value.clone()
}
// 呼び出しは直接関数呼び出しに変換
let a: i32 = process_i32(42);
let b: String = process_string("hello".to_string());
クォーラム達成による早期終了
分散ストレージでは、冗長性のために複数のノードにデータを書き込みます。「クォーラム」とは、必要最小限の成功数のことです。
クォーラムの概念(例: 5台中3台で成功すればOK)
ノード1 ──→ 成功 ✓
ノード2 ──→ 成功 ✓
ノード3 ──→ 成功 ✓ ← 3台成功!クォーラム達成!
ノード4 ──→ (応答待ち不要)
ノード5 ──→ (応答待ち不要)
- 全ノードの応答を待たずに「成功」を返せる
- レイテンシの改善
ファイル:
crates/ecstore/src/batch_processor.rs(85-109行目)
pub async fn execute_batch_with_quorum<T, F>(
&self,
tasks: Vec<F>,
required_successes: usize, // 必要な成功数(クォーラム)
) -> Result<Vec<T>>
where
T: Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
{
let results = self.execute_batch(tasks).await;
let mut successes = Vec::new();
// 成功した結果を収集
for value in results.into_iter().flatten() {
// ^^^^^^^
// Result::Ok の値のみを取り出す
// イテレータアダプタによる簡潔な記述
successes.push(value);
// クォーラム達成で早期リターン
if successes.len() >= required_successes {
return Ok(successes);
}
}
// クォーラム未達成
if successes.len() >= required_successes {
Ok(successes)
} else {
Err(Error::other(format!(
"Insufficient successful results: got {}, needed {}",
successes.len(),
required_successes
)))
}
}
まとめ
RustFSでは、Rust言語の以下の特徴が効果的に活用されています。
| 特徴 | RustFSでの活用例 | メリット |
|---|---|---|
| 所有権システム |
Arc<RwLock<T>>による共有状態管理 |
データ競合のコンパイル時防止 |
| RAIIパターン |
LockGuardによる自動ロック解放 |
リソースリークの防止 |
| トレイト境界 |
Send + 'staticによるスレッド安全性保証 |
不正なスレッド間転送のコンパイル時検出 |
| ゼロコスト抽象化 | ジェネリクスによる汎用バッチ処理 | 抽象化と性能の両立 |
これらの実装パターンは、高性能な分散システムにおいてRustがいかに安全性と性能を両立できるかを示す好例です。特に、コンパイル時の検証により実行時エラーを大幅に削減できる点は、ストレージシステムのような信頼性が求められるソフトウェアにおいて非常に重要な利点となっています。