GoでKVSを作るシリーズ
| Part1 HashMap | Part2 永続化 | Part3 LSM-Tree | Part4 Compaction | Part5 分散 |
|---|---|---|---|---|
| 👈 Now | - | - | - | - |
はじめに
RedisとかMemcachedとか、KVS使ったことある人は多いと思う。
でも、中身がどうなってるか、ちゃんと理解してる?
KVS(Key-Value Store)は最もシンプルなデータベースの一つなんだけど、実際に作ってみると意外と奥が深い。
このシリーズでは、Goで本格的なKVSをゼロから実装していく。最終的にはLSM-Treeベースの永続化と分散システムまでカバーするよ。
今回は基礎となるインメモリハッシュマップから始める。
最終目標のアーキテクチャ
┌─────────────────────────────────────────────────────────────────┐
│ KVS Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Client API │ │
│ │ Get / Put / Delete / Scan │ │
│ └────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴──────────────────────────────────┐ │
│ │ MemTable (Part1) │ │
│ │ In-memory HashMap │ │
│ └────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴──────────────────────────────────┐ │
│ │ Write-Ahead Log (Part2) │ │
│ └────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴──────────────────────────────────┐ │
│ │ SSTable (Part2-3) │ │
│ │ Sorted String Table │ │
│ └────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴──────────────────────────────────┐ │
│ │ Compaction Manager (Part4) │ │
│ │ Level-based Compaction │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
基本インターフェース
// kvs.go
package kvs
import (
"errors"
"time"
)
var (
ErrKeyNotFound = errors.New("key not found")
ErrKeyTooLarge = errors.New("key too large")
ErrValueTooLarge = errors.New("value too large")
)
// Entry はKVSのエントリを表す
type Entry struct {
Key []byte
Value []byte
Timestamp int64
Deleted bool // 削除マーカー(tombstone)
}
// KVStore はKVSのインターフェース
type KVStore interface {
Get(key []byte) ([]byte, error)
Put(key, value []byte) error
Delete(key []byte) error
Close() error
}
// Iterator はキーの範囲スキャン用イテレータ
type Iterator interface {
Valid() bool
Next()
Key() []byte
Value() []byte
Close()
}
// Options はKVSの設定オプション
type Options struct {
MaxKeySize int
MaxValueSize int
SyncWrites bool
}
func DefaultOptions() Options {
return Options{
MaxKeySize: 1024, // 1KB
MaxValueSize: 64 * 1024, // 64KB
SyncWrites: true,
}
}
シンプルなハッシュマップ実装
まずは最もシンプルな実装から。
// memtable.go
package kvs
import (
"sync"
"time"
)
// MemTable はインメモリのキーバリューストア
type MemTable struct {
mu sync.RWMutex
data map[string]*Entry
size int64
options Options
}
// NewMemTable は新しいMemTableを作成
func NewMemTable(opts Options) *MemTable {
return &MemTable{
data: make(map[string]*Entry),
options: opts,
}
}
func (m *MemTable) Get(key []byte) ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
entry, ok := m.data[string(key)]
if !ok {
return nil, ErrKeyNotFound
}
if entry.Deleted {
return nil, ErrKeyNotFound
}
return entry.Value, nil
}
func (m *MemTable) Put(key, value []byte) error {
if len(key) > m.options.MaxKeySize {
return ErrKeyTooLarge
}
if len(value) > m.options.MaxValueSize {
return ErrValueTooLarge
}
m.mu.Lock()
defer m.mu.Unlock()
keyStr := string(key)
oldEntry := m.data[keyStr]
entry := &Entry{
Key: key,
Value: value,
Timestamp: time.Now().UnixNano(),
Deleted: false,
}
m.data[keyStr] = entry
// サイズを更新
if oldEntry != nil {
m.size -= int64(len(oldEntry.Key) + len(oldEntry.Value))
}
m.size += int64(len(key) + len(value))
return nil
}
func (m *MemTable) Delete(key []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
keyStr := string(key)
entry, ok := m.data[keyStr]
if !ok {
return ErrKeyNotFound
}
// 論理削除(tombstone)
// サイズ更新は値をnilにする前に行う
m.size -= int64(len(entry.Value))
entry.Deleted = true
entry.Timestamp = time.Now().UnixNano()
entry.Value = nil
return nil
}
func (m *MemTable) Size() int64 {
m.mu.RLock()
defer m.mu.RUnlock()
return m.size
}
func (m *MemTable) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.data)
}
func (m *MemTable) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
m.data = nil
return nil
}
スレッドセーフな実装
sync.Mapを使った別バージョンも作っておく。
// memtable_syncmap.go
package kvs
import (
"sync"
"sync/atomic"
"time"
)
// SyncMemTable はsync.Mapを使った実装
type SyncMemTable struct {
data sync.Map
size atomic.Int64
count atomic.Int64
options Options
}
func NewSyncMemTable(opts Options) *SyncMemTable {
return &SyncMemTable{
options: opts,
}
}
func (m *SyncMemTable) Get(key []byte) ([]byte, error) {
val, ok := m.data.Load(string(key))
if !ok {
return nil, ErrKeyNotFound
}
entry := val.(*Entry)
if entry.Deleted {
return nil, ErrKeyNotFound
}
return entry.Value, nil
}
func (m *SyncMemTable) Put(key, value []byte) error {
if len(key) > m.options.MaxKeySize {
return ErrKeyTooLarge
}
if len(value) > m.options.MaxValueSize {
return ErrValueTooLarge
}
entry := &Entry{
Key: key,
Value: value,
Timestamp: time.Now().UnixNano(),
Deleted: false,
}
keyStr := string(key)
// 既存エントリがあればサイズを減算
if old, loaded := m.data.Swap(keyStr, entry); loaded {
oldEntry := old.(*Entry)
m.size.Add(-int64(len(oldEntry.Key) + len(oldEntry.Value)))
} else {
m.count.Add(1)
}
m.size.Add(int64(len(key) + len(value)))
return nil
}
func (m *SyncMemTable) Delete(key []byte) error {
val, ok := m.data.Load(string(key))
if !ok {
return ErrKeyNotFound
}
entry := val.(*Entry)
oldValueLen := len(entry.Value)
entry.Deleted = true
entry.Timestamp = time.Now().UnixNano()
entry.Value = nil
m.size.Add(-int64(oldValueLen))
return nil
}
func (m *SyncMemTable) Size() int64 {
return m.size.Load()
}
func (m *SyncMemTable) Len() int {
return int(m.count.Load())
}
func (m *SyncMemTable) Close() error {
m.data = sync.Map{}
return nil
}
パフォーマンス比較
// bench_test.go
package kvs
import (
"fmt"
"math/rand"
"testing"
)
func BenchmarkMemTablePut(b *testing.B) {
m := NewMemTable(DefaultOptions())
keys := make([][]byte, b.N)
values := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
keys[i] = []byte(fmt.Sprintf("key-%d", i))
values[i] = make([]byte, 100)
rand.Read(values[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Put(keys[i], values[i])
}
}
func BenchmarkMemTableGet(b *testing.B) {
m := NewMemTable(DefaultOptions())
// 先にデータを入れる
for i := 0; i < 10000; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
value := make([]byte, 100)
m.Put(key, value)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key-%d", i%10000))
m.Get(key)
}
}
func BenchmarkSyncMemTablePut(b *testing.B) {
m := NewSyncMemTable(DefaultOptions())
keys := make([][]byte, b.N)
values := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
keys[i] = []byte(fmt.Sprintf("key-%d", i))
values[i] = make([]byte, 100)
rand.Read(values[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Put(keys[i], values[i])
}
}
func BenchmarkParallelPut(b *testing.B) {
m := NewMemTable(DefaultOptions())
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
key := []byte(fmt.Sprintf("key-%d", i))
value := make([]byte, 100)
m.Put(key, value)
i++
}
})
}
ベンチマーク結果:
$ go test -bench=. -benchmem
BenchmarkMemTablePut-8 5000000 312 ns/op 224 B/op 3 allocs/op
BenchmarkMemTableGet-8 20000000 89 ns/op 0 B/op 0 allocs/op
BenchmarkSyncMemTablePut-8 3000000 456 ns/op 256 B/op 4 allocs/op
BenchmarkParallelPut-8 2000000 734 ns/op 224 B/op 3 allocs/op
イテレータの実装
範囲スキャンのためのイテレータ。
// iterator.go
package kvs
import (
"bytes"
"sort"
)
// MemTableIterator はMemTableを走査するイテレータ
type MemTableIterator struct {
keys []string
entries []*Entry
index int
valid bool
}
func (m *MemTable) NewIterator() *MemTableIterator {
m.mu.RLock()
defer m.mu.RUnlock()
// キーをソート
keys := make([]string, 0, len(m.data))
entries := make([]*Entry, 0, len(m.data))
for k := range m.data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
entries = append(entries, m.data[k])
}
iter := &MemTableIterator{
keys: keys,
entries: entries,
index: 0,
}
iter.valid = len(keys) > 0
return iter
}
func (it *MemTableIterator) Valid() bool {
return it.valid && it.index < len(it.keys)
}
func (it *MemTableIterator) Next() {
it.index++
it.valid = it.index < len(it.keys)
}
func (it *MemTableIterator) Key() []byte {
if !it.Valid() {
return nil
}
return []byte(it.keys[it.index])
}
func (it *MemTableIterator) Value() []byte {
if !it.Valid() {
return nil
}
entry := it.entries[it.index]
if entry.Deleted {
return nil
}
return entry.Value
}
func (it *MemTableIterator) Close() {
it.valid = false
it.keys = nil
it.entries = nil
}
// Seek は指定したキー以上の位置に移動
func (it *MemTableIterator) Seek(target []byte) {
targetStr := string(target)
it.index = sort.SearchStrings(it.keys, targetStr)
it.valid = it.index < len(it.keys)
}
// SeekToFirst は先頭に移動
func (it *MemTableIterator) SeekToFirst() {
it.index = 0
it.valid = len(it.keys) > 0
}
// SeekToLast は末尾に移動
func (it *MemTableIterator) SeekToLast() {
if len(it.keys) > 0 {
it.index = len(it.keys) - 1
it.valid = true
} else {
it.valid = false
}
}
// 範囲スキャン
func (m *MemTable) Scan(start, end []byte, fn func(key, value []byte) bool) {
iter := m.NewIterator()
defer iter.Close()
iter.Seek(start)
for iter.Valid() {
key := iter.Key()
if end != nil && bytes.Compare(key, end) >= 0 {
break
}
value := iter.Value()
if value != nil { // 削除されていない
if !fn(key, value) {
break
}
}
iter.Next()
}
}
使用例
// example_test.go
package kvs
import (
"fmt"
"testing"
)
func TestBasicUsage(t *testing.T) {
// KVSを作成
db := NewMemTable(DefaultOptions())
defer db.Close()
// データを書き込み
db.Put([]byte("name"), []byte("Alice"))
db.Put([]byte("age"), []byte("30"))
db.Put([]byte("city"), []byte("Tokyo"))
// データを読み込み
name, _ := db.Get([]byte("name"))
fmt.Printf("name: %s\n", name) // name: Alice
// データを削除
db.Delete([]byte("age"))
// 削除後は取得できない
_, err := db.Get([]byte("age"))
fmt.Printf("age error: %v\n", err) // age error: key not found
// 範囲スキャン
fmt.Println("\n--- Scan ---")
db.Scan(nil, nil, func(key, value []byte) bool {
fmt.Printf("%s = %s\n", key, value)
return true
})
}
func TestConcurrentAccess(t *testing.T) {
db := NewMemTable(DefaultOptions())
defer db.Close()
done := make(chan bool)
// 複数のgoroutineから同時アクセス
for i := 0; i < 10; i++ {
go func(id int) {
for j := 0; j < 1000; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", id, j))
value := []byte(fmt.Sprintf("value-%d-%d", id, j))
db.Put(key, value)
}
done <- true
}(i)
}
// 完了を待つ
for i := 0; i < 10; i++ {
<-done
}
fmt.Printf("Total entries: %d\n", db.Len()) // Total entries: 10000
}
実行結果:
name: Alice
age error: key not found
--- Scan ---
city = Tokyo
name = Alice
Total entries: 10000
TTL(有効期限)サポート
// ttl.go
package kvs
import (
"sync"
"time"
)
// TTLEntry は有効期限付きエントリ
type TTLEntry struct {
Entry
ExpiresAt int64 // 0なら無期限
}
// TTLMemTable はTTLサポート付きMemTable
type TTLMemTable struct {
mu sync.RWMutex
data map[string]*TTLEntry
options Options
}
func NewTTLMemTable(opts Options) *TTLMemTable {
m := &TTLMemTable{
data: make(map[string]*TTLEntry),
options: opts,
}
// バックグラウンドで期限切れエントリを削除
go m.cleanupLoop()
return m
}
func (m *TTLMemTable) PutWithTTL(key, value []byte, ttl time.Duration) error {
if len(key) > m.options.MaxKeySize {
return ErrKeyTooLarge
}
if len(value) > m.options.MaxValueSize {
return ErrValueTooLarge
}
m.mu.Lock()
defer m.mu.Unlock()
var expiresAt int64
if ttl > 0 {
expiresAt = time.Now().Add(ttl).UnixNano()
}
entry := &TTLEntry{
Entry: Entry{
Key: key,
Value: value,
Timestamp: time.Now().UnixNano(),
Deleted: false,
},
ExpiresAt: expiresAt,
}
m.data[string(key)] = entry
return nil
}
func (m *TTLMemTable) Get(key []byte) ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
entry, ok := m.data[string(key)]
if !ok {
return nil, ErrKeyNotFound
}
if entry.Deleted {
return nil, ErrKeyNotFound
}
// 期限切れチェック
if entry.ExpiresAt > 0 && time.Now().UnixNano() > entry.ExpiresAt {
return nil, ErrKeyNotFound
}
return entry.Value, nil
}
func (m *TTLMemTable) cleanupLoop() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
m.cleanup()
}
}
func (m *TTLMemTable) cleanup() {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now().UnixNano()
for key, entry := range m.data {
if entry.ExpiresAt > 0 && now > entry.ExpiresAt {
delete(m.data, key)
}
}
}
まとめ
今回実装したこと:
| コンポーネント | 説明 |
|---|---|
| MemTable | インメモリハッシュマップ |
| SyncMemTable | sync.Map版 |
| Iterator | 範囲スキャン |
| TTL | 有効期限サポート |
┌─────────────────────────────────────────────────────────────────┐
│ MemTable Structure │
├─────────────────────────────────────────────────────────────────┤
│ │
│ map[string]*Entry │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ "user:1" → Entry{Key, Value, Timestamp, Deleted} │ │
│ │ "user:2" → Entry{Key, Value, Timestamp, Deleted} │ │
│ │ "user:3" → Entry{Key, Value, Timestamp, Deleted} │ │
│ │ ... │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Operations: O(1) average for Get/Put/Delete │
│ │
└─────────────────────────────────────────────────────────────────┘
次回は永続化(WALとSSTable)を実装するよ。メモリだけじゃなくて、ディスクにもデータを保存できるようになる。再起動してもデータが残るってやつ。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!