8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoでKVSを作るシリーズ

Part1 HashMap Part2 永続化 Part3 LSM-Tree Part4 Compaction Part5 分散
✅ Done 👈 Now - - -

はじめに

前回はインメモリのハッシュマップを実装した。

でも、これだけだとプロセスが死んだらデータが全部消える。こわいですねぇ。

今回は永続化を実装するよ。WAL(Write-Ahead Log)でクラッシュリカバリを実現し、SSTable(Sorted String Table)でデータをディスクに保存する。

永続化の仕組み

┌─────────────────────────────────────────────────────────────────┐
│                    Write Path Overview                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Write Request                                                   │
│       │                                                          │
│       ▼                                                          │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │    1. Write to WAL (sync to disk)                       │    │
│  └──────────────────────┬──────────────────────────────────┘    │
│                         │                                        │
│                         ▼                                        │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │    2. Write to MemTable (in-memory)                     │    │
│  └──────────────────────┬──────────────────────────────────┘    │
│                         │                                        │
│            MemTable Full?                                        │
│                         │                                        │
│        ┌────────────────┴────────────────┐                      │
│        │ Yes                              │ No                   │
│        ▼                                  ▼                      │
│  ┌───────────────┐                  Return OK                   │
│  │ 3. Flush to   │                                              │
│  │    SSTable    │                                              │
│  └───────────────┘                                              │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

WAL(Write-Ahead Log)

WALは書き込みをディスクに記録し、クラッシュ時のリカバリを可能にする。

// wal.go
package kvs

import (
    "bufio"
    "encoding/binary"
    "errors"
    "hash/crc32"
    "io"
    "os"
    "sync"
)

const (
    WALRecordPut    byte = 1
    WALRecordDelete byte = 2
)

// WALRecord はWALのレコード形式
// [CRC32(4)] [Type(1)] [KeyLen(4)] [ValueLen(4)] [Key] [Value]
type WALRecord struct {
    Type     byte
    Key      []byte
    Value    []byte
    Checksum uint32
}

// WAL はWrite-Ahead Log
type WAL struct {
    mu       sync.Mutex
    file     *os.File
    writer   *bufio.Writer
    path     string
    syncMode bool
}

// NewWAL は新しいWALを作成
func NewWAL(path string, syncMode bool) (*WAL, error) {
    file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return nil, err
    }

    return &WAL{
        file:     file,
        writer:   bufio.NewWriter(file),
        path:     path,
        syncMode: syncMode,
    }, nil
}

func (w *WAL) Append(recordType byte, key, value []byte) error {
    w.mu.Lock()
    defer w.mu.Unlock()

    // レコードをバイト列にシリアライズ
    data := make([]byte, 0, 13+len(key)+len(value))

    // 後でチェックサムを入れるため、最初の4バイトは空けておく
    data = append(data, 0, 0, 0, 0)

    // Type
    data = append(data, recordType)

    // KeyLen
    keyLen := make([]byte, 4)
    binary.LittleEndian.PutUint32(keyLen, uint32(len(key)))
    data = append(data, keyLen...)

    // ValueLen
    valueLen := make([]byte, 4)
    binary.LittleEndian.PutUint32(valueLen, uint32(len(value)))
    data = append(data, valueLen...)

    // Key and Value
    data = append(data, key...)
    data = append(data, value...)

    // チェックサムを計算して先頭に入れる
    checksum := crc32.ChecksumIEEE(data[4:])
    binary.LittleEndian.PutUint32(data[0:4], checksum)

    // 書き込み
    if _, err := w.writer.Write(data); err != nil {
        return err
    }

    if w.syncMode {
        if err := w.writer.Flush(); err != nil {
            return err
        }
        return w.file.Sync()
    }

    return nil
}

func (w *WAL) Put(key, value []byte) error {
    return w.Append(WALRecordPut, key, value)
}

func (w *WAL) Delete(key []byte) error {
    return w.Append(WALRecordDelete, key, nil)
}

func (w *WAL) Close() error {
    w.mu.Lock()
    defer w.mu.Unlock()

    if err := w.writer.Flush(); err != nil {
        return err
    }
    return w.file.Close()
}

// WALをリプレイしてMemTableに復元
func RecoverFromWAL(path string, memtable *MemTable) error {
    file, err := os.Open(path)
    if err != nil {
        if os.IsNotExist(err) {
            return nil  // WALが存在しない場合は何もしない
        }
        return err
    }
    defer file.Close()

    reader := bufio.NewReader(file)

    for {
        record, err := readWALRecord(reader)
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }

        switch record.Type {
        case WALRecordPut:
            memtable.Put(record.Key, record.Value)
        case WALRecordDelete:
            memtable.Delete(record.Key)
        }
    }

    return nil
}

func readWALRecord(reader *bufio.Reader) (*WALRecord, error) {
    // チェックサムを読む
    checksumBytes := make([]byte, 4)
    if _, err := io.ReadFull(reader, checksumBytes); err != nil {
        return nil, err
    }
    expectedChecksum := binary.LittleEndian.Uint32(checksumBytes)

    // Typeを読む
    recordType, err := reader.ReadByte()
    if err != nil {
        return nil, err
    }

    // KeyLenを読む
    keyLenBytes := make([]byte, 4)
    if _, err := io.ReadFull(reader, keyLenBytes); err != nil {
        return nil, err
    }
    keyLen := binary.LittleEndian.Uint32(keyLenBytes)

    // ValueLenを読む
    valueLenBytes := make([]byte, 4)
    if _, err := io.ReadFull(reader, valueLenBytes); err != nil {
        return nil, err
    }
    valueLen := binary.LittleEndian.Uint32(valueLenBytes)

    // Keyを読む
    key := make([]byte, keyLen)
    if _, err := io.ReadFull(reader, key); err != nil {
        return nil, err
    }

    // Valueを読む
    var value []byte
    if valueLen > 0 {
        value = make([]byte, valueLen)
        if _, err := io.ReadFull(reader, value); err != nil {
            return nil, err
        }
    }

    // チェックサムを検証
    data := make([]byte, 0, 9+keyLen+valueLen)
    data = append(data, recordType)
    data = append(data, keyLenBytes...)
    data = append(data, valueLenBytes...)
    data = append(data, key...)
    data = append(data, value...)

    actualChecksum := crc32.ChecksumIEEE(data)
    if actualChecksum != expectedChecksum {
        return nil, errors.New("checksum mismatch")
    }

    return &WALRecord{
        Type:     recordType,
        Key:      key,
        Value:    value,
        Checksum: expectedChecksum,
    }, nil
}

SSTable(Sorted String Table)

ディスク上のソート済みキーバリューファイル。

// sstable.go
package kvs

import (
    "bufio"
    "encoding/binary"
    "io"
    "os"
    "sort"
)

// SSTableは以下のフォーマット:
// [Data Block] [Index Block] [Footer]

// Footer: [IndexOffset(8)] [IndexSize(8)] [Magic(8)]
const (
    FooterSize  = 24
    MagicNumber = 0x00DEADBEEF00CAFE
)

// SSTable はディスク上のソート済みテーブル
type SSTable struct {
    file        *os.File
    path        string
    indexOffset int64
    indexSize   int64
    index       []IndexEntry
}

// IndexEntry はインデックスエントリ
type IndexEntry struct {
    Key    []byte
    Offset int64
    Size   int32
}

// SSTableWriter はSSTableの書き込み用
type SSTableWriter struct {
    file   *os.File
    writer *bufio.Writer
    index  []IndexEntry
    offset int64
}

// NewSSTableWriter は新しいSSTableWriterを作成
func NewSSTableWriter(path string) (*SSTableWriter, error) {
    file, err := os.Create(path)
    if err != nil {
        return nil, err
    }

    return &SSTableWriter{
        file:   file,
        writer: bufio.NewWriter(file),
        index:  make([]IndexEntry, 0),
        offset: 0,
    }, nil
}

// データブロックフォーマット:
// [KeyLen(4)] [ValueLen(4)] [Key] [Value] [Timestamp(8)] [Deleted(1)]
func (w *SSTableWriter) WriteEntry(entry *Entry) error {
    startOffset := w.offset

    // KeyLen
    keyLen := make([]byte, 4)
    binary.LittleEndian.PutUint32(keyLen, uint32(len(entry.Key)))
    w.writer.Write(keyLen)

    // ValueLen
    valueLen := make([]byte, 4)
    binary.LittleEndian.PutUint32(valueLen, uint32(len(entry.Value)))
    w.writer.Write(valueLen)

    // Key
    w.writer.Write(entry.Key)

    // Value
    w.writer.Write(entry.Value)

    // Timestamp
    tsBytes := make([]byte, 8)
    binary.LittleEndian.PutUint64(tsBytes, uint64(entry.Timestamp))
    w.writer.Write(tsBytes)

    // Deleted flag
    if entry.Deleted {
        w.writer.WriteByte(1)
    } else {
        w.writer.WriteByte(0)
    }

    entrySize := int32(4 + 4 + len(entry.Key) + len(entry.Value) + 8 + 1)
    w.offset += int64(entrySize)

    // インデックスに追加
    w.index = append(w.index, IndexEntry{
        Key:    entry.Key,
        Offset: startOffset,
        Size:   entrySize,
    })

    return nil
}

// Finalize はSSTableを完成させる
func (w *SSTableWriter) Finalize() error {
    // インデックスブロックを書き込み
    indexOffset := w.offset

    for _, entry := range w.index {
        // KeyLen
        keyLen := make([]byte, 4)
        binary.LittleEndian.PutUint32(keyLen, uint32(len(entry.Key)))
        w.writer.Write(keyLen)

        // Key
        w.writer.Write(entry.Key)

        // Offset
        offsetBytes := make([]byte, 8)
        binary.LittleEndian.PutUint64(offsetBytes, uint64(entry.Offset))
        w.writer.Write(offsetBytes)

        // Size
        sizeBytes := make([]byte, 4)
        binary.LittleEndian.PutUint32(sizeBytes, uint32(entry.Size))
        w.writer.Write(sizeBytes)

        w.offset += int64(4 + len(entry.Key) + 8 + 4)
    }

    indexSize := w.offset - indexOffset

    // Footerを書き込み
    footer := make([]byte, FooterSize)
    binary.LittleEndian.PutUint64(footer[0:8], uint64(indexOffset))
    binary.LittleEndian.PutUint64(footer[8:16], uint64(indexSize))
    binary.LittleEndian.PutUint64(footer[16:24], MagicNumber)
    w.writer.Write(footer)

    // フラッシュ
    if err := w.writer.Flush(); err != nil {
        return err
    }

    return w.file.Close()
}

// OpenSSTable は既存のSSTableを開く
func OpenSSTable(path string) (*SSTable, error) {
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }

    // Footerを読む
    file.Seek(-FooterSize, io.SeekEnd)
    footer := make([]byte, FooterSize)
    if _, err := io.ReadFull(file, footer); err != nil {
        file.Close()
        return nil, err
    }

    // Magicを確認
    magic := binary.LittleEndian.Uint64(footer[16:24])
    if magic != MagicNumber {
        file.Close()
        return nil, errors.New("invalid SSTable magic")
    }

    indexOffset := int64(binary.LittleEndian.Uint64(footer[0:8]))
    indexSize := int64(binary.LittleEndian.Uint64(footer[8:16]))

    sst := &SSTable{
        file:        file,
        path:        path,
        indexOffset: indexOffset,
        indexSize:   indexSize,
    }

    // インデックスを読み込み
    if err := sst.loadIndex(); err != nil {
        file.Close()
        return nil, err
    }

    return sst, nil
}

func (s *SSTable) loadIndex() error {
    s.file.Seek(s.indexOffset, io.SeekStart)
    reader := bufio.NewReader(s.file)

    var read int64
    for read < s.indexSize {
        // KeyLen
        keyLenBytes := make([]byte, 4)
        if _, err := io.ReadFull(reader, keyLenBytes); err != nil {
            return err
        }
        keyLen := binary.LittleEndian.Uint32(keyLenBytes)
        read += 4

        // Key
        key := make([]byte, keyLen)
        if _, err := io.ReadFull(reader, key); err != nil {
            return err
        }
        read += int64(keyLen)

        // Offset
        offsetBytes := make([]byte, 8)
        if _, err := io.ReadFull(reader, offsetBytes); err != nil {
            return err
        }
        offset := int64(binary.LittleEndian.Uint64(offsetBytes))
        read += 8

        // Size
        sizeBytes := make([]byte, 4)
        if _, err := io.ReadFull(reader, sizeBytes); err != nil {
            return err
        }
        size := int32(binary.LittleEndian.Uint32(sizeBytes))
        read += 4

        s.index = append(s.index, IndexEntry{
            Key:    key,
            Offset: offset,
            Size:   size,
        })
    }

    return nil
}

// Get はキーに対応する値を取得
func (s *SSTable) Get(key []byte) (*Entry, error) {
    // バイナリサーチでインデックスを検索
    idx := sort.Search(len(s.index), func(i int) bool {
        return string(s.index[i].Key) >= string(key)
    })

    if idx >= len(s.index) || string(s.index[idx].Key) != string(key) {
        return nil, ErrKeyNotFound
    }

    // データを読み込み
    entry := s.index[idx]
    s.file.Seek(entry.Offset, io.SeekStart)

    data := make([]byte, entry.Size)
    if _, err := io.ReadFull(s.file, data); err != nil {
        return nil, err
    }

    return parseEntry(data)
}

func parseEntry(data []byte) (*Entry, error) {
    keyLen := binary.LittleEndian.Uint32(data[0:4])
    valueLen := binary.LittleEndian.Uint32(data[4:8])

    keyStart := 8
    keyEnd := keyStart + int(keyLen)
    valueStart := keyEnd
    valueEnd := valueStart + int(valueLen)

    key := data[keyStart:keyEnd]
    value := data[valueStart:valueEnd]
    timestamp := int64(binary.LittleEndian.Uint64(data[valueEnd : valueEnd+8]))
    deleted := data[valueEnd+8] == 1

    return &Entry{
        Key:       key,
        Value:     value,
        Timestamp: timestamp,
        Deleted:   deleted,
    }, nil
}

func (s *SSTable) Close() error {
    return s.file.Close()
}

MemTableからSSTableへのフラッシュ

// flush.go
package kvs

import (
    "fmt"
    "os"
    "path/filepath"
    "sort"
    "time"
)

// FlushMemTableToSSTable はMemTableをSSTableにフラッシュ
func FlushMemTableToSSTable(m *MemTable, dir string) (*SSTable, error) {
    m.mu.RLock()
    defer m.mu.RUnlock()

    // エントリをソート
    entries := make([]*Entry, 0, len(m.data))
    for _, entry := range m.data {
        entries = append(entries, entry)
    }

    sort.Slice(entries, func(i, j int) bool {
        return string(entries[i].Key) < string(entries[j].Key)
    })

    // SSTableファイル名を生成
    filename := fmt.Sprintf("sst_%d.db", time.Now().UnixNano())
    path := filepath.Join(dir, filename)

    // SSTableを書き込み
    writer, err := NewSSTableWriter(path)
    if err != nil {
        return nil, err
    }

    for _, entry := range entries {
        if err := writer.WriteEntry(entry); err != nil {
            os.Remove(path)
            return nil, err
        }
    }

    if err := writer.Finalize(); err != nil {
        os.Remove(path)
        return nil, err
    }

    // SSTableを開き直して返す
    return OpenSSTable(path)
}

統合したKVS

WALとSSTableを使った永続化KVS。

// db.go
package kvs

import (
    "os"
    "path/filepath"
    "sync"
)

const (
    DefaultMemTableSize = 4 * 1024 * 1024  // 4MB
)

// DB は永続化対応のKVS
type DB struct {
    mu           sync.RWMutex
    dir          string
    options      Options
    memtable     *MemTable
    immutable    *MemTable  // フラッシュ中のMemTable
    sstables     []*SSTable
    wal          *WAL
    memTableSize int64
    maxMemTable  int64
}

// Open はDBを開く
func Open(dir string, opts Options) (*DB, error) {
    // ディレクトリを作成
    if err := os.MkdirAll(dir, 0755); err != nil {
        return nil, err
    }

    db := &DB{
        dir:         dir,
        options:     opts,
        memtable:    NewMemTable(opts),
        sstables:    make([]*SSTable, 0),
        maxMemTable: DefaultMemTableSize,
    }

    // 既存のSSTableを読み込み
    if err := db.loadSSTables(); err != nil {
        return nil, err
    }

    // WALから復元
    walPath := filepath.Join(dir, "wal.log")
    if err := RecoverFromWAL(walPath, db.memtable); err != nil {
        return nil, err
    }

    // WALを開く
    wal, err := NewWAL(walPath, opts.SyncWrites)
    if err != nil {
        return nil, err
    }
    db.wal = wal

    return db, nil
}

func (db *DB) loadSSTables() error {
    files, err := os.ReadDir(db.dir)
    if err != nil {
        return err
    }

    for _, file := range files {
        if filepath.Ext(file.Name()) == ".db" {
            path := filepath.Join(db.dir, file.Name())
            sst, err := OpenSSTable(path)
            if err != nil {
                continue  // 壊れたファイルはスキップ
            }
            db.sstables = append(db.sstables, sst)
        }
    }

    return nil
}

func (db *DB) Get(key []byte) ([]byte, error) {
    db.mu.RLock()
    defer db.mu.RUnlock()

    // 1. MemTableを検索
    if value, err := db.memtable.Get(key); err == nil {
        return value, nil
    }

    // 2. Immutable MemTableを検索
    if db.immutable != nil {
        if value, err := db.immutable.Get(key); err == nil {
            return value, nil
        }
    }

    // 3. SSTableを新しい順に検索
    for i := len(db.sstables) - 1; i >= 0; i-- {
        entry, err := db.sstables[i].Get(key)
        if err == nil {
            if entry.Deleted {
                return nil, ErrKeyNotFound
            }
            return entry.Value, nil
        }
    }

    return nil, ErrKeyNotFound
}

func (db *DB) Put(key, value []byte) error {
    db.mu.Lock()
    defer db.mu.Unlock()

    // WALに書き込み
    if err := db.wal.Put(key, value); err != nil {
        return err
    }

    // MemTableに書き込み
    if err := db.memtable.Put(key, value); err != nil {
        return err
    }

    // MemTableサイズチェック
    if db.memtable.Size() >= db.maxMemTable {
        db.scheduleFlush()
    }

    return nil
}

func (db *DB) Delete(key []byte) error {
    db.mu.Lock()
    defer db.mu.Unlock()

    // WALに書き込み
    if err := db.wal.Delete(key); err != nil {
        return err
    }

    // MemTableに削除マーカーを書き込み
    return db.memtable.Delete(key)
}

func (db *DB) scheduleFlush() {
    // 簡略化:同期的にフラッシュ
    db.flush()
}

func (db *DB) flush() error {
    // 現在のMemTableをimmutableに
    db.immutable = db.memtable
    db.memtable = NewMemTable(db.options)

    // SSTableにフラッシュ
    sst, err := FlushMemTableToSSTable(db.immutable, db.dir)
    if err != nil {
        return err
    }

    db.sstables = append(db.sstables, sst)
    db.immutable = nil

    // WALをリセット
    walPath := filepath.Join(db.dir, "wal.log")
    db.wal.Close()
    os.Remove(walPath)
    db.wal, _ = NewWAL(walPath, db.options.SyncWrites)

    return nil
}

func (db *DB) Close() error {
    db.mu.Lock()
    defer db.mu.Unlock()

    // MemTableをフラッシュ
    if db.memtable.Len() > 0 {
        db.flush()
    }

    // WALを閉じる
    if db.wal != nil {
        db.wal.Close()
    }

    // SSTableを閉じる
    for _, sst := range db.sstables {
        sst.Close()
    }

    return nil
}

使用例

// example_test.go
package kvs

import (
    "fmt"
    "os"
    "testing"
)

func TestPersistence(t *testing.T) {
    dir := "./testdb"
    defer os.RemoveAll(dir)

    // DBを開く
    db, err := Open(dir, DefaultOptions())
    if err != nil {
        t.Fatal(err)
    }

    // データを書き込み
    for i := 0; i < 1000; i++ {
        key := []byte(fmt.Sprintf("key-%04d", i))
        value := []byte(fmt.Sprintf("value-%d", i))
        db.Put(key, value)
    }

    fmt.Printf("Written 1000 entries\n")

    // DBを閉じる
    db.Close()
    fmt.Printf("DB closed\n")

    // DBを再度開く
    db2, err := Open(dir, DefaultOptions())
    if err != nil {
        t.Fatal(err)
    }
    defer db2.Close()

    // データを確認
    for i := 0; i < 10; i++ {
        key := []byte(fmt.Sprintf("key-%04d", i))
        value, err := db2.Get(key)
        if err != nil {
            t.Errorf("Key %s not found", key)
            continue
        }
        fmt.Printf("%s = %s\n", key, value)
    }
}

実行結果:

Written 1000 entries
DB closed
key-0000 = value-0
key-0001 = value-1
key-0002 = value-2
key-0003 = value-3
key-0004 = value-4
key-0005 = value-5
key-0006 = value-6
key-0007 = value-7
key-0008 = value-8
key-0009 = value-9

ディスク上のファイル構造

testdb/
├── wal.log          # Write-Ahead Log
├── sst_1234567890.db  # SSTable #1
├── sst_1234567891.db  # SSTable #2
└── ...

まとめ

今回実装したこと:

コンポーネント 説明
WAL Write-Ahead Log、クラッシュリカバリ用
SSTable ディスク上のソート済みテーブル
DB 統合されたKVSインターフェース
┌─────────────────────────────────────────────────────────────────┐
│                       Read Path                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Get(key) ─────────────────────────────────────────────────────▶│
│       │                                                          │
│       ▼                                                          │
│  ┌─────────────────────┐                                        │
│  │     MemTable        │  ───▶ Found? Return                    │
│  └──────────┬──────────┘                                        │
│             │ Not Found                                          │
│             ▼                                                    │
│  ┌─────────────────────┐                                        │
│  │ Immutable MemTable  │  ───▶ Found? Return                    │
│  └──────────┬──────────┘                                        │
│             │ Not Found                                          │
│             ▼                                                    │
│  ┌─────────────────────┐                                        │
│  │    SSTable[n-1]     │  ───▶ Found? Return (newest)           │
│  └──────────┬──────────┘                                        │
│             │ Not Found                                          │
│             ▼                                                    │
│  ┌─────────────────────┐                                        │
│  │    SSTable[n-2]     │  ───▶ ...                              │
│  └──────────┬──────────┘                                        │
│             │                                                    │
│             ▼                                                    │
│     Return "Not Found"                                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

次回はLSM-Treeを実装して、SSTableの階層管理を行う。これにより、読み取り性能を大幅に改善できる。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?