GoでKVSを作るシリーズ
| Part1 HashMap | Part2 永続化 | Part3 LSM-Tree | Part4 Compaction | Part5 分散 |
|---|---|---|---|---|
| ✅ Done | 👈 Now | - | - | - |
はじめに
前回はインメモリのハッシュマップを実装した。
でも、これだけだとプロセスが死んだらデータが全部消える。こわいですねぇ。
今回は永続化を実装するよ。WAL(Write-Ahead Log)でクラッシュリカバリを実現し、SSTable(Sorted String Table)でデータをディスクに保存する。
永続化の仕組み
┌─────────────────────────────────────────────────────────────────┐
│ Write Path Overview │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Write Request │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 1. Write to WAL (sync to disk) │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 2. Write to MemTable (in-memory) │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ MemTable Full? │
│ │ │
│ ┌────────────────┴────────────────┐ │
│ │ Yes │ No │
│ ▼ ▼ │
│ ┌───────────────┐ Return OK │
│ │ 3. Flush to │ │
│ │ SSTable │ │
│ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
WAL(Write-Ahead Log)
WALは書き込みをディスクに記録し、クラッシュ時のリカバリを可能にする。
// wal.go
package kvs
import (
"bufio"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"os"
"sync"
)
const (
WALRecordPut byte = 1
WALRecordDelete byte = 2
)
// WALRecord はWALのレコード形式
// [CRC32(4)] [Type(1)] [KeyLen(4)] [ValueLen(4)] [Key] [Value]
type WALRecord struct {
Type byte
Key []byte
Value []byte
Checksum uint32
}
// WAL はWrite-Ahead Log
type WAL struct {
mu sync.Mutex
file *os.File
writer *bufio.Writer
path string
syncMode bool
}
// NewWAL は新しいWALを作成
func NewWAL(path string, syncMode bool) (*WAL, error) {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
return &WAL{
file: file,
writer: bufio.NewWriter(file),
path: path,
syncMode: syncMode,
}, nil
}
func (w *WAL) Append(recordType byte, key, value []byte) error {
w.mu.Lock()
defer w.mu.Unlock()
// レコードをバイト列にシリアライズ
data := make([]byte, 0, 13+len(key)+len(value))
// 後でチェックサムを入れるため、最初の4バイトは空けておく
data = append(data, 0, 0, 0, 0)
// Type
data = append(data, recordType)
// KeyLen
keyLen := make([]byte, 4)
binary.LittleEndian.PutUint32(keyLen, uint32(len(key)))
data = append(data, keyLen...)
// ValueLen
valueLen := make([]byte, 4)
binary.LittleEndian.PutUint32(valueLen, uint32(len(value)))
data = append(data, valueLen...)
// Key and Value
data = append(data, key...)
data = append(data, value...)
// チェックサムを計算して先頭に入れる
checksum := crc32.ChecksumIEEE(data[4:])
binary.LittleEndian.PutUint32(data[0:4], checksum)
// 書き込み
if _, err := w.writer.Write(data); err != nil {
return err
}
if w.syncMode {
if err := w.writer.Flush(); err != nil {
return err
}
return w.file.Sync()
}
return nil
}
func (w *WAL) Put(key, value []byte) error {
return w.Append(WALRecordPut, key, value)
}
func (w *WAL) Delete(key []byte) error {
return w.Append(WALRecordDelete, key, nil)
}
func (w *WAL) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if err := w.writer.Flush(); err != nil {
return err
}
return w.file.Close()
}
// WALをリプレイしてMemTableに復元
func RecoverFromWAL(path string, memtable *MemTable) error {
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil // WALが存在しない場合は何もしない
}
return err
}
defer file.Close()
reader := bufio.NewReader(file)
for {
record, err := readWALRecord(reader)
if err != nil {
if err == io.EOF {
break
}
return err
}
switch record.Type {
case WALRecordPut:
memtable.Put(record.Key, record.Value)
case WALRecordDelete:
memtable.Delete(record.Key)
}
}
return nil
}
func readWALRecord(reader *bufio.Reader) (*WALRecord, error) {
// チェックサムを読む
checksumBytes := make([]byte, 4)
if _, err := io.ReadFull(reader, checksumBytes); err != nil {
return nil, err
}
expectedChecksum := binary.LittleEndian.Uint32(checksumBytes)
// Typeを読む
recordType, err := reader.ReadByte()
if err != nil {
return nil, err
}
// KeyLenを読む
keyLenBytes := make([]byte, 4)
if _, err := io.ReadFull(reader, keyLenBytes); err != nil {
return nil, err
}
keyLen := binary.LittleEndian.Uint32(keyLenBytes)
// ValueLenを読む
valueLenBytes := make([]byte, 4)
if _, err := io.ReadFull(reader, valueLenBytes); err != nil {
return nil, err
}
valueLen := binary.LittleEndian.Uint32(valueLenBytes)
// Keyを読む
key := make([]byte, keyLen)
if _, err := io.ReadFull(reader, key); err != nil {
return nil, err
}
// Valueを読む
var value []byte
if valueLen > 0 {
value = make([]byte, valueLen)
if _, err := io.ReadFull(reader, value); err != nil {
return nil, err
}
}
// チェックサムを検証
data := make([]byte, 0, 9+keyLen+valueLen)
data = append(data, recordType)
data = append(data, keyLenBytes...)
data = append(data, valueLenBytes...)
data = append(data, key...)
data = append(data, value...)
actualChecksum := crc32.ChecksumIEEE(data)
if actualChecksum != expectedChecksum {
return nil, errors.New("checksum mismatch")
}
return &WALRecord{
Type: recordType,
Key: key,
Value: value,
Checksum: expectedChecksum,
}, nil
}
SSTable(Sorted String Table)
ディスク上のソート済みキーバリューファイル。
// sstable.go
package kvs
import (
"bufio"
"encoding/binary"
"io"
"os"
"sort"
)
// SSTableは以下のフォーマット:
// [Data Block] [Index Block] [Footer]
// Footer: [IndexOffset(8)] [IndexSize(8)] [Magic(8)]
const (
FooterSize = 24
MagicNumber = 0x00DEADBEEF00CAFE
)
// SSTable はディスク上のソート済みテーブル
type SSTable struct {
file *os.File
path string
indexOffset int64
indexSize int64
index []IndexEntry
}
// IndexEntry はインデックスエントリ
type IndexEntry struct {
Key []byte
Offset int64
Size int32
}
// SSTableWriter はSSTableの書き込み用
type SSTableWriter struct {
file *os.File
writer *bufio.Writer
index []IndexEntry
offset int64
}
// NewSSTableWriter は新しいSSTableWriterを作成
func NewSSTableWriter(path string) (*SSTableWriter, error) {
file, err := os.Create(path)
if err != nil {
return nil, err
}
return &SSTableWriter{
file: file,
writer: bufio.NewWriter(file),
index: make([]IndexEntry, 0),
offset: 0,
}, nil
}
// データブロックフォーマット:
// [KeyLen(4)] [ValueLen(4)] [Key] [Value] [Timestamp(8)] [Deleted(1)]
func (w *SSTableWriter) WriteEntry(entry *Entry) error {
startOffset := w.offset
// KeyLen
keyLen := make([]byte, 4)
binary.LittleEndian.PutUint32(keyLen, uint32(len(entry.Key)))
w.writer.Write(keyLen)
// ValueLen
valueLen := make([]byte, 4)
binary.LittleEndian.PutUint32(valueLen, uint32(len(entry.Value)))
w.writer.Write(valueLen)
// Key
w.writer.Write(entry.Key)
// Value
w.writer.Write(entry.Value)
// Timestamp
tsBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(tsBytes, uint64(entry.Timestamp))
w.writer.Write(tsBytes)
// Deleted flag
if entry.Deleted {
w.writer.WriteByte(1)
} else {
w.writer.WriteByte(0)
}
entrySize := int32(4 + 4 + len(entry.Key) + len(entry.Value) + 8 + 1)
w.offset += int64(entrySize)
// インデックスに追加
w.index = append(w.index, IndexEntry{
Key: entry.Key,
Offset: startOffset,
Size: entrySize,
})
return nil
}
// Finalize はSSTableを完成させる
func (w *SSTableWriter) Finalize() error {
// インデックスブロックを書き込み
indexOffset := w.offset
for _, entry := range w.index {
// KeyLen
keyLen := make([]byte, 4)
binary.LittleEndian.PutUint32(keyLen, uint32(len(entry.Key)))
w.writer.Write(keyLen)
// Key
w.writer.Write(entry.Key)
// Offset
offsetBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(offsetBytes, uint64(entry.Offset))
w.writer.Write(offsetBytes)
// Size
sizeBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(sizeBytes, uint32(entry.Size))
w.writer.Write(sizeBytes)
w.offset += int64(4 + len(entry.Key) + 8 + 4)
}
indexSize := w.offset - indexOffset
// Footerを書き込み
footer := make([]byte, FooterSize)
binary.LittleEndian.PutUint64(footer[0:8], uint64(indexOffset))
binary.LittleEndian.PutUint64(footer[8:16], uint64(indexSize))
binary.LittleEndian.PutUint64(footer[16:24], MagicNumber)
w.writer.Write(footer)
// フラッシュ
if err := w.writer.Flush(); err != nil {
return err
}
return w.file.Close()
}
// OpenSSTable は既存のSSTableを開く
func OpenSSTable(path string) (*SSTable, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
// Footerを読む
file.Seek(-FooterSize, io.SeekEnd)
footer := make([]byte, FooterSize)
if _, err := io.ReadFull(file, footer); err != nil {
file.Close()
return nil, err
}
// Magicを確認
magic := binary.LittleEndian.Uint64(footer[16:24])
if magic != MagicNumber {
file.Close()
return nil, errors.New("invalid SSTable magic")
}
indexOffset := int64(binary.LittleEndian.Uint64(footer[0:8]))
indexSize := int64(binary.LittleEndian.Uint64(footer[8:16]))
sst := &SSTable{
file: file,
path: path,
indexOffset: indexOffset,
indexSize: indexSize,
}
// インデックスを読み込み
if err := sst.loadIndex(); err != nil {
file.Close()
return nil, err
}
return sst, nil
}
func (s *SSTable) loadIndex() error {
s.file.Seek(s.indexOffset, io.SeekStart)
reader := bufio.NewReader(s.file)
var read int64
for read < s.indexSize {
// KeyLen
keyLenBytes := make([]byte, 4)
if _, err := io.ReadFull(reader, keyLenBytes); err != nil {
return err
}
keyLen := binary.LittleEndian.Uint32(keyLenBytes)
read += 4
// Key
key := make([]byte, keyLen)
if _, err := io.ReadFull(reader, key); err != nil {
return err
}
read += int64(keyLen)
// Offset
offsetBytes := make([]byte, 8)
if _, err := io.ReadFull(reader, offsetBytes); err != nil {
return err
}
offset := int64(binary.LittleEndian.Uint64(offsetBytes))
read += 8
// Size
sizeBytes := make([]byte, 4)
if _, err := io.ReadFull(reader, sizeBytes); err != nil {
return err
}
size := int32(binary.LittleEndian.Uint32(sizeBytes))
read += 4
s.index = append(s.index, IndexEntry{
Key: key,
Offset: offset,
Size: size,
})
}
return nil
}
// Get はキーに対応する値を取得
func (s *SSTable) Get(key []byte) (*Entry, error) {
// バイナリサーチでインデックスを検索
idx := sort.Search(len(s.index), func(i int) bool {
return string(s.index[i].Key) >= string(key)
})
if idx >= len(s.index) || string(s.index[idx].Key) != string(key) {
return nil, ErrKeyNotFound
}
// データを読み込み
entry := s.index[idx]
s.file.Seek(entry.Offset, io.SeekStart)
data := make([]byte, entry.Size)
if _, err := io.ReadFull(s.file, data); err != nil {
return nil, err
}
return parseEntry(data)
}
func parseEntry(data []byte) (*Entry, error) {
keyLen := binary.LittleEndian.Uint32(data[0:4])
valueLen := binary.LittleEndian.Uint32(data[4:8])
keyStart := 8
keyEnd := keyStart + int(keyLen)
valueStart := keyEnd
valueEnd := valueStart + int(valueLen)
key := data[keyStart:keyEnd]
value := data[valueStart:valueEnd]
timestamp := int64(binary.LittleEndian.Uint64(data[valueEnd : valueEnd+8]))
deleted := data[valueEnd+8] == 1
return &Entry{
Key: key,
Value: value,
Timestamp: timestamp,
Deleted: deleted,
}, nil
}
func (s *SSTable) Close() error {
return s.file.Close()
}
MemTableからSSTableへのフラッシュ
// flush.go
package kvs
import (
"fmt"
"os"
"path/filepath"
"sort"
"time"
)
// FlushMemTableToSSTable はMemTableをSSTableにフラッシュ
func FlushMemTableToSSTable(m *MemTable, dir string) (*SSTable, error) {
m.mu.RLock()
defer m.mu.RUnlock()
// エントリをソート
entries := make([]*Entry, 0, len(m.data))
for _, entry := range m.data {
entries = append(entries, entry)
}
sort.Slice(entries, func(i, j int) bool {
return string(entries[i].Key) < string(entries[j].Key)
})
// SSTableファイル名を生成
filename := fmt.Sprintf("sst_%d.db", time.Now().UnixNano())
path := filepath.Join(dir, filename)
// SSTableを書き込み
writer, err := NewSSTableWriter(path)
if err != nil {
return nil, err
}
for _, entry := range entries {
if err := writer.WriteEntry(entry); err != nil {
os.Remove(path)
return nil, err
}
}
if err := writer.Finalize(); err != nil {
os.Remove(path)
return nil, err
}
// SSTableを開き直して返す
return OpenSSTable(path)
}
統合したKVS
WALとSSTableを使った永続化KVS。
// db.go
package kvs
import (
"os"
"path/filepath"
"sync"
)
const (
DefaultMemTableSize = 4 * 1024 * 1024 // 4MB
)
// DB は永続化対応のKVS
type DB struct {
mu sync.RWMutex
dir string
options Options
memtable *MemTable
immutable *MemTable // フラッシュ中のMemTable
sstables []*SSTable
wal *WAL
memTableSize int64
maxMemTable int64
}
// Open はDBを開く
func Open(dir string, opts Options) (*DB, error) {
// ディレクトリを作成
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
db := &DB{
dir: dir,
options: opts,
memtable: NewMemTable(opts),
sstables: make([]*SSTable, 0),
maxMemTable: DefaultMemTableSize,
}
// 既存のSSTableを読み込み
if err := db.loadSSTables(); err != nil {
return nil, err
}
// WALから復元
walPath := filepath.Join(dir, "wal.log")
if err := RecoverFromWAL(walPath, db.memtable); err != nil {
return nil, err
}
// WALを開く
wal, err := NewWAL(walPath, opts.SyncWrites)
if err != nil {
return nil, err
}
db.wal = wal
return db, nil
}
func (db *DB) loadSSTables() error {
files, err := os.ReadDir(db.dir)
if err != nil {
return err
}
for _, file := range files {
if filepath.Ext(file.Name()) == ".db" {
path := filepath.Join(db.dir, file.Name())
sst, err := OpenSSTable(path)
if err != nil {
continue // 壊れたファイルはスキップ
}
db.sstables = append(db.sstables, sst)
}
}
return nil
}
func (db *DB) Get(key []byte) ([]byte, error) {
db.mu.RLock()
defer db.mu.RUnlock()
// 1. MemTableを検索
if value, err := db.memtable.Get(key); err == nil {
return value, nil
}
// 2. Immutable MemTableを検索
if db.immutable != nil {
if value, err := db.immutable.Get(key); err == nil {
return value, nil
}
}
// 3. SSTableを新しい順に検索
for i := len(db.sstables) - 1; i >= 0; i-- {
entry, err := db.sstables[i].Get(key)
if err == nil {
if entry.Deleted {
return nil, ErrKeyNotFound
}
return entry.Value, nil
}
}
return nil, ErrKeyNotFound
}
func (db *DB) Put(key, value []byte) error {
db.mu.Lock()
defer db.mu.Unlock()
// WALに書き込み
if err := db.wal.Put(key, value); err != nil {
return err
}
// MemTableに書き込み
if err := db.memtable.Put(key, value); err != nil {
return err
}
// MemTableサイズチェック
if db.memtable.Size() >= db.maxMemTable {
db.scheduleFlush()
}
return nil
}
func (db *DB) Delete(key []byte) error {
db.mu.Lock()
defer db.mu.Unlock()
// WALに書き込み
if err := db.wal.Delete(key); err != nil {
return err
}
// MemTableに削除マーカーを書き込み
return db.memtable.Delete(key)
}
func (db *DB) scheduleFlush() {
// 簡略化:同期的にフラッシュ
db.flush()
}
func (db *DB) flush() error {
// 現在のMemTableをimmutableに
db.immutable = db.memtable
db.memtable = NewMemTable(db.options)
// SSTableにフラッシュ
sst, err := FlushMemTableToSSTable(db.immutable, db.dir)
if err != nil {
return err
}
db.sstables = append(db.sstables, sst)
db.immutable = nil
// WALをリセット
walPath := filepath.Join(db.dir, "wal.log")
db.wal.Close()
os.Remove(walPath)
db.wal, _ = NewWAL(walPath, db.options.SyncWrites)
return nil
}
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
// MemTableをフラッシュ
if db.memtable.Len() > 0 {
db.flush()
}
// WALを閉じる
if db.wal != nil {
db.wal.Close()
}
// SSTableを閉じる
for _, sst := range db.sstables {
sst.Close()
}
return nil
}
使用例
// example_test.go
package kvs
import (
"fmt"
"os"
"testing"
)
func TestPersistence(t *testing.T) {
dir := "./testdb"
defer os.RemoveAll(dir)
// DBを開く
db, err := Open(dir, DefaultOptions())
if err != nil {
t.Fatal(err)
}
// データを書き込み
for i := 0; i < 1000; i++ {
key := []byte(fmt.Sprintf("key-%04d", i))
value := []byte(fmt.Sprintf("value-%d", i))
db.Put(key, value)
}
fmt.Printf("Written 1000 entries\n")
// DBを閉じる
db.Close()
fmt.Printf("DB closed\n")
// DBを再度開く
db2, err := Open(dir, DefaultOptions())
if err != nil {
t.Fatal(err)
}
defer db2.Close()
// データを確認
for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("key-%04d", i))
value, err := db2.Get(key)
if err != nil {
t.Errorf("Key %s not found", key)
continue
}
fmt.Printf("%s = %s\n", key, value)
}
}
実行結果:
Written 1000 entries
DB closed
key-0000 = value-0
key-0001 = value-1
key-0002 = value-2
key-0003 = value-3
key-0004 = value-4
key-0005 = value-5
key-0006 = value-6
key-0007 = value-7
key-0008 = value-8
key-0009 = value-9
ディスク上のファイル構造
testdb/
├── wal.log # Write-Ahead Log
├── sst_1234567890.db # SSTable #1
├── sst_1234567891.db # SSTable #2
└── ...
まとめ
今回実装したこと:
| コンポーネント | 説明 |
|---|---|
| WAL | Write-Ahead Log、クラッシュリカバリ用 |
| SSTable | ディスク上のソート済みテーブル |
| DB | 統合されたKVSインターフェース |
┌─────────────────────────────────────────────────────────────────┐
│ Read Path │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Get(key) ─────────────────────────────────────────────────────▶│
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ MemTable │ ───▶ Found? Return │
│ └──────────┬──────────┘ │
│ │ Not Found │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Immutable MemTable │ ───▶ Found? Return │
│ └──────────┬──────────┘ │
│ │ Not Found │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ SSTable[n-1] │ ───▶ Found? Return (newest) │
│ └──────────┬──────────┘ │
│ │ Not Found │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ SSTable[n-2] │ ───▶ ... │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ Return "Not Found" │
│ │
└─────────────────────────────────────────────────────────────────┘
次回はLSM-Treeを実装して、SSTableの階層管理を行う。これにより、読み取り性能を大幅に改善できる。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!