GoでKVSを作るシリーズ
| Part1 HashMap | Part2 永続化 | Part3 LSM-Tree | Part4 Compaction | Part5 分散 |
|---|---|---|---|---|
| ✅ Done | ✅ Done | 👈 Now | - | - |
はじめに
前回はWALとSSTableで永続化を実装した。
でも、SSTableが増えると読み取りが遅くなる問題がある。全部のファイルを調べないといけないからね。
今回はLSM-Tree(Log-Structured Merge-Tree)を実装するよ。SSTableを階層化し、Bloom Filterで高速化することで、この問題を解決する。
LSM-Treeとは
┌─────────────────────────────────────────────────────────────────┐
│ LSM-Tree Structure │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ MemTable (L0) │ │
│ │ Active in Memory │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ Flush when full │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Level 0: [SST] [SST] [SST] [SST] │ │
│ │ (Unsorted, may overlap) │ │
│ │ Size: ~64MB │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ Compact │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Level 1: [SST] [SST] [SST] [SST] [SST] [SST] │ │
│ │ (Sorted, non-overlapping) │ │
│ │ Size: ~640MB (10x L0) │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ Compact │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Level 2: [SST] [SST] ... [SST] │ │
│ │ (Sorted, non-overlapping) │ │
│ │ Size: ~6.4GB (10x L1) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Bloom Filter
存在しないキーの検索を高速化するための確率的データ構造。
// bloomfilter.go
package kvs
import (
"hash"
"hash/fnv"
"math"
)
// BloomFilter は確率的データ構造
type BloomFilter struct {
bits []bool
size uint64
hashFunc []hash.Hash64
numHash int
}
// NewBloomFilter は新しいBloomFilterを作成
// n: 予想される要素数
// p: 偽陽性率(0.01 = 1%)
func NewBloomFilter(n int, p float64) *BloomFilter {
// 最適なビット数: m = -n * ln(p) / (ln(2)^2)
m := uint64(math.Ceil(-float64(n) * math.Log(p) / math.Pow(math.Log(2), 2)))
// 最適なハッシュ関数数: k = (m/n) * ln(2)
k := int(math.Ceil(float64(m) / float64(n) * math.Log(2)))
return &BloomFilter{
bits: make([]bool, m),
size: m,
numHash: k,
}
}
// ハッシュ関数(ダブルハッシュ法)
func (bf *BloomFilter) hash(key []byte, i int) uint64 {
h1 := fnv.New64a()
h1.Write(key)
hash1 := h1.Sum64()
h2 := fnv.New64()
h2.Write(key)
hash2 := h2.Sum64()
// h(i) = h1 + i * h2
return (hash1 + uint64(i)*hash2) % bf.size
}
// Add はキーを追加
func (bf *BloomFilter) Add(key []byte) {
for i := 0; i < bf.numHash; i++ {
idx := bf.hash(key, i)
bf.bits[idx] = true
}
}
// MayContain はキーが存在する可能性を返す
// falseなら確実に存在しない
// trueなら存在するかもしれない(偽陽性の可能性あり)
func (bf *BloomFilter) MayContain(key []byte) bool {
for i := 0; i < bf.numHash; i++ {
idx := bf.hash(key, i)
if !bf.bits[idx] {
return false
}
}
return true
}
// Serialize はBloomFilterをバイト列に変換
func (bf *BloomFilter) Serialize() []byte {
// サイズ + numHash + ビット列
data := make([]byte, 12+len(bf.bits)/8+1)
// サイズ(8バイト)
for i := 0; i < 8; i++ {
data[i] = byte(bf.size >> (i * 8))
}
// numHash(4バイト)
for i := 0; i < 4; i++ {
data[8+i] = byte(bf.numHash >> (i * 8))
}
// ビット列
for i, b := range bf.bits {
if b {
data[12+i/8] |= 1 << (i % 8)
}
}
return data
}
// DeserializeBloomFilter はバイト列からBloomFilterを復元
func DeserializeBloomFilter(data []byte) *BloomFilter {
// サイズ
var size uint64
for i := 0; i < 8; i++ {
size |= uint64(data[i]) << (i * 8)
}
// numHash
var numHash int
for i := 0; i < 4; i++ {
numHash |= int(data[8+i]) << (i * 8)
}
// ビット列
bits := make([]bool, size)
for i := uint64(0); i < size; i++ {
if data[12+i/8]&(1<<(i%8)) != 0 {
bits[i] = true
}
}
return &BloomFilter{
bits: bits,
size: size,
numHash: numHash,
}
}
Level管理
// level.go
package kvs
import (
"path/filepath"
"sort"
"sync"
)
const (
MaxLevels = 7
Level0FileLimit = 4
LevelSizeMultiplier = 10
BaseLevelSize = 64 * 1024 * 1024 // 64MB
)
// Level はSSTableのレベル
type Level struct {
level int
sstables []*SSTable
size int64
}
// LevelManager はレベルを管理
type LevelManager struct {
mu sync.RWMutex
dir string
levels []*Level
}
// NewLevelManager は新しいLevelManagerを作成
func NewLevelManager(dir string) *LevelManager {
lm := &LevelManager{
dir: dir,
levels: make([]*Level, MaxLevels),
}
for i := 0; i < MaxLevels; i++ {
lm.levels[i] = &Level{
level: i,
sstables: make([]*SSTable, 0),
}
}
return lm
}
// AddSSTable はSSTableをレベル0に追加
func (lm *LevelManager) AddSSTable(sst *SSTable) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.levels[0].sstables = append(lm.levels[0].sstables, sst)
// サイズを更新
// (実際にはファイルサイズから計算)
}
// NeedsCompaction はコンパクションが必要か判定
func (lm *LevelManager) NeedsCompaction() (int, bool) {
lm.mu.RLock()
defer lm.mu.RUnlock()
// Level 0: ファイル数で判定
if len(lm.levels[0].sstables) >= Level0FileLimit {
return 0, true
}
// Level 1以降: サイズで判定
for i := 1; i < MaxLevels-1; i++ {
maxSize := int64(BaseLevelSize) * int64(pow(LevelSizeMultiplier, i))
if lm.levels[i].size >= maxSize {
return i, true
}
}
return -1, false
}
func pow(base, exp int) int {
result := 1
for i := 0; i < exp; i++ {
result *= base
}
return result
}
// Get はキーを検索
func (lm *LevelManager) Get(key []byte) (*Entry, error) {
lm.mu.RLock()
defer lm.mu.RUnlock()
// Level 0: 全てのSSTableを検索(オーバーラップあり)
for i := len(lm.levels[0].sstables) - 1; i >= 0; i-- {
sst := lm.levels[0].sstables[i]
if entry, err := sst.Get(key); err == nil {
return entry, nil
}
}
// Level 1以降: バイナリサーチで対象SSTableを特定
for level := 1; level < MaxLevels; level++ {
sst := lm.findSSTableForKey(level, key)
if sst != nil {
if entry, err := sst.Get(key); err == nil {
return entry, nil
}
}
}
return nil, ErrKeyNotFound
}
// findSSTableForKey はキーを含む可能性のあるSSTableを見つける
func (lm *LevelManager) findSSTableForKey(level int, key []byte) *SSTable {
sstables := lm.levels[level].sstables
if len(sstables) == 0 {
return nil
}
// 各SSTableはソートされており、キー範囲が重複しない
// バイナリサーチで対象を見つける
idx := sort.Search(len(sstables), func(i int) bool {
// SSTableの最大キーがkey以上か
return string(sstables[i].MaxKey()) >= string(key)
})
if idx < len(sstables) {
sst := sstables[idx]
// 最小キーもチェック
if string(key) >= string(sst.MinKey()) {
return sst
}
}
return nil
}
// GetSSTablesForLevel は指定レベルのSSTableを取得
func (lm *LevelManager) GetSSTablesForLevel(level int) []*SSTable {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.levels[level].sstables
}
// ReplaceSSTablesInLevel はレベルのSSTableを置き換え(コンパクション後)
func (lm *LevelManager) ReplaceSSTablesInLevel(level int, old []*SSTable, new []*SSTable) {
lm.mu.Lock()
defer lm.mu.Unlock()
// 古いSSTableを削除
oldMap := make(map[*SSTable]bool)
for _, sst := range old {
oldMap[sst] = true
}
newList := make([]*SSTable, 0)
for _, sst := range lm.levels[level].sstables {
if !oldMap[sst] {
newList = append(newList, sst)
}
}
// 新しいSSTableを追加
newList = append(newList, new...)
// ソート(Level 1以降)
if level > 0 {
sort.Slice(newList, func(i, j int) bool {
return string(newList[i].MinKey()) < string(newList[j].MinKey())
})
}
lm.levels[level].sstables = newList
}
SSTableのメタデータ拡張
// sstable_meta.go
package kvs
// SSTableMeta はSSTableのメタデータ
type SSTableMeta struct {
MinKey []byte
MaxKey []byte
EntryCount int64
Size int64
Level int
BloomFilter *BloomFilter
}
// SSTable に追加するメソッド
func (s *SSTable) MinKey() []byte {
if len(s.index) == 0 {
return nil
}
return s.index[0].Key
}
func (s *SSTable) MaxKey() []byte {
if len(s.index) == 0 {
return nil
}
return s.index[len(s.index)-1].Key
}
func (s *SSTable) EntryCount() int {
return len(s.index)
}
// SSTableWithMeta はメタデータ付きSSTable
type SSTableWithMeta struct {
*SSTable
meta SSTableMeta
bloomFilter *BloomFilter
}
// NewSSTableWithMeta は新しいSSTableWithMetaを作成
func NewSSTableWithMeta(sst *SSTable) *SSTableWithMeta {
// Bloom Filterを構築
bf := NewBloomFilter(len(sst.index), 0.01)
for _, entry := range sst.index {
bf.Add(entry.Key)
}
return &SSTableWithMeta{
SSTable: sst,
meta: SSTableMeta{
MinKey: sst.MinKey(),
MaxKey: sst.MaxKey(),
EntryCount: int64(len(sst.index)),
},
bloomFilter: bf,
}
}
// Get はBloom Filterでフィルタリングしてから検索
func (s *SSTableWithMeta) Get(key []byte) (*Entry, error) {
// Bloom Filterでチェック
if !s.bloomFilter.MayContain(key) {
return nil, ErrKeyNotFound
}
// 実際に検索
return s.SSTable.Get(key)
}
LSM-Tree統合
// lsm.go
package kvs
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// LSMTree はLSM-Treeベースのストレージエンジン
type LSMTree struct {
mu sync.RWMutex
dir string
options Options
memtable *MemTable
immutable []*MemTable
levelManager *LevelManager
wal *WAL
compacting bool
closeCh chan struct{}
}
// NewLSMTree は新しいLSM-Treeを作成
func NewLSMTree(dir string, opts Options) (*LSMTree, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
lsm := &LSMTree{
dir: dir,
options: opts,
memtable: NewMemTable(opts),
immutable: make([]*MemTable, 0),
levelManager: NewLevelManager(dir),
closeCh: make(chan struct{}),
}
// WALから復元
walPath := filepath.Join(dir, "wal.log")
if err := RecoverFromWAL(walPath, lsm.memtable); err != nil {
return nil, err
}
// WALを開く
wal, err := NewWAL(walPath, opts.SyncWrites)
if err != nil {
return nil, err
}
lsm.wal = wal
// バックグラウンドコンパクション
go lsm.compactionLoop()
return lsm, nil
}
func (lsm *LSMTree) Get(key []byte) ([]byte, error) {
lsm.mu.RLock()
defer lsm.mu.RUnlock()
// 1. Active MemTable
if value, err := lsm.memtable.Get(key); err == nil {
return value, nil
}
// 2. Immutable MemTables
for i := len(lsm.immutable) - 1; i >= 0; i-- {
if value, err := lsm.immutable[i].Get(key); err == nil {
return value, nil
}
}
// 3. SSTable levels
entry, err := lsm.levelManager.Get(key)
if err != nil {
return nil, err
}
if entry.Deleted {
return nil, ErrKeyNotFound
}
return entry.Value, nil
}
func (lsm *LSMTree) Put(key, value []byte) error {
lsm.mu.Lock()
defer lsm.mu.Unlock()
// WALに書き込み
if err := lsm.wal.Put(key, value); err != nil {
return err
}
// MemTableに書き込み
if err := lsm.memtable.Put(key, value); err != nil {
return err
}
// MemTableが一定サイズを超えたらフラッシュ
if lsm.memtable.Size() >= DefaultMemTableSize {
lsm.rotateMemTable()
}
return nil
}
func (lsm *LSMTree) Delete(key []byte) error {
lsm.mu.Lock()
defer lsm.mu.Unlock()
// WALに書き込み
if err := lsm.wal.Delete(key); err != nil {
return err
}
return lsm.memtable.Delete(key)
}
func (lsm *LSMTree) rotateMemTable() {
// 現在のMemTableをimmutableに
lsm.immutable = append(lsm.immutable, lsm.memtable)
// 新しいMemTableを作成
lsm.memtable = NewMemTable(lsm.options)
// WALをローテート
walPath := filepath.Join(lsm.dir, "wal.log")
lsm.wal.Close()
os.Rename(walPath, walPath+fmt.Sprintf(".%d", time.Now().UnixNano()))
lsm.wal, _ = NewWAL(walPath, lsm.options.SyncWrites)
}
func (lsm *LSMTree) compactionLoop() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lsm.maybeFlush()
lsm.maybeCompact()
case <-lsm.closeCh:
return
}
}
}
func (lsm *LSMTree) maybeFlush() {
lsm.mu.Lock()
if len(lsm.immutable) == 0 {
lsm.mu.Unlock()
return
}
// 最も古いimmutableをフラッシュ
toFlush := lsm.immutable[0]
lsm.immutable = lsm.immutable[1:]
lsm.mu.Unlock()
// SSTableに書き込み
sst, err := FlushMemTableToSSTable(toFlush, lsm.dir)
if err != nil {
return
}
lsm.mu.Lock()
lsm.levelManager.AddSSTable(sst)
lsm.mu.Unlock()
}
func (lsm *LSMTree) maybeCompact() {
lsm.mu.Lock()
if lsm.compacting {
lsm.mu.Unlock()
return
}
level, needs := lsm.levelManager.NeedsCompaction()
if !needs {
lsm.mu.Unlock()
return
}
lsm.compacting = true
lsm.mu.Unlock()
// コンパクション実行(次回で詳しく説明)
lsm.doCompaction(level)
lsm.mu.Lock()
lsm.compacting = false
lsm.mu.Unlock()
}
func (lsm *LSMTree) doCompaction(level int) {
// Part4で実装
}
func (lsm *LSMTree) Close() error {
close(lsm.closeCh)
lsm.mu.Lock()
defer lsm.mu.Unlock()
// 残りのMemTableをフラッシュ
if lsm.memtable.Len() > 0 {
sst, _ := FlushMemTableToSSTable(lsm.memtable, lsm.dir)
if sst != nil {
lsm.levelManager.AddSSTable(sst)
}
}
for _, imm := range lsm.immutable {
sst, _ := FlushMemTableToSSTable(imm, lsm.dir)
if sst != nil {
lsm.levelManager.AddSSTable(sst)
}
}
if lsm.wal != nil {
lsm.wal.Close()
}
return nil
}
ベンチマーク
// lsm_bench_test.go
package kvs
import (
"fmt"
"math/rand"
"os"
"testing"
)
func BenchmarkLSMTreePut(b *testing.B) {
dir := "./benchdb"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
lsm, _ := NewLSMTree(dir, DefaultOptions())
defer lsm.Close()
value := make([]byte, 100)
rand.Read(value)
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key-%08d", i))
lsm.Put(key, value)
}
}
func BenchmarkLSMTreeGet(b *testing.B) {
dir := "./benchdb"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
lsm, _ := NewLSMTree(dir, DefaultOptions())
// 先にデータを入れる
for i := 0; i < 100000; i++ {
key := []byte(fmt.Sprintf("key-%08d", i))
value := []byte(fmt.Sprintf("value-%d", i))
lsm.Put(key, value)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key-%08d", i%100000))
lsm.Get(key)
}
lsm.Close()
}
func BenchmarkBloomFilter(b *testing.B) {
bf := NewBloomFilter(1000000, 0.01)
// 追加
for i := 0; i < 1000000; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
bf.Add(key)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key-%d", i%1000000))
bf.MayContain(key)
}
}
結果:
BenchmarkLSMTreePut-8 500000 2341 ns/op
BenchmarkLSMTreeGet-8 2000000 812 ns/op
BenchmarkBloomFilter-8 50000000 28 ns/op
まとめ
今回実装したこと:
| コンポーネント | 説明 |
|---|---|
| Bloom Filter | 存在しないキーの高速フィルタリング |
| Level Manager | SSTableの階層管理 |
| LSM-Tree | 統合されたストレージエンジン |
┌─────────────────────────────────────────────────────────────────┐
│ Read Amplification │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Without Bloom Filter: │
│ Worst case: Check all SSTables at all levels │
│ O(L × N) disk reads (L=levels, N=SSTables per level) │
│ │
│ With Bloom Filter: │
│ Check bloom filter first (O(k) memory ops) │
│ Skip SSTable if definitely not present │
│ ~1% false positive rate │
│ Reduces disk I/O by 99% │
│ │
└─────────────────────────────────────────────────────────────────┘
次回はCompactionを実装するよ。SSTableをマージして、削除されたデータを回収し、読み取り性能を改善する。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!