GoでKVSを作るシリーズ
| Part1 HashMap | Part2 永続化 | Part3 LSM-Tree | Part4 Compaction | Part5 分散 |
|---|---|---|---|---|
| ✅ Done | ✅ Done | ✅ Done | 👈 Now | - |
はじめに
前回はLSM-TreeとBloom Filterを実装した。
でもこれ、使い続けるとSSTableがどんどん増えて、ディスクがパンクするよね。削除したはずのデータも実際には残ってるし。
今回はCompactionを実装するよ。SSTableをマージして古いデータを削除し、ストレージ効率と読み取り性能を改善する。
Compactionとは
┌─────────────────────────────────────────────────────────────────┐
│ Why Compaction? │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Problems without Compaction: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 1. Space Amplification │ │
│ │ - Old versions of keys consume disk space │ │
│ │ - Deleted keys still take space (tombstones) │ │
│ │ │ │
│ │ 2. Read Amplification │ │
│ │ - Multiple SSTables may contain same key │ │
│ │ - Must check many files to find latest version │ │
│ │ │ │
│ │ 3. Unbounded SSTable Count │ │
│ │ - Level 0 keeps growing │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Compaction merges SSTables: │
│ - Removes old versions of keys │
│ - Removes tombstones (deleted keys) │
│ - Reduces number of files to search │
│ │
└─────────────────────────────────────────────────────────────────┘
Compaction戦略
Leveled Compaction(LevelDB/RocksDBスタイル)
// compaction.go
package kvs
import (
"fmt"
"os"
"path/filepath"
"sort"
"time"
)
// CompactionStrategy はコンパクション戦略
type CompactionStrategy interface {
SelectCompaction(lm *LevelManager) *CompactionJob
}
// CompactionJob はコンパクションジョブ
type CompactionJob struct {
Level int
InputSSTs []*SSTable
TargetLevel int
}
// LeveledCompaction はレベルベースのコンパクション
type LeveledCompaction struct{}
func (lc *LeveledCompaction) SelectCompaction(lm *LevelManager) *CompactionJob {
lm.mu.RLock()
defer lm.mu.RUnlock()
// Level 0のコンパクションを優先
if len(lm.levels[0].sstables) >= Level0FileLimit {
return &CompactionJob{
Level: 0,
InputSSTs: lm.levels[0].sstables,
TargetLevel: 1,
}
}
// 他のレベルをチェック
for level := 1; level < MaxLevels-1; level++ {
maxSize := int64(BaseLevelSize) * int64(pow(LevelSizeMultiplier, level))
if lm.levels[level].size >= maxSize {
// 最も古いSSTableを選択
if len(lm.levels[level].sstables) > 0 {
return &CompactionJob{
Level: level,
InputSSTs: []*SSTable{lm.levels[level].sstables[0]},
TargetLevel: level + 1,
}
}
}
}
return nil
}
マージイテレータ
複数のSSTableをマージするためのイテレータ。
// merge_iterator.go
package kvs
import (
"bytes"
"container/heap"
)
// MergeIterator は複数のイテレータをマージ
type MergeIterator struct {
iterators []SSTableIterator
heap *iteratorHeap
}
// SSTableIterator はSSTableを走査するイテレータ
type SSTableIterator struct {
sst *SSTable
index int
current *Entry
valid bool
}
func NewSSTableIterator(sst *SSTable) *SSTableIterator {
it := &SSTableIterator{
sst: sst,
index: 0,
}
it.loadCurrent()
return it
}
func (it *SSTableIterator) loadCurrent() {
if it.index < len(it.sst.index) {
entry, err := it.sst.Get(it.sst.index[it.index].Key)
if err == nil {
it.current = entry
it.valid = true
return
}
}
it.valid = false
it.current = nil
}
func (it *SSTableIterator) Valid() bool {
return it.valid
}
func (it *SSTableIterator) Next() {
it.index++
it.loadCurrent()
}
func (it *SSTableIterator) Key() []byte {
if it.current != nil {
return it.current.Key
}
return nil
}
func (it *SSTableIterator) Value() *Entry {
return it.current
}
// iteratorHeap はヒープ用の構造体
type iteratorHeap struct {
items []*SSTableIterator
}
func (h *iteratorHeap) Len() int {
return len(h.items)
}
func (h *iteratorHeap) Less(i, j int) bool {
return bytes.Compare(h.items[i].Key(), h.items[j].Key()) < 0
}
func (h *iteratorHeap) Swap(i, j int) {
h.items[i], h.items[j] = h.items[j], h.items[i]
}
func (h *iteratorHeap) Push(x interface{}) {
h.items = append(h.items, x.(*SSTableIterator))
}
func (h *iteratorHeap) Pop() interface{} {
old := h.items
n := len(old)
x := old[n-1]
h.items = old[:n-1]
return x
}
// NewMergeIterator は新しいMergeIteratorを作成
func NewMergeIterator(sstables []*SSTable) *MergeIterator {
h := &iteratorHeap{
items: make([]*SSTableIterator, 0),
}
for _, sst := range sstables {
it := NewSSTableIterator(sst)
if it.Valid() {
h.items = append(h.items, it)
}
}
heap.Init(h)
return &MergeIterator{
heap: h,
}
}
func (mi *MergeIterator) Valid() bool {
return mi.heap.Len() > 0
}
func (mi *MergeIterator) Key() []byte {
if !mi.Valid() {
return nil
}
return mi.heap.items[0].Key()
}
func (mi *MergeIterator) Value() *Entry {
if !mi.Valid() {
return nil
}
return mi.heap.items[0].Value()
}
func (mi *MergeIterator) Next() {
if !mi.Valid() {
return
}
// 現在のキーを記録
currentKey := mi.Key()
// 同じキーを持つ全てのイテレータを進める
for mi.Valid() && bytes.Equal(mi.Key(), currentKey) {
it := heap.Pop(mi.heap).(*SSTableIterator)
it.Next()
if it.Valid() {
heap.Push(mi.heap, it)
}
}
}
func (mi *MergeIterator) Close() {
mi.heap = nil
}
Compaction実行
// compactor.go
package kvs
import (
"fmt"
"os"
"path/filepath"
"time"
)
// Compactor はコンパクションを実行
type Compactor struct {
dir string
strategy CompactionStrategy
}
// NewCompactor は新しいCompactorを作成
func NewCompactor(dir string) *Compactor {
return &Compactor{
dir: dir,
strategy: &LeveledCompaction{},
}
}
// DoCompaction はコンパクションを実行
func (c *Compactor) DoCompaction(lm *LevelManager, job *CompactionJob) error {
if job == nil {
return nil
}
// ターゲットレベルからオーバーラップするSSTableを選択
targetSSTs := c.findOverlappingSSTs(lm, job)
// 全ての入力SSTableをマージ
allSSTs := append(job.InputSSTs, targetSSTs...)
// 新しいSSTableを作成
newSSTs, err := c.mergeSSTs(allSSTs)
if err != nil {
return err
}
// LevelManagerを更新
lm.ReplaceSSTablesInLevel(job.Level, job.InputSSTs, nil)
lm.ReplaceSSTablesInLevel(job.TargetLevel, targetSSTs, newSSTs)
// 古いSSTableファイルを削除
for _, sst := range allSSTs {
sst.Close()
os.Remove(sst.path)
}
return nil
}
// findOverlappingSSTs はターゲットレベルでオーバーラップするSSTableを見つける
func (c *Compactor) findOverlappingSSTs(lm *LevelManager, job *CompactionJob) []*SSTable {
// 入力SSTableのキー範囲を計算
var minKey, maxKey []byte
for _, sst := range job.InputSSTs {
if minKey == nil || bytes.Compare(sst.MinKey(), minKey) < 0 {
minKey = sst.MinKey()
}
if maxKey == nil || bytes.Compare(sst.MaxKey(), maxKey) > 0 {
maxKey = sst.MaxKey()
}
}
// ターゲットレベルでオーバーラップするSSTableを見つける
lm.mu.RLock()
defer lm.mu.RUnlock()
result := make([]*SSTable, 0)
for _, sst := range lm.levels[job.TargetLevel].sstables {
// キー範囲がオーバーラップするかチェック
if bytes.Compare(sst.MaxKey(), minKey) >= 0 &&
bytes.Compare(sst.MinKey(), maxKey) <= 0 {
result = append(result, sst)
}
}
return result
}
// mergeSSTs はSSTableをマージ
func (c *Compactor) mergeSSTs(sstables []*SSTable) ([]*SSTable, error) {
if len(sstables) == 0 {
return nil, nil
}
// マージイテレータを作成
mi := NewMergeIterator(sstables)
defer mi.Close()
result := make([]*SSTable, 0)
var currentWriter *SSTableWriter
var currentSize int64
const maxSSTableSize = 64 * 1024 * 1024 // 64MB
for mi.Valid() {
entry := mi.Value()
// 削除されたエントリはスキップ(一定期間経過後)
// ここでは簡略化のため即座にスキップ
if entry.Deleted {
mi.Next()
continue
}
// 新しいSSTableを開始
if currentWriter == nil || currentSize >= maxSSTableSize {
if currentWriter != nil {
// 現在のSSTableを完了
if err := currentWriter.Finalize(); err != nil {
return nil, err
}
sst, err := OpenSSTable(currentWriter.file.Name())
if err != nil {
return nil, err
}
result = append(result, sst)
}
// 新しいSSTableを作成
filename := fmt.Sprintf("sst_%d.db", time.Now().UnixNano())
path := filepath.Join(c.dir, filename)
var err error
currentWriter, err = NewSSTableWriter(path)
if err != nil {
return nil, err
}
currentSize = 0
}
// エントリを書き込み
if err := currentWriter.WriteEntry(entry); err != nil {
return nil, err
}
currentSize += int64(len(entry.Key) + len(entry.Value) + 16)
mi.Next()
}
// 最後のSSTableを完了
if currentWriter != nil {
if err := currentWriter.Finalize(); err != nil {
return nil, err
}
sst, err := OpenSSTable(currentWriter.file.Name())
if err != nil {
return nil, err
}
result = append(result, sst)
}
return result, nil
}
LSM-TreeにCompactionを統合
// lsm_compaction.go
package kvs
// doCompaction はコンパクションを実行(lsm.goの実装)
func (lsm *LSMTree) doCompaction(level int) {
compactor := NewCompactor(lsm.dir)
strategy := &LeveledCompaction{}
job := strategy.SelectCompaction(lsm.levelManager)
if job != nil {
compactor.DoCompaction(lsm.levelManager, job)
}
}
// ForceCompaction は強制的にコンパクションを実行
func (lsm *LSMTree) ForceCompaction() error {
lsm.mu.Lock()
defer lsm.mu.Unlock()
for {
level, needs := lsm.levelManager.NeedsCompaction()
if !needs {
break
}
lsm.doCompaction(level)
}
return nil
}
// GetStats は統計情報を返す
func (lsm *LSMTree) GetStats() Stats {
lsm.mu.RLock()
defer lsm.mu.RUnlock()
stats := Stats{
MemTableSize: lsm.memtable.Size(),
ImmutableCount: len(lsm.immutable),
LevelStats: make([]LevelStats, MaxLevels),
}
for i := 0; i < MaxLevels; i++ {
level := lsm.levelManager.levels[i]
stats.LevelStats[i] = LevelStats{
Level: i,
SSTs: len(level.sstables),
Size: level.size,
}
stats.TotalSSTs += len(level.sstables)
stats.TotalSize += level.size
}
return stats
}
// Stats は統計情報
type Stats struct {
MemTableSize int64
ImmutableCount int
TotalSSTs int
TotalSize int64
LevelStats []LevelStats
}
// LevelStats はレベルごとの統計
type LevelStats struct {
Level int
SSTs int
Size int64
}
// PrintStats は統計情報を表示
func (s Stats) PrintStats() {
fmt.Printf("MemTable Size: %d bytes\n", s.MemTableSize)
fmt.Printf("Immutable MemTables: %d\n", s.ImmutableCount)
fmt.Printf("Total SSTables: %d\n", s.TotalSSTs)
fmt.Printf("Total Size: %d bytes\n\n", s.TotalSize)
fmt.Println("Level Stats:")
for _, ls := range s.LevelStats {
fmt.Printf(" Level %d: %d SSTables, %d bytes\n",
ls.Level, ls.SSTs, ls.Size)
}
}
Tombstone GC(墓石のガベージコレクション)
削除マーカーを適切に回収するための仕組み。
// tombstone_gc.go
package kvs
import (
"time"
)
const (
TombstoneRetentionPeriod = 24 * time.Hour // 墓石の保持期間
)
// TombstoneGC は墓石のガベージコレクション
type TombstoneGC struct {
retention time.Duration
}
func NewTombstoneGC() *TombstoneGC {
return &TombstoneGC{
retention: TombstoneRetentionPeriod,
}
}
// ShouldKeep は墓石を保持すべきか判定
func (gc *TombstoneGC) ShouldKeep(entry *Entry) bool {
if !entry.Deleted {
return true
}
// 保持期間を過ぎた墓石は削除可能
age := time.Since(time.Unix(0, entry.Timestamp))
return age < gc.retention
}
// 下位レベルに同じキーがある場合も墓石を保持
// (下位レベルの古い値を隠すため)
func (gc *TombstoneGC) ShouldKeepForLevel(entry *Entry, isBottomLevel bool) bool {
if !entry.Deleted {
return true
}
// 最下位レベルなら削除可能
if isBottomLevel {
age := time.Since(time.Unix(0, entry.Timestamp))
return age < gc.retention
}
// 他のレベルなら保持
return true
}
Compaction統計
// compaction_stats.go
package kvs
import (
"sync/atomic"
"time"
)
// CompactionStats はコンパクションの統計
type CompactionStats struct {
TotalCompactions int64
BytesRead int64
BytesWritten int64
KeysProcessed int64
TombstonesRemoved int64
TotalDuration time.Duration
LastCompactionTime time.Time
}
func (cs *CompactionStats) RecordCompaction(
bytesRead, bytesWritten, keysProcessed, tombstonesRemoved int64,
duration time.Duration,
) {
atomic.AddInt64(&cs.TotalCompactions, 1)
atomic.AddInt64(&cs.BytesRead, bytesRead)
atomic.AddInt64(&cs.BytesWritten, bytesWritten)
atomic.AddInt64(&cs.KeysProcessed, keysProcessed)
atomic.AddInt64(&cs.TombstonesRemoved, tombstonesRemoved)
cs.TotalDuration += duration
cs.LastCompactionTime = time.Now()
}
func (cs *CompactionStats) WriteAmplification() float64 {
if cs.BytesRead == 0 {
return 0
}
return float64(cs.BytesWritten) / float64(cs.BytesRead)
}
func (cs *CompactionStats) Print() {
fmt.Printf("Compaction Statistics:\n")
fmt.Printf(" Total Compactions: %d\n", cs.TotalCompactions)
fmt.Printf(" Bytes Read: %d\n", cs.BytesRead)
fmt.Printf(" Bytes Written: %d\n", cs.BytesWritten)
fmt.Printf(" Write Amplification: %.2f\n", cs.WriteAmplification())
fmt.Printf(" Keys Processed: %d\n", cs.KeysProcessed)
fmt.Printf(" Tombstones Removed: %d\n", cs.TombstonesRemoved)
fmt.Printf(" Total Duration: %v\n", cs.TotalDuration)
fmt.Printf(" Last Compaction: %v\n", cs.LastCompactionTime)
}
使用例
// example_compaction_test.go
package kvs
import (
"fmt"
"math/rand"
"os"
"testing"
)
func TestCompaction(t *testing.T) {
dir := "./compaction_test"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
lsm, err := NewLSMTree(dir, DefaultOptions())
if err != nil {
t.Fatal(err)
}
// 大量のデータを書き込み
fmt.Println("Writing 100,000 entries...")
for i := 0; i < 100000; i++ {
key := []byte(fmt.Sprintf("key-%08d", i))
value := make([]byte, 100)
rand.Read(value)
lsm.Put(key, value)
if i%10000 == 0 {
fmt.Printf(" Written %d entries\n", i)
}
}
fmt.Println("\nBefore compaction:")
lsm.GetStats().PrintStats()
// コンパクションを実行
fmt.Println("\nRunning compaction...")
lsm.ForceCompaction()
fmt.Println("\nAfter compaction:")
lsm.GetStats().PrintStats()
// 削除してから再度コンパクション
fmt.Println("\nDeleting 50% of entries...")
for i := 0; i < 50000; i++ {
key := []byte(fmt.Sprintf("key-%08d", i*2))
lsm.Delete(key)
}
fmt.Println("\nBefore tombstone compaction:")
lsm.GetStats().PrintStats()
lsm.ForceCompaction()
fmt.Println("\nAfter tombstone compaction:")
lsm.GetStats().PrintStats()
lsm.Close()
}
実行結果:
Writing 100,000 entries...
Written 0 entries
Written 10000 entries
Written 20000 entries
...
Before compaction:
MemTable Size: 1234567 bytes
Immutable MemTables: 2
Total SSTables: 8
Total Size: 45678901 bytes
Level Stats:
Level 0: 8 SSTables, 45678901 bytes
Level 1: 0 SSTables, 0 bytes
...
Running compaction...
After compaction:
MemTable Size: 1234567 bytes
Immutable MemTables: 0
Total SSTables: 3
Total Size: 12345678 bytes
Level Stats:
Level 0: 0 SSTables, 0 bytes
Level 1: 3 SSTables, 12345678 bytes
...
まとめ
今回実装したこと:
| コンポーネント | 説明 |
|---|---|
| Leveled Compaction | レベルベースのコンパクション戦略 |
| Merge Iterator | 複数SSTableのマージ |
| Tombstone GC | 削除マーカーの回収 |
| Compaction Stats | 統計情報の収集 |
┌─────────────────────────────────────────────────────────────────┐
│ Compaction Process │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Level 0 │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │SST 1│ │SST 2│ │SST 3│ │SST 4│ (Overlapping) │
│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │ │
│ └───────┴───────┴───────┘ │
│ │ │
│ ▼ Merge & Sort │
│ ┌──────────────┐ │
│ │ Compact │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ Level 1 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ SST A │ │ SST B │ │ SST C │ (Non-overlapping) │
│ │[a-f] │ │[g-m] │ │[n-z] │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
次回は最終回、分散システムを実装するよ。複数ノードでのレプリケーションとシャーディングで、スケーラブルなKVSを作る。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!