6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoでKVSを作るシリーズ

Part1 HashMap Part2 永続化 Part3 LSM-Tree Part4 Compaction Part5 分散
👈 Now - - - -

はじめに

RedisとかMemcachedとか、KVS使ったことある人は多いと思う。

でも、中身がどうなってるか、ちゃんと理解してる?

KVS(Key-Value Store)は最もシンプルなデータベースの一つなんだけど、実際に作ってみると意外と奥が深い。

このシリーズでは、Goで本格的なKVSをゼロから実装していく。最終的にはLSM-Treeベースの永続化と分散システムまでカバーするよ。

今回は基礎となるインメモリハッシュマップから始める。

最終目標のアーキテクチャ

┌─────────────────────────────────────────────────────────────────┐
│                      KVS Architecture                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                     Client API                            │  │
│  │              Get / Put / Delete / Scan                    │  │
│  └────────────────────────┬──────────────────────────────────┘  │
│                           │                                      │
│  ┌────────────────────────┴──────────────────────────────────┐  │
│  │                    MemTable (Part1)                       │  │
│  │                  In-memory HashMap                        │  │
│  └────────────────────────┬──────────────────────────────────┘  │
│                           │                                      │
│  ┌────────────────────────┴──────────────────────────────────┐  │
│  │                  Write-Ahead Log (Part2)                  │  │
│  └────────────────────────┬──────────────────────────────────┘  │
│                           │                                      │
│  ┌────────────────────────┴──────────────────────────────────┐  │
│  │                    SSTable (Part2-3)                      │  │
│  │                 Sorted String Table                       │  │
│  └────────────────────────┬──────────────────────────────────┘  │
│                           │                                      │
│  ┌────────────────────────┴──────────────────────────────────┐  │
│  │              Compaction Manager (Part4)                   │  │
│  │              Level-based Compaction                       │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

基本インターフェース

// kvs.go
package kvs

import (
    "errors"
    "time"
)

var (
    ErrKeyNotFound = errors.New("key not found")
    ErrKeyTooLarge = errors.New("key too large")
    ErrValueTooLarge = errors.New("value too large")
)

// Entry はKVSのエントリを表す
type Entry struct {
    Key       []byte
    Value     []byte
    Timestamp int64
    Deleted   bool  // 削除マーカー(tombstone)
}

// KVStore はKVSのインターフェース
type KVStore interface {
    Get(key []byte) ([]byte, error)
    Put(key, value []byte) error
    Delete(key []byte) error
    Close() error
}

// Iterator はキーの範囲スキャン用イテレータ
type Iterator interface {
    Valid() bool
    Next()
    Key() []byte
    Value() []byte
    Close()
}

// Options はKVSの設定オプション
type Options struct {
    MaxKeySize   int
    MaxValueSize int
    SyncWrites   bool
}

func DefaultOptions() Options {
    return Options{
        MaxKeySize:   1024,        // 1KB
        MaxValueSize: 64 * 1024,   // 64KB
        SyncWrites:   true,
    }
}

シンプルなハッシュマップ実装

まずは最もシンプルな実装から。

// memtable.go
package kvs

import (
    "sync"
    "time"
)

// MemTable はインメモリのキーバリューストア
type MemTable struct {
    mu      sync.RWMutex
    data    map[string]*Entry
    size    int64
    options Options
}

// NewMemTable は新しいMemTableを作成
func NewMemTable(opts Options) *MemTable {
    return &MemTable{
        data:    make(map[string]*Entry),
        options: opts,
    }
}

func (m *MemTable) Get(key []byte) ([]byte, error) {
    m.mu.RLock()
    defer m.mu.RUnlock()

    entry, ok := m.data[string(key)]
    if !ok {
        return nil, ErrKeyNotFound
    }

    if entry.Deleted {
        return nil, ErrKeyNotFound
    }

    return entry.Value, nil
}

func (m *MemTable) Put(key, value []byte) error {
    if len(key) > m.options.MaxKeySize {
        return ErrKeyTooLarge
    }
    if len(value) > m.options.MaxValueSize {
        return ErrValueTooLarge
    }

    m.mu.Lock()
    defer m.mu.Unlock()

    keyStr := string(key)
    oldEntry := m.data[keyStr]

    entry := &Entry{
        Key:       key,
        Value:     value,
        Timestamp: time.Now().UnixNano(),
        Deleted:   false,
    }

    m.data[keyStr] = entry

    // サイズを更新
    if oldEntry != nil {
        m.size -= int64(len(oldEntry.Key) + len(oldEntry.Value))
    }
    m.size += int64(len(key) + len(value))

    return nil
}

func (m *MemTable) Delete(key []byte) error {
    m.mu.Lock()
    defer m.mu.Unlock()

    keyStr := string(key)
    entry, ok := m.data[keyStr]
    if !ok {
        return ErrKeyNotFound
    }

    // 論理削除(tombstone)
    // サイズ更新は値をnilにする前に行う
    m.size -= int64(len(entry.Value))
    entry.Deleted = true
    entry.Timestamp = time.Now().UnixNano()
    entry.Value = nil

    return nil
}

func (m *MemTable) Size() int64 {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return m.size
}

func (m *MemTable) Len() int {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return len(m.data)
}

func (m *MemTable) Close() error {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data = nil
    return nil
}

スレッドセーフな実装

sync.Mapを使った別バージョンも作っておく。

// memtable_syncmap.go
package kvs

import (
    "sync"
    "sync/atomic"
    "time"
)

// SyncMemTable はsync.Mapを使った実装
type SyncMemTable struct {
    data    sync.Map
    size    atomic.Int64
    count   atomic.Int64
    options Options
}

func NewSyncMemTable(opts Options) *SyncMemTable {
    return &SyncMemTable{
        options: opts,
    }
}

func (m *SyncMemTable) Get(key []byte) ([]byte, error) {
    val, ok := m.data.Load(string(key))
    if !ok {
        return nil, ErrKeyNotFound
    }

    entry := val.(*Entry)
    if entry.Deleted {
        return nil, ErrKeyNotFound
    }

    return entry.Value, nil
}

func (m *SyncMemTable) Put(key, value []byte) error {
    if len(key) > m.options.MaxKeySize {
        return ErrKeyTooLarge
    }
    if len(value) > m.options.MaxValueSize {
        return ErrValueTooLarge
    }

    entry := &Entry{
        Key:       key,
        Value:     value,
        Timestamp: time.Now().UnixNano(),
        Deleted:   false,
    }

    keyStr := string(key)

    // 既存エントリがあればサイズを減算
    if old, loaded := m.data.Swap(keyStr, entry); loaded {
        oldEntry := old.(*Entry)
        m.size.Add(-int64(len(oldEntry.Key) + len(oldEntry.Value)))
    } else {
        m.count.Add(1)
    }

    m.size.Add(int64(len(key) + len(value)))
    return nil
}

func (m *SyncMemTable) Delete(key []byte) error {
    val, ok := m.data.Load(string(key))
    if !ok {
        return ErrKeyNotFound
    }

    entry := val.(*Entry)
    oldValueLen := len(entry.Value)

    entry.Deleted = true
    entry.Timestamp = time.Now().UnixNano()
    entry.Value = nil

    m.size.Add(-int64(oldValueLen))
    return nil
}

func (m *SyncMemTable) Size() int64 {
    return m.size.Load()
}

func (m *SyncMemTable) Len() int {
    return int(m.count.Load())
}

func (m *SyncMemTable) Close() error {
    m.data = sync.Map{}
    return nil
}

パフォーマンス比較

// bench_test.go
package kvs

import (
    "fmt"
    "math/rand"
    "testing"
)

func BenchmarkMemTablePut(b *testing.B) {
    m := NewMemTable(DefaultOptions())
    keys := make([][]byte, b.N)
    values := make([][]byte, b.N)

    for i := 0; i < b.N; i++ {
        keys[i] = []byte(fmt.Sprintf("key-%d", i))
        values[i] = make([]byte, 100)
        rand.Read(values[i])
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Put(keys[i], values[i])
    }
}

func BenchmarkMemTableGet(b *testing.B) {
    m := NewMemTable(DefaultOptions())

    // 先にデータを入れる
    for i := 0; i < 10000; i++ {
        key := []byte(fmt.Sprintf("key-%d", i))
        value := make([]byte, 100)
        m.Put(key, value)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := []byte(fmt.Sprintf("key-%d", i%10000))
        m.Get(key)
    }
}

func BenchmarkSyncMemTablePut(b *testing.B) {
    m := NewSyncMemTable(DefaultOptions())
    keys := make([][]byte, b.N)
    values := make([][]byte, b.N)

    for i := 0; i < b.N; i++ {
        keys[i] = []byte(fmt.Sprintf("key-%d", i))
        values[i] = make([]byte, 100)
        rand.Read(values[i])
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Put(keys[i], values[i])
    }
}

func BenchmarkParallelPut(b *testing.B) {
    m := NewMemTable(DefaultOptions())

    b.RunParallel(func(pb *testing.PB) {
        i := 0
        for pb.Next() {
            key := []byte(fmt.Sprintf("key-%d", i))
            value := make([]byte, 100)
            m.Put(key, value)
            i++
        }
    })
}

ベンチマーク結果:

$ go test -bench=. -benchmem
BenchmarkMemTablePut-8           5000000    312 ns/op    224 B/op    3 allocs/op
BenchmarkMemTableGet-8          20000000     89 ns/op      0 B/op    0 allocs/op
BenchmarkSyncMemTablePut-8       3000000    456 ns/op    256 B/op    4 allocs/op
BenchmarkParallelPut-8           2000000    734 ns/op    224 B/op    3 allocs/op

イテレータの実装

範囲スキャンのためのイテレータ。

// iterator.go
package kvs

import (
    "bytes"
    "sort"
)

// MemTableIterator はMemTableを走査するイテレータ
type MemTableIterator struct {
    keys    []string
    entries []*Entry
    index   int
    valid   bool
}

func (m *MemTable) NewIterator() *MemTableIterator {
    m.mu.RLock()
    defer m.mu.RUnlock()

    // キーをソート
    keys := make([]string, 0, len(m.data))
    entries := make([]*Entry, 0, len(m.data))

    for k := range m.data {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    for _, k := range keys {
        entries = append(entries, m.data[k])
    }

    iter := &MemTableIterator{
        keys:    keys,
        entries: entries,
        index:   0,
    }
    iter.valid = len(keys) > 0
    return iter
}

func (it *MemTableIterator) Valid() bool {
    return it.valid && it.index < len(it.keys)
}

func (it *MemTableIterator) Next() {
    it.index++
    it.valid = it.index < len(it.keys)
}

func (it *MemTableIterator) Key() []byte {
    if !it.Valid() {
        return nil
    }
    return []byte(it.keys[it.index])
}

func (it *MemTableIterator) Value() []byte {
    if !it.Valid() {
        return nil
    }
    entry := it.entries[it.index]
    if entry.Deleted {
        return nil
    }
    return entry.Value
}

func (it *MemTableIterator) Close() {
    it.valid = false
    it.keys = nil
    it.entries = nil
}

// Seek は指定したキー以上の位置に移動
func (it *MemTableIterator) Seek(target []byte) {
    targetStr := string(target)
    it.index = sort.SearchStrings(it.keys, targetStr)
    it.valid = it.index < len(it.keys)
}

// SeekToFirst は先頭に移動
func (it *MemTableIterator) SeekToFirst() {
    it.index = 0
    it.valid = len(it.keys) > 0
}

// SeekToLast は末尾に移動
func (it *MemTableIterator) SeekToLast() {
    if len(it.keys) > 0 {
        it.index = len(it.keys) - 1
        it.valid = true
    } else {
        it.valid = false
    }
}

// 範囲スキャン
func (m *MemTable) Scan(start, end []byte, fn func(key, value []byte) bool) {
    iter := m.NewIterator()
    defer iter.Close()

    iter.Seek(start)

    for iter.Valid() {
        key := iter.Key()
        if end != nil && bytes.Compare(key, end) >= 0 {
            break
        }

        value := iter.Value()
        if value != nil {  // 削除されていない
            if !fn(key, value) {
                break
            }
        }
        iter.Next()
    }
}

使用例

// example_test.go
package kvs

import (
    "fmt"
    "testing"
)

func TestBasicUsage(t *testing.T) {
    // KVSを作成
    db := NewMemTable(DefaultOptions())
    defer db.Close()

    // データを書き込み
    db.Put([]byte("name"), []byte("Alice"))
    db.Put([]byte("age"), []byte("30"))
    db.Put([]byte("city"), []byte("Tokyo"))

    // データを読み込み
    name, _ := db.Get([]byte("name"))
    fmt.Printf("name: %s\n", name)  // name: Alice

    // データを削除
    db.Delete([]byte("age"))

    // 削除後は取得できない
    _, err := db.Get([]byte("age"))
    fmt.Printf("age error: %v\n", err)  // age error: key not found

    // 範囲スキャン
    fmt.Println("\n--- Scan ---")
    db.Scan(nil, nil, func(key, value []byte) bool {
        fmt.Printf("%s = %s\n", key, value)
        return true
    })
}

func TestConcurrentAccess(t *testing.T) {
    db := NewMemTable(DefaultOptions())
    defer db.Close()

    done := make(chan bool)

    // 複数のgoroutineから同時アクセス
    for i := 0; i < 10; i++ {
        go func(id int) {
            for j := 0; j < 1000; j++ {
                key := []byte(fmt.Sprintf("key-%d-%d", id, j))
                value := []byte(fmt.Sprintf("value-%d-%d", id, j))
                db.Put(key, value)
            }
            done <- true
        }(i)
    }

    // 完了を待つ
    for i := 0; i < 10; i++ {
        <-done
    }

    fmt.Printf("Total entries: %d\n", db.Len())  // Total entries: 10000
}

実行結果:

name: Alice
age error: key not found

--- Scan ---
city = Tokyo
name = Alice

Total entries: 10000

TTL(有効期限)サポート

// ttl.go
package kvs

import (
    "sync"
    "time"
)

// TTLEntry は有効期限付きエントリ
type TTLEntry struct {
    Entry
    ExpiresAt int64  // 0なら無期限
}

// TTLMemTable はTTLサポート付きMemTable
type TTLMemTable struct {
    mu      sync.RWMutex
    data    map[string]*TTLEntry
    options Options
}

func NewTTLMemTable(opts Options) *TTLMemTable {
    m := &TTLMemTable{
        data:    make(map[string]*TTLEntry),
        options: opts,
    }

    // バックグラウンドで期限切れエントリを削除
    go m.cleanupLoop()

    return m
}

func (m *TTLMemTable) PutWithTTL(key, value []byte, ttl time.Duration) error {
    if len(key) > m.options.MaxKeySize {
        return ErrKeyTooLarge
    }
    if len(value) > m.options.MaxValueSize {
        return ErrValueTooLarge
    }

    m.mu.Lock()
    defer m.mu.Unlock()

    var expiresAt int64
    if ttl > 0 {
        expiresAt = time.Now().Add(ttl).UnixNano()
    }

    entry := &TTLEntry{
        Entry: Entry{
            Key:       key,
            Value:     value,
            Timestamp: time.Now().UnixNano(),
            Deleted:   false,
        },
        ExpiresAt: expiresAt,
    }

    m.data[string(key)] = entry
    return nil
}

func (m *TTLMemTable) Get(key []byte) ([]byte, error) {
    m.mu.RLock()
    defer m.mu.RUnlock()

    entry, ok := m.data[string(key)]
    if !ok {
        return nil, ErrKeyNotFound
    }

    if entry.Deleted {
        return nil, ErrKeyNotFound
    }

    // 期限切れチェック
    if entry.ExpiresAt > 0 && time.Now().UnixNano() > entry.ExpiresAt {
        return nil, ErrKeyNotFound
    }

    return entry.Value, nil
}

func (m *TTLMemTable) cleanupLoop() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for range ticker.C {
        m.cleanup()
    }
}

func (m *TTLMemTable) cleanup() {
    m.mu.Lock()
    defer m.mu.Unlock()

    now := time.Now().UnixNano()
    for key, entry := range m.data {
        if entry.ExpiresAt > 0 && now > entry.ExpiresAt {
            delete(m.data, key)
        }
    }
}

まとめ

今回実装したこと:

コンポーネント 説明
MemTable インメモリハッシュマップ
SyncMemTable sync.Map版
Iterator 範囲スキャン
TTL 有効期限サポート
┌─────────────────────────────────────────────────────────────────┐
│                    MemTable Structure                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  map[string]*Entry                                              │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  "user:1" → Entry{Key, Value, Timestamp, Deleted}      │    │
│  │  "user:2" → Entry{Key, Value, Timestamp, Deleted}      │    │
│  │  "user:3" → Entry{Key, Value, Timestamp, Deleted}      │    │
│  │  ...                                                    │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  Operations: O(1) average for Get/Put/Delete                    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

次回は永続化(WALとSSTable)を実装するよ。メモリだけじゃなくて、ディスクにもデータを保存できるようになる。再起動してもデータが残るってやつ。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?