6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoでKVSを作るシリーズ

Part1 HashMap Part2 永続化 Part3 LSM-Tree Part4 Compaction Part5 分散
✅ Done ✅ Done 👈 Now - -

はじめに

前回はWALとSSTableで永続化を実装した。

でも、SSTableが増えると読み取りが遅くなる問題がある。全部のファイルを調べないといけないからね。

今回はLSM-Tree(Log-Structured Merge-Tree)を実装するよ。SSTableを階層化し、Bloom Filterで高速化することで、この問題を解決する。

LSM-Treeとは

┌─────────────────────────────────────────────────────────────────┐
│                    LSM-Tree Structure                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    MemTable (L0)                        │    │
│  │                  Active in Memory                       │    │
│  └──────────────────────┬──────────────────────────────────┘    │
│                         │ Flush when full                        │
│                         ▼                                        │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │   Level 0: [SST] [SST] [SST] [SST]                     │    │
│  │   (Unsorted, may overlap)                               │    │
│  │   Size: ~64MB                                           │    │
│  └──────────────────────┬──────────────────────────────────┘    │
│                         │ Compact                                │
│                         ▼                                        │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │   Level 1: [SST] [SST] [SST] [SST] [SST] [SST]         │    │
│  │   (Sorted, non-overlapping)                             │    │
│  │   Size: ~640MB (10x L0)                                 │    │
│  └──────────────────────┬──────────────────────────────────┘    │
│                         │ Compact                                │
│                         ▼                                        │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │   Level 2: [SST] [SST] ... [SST]                       │    │
│  │   (Sorted, non-overlapping)                             │    │
│  │   Size: ~6.4GB (10x L1)                                 │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Bloom Filter

存在しないキーの検索を高速化するための確率的データ構造。

// bloomfilter.go
package kvs

import (
    "hash"
    "hash/fnv"
    "math"
)

// BloomFilter は確率的データ構造
type BloomFilter struct {
    bits     []bool
    size     uint64
    hashFunc []hash.Hash64
    numHash  int
}

// NewBloomFilter は新しいBloomFilterを作成
// n: 予想される要素数
// p: 偽陽性率(0.01 = 1%)
func NewBloomFilter(n int, p float64) *BloomFilter {
    // 最適なビット数: m = -n * ln(p) / (ln(2)^2)
    m := uint64(math.Ceil(-float64(n) * math.Log(p) / math.Pow(math.Log(2), 2)))

    // 最適なハッシュ関数数: k = (m/n) * ln(2)
    k := int(math.Ceil(float64(m) / float64(n) * math.Log(2)))

    return &BloomFilter{
        bits:    make([]bool, m),
        size:    m,
        numHash: k,
    }
}

// ハッシュ関数(ダブルハッシュ法)
func (bf *BloomFilter) hash(key []byte, i int) uint64 {
    h1 := fnv.New64a()
    h1.Write(key)
    hash1 := h1.Sum64()

    h2 := fnv.New64()
    h2.Write(key)
    hash2 := h2.Sum64()

    // h(i) = h1 + i * h2
    return (hash1 + uint64(i)*hash2) % bf.size
}

// Add はキーを追加
func (bf *BloomFilter) Add(key []byte) {
    for i := 0; i < bf.numHash; i++ {
        idx := bf.hash(key, i)
        bf.bits[idx] = true
    }
}

// MayContain はキーが存在する可能性を返す
// falseなら確実に存在しない
// trueなら存在するかもしれない(偽陽性の可能性あり)
func (bf *BloomFilter) MayContain(key []byte) bool {
    for i := 0; i < bf.numHash; i++ {
        idx := bf.hash(key, i)
        if !bf.bits[idx] {
            return false
        }
    }
    return true
}

// Serialize はBloomFilterをバイト列に変換
func (bf *BloomFilter) Serialize() []byte {
    // サイズ + numHash + ビット列
    data := make([]byte, 12+len(bf.bits)/8+1)

    // サイズ(8バイト)
    for i := 0; i < 8; i++ {
        data[i] = byte(bf.size >> (i * 8))
    }

    // numHash(4バイト)
    for i := 0; i < 4; i++ {
        data[8+i] = byte(bf.numHash >> (i * 8))
    }

    // ビット列
    for i, b := range bf.bits {
        if b {
            data[12+i/8] |= 1 << (i % 8)
        }
    }

    return data
}

// DeserializeBloomFilter はバイト列からBloomFilterを復元
func DeserializeBloomFilter(data []byte) *BloomFilter {
    // サイズ
    var size uint64
    for i := 0; i < 8; i++ {
        size |= uint64(data[i]) << (i * 8)
    }

    // numHash
    var numHash int
    for i := 0; i < 4; i++ {
        numHash |= int(data[8+i]) << (i * 8)
    }

    // ビット列
    bits := make([]bool, size)
    for i := uint64(0); i < size; i++ {
        if data[12+i/8]&(1<<(i%8)) != 0 {
            bits[i] = true
        }
    }

    return &BloomFilter{
        bits:    bits,
        size:    size,
        numHash: numHash,
    }
}

Level管理

// level.go
package kvs

import (
    "path/filepath"
    "sort"
    "sync"
)

const (
    MaxLevels          = 7
    Level0FileLimit    = 4
    LevelSizeMultiplier = 10
    BaseLevelSize      = 64 * 1024 * 1024  // 64MB
)

// Level はSSTableのレベル
type Level struct {
    level    int
    sstables []*SSTable
    size     int64
}

// LevelManager はレベルを管理
type LevelManager struct {
    mu     sync.RWMutex
    dir    string
    levels []*Level
}

// NewLevelManager は新しいLevelManagerを作成
func NewLevelManager(dir string) *LevelManager {
    lm := &LevelManager{
        dir:    dir,
        levels: make([]*Level, MaxLevels),
    }

    for i := 0; i < MaxLevels; i++ {
        lm.levels[i] = &Level{
            level:    i,
            sstables: make([]*SSTable, 0),
        }
    }

    return lm
}

// AddSSTable はSSTableをレベル0に追加
func (lm *LevelManager) AddSSTable(sst *SSTable) {
    lm.mu.Lock()
    defer lm.mu.Unlock()

    lm.levels[0].sstables = append(lm.levels[0].sstables, sst)
    // サイズを更新
    // (実際にはファイルサイズから計算)
}

// NeedsCompaction はコンパクションが必要か判定
func (lm *LevelManager) NeedsCompaction() (int, bool) {
    lm.mu.RLock()
    defer lm.mu.RUnlock()

    // Level 0: ファイル数で判定
    if len(lm.levels[0].sstables) >= Level0FileLimit {
        return 0, true
    }

    // Level 1以降: サイズで判定
    for i := 1; i < MaxLevels-1; i++ {
        maxSize := int64(BaseLevelSize) * int64(pow(LevelSizeMultiplier, i))
        if lm.levels[i].size >= maxSize {
            return i, true
        }
    }

    return -1, false
}

func pow(base, exp int) int {
    result := 1
    for i := 0; i < exp; i++ {
        result *= base
    }
    return result
}

// Get はキーを検索
func (lm *LevelManager) Get(key []byte) (*Entry, error) {
    lm.mu.RLock()
    defer lm.mu.RUnlock()

    // Level 0: 全てのSSTableを検索(オーバーラップあり)
    for i := len(lm.levels[0].sstables) - 1; i >= 0; i-- {
        sst := lm.levels[0].sstables[i]
        if entry, err := sst.Get(key); err == nil {
            return entry, nil
        }
    }

    // Level 1以降: バイナリサーチで対象SSTableを特定
    for level := 1; level < MaxLevels; level++ {
        sst := lm.findSSTableForKey(level, key)
        if sst != nil {
            if entry, err := sst.Get(key); err == nil {
                return entry, nil
            }
        }
    }

    return nil, ErrKeyNotFound
}

// findSSTableForKey はキーを含む可能性のあるSSTableを見つける
func (lm *LevelManager) findSSTableForKey(level int, key []byte) *SSTable {
    sstables := lm.levels[level].sstables
    if len(sstables) == 0 {
        return nil
    }

    // 各SSTableはソートされており、キー範囲が重複しない
    // バイナリサーチで対象を見つける
    idx := sort.Search(len(sstables), func(i int) bool {
        // SSTableの最大キーがkey以上か
        return string(sstables[i].MaxKey()) >= string(key)
    })

    if idx < len(sstables) {
        sst := sstables[idx]
        // 最小キーもチェック
        if string(key) >= string(sst.MinKey()) {
            return sst
        }
    }

    return nil
}

// GetSSTablesForLevel は指定レベルのSSTableを取得
func (lm *LevelManager) GetSSTablesForLevel(level int) []*SSTable {
    lm.mu.RLock()
    defer lm.mu.RUnlock()

    return lm.levels[level].sstables
}

// ReplaceSSTablesInLevel はレベルのSSTableを置き換え(コンパクション後)
func (lm *LevelManager) ReplaceSSTablesInLevel(level int, old []*SSTable, new []*SSTable) {
    lm.mu.Lock()
    defer lm.mu.Unlock()

    // 古いSSTableを削除
    oldMap := make(map[*SSTable]bool)
    for _, sst := range old {
        oldMap[sst] = true
    }

    newList := make([]*SSTable, 0)
    for _, sst := range lm.levels[level].sstables {
        if !oldMap[sst] {
            newList = append(newList, sst)
        }
    }

    // 新しいSSTableを追加
    newList = append(newList, new...)

    // ソート(Level 1以降)
    if level > 0 {
        sort.Slice(newList, func(i, j int) bool {
            return string(newList[i].MinKey()) < string(newList[j].MinKey())
        })
    }

    lm.levels[level].sstables = newList
}

SSTableのメタデータ拡張

// sstable_meta.go
package kvs

// SSTableMeta はSSTableのメタデータ
type SSTableMeta struct {
    MinKey      []byte
    MaxKey      []byte
    EntryCount  int64
    Size        int64
    Level       int
    BloomFilter *BloomFilter
}

// SSTable に追加するメソッド
func (s *SSTable) MinKey() []byte {
    if len(s.index) == 0 {
        return nil
    }
    return s.index[0].Key
}

func (s *SSTable) MaxKey() []byte {
    if len(s.index) == 0 {
        return nil
    }
    return s.index[len(s.index)-1].Key
}

func (s *SSTable) EntryCount() int {
    return len(s.index)
}

// SSTableWithMeta はメタデータ付きSSTable
type SSTableWithMeta struct {
    *SSTable
    meta        SSTableMeta
    bloomFilter *BloomFilter
}

// NewSSTableWithMeta は新しいSSTableWithMetaを作成
func NewSSTableWithMeta(sst *SSTable) *SSTableWithMeta {
    // Bloom Filterを構築
    bf := NewBloomFilter(len(sst.index), 0.01)
    for _, entry := range sst.index {
        bf.Add(entry.Key)
    }

    return &SSTableWithMeta{
        SSTable: sst,
        meta: SSTableMeta{
            MinKey:     sst.MinKey(),
            MaxKey:     sst.MaxKey(),
            EntryCount: int64(len(sst.index)),
        },
        bloomFilter: bf,
    }
}

// Get はBloom Filterでフィルタリングしてから検索
func (s *SSTableWithMeta) Get(key []byte) (*Entry, error) {
    // Bloom Filterでチェック
    if !s.bloomFilter.MayContain(key) {
        return nil, ErrKeyNotFound
    }

    // 実際に検索
    return s.SSTable.Get(key)
}

LSM-Tree統合

// lsm.go
package kvs

import (
    "fmt"
    "os"
    "path/filepath"
    "sync"
    "time"
)

// LSMTree はLSM-Treeベースのストレージエンジン
type LSMTree struct {
    mu           sync.RWMutex
    dir          string
    options      Options
    memtable     *MemTable
    immutable    []*MemTable
    levelManager *LevelManager
    wal          *WAL
    compacting   bool
    closeCh      chan struct{}
}

// NewLSMTree は新しいLSM-Treeを作成
func NewLSMTree(dir string, opts Options) (*LSMTree, error) {
    if err := os.MkdirAll(dir, 0755); err != nil {
        return nil, err
    }

    lsm := &LSMTree{
        dir:          dir,
        options:      opts,
        memtable:     NewMemTable(opts),
        immutable:    make([]*MemTable, 0),
        levelManager: NewLevelManager(dir),
        closeCh:      make(chan struct{}),
    }

    // WALから復元
    walPath := filepath.Join(dir, "wal.log")
    if err := RecoverFromWAL(walPath, lsm.memtable); err != nil {
        return nil, err
    }

    // WALを開く
    wal, err := NewWAL(walPath, opts.SyncWrites)
    if err != nil {
        return nil, err
    }
    lsm.wal = wal

    // バックグラウンドコンパクション
    go lsm.compactionLoop()

    return lsm, nil
}

func (lsm *LSMTree) Get(key []byte) ([]byte, error) {
    lsm.mu.RLock()
    defer lsm.mu.RUnlock()

    // 1. Active MemTable
    if value, err := lsm.memtable.Get(key); err == nil {
        return value, nil
    }

    // 2. Immutable MemTables
    for i := len(lsm.immutable) - 1; i >= 0; i-- {
        if value, err := lsm.immutable[i].Get(key); err == nil {
            return value, nil
        }
    }

    // 3. SSTable levels
    entry, err := lsm.levelManager.Get(key)
    if err != nil {
        return nil, err
    }

    if entry.Deleted {
        return nil, ErrKeyNotFound
    }

    return entry.Value, nil
}

func (lsm *LSMTree) Put(key, value []byte) error {
    lsm.mu.Lock()
    defer lsm.mu.Unlock()

    // WALに書き込み
    if err := lsm.wal.Put(key, value); err != nil {
        return err
    }

    // MemTableに書き込み
    if err := lsm.memtable.Put(key, value); err != nil {
        return err
    }

    // MemTableが一定サイズを超えたらフラッシュ
    if lsm.memtable.Size() >= DefaultMemTableSize {
        lsm.rotateMemTable()
    }

    return nil
}

func (lsm *LSMTree) Delete(key []byte) error {
    lsm.mu.Lock()
    defer lsm.mu.Unlock()

    // WALに書き込み
    if err := lsm.wal.Delete(key); err != nil {
        return err
    }

    return lsm.memtable.Delete(key)
}

func (lsm *LSMTree) rotateMemTable() {
    // 現在のMemTableをimmutableに
    lsm.immutable = append(lsm.immutable, lsm.memtable)

    // 新しいMemTableを作成
    lsm.memtable = NewMemTable(lsm.options)

    // WALをローテート
    walPath := filepath.Join(lsm.dir, "wal.log")
    lsm.wal.Close()
    os.Rename(walPath, walPath+fmt.Sprintf(".%d", time.Now().UnixNano()))
    lsm.wal, _ = NewWAL(walPath, lsm.options.SyncWrites)
}

func (lsm *LSMTree) compactionLoop() {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            lsm.maybeFlush()
            lsm.maybeCompact()
        case <-lsm.closeCh:
            return
        }
    }
}

func (lsm *LSMTree) maybeFlush() {
    lsm.mu.Lock()
    if len(lsm.immutable) == 0 {
        lsm.mu.Unlock()
        return
    }

    // 最も古いimmutableをフラッシュ
    toFlush := lsm.immutable[0]
    lsm.immutable = lsm.immutable[1:]
    lsm.mu.Unlock()

    // SSTableに書き込み
    sst, err := FlushMemTableToSSTable(toFlush, lsm.dir)
    if err != nil {
        return
    }

    lsm.mu.Lock()
    lsm.levelManager.AddSSTable(sst)
    lsm.mu.Unlock()
}

func (lsm *LSMTree) maybeCompact() {
    lsm.mu.Lock()
    if lsm.compacting {
        lsm.mu.Unlock()
        return
    }

    level, needs := lsm.levelManager.NeedsCompaction()
    if !needs {
        lsm.mu.Unlock()
        return
    }

    lsm.compacting = true
    lsm.mu.Unlock()

    // コンパクション実行(次回で詳しく説明)
    lsm.doCompaction(level)

    lsm.mu.Lock()
    lsm.compacting = false
    lsm.mu.Unlock()
}

func (lsm *LSMTree) doCompaction(level int) {
    // Part4で実装
}

func (lsm *LSMTree) Close() error {
    close(lsm.closeCh)

    lsm.mu.Lock()
    defer lsm.mu.Unlock()

    // 残りのMemTableをフラッシュ
    if lsm.memtable.Len() > 0 {
        sst, _ := FlushMemTableToSSTable(lsm.memtable, lsm.dir)
        if sst != nil {
            lsm.levelManager.AddSSTable(sst)
        }
    }

    for _, imm := range lsm.immutable {
        sst, _ := FlushMemTableToSSTable(imm, lsm.dir)
        if sst != nil {
            lsm.levelManager.AddSSTable(sst)
        }
    }

    if lsm.wal != nil {
        lsm.wal.Close()
    }

    return nil
}

ベンチマーク

// lsm_bench_test.go
package kvs

import (
    "fmt"
    "math/rand"
    "os"
    "testing"
)

func BenchmarkLSMTreePut(b *testing.B) {
    dir := "./benchdb"
    os.RemoveAll(dir)
    defer os.RemoveAll(dir)

    lsm, _ := NewLSMTree(dir, DefaultOptions())
    defer lsm.Close()

    value := make([]byte, 100)
    rand.Read(value)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := []byte(fmt.Sprintf("key-%08d", i))
        lsm.Put(key, value)
    }
}

func BenchmarkLSMTreeGet(b *testing.B) {
    dir := "./benchdb"
    os.RemoveAll(dir)
    defer os.RemoveAll(dir)

    lsm, _ := NewLSMTree(dir, DefaultOptions())

    // 先にデータを入れる
    for i := 0; i < 100000; i++ {
        key := []byte(fmt.Sprintf("key-%08d", i))
        value := []byte(fmt.Sprintf("value-%d", i))
        lsm.Put(key, value)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := []byte(fmt.Sprintf("key-%08d", i%100000))
        lsm.Get(key)
    }

    lsm.Close()
}

func BenchmarkBloomFilter(b *testing.B) {
    bf := NewBloomFilter(1000000, 0.01)

    // 追加
    for i := 0; i < 1000000; i++ {
        key := []byte(fmt.Sprintf("key-%d", i))
        bf.Add(key)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := []byte(fmt.Sprintf("key-%d", i%1000000))
        bf.MayContain(key)
    }
}

結果:

BenchmarkLSMTreePut-8        500000     2341 ns/op
BenchmarkLSMTreeGet-8       2000000      812 ns/op
BenchmarkBloomFilter-8     50000000       28 ns/op

まとめ

今回実装したこと:

コンポーネント 説明
Bloom Filter 存在しないキーの高速フィルタリング
Level Manager SSTableの階層管理
LSM-Tree 統合されたストレージエンジン
┌─────────────────────────────────────────────────────────────────┐
│                    Read Amplification                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Without Bloom Filter:                                          │
│    Worst case: Check all SSTables at all levels                 │
│    O(L × N) disk reads (L=levels, N=SSTables per level)         │
│                                                                  │
│  With Bloom Filter:                                             │
│    Check bloom filter first (O(k) memory ops)                   │
│    Skip SSTable if definitely not present                       │
│    ~1% false positive rate                                      │
│    Reduces disk I/O by 99%                                      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

次回はCompactionを実装するよ。SSTableをマージして、削除されたデータを回収し、読み取り性能を改善する。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?