7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoでKVSを作るシリーズ

Part1 HashMap Part2 永続化 Part3 LSM-Tree Part4 Compaction Part5 分散
✅ Done ✅ Done ✅ Done 👈 Now -

はじめに

前回はLSM-TreeとBloom Filterを実装した。

でもこれ、使い続けるとSSTableがどんどん増えて、ディスクがパンクするよね。削除したはずのデータも実際には残ってるし。

今回はCompactionを実装するよ。SSTableをマージして古いデータを削除し、ストレージ効率と読み取り性能を改善する。

Compactionとは

┌─────────────────────────────────────────────────────────────────┐
│                    Why Compaction?                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Problems without Compaction:                                   │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  1. Space Amplification                                 │    │
│  │     - Old versions of keys consume disk space           │    │
│  │     - Deleted keys still take space (tombstones)        │    │
│  │                                                         │    │
│  │  2. Read Amplification                                  │    │
│  │     - Multiple SSTables may contain same key            │    │
│  │     - Must check many files to find latest version      │    │
│  │                                                         │    │
│  │  3. Unbounded SSTable Count                             │    │
│  │     - Level 0 keeps growing                             │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  Compaction merges SSTables:                                    │
│    - Removes old versions of keys                               │
│    - Removes tombstones (deleted keys)                          │
│    - Reduces number of files to search                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Compaction戦略

Leveled Compaction(LevelDB/RocksDBスタイル)

// compaction.go
package kvs

import (
    "fmt"
    "os"
    "path/filepath"
    "sort"
    "time"
)

// CompactionStrategy はコンパクション戦略
type CompactionStrategy interface {
    SelectCompaction(lm *LevelManager) *CompactionJob
}

// CompactionJob はコンパクションジョブ
type CompactionJob struct {
    Level     int
    InputSSTs []*SSTable
    TargetLevel int
}

// LeveledCompaction はレベルベースのコンパクション
type LeveledCompaction struct{}

func (lc *LeveledCompaction) SelectCompaction(lm *LevelManager) *CompactionJob {
    lm.mu.RLock()
    defer lm.mu.RUnlock()

    // Level 0のコンパクションを優先
    if len(lm.levels[0].sstables) >= Level0FileLimit {
        return &CompactionJob{
            Level:       0,
            InputSSTs:   lm.levels[0].sstables,
            TargetLevel: 1,
        }
    }

    // 他のレベルをチェック
    for level := 1; level < MaxLevels-1; level++ {
        maxSize := int64(BaseLevelSize) * int64(pow(LevelSizeMultiplier, level))
        if lm.levels[level].size >= maxSize {
            // 最も古いSSTableを選択
            if len(lm.levels[level].sstables) > 0 {
                return &CompactionJob{
                    Level:       level,
                    InputSSTs:   []*SSTable{lm.levels[level].sstables[0]},
                    TargetLevel: level + 1,
                }
            }
        }
    }

    return nil
}

マージイテレータ

複数のSSTableをマージするためのイテレータ。

// merge_iterator.go
package kvs

import (
    "bytes"
    "container/heap"
)

// MergeIterator は複数のイテレータをマージ
type MergeIterator struct {
    iterators []SSTableIterator
    heap      *iteratorHeap
}

// SSTableIterator はSSTableを走査するイテレータ
type SSTableIterator struct {
    sst      *SSTable
    index    int
    current  *Entry
    valid    bool
}

func NewSSTableIterator(sst *SSTable) *SSTableIterator {
    it := &SSTableIterator{
        sst:   sst,
        index: 0,
    }
    it.loadCurrent()
    return it
}

func (it *SSTableIterator) loadCurrent() {
    if it.index < len(it.sst.index) {
        entry, err := it.sst.Get(it.sst.index[it.index].Key)
        if err == nil {
            it.current = entry
            it.valid = true
            return
        }
    }
    it.valid = false
    it.current = nil
}

func (it *SSTableIterator) Valid() bool {
    return it.valid
}

func (it *SSTableIterator) Next() {
    it.index++
    it.loadCurrent()
}

func (it *SSTableIterator) Key() []byte {
    if it.current != nil {
        return it.current.Key
    }
    return nil
}

func (it *SSTableIterator) Value() *Entry {
    return it.current
}

// iteratorHeap はヒープ用の構造体
type iteratorHeap struct {
    items []*SSTableIterator
}

func (h *iteratorHeap) Len() int {
    return len(h.items)
}

func (h *iteratorHeap) Less(i, j int) bool {
    return bytes.Compare(h.items[i].Key(), h.items[j].Key()) < 0
}

func (h *iteratorHeap) Swap(i, j int) {
    h.items[i], h.items[j] = h.items[j], h.items[i]
}

func (h *iteratorHeap) Push(x interface{}) {
    h.items = append(h.items, x.(*SSTableIterator))
}

func (h *iteratorHeap) Pop() interface{} {
    old := h.items
    n := len(old)
    x := old[n-1]
    h.items = old[:n-1]
    return x
}

// NewMergeIterator は新しいMergeIteratorを作成
func NewMergeIterator(sstables []*SSTable) *MergeIterator {
    h := &iteratorHeap{
        items: make([]*SSTableIterator, 0),
    }

    for _, sst := range sstables {
        it := NewSSTableIterator(sst)
        if it.Valid() {
            h.items = append(h.items, it)
        }
    }

    heap.Init(h)

    return &MergeIterator{
        heap: h,
    }
}

func (mi *MergeIterator) Valid() bool {
    return mi.heap.Len() > 0
}

func (mi *MergeIterator) Key() []byte {
    if !mi.Valid() {
        return nil
    }
    return mi.heap.items[0].Key()
}

func (mi *MergeIterator) Value() *Entry {
    if !mi.Valid() {
        return nil
    }
    return mi.heap.items[0].Value()
}

func (mi *MergeIterator) Next() {
    if !mi.Valid() {
        return
    }

    // 現在のキーを記録
    currentKey := mi.Key()

    // 同じキーを持つ全てのイテレータを進める
    for mi.Valid() && bytes.Equal(mi.Key(), currentKey) {
        it := heap.Pop(mi.heap).(*SSTableIterator)
        it.Next()
        if it.Valid() {
            heap.Push(mi.heap, it)
        }
    }
}

func (mi *MergeIterator) Close() {
    mi.heap = nil
}

Compaction実行

// compactor.go
package kvs

import (
    "fmt"
    "os"
    "path/filepath"
    "time"
)

// Compactor はコンパクションを実行
type Compactor struct {
    dir      string
    strategy CompactionStrategy
}

// NewCompactor は新しいCompactorを作成
func NewCompactor(dir string) *Compactor {
    return &Compactor{
        dir:      dir,
        strategy: &LeveledCompaction{},
    }
}

// DoCompaction はコンパクションを実行
func (c *Compactor) DoCompaction(lm *LevelManager, job *CompactionJob) error {
    if job == nil {
        return nil
    }

    // ターゲットレベルからオーバーラップするSSTableを選択
    targetSSTs := c.findOverlappingSSTs(lm, job)

    // 全ての入力SSTableをマージ
    allSSTs := append(job.InputSSTs, targetSSTs...)
    
    // 新しいSSTableを作成
    newSSTs, err := c.mergeSSTs(allSSTs)
    if err != nil {
        return err
    }

    // LevelManagerを更新
    lm.ReplaceSSTablesInLevel(job.Level, job.InputSSTs, nil)
    lm.ReplaceSSTablesInLevel(job.TargetLevel, targetSSTs, newSSTs)

    // 古いSSTableファイルを削除
    for _, sst := range allSSTs {
        sst.Close()
        os.Remove(sst.path)
    }

    return nil
}

// findOverlappingSSTs はターゲットレベルでオーバーラップするSSTableを見つける
func (c *Compactor) findOverlappingSSTs(lm *LevelManager, job *CompactionJob) []*SSTable {
    // 入力SSTableのキー範囲を計算
    var minKey, maxKey []byte
    for _, sst := range job.InputSSTs {
        if minKey == nil || bytes.Compare(sst.MinKey(), minKey) < 0 {
            minKey = sst.MinKey()
        }
        if maxKey == nil || bytes.Compare(sst.MaxKey(), maxKey) > 0 {
            maxKey = sst.MaxKey()
        }
    }

    // ターゲットレベルでオーバーラップするSSTableを見つける
    lm.mu.RLock()
    defer lm.mu.RUnlock()

    result := make([]*SSTable, 0)
    for _, sst := range lm.levels[job.TargetLevel].sstables {
        // キー範囲がオーバーラップするかチェック
        if bytes.Compare(sst.MaxKey(), minKey) >= 0 &&
           bytes.Compare(sst.MinKey(), maxKey) <= 0 {
            result = append(result, sst)
        }
    }

    return result
}

// mergeSSTs はSSTableをマージ
func (c *Compactor) mergeSSTs(sstables []*SSTable) ([]*SSTable, error) {
    if len(sstables) == 0 {
        return nil, nil
    }

    // マージイテレータを作成
    mi := NewMergeIterator(sstables)
    defer mi.Close()

    result := make([]*SSTable, 0)
    var currentWriter *SSTableWriter
    var currentSize int64
    const maxSSTableSize = 64 * 1024 * 1024  // 64MB

    for mi.Valid() {
        entry := mi.Value()

        // 削除されたエントリはスキップ(一定期間経過後)
        // ここでは簡略化のため即座にスキップ
        if entry.Deleted {
            mi.Next()
            continue
        }

        // 新しいSSTableを開始
        if currentWriter == nil || currentSize >= maxSSTableSize {
            if currentWriter != nil {
                // 現在のSSTableを完了
                if err := currentWriter.Finalize(); err != nil {
                    return nil, err
                }
                sst, err := OpenSSTable(currentWriter.file.Name())
                if err != nil {
                    return nil, err
                }
                result = append(result, sst)
            }

            // 新しいSSTableを作成
            filename := fmt.Sprintf("sst_%d.db", time.Now().UnixNano())
            path := filepath.Join(c.dir, filename)
            var err error
            currentWriter, err = NewSSTableWriter(path)
            if err != nil {
                return nil, err
            }
            currentSize = 0
        }

        // エントリを書き込み
        if err := currentWriter.WriteEntry(entry); err != nil {
            return nil, err
        }
        currentSize += int64(len(entry.Key) + len(entry.Value) + 16)

        mi.Next()
    }

    // 最後のSSTableを完了
    if currentWriter != nil {
        if err := currentWriter.Finalize(); err != nil {
            return nil, err
        }
        sst, err := OpenSSTable(currentWriter.file.Name())
        if err != nil {
            return nil, err
        }
        result = append(result, sst)
    }

    return result, nil
}

LSM-TreeにCompactionを統合

// lsm_compaction.go
package kvs

// doCompaction はコンパクションを実行(lsm.goの実装)
func (lsm *LSMTree) doCompaction(level int) {
    compactor := NewCompactor(lsm.dir)
    strategy := &LeveledCompaction{}

    job := strategy.SelectCompaction(lsm.levelManager)
    if job != nil {
        compactor.DoCompaction(lsm.levelManager, job)
    }
}

// ForceCompaction は強制的にコンパクションを実行
func (lsm *LSMTree) ForceCompaction() error {
    lsm.mu.Lock()
    defer lsm.mu.Unlock()

    for {
        level, needs := lsm.levelManager.NeedsCompaction()
        if !needs {
            break
        }
        lsm.doCompaction(level)
    }

    return nil
}

// GetStats は統計情報を返す
func (lsm *LSMTree) GetStats() Stats {
    lsm.mu.RLock()
    defer lsm.mu.RUnlock()

    stats := Stats{
        MemTableSize:  lsm.memtable.Size(),
        ImmutableCount: len(lsm.immutable),
        LevelStats:    make([]LevelStats, MaxLevels),
    }

    for i := 0; i < MaxLevels; i++ {
        level := lsm.levelManager.levels[i]
        stats.LevelStats[i] = LevelStats{
            Level:    i,
            SSTs:     len(level.sstables),
            Size:     level.size,
        }
        stats.TotalSSTs += len(level.sstables)
        stats.TotalSize += level.size
    }

    return stats
}

// Stats は統計情報
type Stats struct {
    MemTableSize   int64
    ImmutableCount int
    TotalSSTs      int
    TotalSize      int64
    LevelStats     []LevelStats
}

// LevelStats はレベルごとの統計
type LevelStats struct {
    Level int
    SSTs  int
    Size  int64
}

// PrintStats は統計情報を表示
func (s Stats) PrintStats() {
    fmt.Printf("MemTable Size: %d bytes\n", s.MemTableSize)
    fmt.Printf("Immutable MemTables: %d\n", s.ImmutableCount)
    fmt.Printf("Total SSTables: %d\n", s.TotalSSTs)
    fmt.Printf("Total Size: %d bytes\n\n", s.TotalSize)

    fmt.Println("Level Stats:")
    for _, ls := range s.LevelStats {
        fmt.Printf("  Level %d: %d SSTables, %d bytes\n",
            ls.Level, ls.SSTs, ls.Size)
    }
}

Tombstone GC(墓石のガベージコレクション)

削除マーカーを適切に回収するための仕組み。

// tombstone_gc.go
package kvs

import (
    "time"
)

const (
    TombstoneRetentionPeriod = 24 * time.Hour  // 墓石の保持期間
)

// TombstoneGC は墓石のガベージコレクション
type TombstoneGC struct {
    retention time.Duration
}

func NewTombstoneGC() *TombstoneGC {
    return &TombstoneGC{
        retention: TombstoneRetentionPeriod,
    }
}

// ShouldKeep は墓石を保持すべきか判定
func (gc *TombstoneGC) ShouldKeep(entry *Entry) bool {
    if !entry.Deleted {
        return true
    }

    // 保持期間を過ぎた墓石は削除可能
    age := time.Since(time.Unix(0, entry.Timestamp))
    return age < gc.retention
}

// 下位レベルに同じキーがある場合も墓石を保持
// (下位レベルの古い値を隠すため)
func (gc *TombstoneGC) ShouldKeepForLevel(entry *Entry, isBottomLevel bool) bool {
    if !entry.Deleted {
        return true
    }

    // 最下位レベルなら削除可能
    if isBottomLevel {
        age := time.Since(time.Unix(0, entry.Timestamp))
        return age < gc.retention
    }

    // 他のレベルなら保持
    return true
}

Compaction統計

// compaction_stats.go
package kvs

import (
    "sync/atomic"
    "time"
)

// CompactionStats はコンパクションの統計
type CompactionStats struct {
    TotalCompactions    int64
    BytesRead           int64
    BytesWritten        int64
    KeysProcessed       int64
    TombstonesRemoved   int64
    TotalDuration       time.Duration
    LastCompactionTime  time.Time
}

func (cs *CompactionStats) RecordCompaction(
    bytesRead, bytesWritten, keysProcessed, tombstonesRemoved int64,
    duration time.Duration,
) {
    atomic.AddInt64(&cs.TotalCompactions, 1)
    atomic.AddInt64(&cs.BytesRead, bytesRead)
    atomic.AddInt64(&cs.BytesWritten, bytesWritten)
    atomic.AddInt64(&cs.KeysProcessed, keysProcessed)
    atomic.AddInt64(&cs.TombstonesRemoved, tombstonesRemoved)
    cs.TotalDuration += duration
    cs.LastCompactionTime = time.Now()
}

func (cs *CompactionStats) WriteAmplification() float64 {
    if cs.BytesRead == 0 {
        return 0
    }
    return float64(cs.BytesWritten) / float64(cs.BytesRead)
}

func (cs *CompactionStats) Print() {
    fmt.Printf("Compaction Statistics:\n")
    fmt.Printf("  Total Compactions: %d\n", cs.TotalCompactions)
    fmt.Printf("  Bytes Read: %d\n", cs.BytesRead)
    fmt.Printf("  Bytes Written: %d\n", cs.BytesWritten)
    fmt.Printf("  Write Amplification: %.2f\n", cs.WriteAmplification())
    fmt.Printf("  Keys Processed: %d\n", cs.KeysProcessed)
    fmt.Printf("  Tombstones Removed: %d\n", cs.TombstonesRemoved)
    fmt.Printf("  Total Duration: %v\n", cs.TotalDuration)
    fmt.Printf("  Last Compaction: %v\n", cs.LastCompactionTime)
}

使用例

// example_compaction_test.go
package kvs

import (
    "fmt"
    "math/rand"
    "os"
    "testing"
)

func TestCompaction(t *testing.T) {
    dir := "./compaction_test"
    os.RemoveAll(dir)
    defer os.RemoveAll(dir)

    lsm, err := NewLSMTree(dir, DefaultOptions())
    if err != nil {
        t.Fatal(err)
    }

    // 大量のデータを書き込み
    fmt.Println("Writing 100,000 entries...")
    for i := 0; i < 100000; i++ {
        key := []byte(fmt.Sprintf("key-%08d", i))
        value := make([]byte, 100)
        rand.Read(value)
        lsm.Put(key, value)

        if i%10000 == 0 {
            fmt.Printf("  Written %d entries\n", i)
        }
    }

    fmt.Println("\nBefore compaction:")
    lsm.GetStats().PrintStats()

    // コンパクションを実行
    fmt.Println("\nRunning compaction...")
    lsm.ForceCompaction()

    fmt.Println("\nAfter compaction:")
    lsm.GetStats().PrintStats()

    // 削除してから再度コンパクション
    fmt.Println("\nDeleting 50% of entries...")
    for i := 0; i < 50000; i++ {
        key := []byte(fmt.Sprintf("key-%08d", i*2))
        lsm.Delete(key)
    }

    fmt.Println("\nBefore tombstone compaction:")
    lsm.GetStats().PrintStats()

    lsm.ForceCompaction()

    fmt.Println("\nAfter tombstone compaction:")
    lsm.GetStats().PrintStats()

    lsm.Close()
}

実行結果:

Writing 100,000 entries...
  Written 0 entries
  Written 10000 entries
  Written 20000 entries
  ...

Before compaction:
MemTable Size: 1234567 bytes
Immutable MemTables: 2
Total SSTables: 8
Total Size: 45678901 bytes

Level Stats:
  Level 0: 8 SSTables, 45678901 bytes
  Level 1: 0 SSTables, 0 bytes
  ...

Running compaction...

After compaction:
MemTable Size: 1234567 bytes
Immutable MemTables: 0
Total SSTables: 3
Total Size: 12345678 bytes

Level Stats:
  Level 0: 0 SSTables, 0 bytes
  Level 1: 3 SSTables, 12345678 bytes
  ...

まとめ

今回実装したこと:

コンポーネント 説明
Leveled Compaction レベルベースのコンパクション戦略
Merge Iterator 複数SSTableのマージ
Tombstone GC 削除マーカーの回収
Compaction Stats 統計情報の収集
┌─────────────────────────────────────────────────────────────────┐
│                    Compaction Process                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Level 0                                                        │
│  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                              │
│  │SST 1│ │SST 2│ │SST 3│ │SST 4│  (Overlapping)               │
│  └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘                              │
│     │       │       │       │                                   │
│     └───────┴───────┴───────┘                                   │
│                  │                                              │
│                  ▼  Merge & Sort                                │
│            ┌──────────────┐                                     │
│            │   Compact    │                                     │
│            └──────┬───────┘                                     │
│                   │                                             │
│                   ▼                                             │
│  Level 1                                                        │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐                          │
│  │  SST A  │ │  SST B  │ │  SST C  │  (Non-overlapping)       │
│  │[a-f]    │ │[g-m]    │ │[n-z]    │                          │
│  └─────────┘ └─────────┘ └─────────┘                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

次回は最終回、分散システムを実装するよ。複数ノードでのレプリケーションとシャーディングで、スケーラブルなKVSを作る。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

7
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?