こんにちは!分散システムの重要な技術である「一貫性のあるハッシュ(Consistent Hashing)」について解説した記事を紹介します。
はじめに:なぜConsistent Hashingが必要なのか?
現代のWebサービスでは、大量のデータを複数のサーバーに分散して保存する必要があります。たとえば、ユーザーがアップロードしたファイルや、キャッシュデータなどをどのサーバーに保存するか決める必要があります。
通常のハッシュ関数を使う場合、以下のように実装するかもしれません:
def hash_fn(key):
"""キーをバイト合計し、サーバー数(5台)で割ったあまりを返す"""
return sum(bytearray(key.encode('utf-8'))) % 5
しかし、この方法には大きな問題があります。サーバーの台数が変わったとき、多くのデータの再配置が必要になるのです。
例えば、5台のサーバーがあるとします。ファイル「file1.txt」は hash_fn("file1.txt") = 2
なので、3番目のサーバーに保存されます。しかし、1台サーバーを追加して6台になると、ハッシュ関数は % 6
に変わり、多くのファイルの配置先が変わってしまいます。
これは大規模なデータ移動を引き起こし、システムに大きな負荷をかけます。この問題を解決するのが「Consistent Hashing」なのです。
Consistent Hashingの基本概念
Consistent Hashingの基本的なアイデアは以下の通りです:
- 巨大で固定サイズのハッシュ空間(たとえば0〜2^256-1)を考える
- サーバーもデータも同じハッシュ空間上にマッピングする
- データは「右回りで最も近いサーバー」に割り当てる
これにより、サーバーが追加・削除されても影響を受けるのは、そのサーバーの近くにあるデータだけになります。
ハッシュリングを理解する
Consistent Hashingは「ハッシュリング」と呼ばれる円環状の構造で説明されることが多いです。
このリング上に、サーバーとデータの両方がマッピングされます。データはリング上で時計回りに進み、最初に見つかるサーバーに割り当てられます。
Consistent Hashingの実装方法
実装方法としては、以下のアプローチが考えられます:
- 巨大な配列を使う方法(メモリ効率が悪い)
- 二分探索を使った効率的な方法(実用的)
今回は2番目の方法で実装してみましょう。Pythonのコードで説明します。
基本構造
import hashlib
import bisect
class ConsistentHash:
def __init__(self, total_slots=2**256):
"""
ConsistentHashを初期化
total_slots: ハッシュ空間のサイズ
"""
self.total_slots = total_slots
self._keys = [] # サーバーのハッシュ値を保存するリスト
self.nodes = [] # サーバーノードを保存するリスト
def hash_fn(self, key):
"""
キーをハッシュ値に変換する関数
SHA-256ハッシュを計算し、total_slotsで割ったあまりを返す
"""
hsh = hashlib.sha256()
hsh.update(bytes(key.encode('utf-8')))
return int(hsh.hexdigest(), 16) % self.total_slots
ノードの追加
def add_node(self, node):
"""
新しいノードをハッシュリングに追加する
node: 追加するStorageNodeオブジェクト
"""
if len(self._keys) == self.total_slots:
raise Exception("ハッシュ空間がいっぱいです")
# ノードの位置をハッシュ関数で計算
key = self.hash_fn(node.host)
# キーを挿入する位置を二分探索で見つける
index = bisect.bisect(self._keys, key)
# 同じキーが既にある場合はエラー
if index > 0 and self._keys[index - 1] == key:
raise Exception("衝突が発生しました")
# この時点でデータ移行が必要になる
# (実際のコードではここでデータ移行処理を行う)
# ノードとキーを挿入
self.nodes.insert(index, node)
self._keys.insert(index, key)
return key
ノードの削除
def remove_node(self, node):
"""
ハッシュリングからノードを削除する
node: 削除するStorageNodeオブジェクト
"""
if len(self._keys) == 0:
raise Exception("ハッシュ空間が空です")
# ノードの位置をハッシュ関数で計算
key = self.hash_fn(node.host)
# キーの位置を二分探索で検索
index = bisect.bisect_left(self._keys, key)
# キーが見つからない場合はエラー
if index >= len(self._keys) or self._keys[index] != key:
raise Exception("ノードが存在しません")
# この時点でデータ移行が必要になる
# (実際のコードではここでデータ移行処理を行う)
# ノードとキーを削除
self._keys.pop(index)
self.nodes.pop(index)
return key
アイテムの割り当て
def assign(self, item):
"""
アイテムを適切なノードに割り当てる
item: 割り当てるアイテムの識別子
戻り値: 割り当てられたノード
"""
if len(self._keys) == 0:
raise Exception("ノードがありません")
# アイテムの位置をハッシュ関数で計算
key = self.hash_fn(item)
# アイテムの右側にある最初のノードのインデックスを見つける
# bisect_rightはキー以上の最小のインデックスを返す
index = bisect.bisect_right(self._keys, key) % len(self._keys)
# そのインデックスにあるノードを返す
return self.nodes[index]
Golangによる実装
Consistent Hashingを実際にGolangで実装してみましょう。以下の実装では、より効率的なmurmur3ハッシュ関数を使用しています。
基本的な構造体
まず、ノードとハッシュリングを表す構造体を定義します:
package main
import (
"errors"
"fmt"
"sort"
"sync"
"github.com/spaolacci/murmur3"
)
// Node represents a server in our distributed system
type Node struct {
ID string
Keys map[string]string
}
// ConsistentHashRing manages the hash ring and node assignments
type ConsistentHashRing struct {
mu sync.RWMutex
nodes map[uint32]*Node
hashes []uint32
}
// NewConsistentHashRing creates a new consistent hash ring
func NewConsistentHashRing() *ConsistentHashRing {
return &ConsistentHashRing{
nodes: make(map[uint32]*Node),
hashes: []uint32{},
}
}
// ハッシュ関数としてmurmur3を使用
func hashFunction(key string) uint32 {
return murmur3.Sum32([]byte(key))
}
ノードの追加
ハッシュリングにノードを追加する関数を実装します:
// AddNode adds a new node to the hash ring
func (chr *ConsistentHashRing) AddNode(id string) {
chr.mu.Lock()
defer chr.mu.Unlock()
hash := hashFunction(id)
chr.nodes[hash] = &Node{
ID: id,
Keys: make(map[string]string),
}
chr.hashes = append(chr.hashes, hash)
sort.Slice(chr.hashes, func(i, j int) bool { return chr.hashes[i] < chr.hashes[j] })
}
キーの割り当て先となるノードを見つける
キーに対応するノードを見つける処理は次のように実装します:
// GetNode returns the node that should handle the given key
func (chr *ConsistentHashRing) GetNode(key string) *Node {
chr.mu.RLock()
defer chr.mu.RUnlock()
if len(chr.hashes) == 0 {
return nil
}
hash := hashFunction(key)
idx := chr.GetNextNodeIndex(hash)
return chr.nodes[chr.hashes[idx]]
}
// GetNextNodeIndex finds the next node in the ring after the given hash
func (chr *ConsistentHashRing) GetNextNodeIndex(hash uint32) int {
for i, h := range chr.hashes {
if h >= hash {
return i
}
}
return 0 // 最初のノードに戻る(リングの一周)
}
データの保存と取得
キーと値のペアをノードに保存し、取得する関数を実装します:
// StoreKey stores a key-value pair in the appropriate node
func (chr *ConsistentHashRing) StoreKey(key, val string) {
node := chr.GetNode(key)
if node != nil {
chr.mu.Lock()
node.Keys[key] = val
chr.mu.Unlock()
}
}
// RetrieveKey retrieves a value for a given key
func (chr *ConsistentHashRing) RetrieveKey(key string) (string, error) {
node := chr.GetNode(key)
if node == nil {
return "", errors.New("no node found")
}
chr.mu.RLock()
val, ok := node.Keys[key]
chr.mu.RUnlock()
if !ok {
return "", errors.New("key not found")
}
return val, nil
}
ノードの削除
ノードを削除する際は、そのノードが持っていたキーを次のノードに移す処理も行います:
// RemoveNode removes a node from the hash ring and redistributes its keys
func (chr *ConsistentHashRing) RemoveNode(id string) {
chr.mu.Lock()
defer chr.mu.Unlock()
hash := hashFunction(id)
node, exists := chr.nodes[hash]
if !exists {
return
}
// このノードが持っていたキーを次のノードに移す
idx := 0
for i, h := range chr.hashes {
if h == hash {
idx = i
break
}
}
nextIdx := (idx + 1) % len(chr.hashes)
nextNode := chr.nodes[chr.hashes[nextIdx]]
// すべてのキーを次のノードにコピー
for k, v := range node.Keys {
nextNode.Keys[k] = v
}
// ノードの削除
delete(chr.nodes, hash)
// ハッシュ配列からも削除
chr.hashes = append(chr.hashes[:idx], chr.hashes[idx+1:]...)
}
リング情報の表示
デバッグやモニタリングのために、ハッシュリングの情報を表示する関数も実装しておきます:
// PrintRing displays the current state of the hash ring
func (chr *ConsistentHashRing) PrintRing() {
chr.mu.RLock()
defer chr.mu.RUnlock()
for _, h := range chr.hashes {
fmt.Printf("Node: %s \t\t Hash: %d \t\t Total Keys: %d\n",
chr.nodes[h].ID, h, len(chr.nodes[h].Keys))
}
}
使用例
実際にこの実装を使用してみましょう:
func main() {
ring := NewConsistentHashRing()
// いくつかのノードを追加
servers := []string{"server1", "server2", "server3", "server4", "server5"}
for _, server := range servers {
ring.AddNode(server)
}
// いくつかのキーを保存
keys := []string{"user:1001", "user:1002", "product:5001", "order:8001", "order:8002"}
for i, key := range keys {
ring.StoreKey(key, fmt.Sprintf("value-%d", i+1))
}
// ノードとキーの分布状況を表示
fmt.Println("Initial distribution:")
ring.PrintRing()
// キーを取得
for _, key := range keys {
val, err := ring.RetrieveKey(key)
if err != nil {
fmt.Printf("Key %s not found: %v\n", key, err)
} else {
fmt.Printf("Retrieved %s = %s\n", key, val)
}
}
// 新しいノードを追加
fmt.Println("\nAdding a new node:")
ring.AddNode("server6")
ring.PrintRing()
// ノードを削除
fmt.Println("\nRemoving a node:")
ring.RemoveNode("server3")
ring.PrintRing()
}
このコードを実行すると、ノードの追加や削除を行っても、すべてのキーを再配置する必要がないことがわかります。これが通常のハッシュ関数と比較したConsistent Hashingの大きな利点です。
Consistent Hashingの実用的な応用例
Consistent Hashingは様々な実際のシステムで広く活用されています:
1. 分散キャッシュシステム
Memcachedや、Redis Clusterなどの分散キャッシュでは、キーをどのサーバーに保存するかを決めるために使われています。サーバーが追加・削除されてもキャッシュミスを最小限に抑えられるため、パフォーマンスの安定性が向上します。
2. ロードバランシング
API呼び出しやウェブリクエストを複数のサーバーに均等に分散するために使用されます。同じクライアントからの複数のリクエストを同じサーバーに割り当てる「スティッキーセッション」を実現するのに適しています。
3. データベースシャーディング
大規模データベースを複数のサーバーに分割(シャーディング)する際にも利用されます。レコードをどのデータベースサーバーに保存するかを決定するのに役立ちます。
4. コンテンツ配信ネットワーク(CDN)
Akamaiなどのサービスでは、コンテンツをどのエッジサーバーから配信するかを決める際に使用されています。
5. P2Pネットワーク
BitTorrentなどのP2Pネットワークでは、データの配置やピア検索にConsistent Hashingが使われています。
実装上の注意点と改善方法
基本的なConsistent Hashingには、ノードの追加・削除時のデータ移動を最小限に抑えるという利点がありますが、いくつかの課題もあります:
1. 仮想ノード(Virtual Nodes)の導入
基本的な実装では、ハッシュリング上のノード分布が偏る可能性があります。これを解決するために「仮想ノード(バーチャルノード)」という概念が導入されています。
各物理サーバーを複数の仮想ノードとしてリングに配置することで、より均一な分布を実現できます:
// 各サーバーに対して複数の仮想ノードを作成
func (chr *ConsistentHashRing) AddNodeWithReplicas(id string, replicas int) {
for i := 0; i < replicas; i++ {
virtualID := fmt.Sprintf("%s-%d", id, i)
hash := hashFunction(virtualID)
chr.nodes[hash] = &Node{
ID: id, // 元のサーバーIDを保持
Keys: make(map[string]string),
}
chr.hashes = append(chr.hashes, hash)
}
sort.Slice(chr.hashes, func(i, j int) bool { return chr.hashes[i] < chr.hashes[j] })
}
2. 負荷分散の最適化
サーバー間の負荷を均等に保つために、単純に「最も近いノード」ではなく、負荷状況も考慮した割り当てを行うことができます:
// 負荷状況も考慮したノード選択
func (chr *ConsistentHashRing) GetLeastLoadedNode(key string, maxLoad int) *Node {
chr.mu.RLock()
defer chr.mu.RUnlock()
if len(chr.hashes) == 0 {
return nil
}
hash := hashFunction(key)
// ハッシュリングを一周して、負荷が閾値以下のノードを探す
startIdx := chr.GetNextNodeIndex(hash)
idx := startIdx
for {
node := chr.nodes[chr.hashes[idx]]
if len(node.Keys) < maxLoad {
return node
}
idx = (idx + 1) % len(chr.hashes)
if idx == startIdx {
// 一周してきたら、最初に見つかったノードを返す
return chr.nodes[chr.hashes[startIdx]]
}
}
}
まとめ
Consistent Hashingは、分散システムにおけるデータ配置の問題を効率的に解決する重要なアルゴリズムです。
このアルゴリズムの主な利点は以下の通りです:
- サーバーの追加・削除時にデータ移動が最小限に抑えられる(通常は再配置が必要なのは全体のK/N程度、Kはキー数、Nはノード数)
- スケールアップ・ダウンが容易で、システムの可用性が向上する
- 仮想ノードの導入により、より均一な負荷分散が可能