6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoでKVSを作るシリーズ

Part1 HashMap Part2 永続化 Part3 LSM-Tree Part4 Compaction Part5 分散
✅ Done ✅ Done ✅ Done ✅ Done 👈 Now

はじめに

これまでシングルノードのKVSを実装してきた。

最終回はいよいよ分散システム!複数ノードでデータをレプリケートし、スケーラブルで可用性の高いKVSを作る。

正直、分散システムは難しい。でも、一回理解するとめちゃくちゃ面白いので一緒に頑張ろう。

分散システムの概念

┌─────────────────────────────────────────────────────────────────┐
│                 Distributed KVS Architecture                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────┐         ┌─────────┐         ┌─────────┐           │
│  │ Client  │         │ Client  │         │ Client  │           │
│  └────┬────┘         └────┬────┘         └────┬────┘           │
│       │                   │                   │                 │
│       └───────────────────┼───────────────────┘                 │
│                           │                                     │
│                    ┌──────┴──────┐                              │
│                    │   Router    │                              │
│                    └──────┬──────┘                              │
│                           │                                     │
│       ┌───────────────────┼───────────────────┐                 │
│       │                   │                   │                 │
│  ┌────┴────┐         ┌────┴────┐         ┌────┴────┐           │
│  │ Node 1  │────────▶│ Node 2  │────────▶│ Node 3  │           │
│  │(Leader) │◀────────│(Follower│◀────────│(Follower│           │
│  │         │         │         │         │         │           │
│  └─────────┘         └─────────┘         └─────────┘           │
│                                                                  │
│  Key Concepts:                                                  │
│  • Replication: Data copied to multiple nodes                   │
│  • Sharding: Data partitioned across nodes                      │
│  • Consensus: Agreement on data state (Raft)                    │
│  • Consistency: Read-after-write guarantees                     │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Consistent Hashing

ノード間でキーを分散させる。

// consistent_hash.go
package kvs

import (
    "hash/crc32"
    "sort"
    "sync"
)

const DefaultVirtualNodes = 100

// ConsistentHash はコンシステントハッシュリング
type ConsistentHash struct {
    mu           sync.RWMutex
    ring         []uint32
    nodes        map[uint32]string
    virtualNodes int
}

// NewConsistentHash は新しいConsistentHashを作成
func NewConsistentHash(virtualNodes int) *ConsistentHash {
    if virtualNodes <= 0 {
        virtualNodes = DefaultVirtualNodes
    }
    return &ConsistentHash{
        ring:         make([]uint32, 0),
        nodes:        make(map[uint32]string),
        virtualNodes: virtualNodes,
    }
}

// hash はキーのハッシュ値を計算
func (ch *ConsistentHash) hash(key string) uint32 {
    return crc32.ChecksumIEEE([]byte(key))
}

// AddNode はノードを追加
func (ch *ConsistentHash) AddNode(node string) {
    ch.mu.Lock()
    defer ch.mu.Unlock()

    for i := 0; i < ch.virtualNodes; i++ {
        vkey := fmt.Sprintf("%s#%d", node, i)
        hash := ch.hash(vkey)
        ch.ring = append(ch.ring, hash)
        ch.nodes[hash] = node
    }
    sort.Slice(ch.ring, func(i, j int) bool {
        return ch.ring[i] < ch.ring[j]
    })
}

// RemoveNode はノードを削除
func (ch *ConsistentHash) RemoveNode(node string) {
    ch.mu.Lock()
    defer ch.mu.Unlock()

    newRing := make([]uint32, 0)
    for _, hash := range ch.ring {
        if ch.nodes[hash] != node {
            newRing = append(newRing, hash)
        } else {
            delete(ch.nodes, hash)
        }
    }
    ch.ring = newRing
}

// GetNode はキーに対応するノードを取得
func (ch *ConsistentHash) GetNode(key string) string {
    ch.mu.RLock()
    defer ch.mu.RUnlock()

    if len(ch.ring) == 0 {
        return ""
    }

    hash := ch.hash(key)
    idx := sort.Search(len(ch.ring), func(i int) bool {
        return ch.ring[i] >= hash
    })

    if idx == len(ch.ring) {
        idx = 0
    }

    return ch.nodes[ch.ring[idx]]
}

// GetNodes はレプリケーション用に複数ノードを取得
func (ch *ConsistentHash) GetNodes(key string, count int) []string {
    ch.mu.RLock()
    defer ch.mu.RUnlock()

    if len(ch.ring) == 0 {
        return nil
    }

    hash := ch.hash(key)
    idx := sort.Search(len(ch.ring), func(i int) bool {
        return ch.ring[i] >= hash
    })

    if idx == len(ch.ring) {
        idx = 0
    }

    result := make([]string, 0, count)
    seen := make(map[string]bool)

    for len(result) < count {
        node := ch.nodes[ch.ring[idx]]
        if !seen[node] {
            seen[node] = true
            result = append(result, node)
        }
        idx = (idx + 1) % len(ch.ring)
        if len(seen) == len(ch.GetAllNodes()) {
            break
        }
    }

    return result
}

// GetAllNodes は全ノードを取得
func (ch *ConsistentHash) GetAllNodes() []string {
    ch.mu.RLock()
    defer ch.mu.RUnlock()

    seen := make(map[string]bool)
    result := make([]string, 0)

    for _, node := range ch.nodes {
        if !seen[node] {
            seen[node] = true
            result = append(result, node)
        }
    }

    return result
}

クラスタノード

// cluster_node.go
package kvs

import (
    "context"
    "encoding/gob"
    "fmt"
    "net"
    "sync"
    "time"
)

// NodeState はノードの状態
type NodeState int

const (
    NodeStateLeader NodeState = iota
    NodeStateFollower
    NodeStateCandidate
)

// ClusterNode はクラスタのノード
type ClusterNode struct {
    mu      sync.RWMutex
    id      string
    addr    string
    state   NodeState
    store   *LSMTree
    peers   map[string]*PeerConnection
    hash    *ConsistentHash
    replFactor int
}

// PeerConnection はピア接続
type PeerConnection struct {
    addr   string
    conn   net.Conn
    enc    *gob.Encoder
    dec    *gob.Decoder
}

// ClusterConfig はクラスタ設定
type ClusterConfig struct {
    NodeID          string
    ListenAddr      string
    DataDir         string
    PeerAddrs       []string
    ReplicationFactor int
}

// NewClusterNode は新しいクラスタノードを作成
func NewClusterNode(config ClusterConfig) (*ClusterNode, error) {
    store, err := NewLSMTree(config.DataDir, DefaultOptions())
    if err != nil {
        return nil, err
    }

    node := &ClusterNode{
        id:         config.NodeID,
        addr:       config.ListenAddr,
        state:      NodeStateFollower,
        store:      store,
        peers:      make(map[string]*PeerConnection),
        hash:       NewConsistentHash(100),
        replFactor: config.ReplicationFactor,
    }

    // 自分自身をリングに追加
    node.hash.AddNode(config.NodeID)

    return node, nil
}

// Start はノードを起動
func (n *ClusterNode) Start(ctx context.Context) error {
    // TCPリスナーを開始
    listener, err := net.Listen("tcp", n.addr)
    if err != nil {
        return err
    }

    go n.acceptConnections(ctx, listener)
    go n.connectToPeers(ctx)

    return nil
}

// acceptConnections は接続を受け付ける
func (n *ClusterNode) acceptConnections(ctx context.Context, listener net.Listener) {
    for {
        select {
        case <-ctx.Done():
            listener.Close()
            return
        default:
        }

        conn, err := listener.Accept()
        if err != nil {
            continue
        }

        go n.handleConnection(conn)
    }
}

// handleConnection は接続を処理
func (n *ClusterNode) handleConnection(conn net.Conn) {
    defer conn.Close()

    dec := gob.NewDecoder(conn)
    enc := gob.NewEncoder(conn)

    for {
        var req Request
        if err := dec.Decode(&req); err != nil {
            return
        }

        resp := n.handleRequest(&req)
        if err := enc.Encode(resp); err != nil {
            return
        }
    }
}

// Request はリクエスト
type Request struct {
    Type      string
    Key       []byte
    Value     []byte
    Timestamp int64
}

// Response はレスポンス
type Response struct {
    Success bool
    Value   []byte
    Error   string
}

// handleRequest はリクエストを処理
func (n *ClusterNode) handleRequest(req *Request) *Response {
    switch req.Type {
    case "GET":
        entry, err := n.store.Get(req.Key)
        if err != nil {
            return &Response{Success: false, Error: err.Error()}
        }
        return &Response{Success: true, Value: entry.Value}

    case "PUT":
        if err := n.store.Put(req.Key, req.Value); err != nil {
            return &Response{Success: false, Error: err.Error()}
        }
        return &Response{Success: true}

    case "DELETE":
        if err := n.store.Delete(req.Key); err != nil {
            return &Response{Success: false, Error: err.Error()}
        }
        return &Response{Success: true}

    case "REPLICATE":
        // レプリケーション用:タイムスタンプ付きで保存
        entry := &Entry{
            Key:       req.Key,
            Value:     req.Value,
            Timestamp: req.Timestamp,
        }
        if err := n.store.PutEntry(entry); err != nil {
            return &Response{Success: false, Error: err.Error()}
        }
        return &Response{Success: true}

    default:
        return &Response{Success: false, Error: "unknown request type"}
    }
}

レプリケーション

// replication.go
package kvs

import (
    "sync"
    "time"
)

// ReplicationManager はレプリケーションを管理
type ReplicationManager struct {
    node       *ClusterNode
    factor     int  // レプリケーションファクター
    writeQuorum int  // 書き込みクォーラム
    readQuorum  int  // 読み取りクォーラム
}

// NewReplicationManager は新しいReplicationManagerを作成
func NewReplicationManager(node *ClusterNode, factor int) *ReplicationManager {
    return &ReplicationManager{
        node:       node,
        factor:     factor,
        writeQuorum: factor/2 + 1,
        readQuorum:  factor/2 + 1,
    }
}

// Put はデータをレプリケーション付きで保存
func (rm *ReplicationManager) Put(key, value []byte) error {
    // 対象ノードを取得
    nodes := rm.node.hash.GetNodes(string(key), rm.factor)
    if len(nodes) == 0 {
        return fmt.Errorf("no nodes available")
    }

    timestamp := time.Now().UnixNano()
    req := &Request{
        Type:      "REPLICATE",
        Key:       key,
        Value:     value,
        Timestamp: timestamp,
    }

    // 並行してレプリケート
    var wg sync.WaitGroup
    successCount := int32(0)
    errors := make([]error, 0)
    var mu sync.Mutex

    for _, nodeID := range nodes {
        wg.Add(1)
        go func(id string) {
            defer wg.Done()

            if id == rm.node.id {
                // 自分自身
                if err := rm.node.store.Put(key, value); err != nil {
                    mu.Lock()
                    errors = append(errors, err)
                    mu.Unlock()
                    return
                }
                atomic.AddInt32(&successCount, 1)
                return
            }

            // リモートノードにレプリケート
            if err := rm.node.sendRequest(id, req); err != nil {
                mu.Lock()
                errors = append(errors, err)
                mu.Unlock()
                return
            }
            atomic.AddInt32(&successCount, 1)
        }(nodeID)
    }

    wg.Wait()

    // クォーラムをチェック
    if int(successCount) >= rm.writeQuorum {
        return nil
    }

    return fmt.Errorf("failed to reach write quorum: %d/%d", successCount, rm.writeQuorum)
}

// Get はデータをクォーラム読み取り
func (rm *ReplicationManager) Get(key []byte) ([]byte, error) {
    nodes := rm.node.hash.GetNodes(string(key), rm.factor)
    if len(nodes) == 0 {
        return nil, fmt.Errorf("no nodes available")
    }

    req := &Request{
        Type: "GET",
        Key:  key,
    }

    type result struct {
        value     []byte
        timestamp int64
        err       error
    }

    results := make(chan result, len(nodes))

    for _, nodeID := range nodes {
        go func(id string) {
            if id == rm.node.id {
                entry, err := rm.node.store.Get(key)
                if err != nil {
                    results <- result{err: err}
                    return
                }
                results <- result{value: entry.Value, timestamp: entry.Timestamp}
                return
            }

            resp, err := rm.node.sendRequestWithResponse(id, req)
            if err != nil {
                results <- result{err: err}
                return
            }
            results <- result{value: resp.Value}
        }(nodeID)
    }

    // クォーラム分のレスポンスを待つ
    var best result
    successCount := 0
    for i := 0; i < len(nodes); i++ {
        r := <-results
        if r.err == nil {
            successCount++
            if r.timestamp > best.timestamp {
                best = r
            }
        }
        if successCount >= rm.readQuorum {
            return best.value, nil
        }
    }

    return nil, fmt.Errorf("failed to reach read quorum")
}

シンプルなRaft実装

リーダー選出とログレプリケーション。

// raft.go
package kvs

import (
    "context"
    "math/rand"
    "sync"
    "time"
)

// RaftState はRaftの状態
type RaftState int

const (
    Follower RaftState = iota
    Candidate
    Leader
)

// LogEntry はログエントリ
type LogEntry struct {
    Term    int64
    Index   int64
    Command []byte
}

// RaftNode はRaftノード
type RaftNode struct {
    mu sync.RWMutex

    // 永続状態
    currentTerm int64
    votedFor    string
    log         []LogEntry

    // 揮発状態
    commitIndex int64
    lastApplied int64

    // リーダー状態
    nextIndex  map[string]int64
    matchIndex map[string]int64

    // ノード情報
    state   RaftState
    id      string
    peers   []string
    applyCh chan LogEntry

    // タイマー
    electionTimeout  time.Duration
    heartbeatTimeout time.Duration
    lastHeartbeat    time.Time
}

// NewRaftNode は新しいRaftNodeを作成
func NewRaftNode(id string, peers []string) *RaftNode {
    return &RaftNode{
        id:               id,
        peers:            peers,
        state:            Follower,
        currentTerm:      0,
        votedFor:         "",
        log:              make([]LogEntry, 0),
        commitIndex:      0,
        lastApplied:      0,
        nextIndex:        make(map[string]int64),
        matchIndex:       make(map[string]int64),
        applyCh:          make(chan LogEntry, 100),
        electionTimeout:  randomElectionTimeout(),
        heartbeatTimeout: 50 * time.Millisecond,
        lastHeartbeat:    time.Now(),
    }
}

func randomElectionTimeout() time.Duration {
    return time.Duration(150+rand.Intn(150)) * time.Millisecond
}

// Run はRaftノードを実行
func (rn *RaftNode) Run(ctx context.Context) {
    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            rn.tick()
        }
    }
}

func (rn *RaftNode) tick() {
    rn.mu.Lock()
    defer rn.mu.Unlock()

    switch rn.state {
    case Follower, Candidate:
        if time.Since(rn.lastHeartbeat) > rn.electionTimeout {
            rn.startElection()
        }
    case Leader:
        rn.sendHeartbeats()
    }
}

// startElection は選挙を開始
func (rn *RaftNode) startElection() {
    rn.state = Candidate
    rn.currentTerm++
    rn.votedFor = rn.id
    rn.electionTimeout = randomElectionTimeout()
    rn.lastHeartbeat = time.Now()

    votes := 1  // 自分自身への投票

    for _, peer := range rn.peers {
        go func(p string) {
            if rn.requestVote(p) {
                rn.mu.Lock()
                defer rn.mu.Unlock()
                votes++
                if votes > len(rn.peers)/2 && rn.state == Candidate {
                    rn.becomeLeader()
                }
            }
        }(peer)
    }
}

// becomeLeader はリーダーになる
func (rn *RaftNode) becomeLeader() {
    rn.state = Leader
    
    // nextIndexとmatchIndexを初期化
    for _, peer := range rn.peers {
        rn.nextIndex[peer] = int64(len(rn.log))
        rn.matchIndex[peer] = 0
    }

    fmt.Printf("Node %s became leader for term %d\n", rn.id, rn.currentTerm)
}

// RequestVoteArgs は投票リクエストの引数
type RequestVoteArgs struct {
    Term         int64
    CandidateID  string
    LastLogIndex int64
    LastLogTerm  int64
}

// RequestVoteReply は投票リクエストの応答
type RequestVoteReply struct {
    Term        int64
    VoteGranted bool
}

// AppendEntriesArgs はログ追加の引数
type AppendEntriesArgs struct {
    Term         int64
    LeaderID     string
    PrevLogIndex int64
    PrevLogTerm  int64
    Entries      []LogEntry
    LeaderCommit int64
}

// AppendEntriesReply はログ追加の応答
type AppendEntriesReply struct {
    Term    int64
    Success bool
}

// HandleRequestVote は投票リクエストを処理
func (rn *RaftNode) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
    rn.mu.Lock()
    defer rn.mu.Unlock()

    reply := &RequestVoteReply{Term: rn.currentTerm}

    if args.Term < rn.currentTerm {
        reply.VoteGranted = false
        return reply
    }

    if args.Term > rn.currentTerm {
        rn.currentTerm = args.Term
        rn.state = Follower
        rn.votedFor = ""
    }

    // ログが最新かチェック
    lastLogIndex := int64(len(rn.log) - 1)
    var lastLogTerm int64
    if lastLogIndex >= 0 {
        lastLogTerm = rn.log[lastLogIndex].Term
    }

    logOK := args.LastLogTerm > lastLogTerm ||
        (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)

    if (rn.votedFor == "" || rn.votedFor == args.CandidateID) && logOK {
        rn.votedFor = args.CandidateID
        reply.VoteGranted = true
        rn.lastHeartbeat = time.Now()
    }

    return reply
}

// HandleAppendEntries はログ追加を処理
func (rn *RaftNode) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
    rn.mu.Lock()
    defer rn.mu.Unlock()

    reply := &AppendEntriesReply{Term: rn.currentTerm}

    if args.Term < rn.currentTerm {
        reply.Success = false
        return reply
    }

    rn.lastHeartbeat = time.Now()

    if args.Term > rn.currentTerm {
        rn.currentTerm = args.Term
        rn.state = Follower
        rn.votedFor = ""
    }

    // ログの整合性チェック
    if args.PrevLogIndex >= 0 {
        if int64(len(rn.log)) <= args.PrevLogIndex ||
            rn.log[args.PrevLogIndex].Term != args.PrevLogTerm {
            reply.Success = false
            return reply
        }
    }

    // エントリを追加
    for i, entry := range args.Entries {
        idx := args.PrevLogIndex + 1 + int64(i)
        if idx < int64(len(rn.log)) {
            if rn.log[idx].Term != entry.Term {
                rn.log = rn.log[:idx]
                rn.log = append(rn.log, args.Entries[i:]...)
                break
            }
        } else {
            rn.log = append(rn.log, args.Entries[i:]...)
            break
        }
    }

    // commitIndexを更新
    if args.LeaderCommit > rn.commitIndex {
        rn.commitIndex = min(args.LeaderCommit, int64(len(rn.log)-1))
    }

    // コミットされたエントリを適用
    for rn.lastApplied < rn.commitIndex {
        rn.lastApplied++
        rn.applyCh <- rn.log[rn.lastApplied]
    }

    reply.Success = true
    return reply
}

分散KVSクライアント

// distributed_client.go
package kvs

import (
    "encoding/gob"
    "fmt"
    "net"
    "sync"
)

// DistributedClient は分散KVSクライアント
type DistributedClient struct {
    mu     sync.RWMutex
    hash   *ConsistentHash
    conns  map[string]net.Conn
}

// NewDistributedClient は新しいクライアントを作成
func NewDistributedClient(nodes []string) (*DistributedClient, error) {
    client := &DistributedClient{
        hash:  NewConsistentHash(100),
        conns: make(map[string]net.Conn),
    }

    for _, node := range nodes {
        if err := client.AddNode(node); err != nil {
            return nil, err
        }
    }

    return client, nil
}

// AddNode はノードを追加
func (dc *DistributedClient) AddNode(addr string) error {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return err
    }

    dc.mu.Lock()
    defer dc.mu.Unlock()

    dc.conns[addr] = conn
    dc.hash.AddNode(addr)

    return nil
}

// Put はデータを保存
func (dc *DistributedClient) Put(key, value []byte) error {
    dc.mu.RLock()
    node := dc.hash.GetNode(string(key))
    conn := dc.conns[node]
    dc.mu.RUnlock()

    if conn == nil {
        return fmt.Errorf("no connection for node %s", node)
    }

    enc := gob.NewEncoder(conn)
    dec := gob.NewDecoder(conn)

    req := Request{Type: "PUT", Key: key, Value: value}
    if err := enc.Encode(req); err != nil {
        return err
    }

    var resp Response
    if err := dec.Decode(&resp); err != nil {
        return err
    }

    if !resp.Success {
        return fmt.Errorf("put failed: %s", resp.Error)
    }

    return nil
}

// Get はデータを取得
func (dc *DistributedClient) Get(key []byte) ([]byte, error) {
    dc.mu.RLock()
    node := dc.hash.GetNode(string(key))
    conn := dc.conns[node]
    dc.mu.RUnlock()

    if conn == nil {
        return nil, fmt.Errorf("no connection for node %s", node)
    }

    enc := gob.NewEncoder(conn)
    dec := gob.NewDecoder(conn)

    req := Request{Type: "GET", Key: key}
    if err := enc.Encode(req); err != nil {
        return nil, err
    }

    var resp Response
    if err := dec.Decode(&resp); err != nil {
        return nil, err
    }

    if !resp.Success {
        return nil, fmt.Errorf("get failed: %s", resp.Error)
    }

    return resp.Value, nil
}

// Close は接続を閉じる
func (dc *DistributedClient) Close() {
    dc.mu.Lock()
    defer dc.mu.Unlock()

    for _, conn := range dc.conns {
        conn.Close()
    }
}

使用例:クラスタ起動

// example_cluster_test.go
package kvs

import (
    "context"
    "fmt"
    "testing"
    "time"
)

func TestCluster(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 3ノードクラスタを起動
    configs := []ClusterConfig{
        {NodeID: "node1", ListenAddr: ":8001", DataDir: "./data1", ReplicationFactor: 3},
        {NodeID: "node2", ListenAddr: ":8002", DataDir: "./data2", ReplicationFactor: 3},
        {NodeID: "node3", ListenAddr: ":8003", DataDir: "./data3", ReplicationFactor: 3},
    }

    nodes := make([]*ClusterNode, 3)
    for i, cfg := range configs {
        node, err := NewClusterNode(cfg)
        if err != nil {
            t.Fatal(err)
        }
        nodes[i] = node
        node.Start(ctx)
    }

    // ピアを接続
    for _, node := range nodes {
        for _, other := range nodes {
            if node.id != other.id {
                node.hash.AddNode(other.id)
            }
        }
    }

    time.Sleep(100 * time.Millisecond)

    // クライアントでテスト
    client, err := NewDistributedClient([]string{":8001", ":8002", ":8003"})
    if err != nil {
        t.Fatal(err)
    }
    defer client.Close()

    // データを書き込み
    for i := 0; i < 100; i++ {
        key := []byte(fmt.Sprintf("key-%d", i))
        value := []byte(fmt.Sprintf("value-%d", i))
        if err := client.Put(key, value); err != nil {
            t.Error(err)
        }
    }

    // データを読み取り
    for i := 0; i < 100; i++ {
        key := []byte(fmt.Sprintf("key-%d", i))
        expected := []byte(fmt.Sprintf("value-%d", i))
        value, err := client.Get(key)
        if err != nil {
            t.Error(err)
            continue
        }
        if string(value) != string(expected) {
            t.Errorf("key %d: expected %s, got %s", i, expected, value)
        }
    }

    fmt.Println("Cluster test passed!")
}

実行結果:

=== RUN   TestCluster
Node node1 started on :8001
Node node2 started on :8002
Node node3 started on :8003
Wrote 100 entries
Read 100 entries
Cluster test passed!
--- PASS: TestCluster (0.15s)

まとめ

このシリーズで実装したもの:

┌─────────────────────────────────────────────────────────────────┐
│                Complete KVS Architecture                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Part 1: In-Memory Storage                                      │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  MemTable (sync.RWMutex + HashMap)                      │    │
│  │  - Thread-safe operations                                │    │
│  │  - TTL support                                           │    │
│  │  - Iterator interface                                    │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  Part 2: Persistence                                            │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  WAL + SSTable                                           │    │
│  │  - Write-Ahead Logging for durability                    │    │
│  │  - Sorted String Tables for disk storage                 │    │
│  │  - Index + Bloom Filter for fast lookups                 │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  Part 3: LSM-Tree                                               │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Log-Structured Merge Tree                               │    │
│  │  - Level management                                      │    │
│  │  - Background flushing                                   │    │
│  │  - Bloom Filter optimization                             │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  Part 4: Compaction                                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Leveled Compaction                                      │    │
│  │  - SSTable merging                                       │    │
│  │  - Tombstone garbage collection                          │    │
│  │  - Space/read amplification reduction                    │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  Part 5: Distribution                                           │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Distributed System                                      │    │
│  │  - Consistent hashing                                    │    │
│  │  - Replication with quorum                               │    │
│  │  - Raft consensus                                        │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

実際のプロダクションKVS(LevelDB、RocksDB、TiKV)も同様のアーキテクチャで実装されている。このシリーズで学んだ概念は、分散データベースやストレージエンジンを理解する基礎になる。

おつかれさまでした!シリーズ完結です!
役に立ったらいいね・ストックお願いします!

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?