背景
分散システムでは、データをどのサーバが保持・処理するかを決定する必要があります。
このとき、各データをハッシュ関数で数値に変換し、その結果に基づいてサーバを割り当てる手法が一般的に用いられます。
ここで、データの割り当て方法として、一般的なハッシュ方式以外にもConsistent Hashingアルゴリズムが用いられます。本ブログではこれらのアルゴリズムについて比較する形で説明していきたいと思います。
一般的なハッシュ方式
仕組み
一般的なハッシュ方式では、KV形式のデータを処理するサーバを決定する場合に、Keyのハッシュ値をサーバ数で割った剰余を各サーバのIndexと照らし合わせ決定します。
この方式は、計算がシンプルでデータ探索がO(1)で高速というメリットがあります。
一方で、以下の図のようにサーバ数の増減時に 全データの再割り当て(再ハッシュ) が必要になるというデメリットがあります。
元々4台のサーバがあった場合にKeyのハッシュ値を4で割っていたものの、Server Bが利用できない場合、今度はサーバ数が3台となり3で割る必要があるため、全データを異なるサーバにマッピングし直す必要があります。
実装
Goでの実装の中で主要な処理を抜き出すと以下のようになります。
この処理では、Keyをハッシュ化し、その値をサーバ数で割った剰余を用いて担当サーバを決定しています。
type SimpleHash struct {
mu sync.RWMutex
hash func([]byte) uint32
nodes []string
}
func (sh *SimpleHash) Get(key string) string {
sh.mu.RLock()
defer sh.mu.RUnlock()
h := sh.hash([]byte(key))
idx := int(h) % len(sh.nodes)
return sh.nodes[idx]
}
Consistent Hashing
仕組み
Consistent Hashingは、一般的なハッシュ方式において「サーバ数の増減時に全データを再割り当てする必要がある」という欠点を補うアルゴリズムです。
簡単のために、ハッシュ値が0~10までである場合にKVデータがどのサーバに割り当てられるかを見ていきます。
各サーバについてハッシュ値を計算し、リング上に配置します。
そして、Keyのハッシュ値を計算し、その値から時計回りに進んで最初に見つかるサーバが担当します。
以下の例だと、例えばKeyのハッシュ値が 2 の場合は Server B (ハッシュ値 3)が担当し、Keyのハッシュ値が 7 の場合は Server D (ハッシュ値 8 )が担当するといった流れです。
リング上にサーバを配置することで、サーバ数が増減したとしても、影響を受けるのは近傍のデータのみとなり、全体の再割り当てを避けることができます。
以下の例では、Server B(ハッシュ値 3 )が利用できなくなった場合、影響を受けるのはハッシュ値が2のKeyのみです。このKeyは、次に位置する Server C(ハッシュ値 5 )へ再割り当てされます。一方で、ハッシュ値が 7 のKeyなど、他のサーバに割り当てられているデータには影響がありません。
仮想ノードの導入
一般的に、Consistent Hashingで利用されるハッシュ空間は 32bit 程度で表現され、
範囲は 0 〜 2³²−1 のように非常に広く設定されます。
この広大な空間に対して、サーバ数が少ない場合において特定のサーバに多くのキーが偏ってしまう可能性が十分にあります。
これは、各サーバのハッシュ値がランダムに配置されるため、リング上の距離が均等にならないことが原因です。
例えば以下のような場合です。
Server AはKeyのハッシュ値が3~10までを担当するものの、Server Bはkeyのハッシュ値が1のみを担当するため、偏りが発生してしまっています。
そこで、各サーバを複数の 仮想ノード(Virtual Node) として扱う手法が一般に用いられます。
例えば、ServerA_1, ServerA_2, ServerA_3 のようにサーバAに複数の仮想ノードを割り当て、それぞれに対してハッシュ値を計算しリング上に配置します。
これにより、各サーバがリング上に複数の位置を持つことになり、キーの分布がより均一になって負荷の偏りを防ぐことができます。
以下の例では、ServerA_1, ServerA_2, ServerB_1, ServerB_2 という仮想ノードを作成し、仮想ノードと実際のサーバを対応して管理し配置することで偏りをなくすことができています。
実装
以下の実装では、replicas の数だけ仮想ノードを作成します。
各仮想ノードは ServerA_0", "ServerA_1 のように実ノード名にインデックスを付けた形で識別されます。
次に、各仮想ノードの名前をハッシュ関数に通して得られた値をkey、対応する実ノード名 をvalueとして、ring というマップで管理します。
これにより、リング上のどの仮想ノードがどの実ノードに対応するかを一意に表現できます。
全ノードの情報は HashRing 構造体の中で一括管理されており、
キーの担当ノードを探索する際は sort.Search による 二分探索 を行うことで、O(log N) の計算量で該当ノードを十分高速に見つけることができます。
また、サーバ(ノード)が増減したとしても、影響範囲は 増減したサーバの隣接部分に限定される 点が大きなメリットとなります。
type HashRing struct {
mu sync.RWMutex
hash func([]byte) uint32
replicas int
keys []uint32
ring map[uint32]string
}
func (hr *HashRing) Add(node string) {
hr.mu.Lock()
defer hr.mu.Unlock()
// add virtual nodes
for i := 0; i < hr.replicas; i++ {
virtualKey := fmt.Sprintf("%s_%d", node, i)
h := hr.hash([]byte(virtualKey))
if _, exists := hr.ring[h]; exists {
continue
}
hr.ring[h] = node
hr.keys = append(hr.keys, h)
}
sort.Slice(hr.keys, func(i, j int) bool { return hr.keys[i] < hr.keys[j] })
}
func (hr *HashRing) Get(key string) string {
hr.mu.RLock()
defer hr.mu.RUnlock()
h := hr.hash([]byte(key))
idx := sort.Search(len(hr.keys), func(i int) bool { return hr.keys[i] >= h })
if idx == len(hr.keys) {
idx = 0
}
return hr.ring[hr.keys[idx]]
}
動作確認
Keyが各サーバに分散されているか
一般的なハッシュ方式も、Consistent HashingもAdd, Remove, Getが主な処理になるためインターフェースを定義し抽象化します。
type HashDistributor interface {
Add(node string)
Remove(node string)
Get(key string) string
}
func testHashDistributor(name string, dist distributor.HashDistributor) {
fmt.Printf("=== %s ===\n", name)
dist.Add("NodeA")
dist.Add("NodeB")
dist.Add("NodeC")
keys := []string{"apple", "banana", "cherry", "grape", "melon"}
for _, k := range keys {
fmt.Printf("%s → %s\n", k, dist.Get(k))
}
fmt.Println()
}
func main() {
testHashDistributor("Consistent Hash", consistenthash.New(3))
testHashDistributor("Simple Hash", hash.New())
}
このとき、実行結果は以下のようになり分散ができていることがわかります。
Consistent Hashingの場合は仮想ノードを導入したことでうまく分散されるようになりました。
=== Consistent Hash ===
apple → NodeB
banana → NodeB
cherry → NodeA
grape → NodeC
melon → NodeA
=== Simple Hash ===
apple → NodeC
banana → NodeB
cherry → NodeB
grape → NodeC
melon → NodeA
ベンチマークによりデータ再割り当ての割合を計算
長くなってしまいますが以下のようにして ノード追加・削除時のKeyの再割り当て割合 を計算しています。
全 Key に対して初期状態の Get(key) の結果を記録し、その後ノード追加または削除を行い、再度 Get(key) を呼び出して担当ノードの変化した Key の数を数えることで、再割り当ての割合を測定しています。
func generateTestKeys(count int) []string {
keys := make([]string, count)
for i := 0; i < count; i++ {
keys[i] = fmt.Sprintf("key-%d", i)
}
return keys
}
func recordMapping(keys []string, dist distributor.HashDistributor) map[string]string {
mapping := make(map[string]string, len(keys))
for _, key := range keys {
mapping[key] = dist.Get(key)
}
return mapping
}
func measureRemapping(keys []string, beforeMapping map[string]string, dist distributor.HashDistributor) int {
remapped := 0
for _, key := range keys {
afterNode := dist.Get(key)
if beforeMapping[key] != afterNode {
remapped++
}
}
return remapped
}
func benchmarkHashDistributor(
name string,
dist distributor.HashDistributor,
testKeys []string,
numKeys, numInitialNodes, numNodesToAdd int,
) {
fmt.Printf("--- %s ---\n", name)
// Initialize nodes
initialNodes := make([]string, numInitialNodes)
for i := 0; i < numInitialNodes; i++ {
nodeName := fmt.Sprintf("Node%d", i)
initialNodes[i] = nodeName
dist.Add(nodeName)
}
// Record initial mapping
beforeMapping := recordMapping(testKeys, dist)
// Add new nodes
for i := 0; i < numNodesToAdd; i++ {
nodeName := fmt.Sprintf("Node%d", numInitialNodes+i)
dist.Add(nodeName)
}
// Measure remapping after adding nodes
remappedAfterAdd := measureRemapping(testKeys, beforeMapping, dist)
afterAddMapping := recordMapping(testKeys, dist)
// Remove a node
dist.Remove(initialNodes[0])
// Measure remapping after removing node
remappedAfterRemove := measureRemapping(testKeys, afterAddMapping, dist)
// Print results
fmt.Printf("After adding %d nodes:\n", numNodesToAdd)
fmt.Printf(" Remapped keys: %d (%.2f%%)\n", remappedAfterAdd, float64(remappedAfterAdd)/float64(numKeys)*100)
fmt.Printf("After removing 1 node:\n")
fmt.Printf(" Remapped keys: %d (%.2f%%)\n", remappedAfterRemove, float64(remappedAfterRemove)/float64(numKeys)*100)
fmt.Println()
}
func runBenchmark() {
const (
numKeys = 10000
numInitialNodes = 5
numNodesToAdd = 2
)
testKeys := generateTestKeys(numKeys)
benchmarkHashDistributor(
"Consistent Hash",
consistenthash.New(150),
testKeys,
numKeys,
numInitialNodes,
numNodesToAdd,
)
benchmarkHashDistributor(
"Simple Hash (Normal Hash Function)",
hash.New(),
testKeys,
numKeys,
numInitialNodes,
numNodesToAdd,
)
}
func main() {
runBenchmark()
}
ベンチマークの実行結果は以下のようになりました。
この結果から、Consistent Hashingはノード(サーバ)の増減が発生したとしてもデータ再割り当ての範囲を通常のハッシュ方式と比較して抑えることができていることが読み取れます。
--- Consistent Hash ---
After adding 2 nodes:
Remapped keys: 2252 (22.52%)
After removing 1 node:
Remapped keys: 890 (8.90%)
--- Simple Hash (Normal Hash Function) ---
After adding 2 nodes:
Remapped keys: 8595 (85.95%)
After removing 1 node:
Remapped keys: 8551 (85.51%)
一般的なハッシュ方式は hash(key) % N に依存するため、ノード数が変わると ほとんど全ての Key が再配置されてしまう一方で、Consistent Hashing はリング構造により、影響が変化したノードの近傍に限定されるため、再配置率が劇的に低い値となっています。
まとめ
一般的なハッシュ方式ではKVデータをサーバに割り当てる際の計算量がO(1)であるのに対し、Consistent Hashingは二分探索を行うためO(logN)となります。その一方で、サーバ数が増減する際のデータ再割り当てに関しては大きな違いが生じることを確認することができました。
今回は、実際にDynamo等で使用されているConsistent Hashingのアルゴリズムを実装してみることでより分散システムの理解を深めることができました。
参考までに、実装を行ったリポジトリは以下になります。





