はじめに
Part4 でクエリ実行エンジンを作って、SELECT/INSERT/UPDATE/DELETEができるようになった。
でも今のままだと問題がある:
- プログラムがクラッシュしたらデータが消える
- 複数の操作を「全部成功 or 全部失敗」にできない
今回はトランザクションと**WAL(Write-Ahead Log)**を実装して、この問題を解決する。
WAL(Write-Ahead Log)って何?
名前の通り「先に書くログ」。
データを変更する前に、まずログに「これから○○を△△に変更します」って書いておく。これによって:
- クラッシュしても、ログを見れば何をしてたか分かる
- ログをリプレイすればデータを復元できる
変更要求 → [WALに書く] → [実データ更新]
↓
fsync(ディスクに確実に書く)
ログレコードの設計
まずログに何を記録するか決める:
/// ログレコードの種類
#[derive(Debug, Clone, PartialEq)]
pub enum LogRecordType {
/// トランザクション開始
Begin,
/// データ変更
Update {
table: String,
row_id: u64,
before: Option<Vec<u8>>, // UNDOログ用
after: Vec<u8>, // REDOログ用
},
/// 挿入
Insert {
table: String,
row_id: u64,
data: Vec<u8>,
},
/// 削除
Delete {
table: String,
row_id: u64,
data: Vec<u8>, // UNDO用に元データを保持
},
/// コミット
Commit,
/// ロールバック
Rollback,
/// チェックポイント
Checkpoint,
}
Updateにはbefore(変更前)とafter(変更後)両方持たせてる。これがポイント:
-
REDOするには
afterが必要 -
UNDOするには
beforeが必要
ログレコード本体
/// ログレコード
#[derive(Debug, Clone)]
pub struct LogRecord {
/// ログシーケンス番号(LSN)
pub lsn: u64,
/// トランザクションID
pub txn_id: u64,
/// タイムスタンプ
pub timestamp: u64,
/// レコードタイプ
pub record_type: LogRecordType,
}
LSNは単調増加するID。ログの順序を保証する。
シリアライズ
ログはファイルに保存するので、バイト列に変換する必要がある:
impl LogRecord {
/// バイト列にシリアライズ
pub fn serialize(&self) -> Vec<u8> {
let mut bytes = Vec::new();
// LSN (8 bytes)
bytes.extend_from_slice(&self.lsn.to_le_bytes());
// TXN ID (8 bytes)
bytes.extend_from_slice(&self.txn_id.to_le_bytes());
// Timestamp (8 bytes)
bytes.extend_from_slice(&self.timestamp.to_le_bytes());
// Record type tag (1 byte) + data
match &self.record_type {
LogRecordType::Begin => {
bytes.push(0);
}
LogRecordType::Insert { table, row_id, data } => {
bytes.push(2);
bytes.extend_from_slice(&(*row_id).to_le_bytes());
Self::write_string(&mut bytes, table);
Self::write_bytes(&mut bytes, data);
}
// ... 他も同様
}
// 全体の長さをプレフィックスとして追加
let mut result = Vec::new();
let len = bytes.len() as u32;
result.extend_from_slice(&len.to_le_bytes());
result.extend(bytes);
result
}
}
フォーマット:
[4 bytes: 長さ][8 bytes: LSN][8 bytes: TXN_ID][8 bytes: timestamp][1 byte: タグ][可変: データ]
長さプレフィックスを付けることで、デシリアライズ時に「次のレコードはどこまでか」が分かる。
WAL本体
/// Write-Ahead Log
pub struct WAL {
file: BufWriter<File>,
file_path: String,
current_lsn: u64,
}
impl WAL {
pub fn new(path: &str) -> io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.read(true)
.append(true) // 追記モード
.open(path)?;
// 既存のLSNを読み取る(リカバリ時用)
let current_lsn = Self::find_max_lsn(path)?;
Ok(WAL {
file: BufWriter::new(file),
file_path: path.to_string(),
current_lsn,
})
}
/// ログを書き込み、LSNを返す
pub fn write(&mut self, txn_id: u64, record_type: LogRecordType) -> io::Result<u64> {
self.current_lsn += 1;
let record = LogRecord::new(self.current_lsn, txn_id, record_type);
let bytes = record.serialize();
self.file.write_all(&bytes)?;
Ok(self.current_lsn)
}
/// fsyncでディスクに同期
pub fn sync(&mut self) -> io::Result<()> {
self.file.flush()?;
self.file.get_ref().sync_all()
}
}
重要なのはsync()メソッド。fsyncシステムコールを呼んで、OSのバッファからディスクに確実に書き込む。
トランザクション管理
/// トランザクション状態
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TxnState {
Active,
Committed,
Aborted,
}
/// トランザクション
#[derive(Debug)]
pub struct Transaction {
pub id: u64,
pub state: TxnState,
pub logs: Vec<LogRecord>, // このトランザクションのログ
}
トランザクションマネージャ
pub struct TransactionManager {
wal: WAL,
next_txn_id: u64,
active_txns: HashMap<u64, Transaction>,
}
impl TransactionManager {
/// トランザクション開始
pub fn begin(&mut self) -> io::Result<u64> {
let txn_id = self.next_txn_id;
self.next_txn_id += 1;
self.wal.write(txn_id, LogRecordType::Begin)?;
let txn = Transaction::new(txn_id);
self.active_txns.insert(txn_id, txn);
Ok(txn_id)
}
/// コミット
pub fn commit(&mut self, txn_id: u64) -> io::Result<()> {
self.wal.write(txn_id, LogRecordType::Commit)?;
self.wal.sync()?; // コミット時は必ずfsync
if let Some(txn) = self.active_txns.get_mut(&txn_id) {
txn.state = TxnState::Committed;
}
self.active_txns.remove(&txn_id);
Ok(())
}
}
コミット時にsync()を呼ぶのがポイント。これがないと、「コミットした」と思ってもディスクに書かれてない可能性がある。
ロールバック
コミット前にトランザクションを中止する場合:
/// ロールバック(UNDO)
pub fn rollback(&mut self, txn_id: u64) -> io::Result<Vec<LogRecord>> {
let logs = if let Some(txn) = self.active_txns.get(&txn_id) {
txn.logs.clone()
} else {
Vec::new()
};
self.wal.write(txn_id, LogRecordType::Rollback)?;
self.wal.sync()?;
if let Some(txn) = self.active_txns.get_mut(&txn_id) {
txn.state = TxnState::Aborted;
}
self.active_txns.remove(&txn_id);
// ログを逆順で返す(UNDO用)
Ok(logs.into_iter().rev().collect())
}
ログを逆順で返すのがポイント。UNDOは最後の操作から戻していく必要があるから。
リカバリ(ARIES風)
クラッシュ後の復旧処理。教科書的にはARIESというアルゴリズムが有名:
- Analysis: WALを読んで、どのトランザクションがコミット/アボートされたか調べる
- REDO: コミットされたトランザクションの操作を再実行
- UNDO: 未完了だったトランザクションをロールバック
/// リカバリ(REDO)
pub fn recover(&mut self) -> io::Result<RecoveryResult> {
let records = self.wal.read_all()?;
let mut committed_txns = HashSet::new();
let mut aborted_txns = HashSet::new();
let mut active_txns = HashSet::new();
// 1. Analysis: どのトランザクションがコミット/アボートされたか調べる
for record in &records {
match &record.record_type {
LogRecordType::Begin => {
active_txns.insert(record.txn_id);
}
LogRecordType::Commit => {
active_txns.remove(&record.txn_id);
committed_txns.insert(record.txn_id);
}
LogRecordType::Rollback => {
active_txns.remove(&record.txn_id);
aborted_txns.insert(record.txn_id);
}
_ => {}
}
}
// 2. REDO: コミットされたトランザクションの操作を再実行
let redo_records: Vec<_> = records.iter()
.filter(|r| committed_txns.contains(&r.txn_id))
.filter(|r| matches!(r.record_type,
LogRecordType::Insert { .. } |
LogRecordType::Update { .. } |
LogRecordType::Delete { .. }
))
.cloned()
.collect();
// 3. UNDO: アクティブだったトランザクションをロールバック
let undo_records: Vec<_> = records.iter()
.filter(|r| active_txns.contains(&r.txn_id))
// ... 逆順で取得
.collect();
Ok(RecoveryResult {
redo_records,
undo_records,
committed_txns: committed_txns.len(),
aborted_txns: aborted_txns.len() + active_txns.len(),
})
}
動かしてみる
$ cargo run
=== Transaction & WAL デモ ===
--- Transaction 1: INSERT + COMMIT ---
BEGIN (txn_id=1)
INSERT products row_id=1 (LSN=2)
INSERT products row_id=2 (LSN=3)
COMMIT
--- Transaction 2: INSERT + ROLLBACK ---
BEGIN (txn_id=2)
INSERT products row_id=3 (LSN=6)
ROLLBACK (1 operations to undo)
--- WAL Contents ---
LSN=1 TXN=1 BEGIN
LSN=2 TXN=1 INSERT(products.1)
LSN=3 TXN=1 INSERT(products.2)
LSN=4 TXN=1 COMMIT
LSN=5 TXN=2 BEGIN
LSN=6 TXN=2 INSERT(products.3)
LSN=7 TXN=2 ROLLBACK
--- Recovery Simulation ---
Committed transactions: 1
Aborted transactions: 1
REDO operations: 2
UNDO operations: 0
ログの流れが見える:
- TXN=1は2行INSERTしてCOMMIT → REDOの対象
- TXN=2は1行INSERTしてROLLBACK → 無視(すでにロールバック済み)
チェックポイント
WALが永遠に大きくなるのを防ぐため、定期的にチェックポイントを取る:
pub fn checkpoint(&mut self) -> io::Result<()> {
self.wal.write(0, LogRecordType::Checkpoint)?;
self.wal.sync()?;
Ok(())
}
チェックポイント時に:
- 全てのダーティページをディスクに書き出す
- チェックポイントレコードをWALに書く
- チェックポイント以前のWALは不要になる(削除可能)
これでリカバリ時にチェックポイント以降だけを見ればよくなる。
実装上の注意点
fsyncは遅い
fsyncはディスクI/Oを待つので遅い。でもコミット時は必須。
最適化として:
-
グループコミット: 複数のコミットを一度の
fsyncでまとめる -
非同期コミット: 多少のデータロストを許容できるなら
fsyncを遅延
ログサイズ
ログが大きくなりすぎると:
- ディスク容量を圧迫
- リカバリが遅くなる
定期的なチェックポイントと、古いログの削除が必要。
バッファ管理との連携
本格的な実装では、Part 2で作ったBufferPoolとWALを連携させる:
- ページを変更する前にWALに書く(Write-Ahead)
- ダーティページをフラッシュする前に、そのページのログがfsyncされていることを確認(Force-Log-at-Commit)
ACID特性との関係
WALとトランザクションで実現できるACID特性:
| 特性 | 実現方法 |
|---|---|
| Atomicity | UNDO(ロールバック) |
| Consistency | アプリケーション層 |
| Isolation | 次回のPart 6で |
| Durability | WAL + fsync |
今回の実装でAとDを達成。I(分離性)は次回ロック機構を作る際に。
まとめ
Part 5では:
- WAL: データ変更の前にログを書く
- ログレコード: BEGIN/INSERT/UPDATE/DELETE/COMMIT/ROLLBACK
- シリアライズ: ログをファイルに保存
- トランザクション管理: BEGIN→操作→COMMIT/ROLLBACK
- リカバリ: Analysis→REDO→UNDO
これでクラッシュしても復旧できるデータベースになった。
次回Part 6ではインデックスとクエリ最適化を実装して、大量データでも高速に検索できるようにする予定。