8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Rustでデータベースを自作する Part5: トランザクションとWAL

Last updated at Posted at 2025-12-25

はじめに

Part4 でクエリ実行エンジンを作って、SELECT/INSERT/UPDATE/DELETEができるようになった。

でも今のままだと問題がある:

  • プログラムがクラッシュしたらデータが消える
  • 複数の操作を「全部成功 or 全部失敗」にできない

今回はトランザクションと**WAL(Write-Ahead Log)**を実装して、この問題を解決する。

WAL(Write-Ahead Log)って何?

名前の通り「先に書くログ」。

データを変更する前に、まずログに「これから○○を△△に変更します」って書いておく。これによって:

  1. クラッシュしても、ログを見れば何をしてたか分かる
  2. ログをリプレイすればデータを復元できる
変更要求 → [WALに書く] → [実データ更新]
             ↓
           fsync(ディスクに確実に書く)

ログレコードの設計

まずログに何を記録するか決める:

/// ログレコードの種類
#[derive(Debug, Clone, PartialEq)]
pub enum LogRecordType {
    /// トランザクション開始
    Begin,
    /// データ変更
    Update {
        table: String,
        row_id: u64,
        before: Option<Vec<u8>>,  // UNDOログ用
        after: Vec<u8>,           // REDOログ用
    },
    /// 挿入
    Insert {
        table: String,
        row_id: u64,
        data: Vec<u8>,
    },
    /// 削除
    Delete {
        table: String,
        row_id: u64,
        data: Vec<u8>,  // UNDO用に元データを保持
    },
    /// コミット
    Commit,
    /// ロールバック
    Rollback,
    /// チェックポイント
    Checkpoint,
}

Updateにはbefore(変更前)とafter(変更後)両方持たせてる。これがポイント:

  • REDOするにはafterが必要
  • UNDOするにはbeforeが必要

ログレコード本体

/// ログレコード
#[derive(Debug, Clone)]
pub struct LogRecord {
    /// ログシーケンス番号(LSN)
    pub lsn: u64,
    /// トランザクションID
    pub txn_id: u64,
    /// タイムスタンプ
    pub timestamp: u64,
    /// レコードタイプ
    pub record_type: LogRecordType,
}

LSNは単調増加するID。ログの順序を保証する。

シリアライズ

ログはファイルに保存するので、バイト列に変換する必要がある:

impl LogRecord {
    /// バイト列にシリアライズ
    pub fn serialize(&self) -> Vec<u8> {
        let mut bytes = Vec::new();
        
        // LSN (8 bytes)
        bytes.extend_from_slice(&self.lsn.to_le_bytes());
        // TXN ID (8 bytes)
        bytes.extend_from_slice(&self.txn_id.to_le_bytes());
        // Timestamp (8 bytes)
        bytes.extend_from_slice(&self.timestamp.to_le_bytes());
        
        // Record type tag (1 byte) + data
        match &self.record_type {
            LogRecordType::Begin => {
                bytes.push(0);
            }
            LogRecordType::Insert { table, row_id, data } => {
                bytes.push(2);
                bytes.extend_from_slice(&(*row_id).to_le_bytes());
                Self::write_string(&mut bytes, table);
                Self::write_bytes(&mut bytes, data);
            }
            // ... 他も同様
        }
        
        // 全体の長さをプレフィックスとして追加
        let mut result = Vec::new();
        let len = bytes.len() as u32;
        result.extend_from_slice(&len.to_le_bytes());
        result.extend(bytes);
        
        result
    }
}

フォーマット:

[4 bytes: 長さ][8 bytes: LSN][8 bytes: TXN_ID][8 bytes: timestamp][1 byte: タグ][可変: データ]

長さプレフィックスを付けることで、デシリアライズ時に「次のレコードはどこまでか」が分かる。

WAL本体

/// Write-Ahead Log
pub struct WAL {
    file: BufWriter<File>,
    file_path: String,
    current_lsn: u64,
}

impl WAL {
    pub fn new(path: &str) -> io::Result<Self> {
        let file = OpenOptions::new()
            .create(true)
            .read(true)
            .append(true)  // 追記モード
            .open(path)?;
        
        // 既存のLSNを読み取る(リカバリ時用)
        let current_lsn = Self::find_max_lsn(path)?;
        
        Ok(WAL {
            file: BufWriter::new(file),
            file_path: path.to_string(),
            current_lsn,
        })
    }

    /// ログを書き込み、LSNを返す
    pub fn write(&mut self, txn_id: u64, record_type: LogRecordType) -> io::Result<u64> {
        self.current_lsn += 1;
        let record = LogRecord::new(self.current_lsn, txn_id, record_type);
        let bytes = record.serialize();
        
        self.file.write_all(&bytes)?;
        
        Ok(self.current_lsn)
    }

    /// fsyncでディスクに同期
    pub fn sync(&mut self) -> io::Result<()> {
        self.file.flush()?;
        self.file.get_ref().sync_all()
    }
}

重要なのはsync()メソッド。fsyncシステムコールを呼んで、OSのバッファからディスクに確実に書き込む。

トランザクション管理

/// トランザクション状態
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TxnState {
    Active,
    Committed,
    Aborted,
}

/// トランザクション
#[derive(Debug)]
pub struct Transaction {
    pub id: u64,
    pub state: TxnState,
    pub logs: Vec<LogRecord>,  // このトランザクションのログ
}

トランザクションマネージャ

pub struct TransactionManager {
    wal: WAL,
    next_txn_id: u64,
    active_txns: HashMap<u64, Transaction>,
}

impl TransactionManager {
    /// トランザクション開始
    pub fn begin(&mut self) -> io::Result<u64> {
        let txn_id = self.next_txn_id;
        self.next_txn_id += 1;
        
        self.wal.write(txn_id, LogRecordType::Begin)?;
        
        let txn = Transaction::new(txn_id);
        self.active_txns.insert(txn_id, txn);
        
        Ok(txn_id)
    }

    /// コミット
    pub fn commit(&mut self, txn_id: u64) -> io::Result<()> {
        self.wal.write(txn_id, LogRecordType::Commit)?;
        self.wal.sync()?;  // コミット時は必ずfsync
        
        if let Some(txn) = self.active_txns.get_mut(&txn_id) {
            txn.state = TxnState::Committed;
        }
        self.active_txns.remove(&txn_id);
        
        Ok(())
    }
}

コミット時にsync()を呼ぶのがポイント。これがないと、「コミットした」と思ってもディスクに書かれてない可能性がある。

ロールバック

コミット前にトランザクションを中止する場合:

/// ロールバック(UNDO)
pub fn rollback(&mut self, txn_id: u64) -> io::Result<Vec<LogRecord>> {
    let logs = if let Some(txn) = self.active_txns.get(&txn_id) {
        txn.logs.clone()
    } else {
        Vec::new()
    };
    
    self.wal.write(txn_id, LogRecordType::Rollback)?;
    self.wal.sync()?;
    
    if let Some(txn) = self.active_txns.get_mut(&txn_id) {
        txn.state = TxnState::Aborted;
    }
    self.active_txns.remove(&txn_id);
    
    // ログを逆順で返す(UNDO用)
    Ok(logs.into_iter().rev().collect())
}

ログを逆順で返すのがポイント。UNDOは最後の操作から戻していく必要があるから。

リカバリ(ARIES風)

クラッシュ後の復旧処理。教科書的にはARIESというアルゴリズムが有名:

  1. Analysis: WALを読んで、どのトランザクションがコミット/アボートされたか調べる
  2. REDO: コミットされたトランザクションの操作を再実行
  3. UNDO: 未完了だったトランザクションをロールバック
/// リカバリ(REDO)
pub fn recover(&mut self) -> io::Result<RecoveryResult> {
    let records = self.wal.read_all()?;
    
    let mut committed_txns = HashSet::new();
    let mut aborted_txns = HashSet::new();
    let mut active_txns = HashSet::new();
    
    // 1. Analysis: どのトランザクションがコミット/アボートされたか調べる
    for record in &records {
        match &record.record_type {
            LogRecordType::Begin => {
                active_txns.insert(record.txn_id);
            }
            LogRecordType::Commit => {
                active_txns.remove(&record.txn_id);
                committed_txns.insert(record.txn_id);
            }
            LogRecordType::Rollback => {
                active_txns.remove(&record.txn_id);
                aborted_txns.insert(record.txn_id);
            }
            _ => {}
        }
    }
    
    // 2. REDO: コミットされたトランザクションの操作を再実行
    let redo_records: Vec<_> = records.iter()
        .filter(|r| committed_txns.contains(&r.txn_id))
        .filter(|r| matches!(r.record_type, 
            LogRecordType::Insert { .. } | 
            LogRecordType::Update { .. } | 
            LogRecordType::Delete { .. }
        ))
        .cloned()
        .collect();
    
    // 3. UNDO: アクティブだったトランザクションをロールバック
    let undo_records: Vec<_> = records.iter()
        .filter(|r| active_txns.contains(&r.txn_id))
        // ... 逆順で取得
        .collect();
    
    Ok(RecoveryResult {
        redo_records,
        undo_records,
        committed_txns: committed_txns.len(),
        aborted_txns: aborted_txns.len() + active_txns.len(),
    })
}

動かしてみる

$ cargo run
=== Transaction & WAL デモ ===

--- Transaction 1: INSERT + COMMIT ---
  BEGIN (txn_id=1)
  INSERT products row_id=1 (LSN=2)
  INSERT products row_id=2 (LSN=3)
  COMMIT

--- Transaction 2: INSERT + ROLLBACK ---
  BEGIN (txn_id=2)
  INSERT products row_id=3 (LSN=6)
  ROLLBACK (1 operations to undo)

--- WAL Contents ---
  LSN=1 TXN=1 BEGIN
  LSN=2 TXN=1 INSERT(products.1)
  LSN=3 TXN=1 INSERT(products.2)
  LSN=4 TXN=1 COMMIT
  LSN=5 TXN=2 BEGIN
  LSN=6 TXN=2 INSERT(products.3)
  LSN=7 TXN=2 ROLLBACK

--- Recovery Simulation ---
  Committed transactions: 1
  Aborted transactions: 1
  REDO operations: 2
  UNDO operations: 0

ログの流れが見える:

  • TXN=1は2行INSERTしてCOMMIT → REDOの対象
  • TXN=2は1行INSERTしてROLLBACK → 無視(すでにロールバック済み)

チェックポイント

WALが永遠に大きくなるのを防ぐため、定期的にチェックポイントを取る:

pub fn checkpoint(&mut self) -> io::Result<()> {
    self.wal.write(0, LogRecordType::Checkpoint)?;
    self.wal.sync()?;
    Ok(())
}

チェックポイント時に:

  1. 全てのダーティページをディスクに書き出す
  2. チェックポイントレコードをWALに書く
  3. チェックポイント以前のWALは不要になる(削除可能)

これでリカバリ時にチェックポイント以降だけを見ればよくなる。

実装上の注意点

fsyncは遅い

fsyncはディスクI/Oを待つので遅い。でもコミット時は必須

最適化として:

  • グループコミット: 複数のコミットを一度のfsyncでまとめる
  • 非同期コミット: 多少のデータロストを許容できるならfsyncを遅延

ログサイズ

ログが大きくなりすぎると:

  • ディスク容量を圧迫
  • リカバリが遅くなる

定期的なチェックポイントと、古いログの削除が必要。

バッファ管理との連携

本格的な実装では、Part 2で作ったBufferPoolとWALを連携させる:

  1. ページを変更する前にWALに書く(Write-Ahead
  2. ダーティページをフラッシュする前に、そのページのログがfsyncされていることを確認(Force-Log-at-Commit

ACID特性との関係

WALとトランザクションで実現できるACID特性:

特性 実現方法
Atomicity UNDO(ロールバック)
Consistency アプリケーション層
Isolation 次回のPart 6で
Durability WAL + fsync

今回の実装でADを達成。I(分離性)は次回ロック機構を作る際に。

まとめ

Part 5では:

  1. WAL: データ変更の前にログを書く
  2. ログレコード: BEGIN/INSERT/UPDATE/DELETE/COMMIT/ROLLBACK
  3. シリアライズ: ログをファイルに保存
  4. トランザクション管理: BEGIN→操作→COMMIT/ROLLBACK
  5. リカバリ: Analysis→REDO→UNDO

これでクラッシュしても復旧できるデータベースになった。

次回Part 6ではインデックスとクエリ最適化を実装して、大量データでも高速に検索できるようにする予定。

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?