Edited at

分散合意アルゴリズム Raft を理解する


概要

Raft は安全な State Machine Replication を実装するためのアルゴリズム。分散システムにおいて一貫性 (分散トランザクション) と可用性 (障害耐性) を実装するための基本的な部品として使用することができる。


TL;DR


  • 一貫性のあるレプリケーションや分散実行を行うことができる。

  • 長期間の停止や大幅に遅延したノードが復帰したり、ノード故障によるリーダー交代が発生しても、合意したデータが破壊されるシナリオが存在しない。

  • 役割もアルゴリズムも構成も簡単 (simplicity) & 簡単 (easiness)。

  • 理解のための最小限の範囲はリーダー選挙、ログ複製の 2 つのみ (Heartbeat はログ複製に含まれる)。


    • この記事ではその 2 つを説明。

    • 実用目的にはスナップショット (ログコンパクション) とノードの追加/削除 (メンバーシップ変更) などが必要。




前書き

「Raft の名前を知っている、仕組みに興味がある」から「英語の論文を読み解くのは時間コストが…」までの間で悶々としている人向けの記事です。パワー系プログラマであれば少し論文から補足すれば「俺が想像する Raft」の実装はできると思います。

この記事は私が Raft の論文 In Search of an Understandable Consensus Algorithm (Extended Version) を読み、自分の言葉で説明することで理解を進めることを目的としています。The Raft Consensus AlgorithmRaft - Understandable Distributed Consensus のアニメーション説明と併せて読むと理解の助けになると思います。


State Machine Replication

State Machine Replication は複数のマシンにデータの複製を作る方法の一つ。複数のサーバがデータのコピーを持ち合うことで 1 台のサーバが故障しても残ったサーバでサービスを継続することができる。このような設計方針は可用性 (Availability) を確保する分散システムの一般的な手段である。一般にこの State Machine Replication の可用性に対して、一貫性 (Consistency) を加えるのであればトランザクションを導入するし、分断耐性 (Partition Tolerance) を加えるのであれば結果整合性を導入する。

State Machine Replication は同一の初期状態から開始した複数の有限状態マシン (FSM) に同じコマンドを同じ順序で適用することで全てのマシンを同じ状態で遷移させる。Raft では全てのマシンにマルチキャストされる命令をログと読んでいる。

State Machine Replication

全てのマシンが同じ状態を共有するためにログの適用結果は決定的でなければならない。例えば乱数や現在時刻、あるいはライブラリ依存の動作など、サーバごとに結果が異なる命令は状態矛盾を引き起こすため使用することはできない。ディスク不足や必須ライブラリの欠落など、実行環境固有の非決定的な失敗が起きたマシンは状態矛盾が発生している可能性があるため意図的にクラスタから離脱させる必要があるかもしれない。

可用性と分断耐性にノードの自立性を加えた設計は非構造型 P2P ネットワークの基盤として利用されている。Bitcoin を始めとする多くのブロックチェーン実装はトランザクションと呼ばれる不可分取引情報を改ざんが不可能な鎖状に連結して P2P ネットワークで共有し、参加しているノードが順に実行することで同じ状態を共有している。これは State Machine Replication と同じ考え方である。ただし、Proof of Work (およびその難易度に比重調整を付けた Proof of Stake のような亜種) に基づく初期のリーダー選出アーキテクチャは選挙が確定的ではなく、これは有限時間内にステートマシンの状態を 100% 確定することができないという結果をもたらしている (DPoS は PoW とは別物で確定的)。


障害モデル

Raft は設計自体に Crash-Recovery (長時間故障/遅延していたノードがクラスタに復帰する) 障害に耐性がある。非同期の分散システムでは故障と遅延の区別がつかないことから、Crash-Recovery 耐性は Performance/Timing 耐性、Omission 耐性、Crash-Stop 耐性があることも意味する。

Failure
Raft

Crash-Stop
突然ネットワーク上から消失し復帰しない。

Omission
偶発的なメッセージのロスト。

Performance, Timing
許容時間 $t$ 内での応答の遅れ。

Crash-Recovery
状態遷移シーケンス上の過去の任意の位置からの復帰。

Byzantine, Arbitrary
状態遷移シーケンスから逸脱した任意のタイミングで任意のメッセージ。悪意の可能性。

Raft は Byzantine 障害に対する耐性がなく、論文を一見して恒久的なリーダーの乗っ取りからのログの改ざん、リーダー選挙の妨害が可能であるところを見ても、完全に管理されたネットワーク向けのアルゴリズムである。Byzantine 耐性が必要であればパフォーマンスは落ちるが pBFT など強い相互結果検証を持ったアルゴリズムの検討が必要だろう。

論文では Crash-Recovery より深刻な障害耐性には言及していないが (論説の範囲を外れるため当然だが)、例えば「テスト環境で使用しているノードの 1 つが事故で本番クラスタに『も』参加してしまった」といったような、運用事故で起きうる障害 (大抵 Arbitrary 障害の一ケースに分類されるタイプ) はクラスタ識別子の導入などで回避できるだろう。もし実際に Raft を実装するなら現実的に想定される障害に対して工夫できる余地が多くある。


基本的な構成

Raft は定足数 (quorum; Raft では過半数と同意) に基づくアルゴリズムのため実働想定のノード数は奇数構成が推奨される (偶数にしても故障個所と定足数が増えるだけで障害許容ノード数は変わらず障害耐性は下がる)。一般的に一貫性を提供する分散システムはノード間の通信頻度が高く、特にリーダーが強い役割を持つ Raft ではノード数が増えればリーダーの負荷も大きくなる。このような強い CA 型の分散システムでは、単独のコンセンサスクラスタとして 3、5、多くても 7 ノードが一般的だろう (論文では 5 ノードが最適としている)。それ以上は一貫性を緩和して複数のクラスタ間で協調するような設計が必要だろう。

Raft のコンセンサスクラスタは、選挙によって選ばれた 1 つのリーダーと、残りのフォロワーによって構成される。各サーバはリーダー選挙時の候補者を加えて 3 つの役割のいずれか 1 つに割り当てられている。Raft クラスタは全てのサーバがフォロワーで「リーダー選挙待ち」の状態から開始する。

Raft の主要な動作はリーダー選挙ログ複製の 2 つだけである。これを行うために実装者が用意する API は RequestVote RPC と AppendEntries RPC の 2 つのみである。

元の論文では Joint Consensus なる方法で新しいノードの追加や離脱 (メンバーシップ変更) のアルゴリズムを紹介していたが、一見してリーダーが機能している前提 -- つまり過半数以上のノードが故障しリーダー選挙ができない状態では手動でクラスタを再構成/再起動する必要があるようで、メンバーシップ変更には別により良いアプローチがあるように感じた。この記事ではメンバーシップ変更についての説明は行わない。

また論文では全てのログをそのまま保持するのではなく、状態スナップショットを作成することでデータ容量を削減することも提案しているが、分散合意とは性質が異なるためこの記事では省略する。


死活監視

フォロワーはリーダーが故障したときに速やかに新しいリーダー選挙を開始する必要がある。Raft ではリーダーから全てのフォロワーに対して定期的に Heartbeat を送信することでサーバが故障していないことを知らせている。

Heartbeat

Heartbeat の実体は、ログ複製のための AppendEntries RPC を複製ログ 0 件で呼び出しているだけである。AppendEntries には選挙やログの世代がどこまで進んでいるかの情報も含まれていることから、長期間停止していたフォロワーが突然復帰したとしても、AppendEntries RPC のやりとりがあった時点で自分の状態が古くなっていることを認識することができる。

フォロワーはリーダーから一定時間 Heartbeat (や通常のログ複製) が到達しなければリーダーが故障したものとして新しい選挙に立候補する。ここで、複数のフォロワーがほぼ同時に立候補してしまうと、選挙を失敗させる split vote (後述) が発生しやすくなるため、Raft ではフォロワーごとに選挙タイムアウトを乱数調整している。例えば Heartbeat 間隔が 1 秒、Heartbeat 到達の最大許容遅延を 0.5 秒として選挙タイムアウトを 1 + 0.5 + 1.5 × rand() とすれば、同時に候補者が発生する確率を減らしつつ最悪でも 3 秒以内にリーダーの故障を検知できるだろう。

Crash Detect

論文では選挙タイムアウトは Heartbeat 間隔より一桁大きい程度の見積もりを推奨している。Heartbeat の現実的な平均ブロードキャスト応答時間 (RTT) を $t_b$ = 0.5~20ms と想定した場合、最短の選挙タイムアウトは $t_e$ = 10~500ms 程度となり、これにサーバごとの乱数調整分が加算すればよいだろう。

なお乱数調整分をサーバごとの優先値 × $t_b$ のように設計すれば split vote も発生しづらく (確定的とは言えないが) リーダーの担当順序を方針づけられるだろう。分散環境でのサーバの順位付けには起動タイムスタンプの古さなどいくつかの方法がある。

リーダーの故障を検知したら次は選挙だ。


リーダー選挙

Raft には選挙が行われるたびに単純増加するタームと呼ばれる数値 (論理クロック) がある。ある時点のタームを担当するリーダーは 1 つないしは 0 (選挙失敗時) であり、2 つ以上のリーダーが同一のタームに発生することはない。

さて、コンセンサスクラスタ内のリーダー不在を検知したフォロワーはリーダーに立候補し候補者となる。候補者は自分の認識しているタームを一つ進め、自分に投票し、クラスタ内の他のサーバの RequestVote RPC (投票要求) を呼び出す。

RequestVote を受けたサーバは、自分がまだそのタームに投票しておらず、候補者の持つログが自分と同じかそれより新しい場合に投票を行う。さもなくば投票を拒否する。クラスタ内のサーバの過半数票を獲得した候補者はリーダーとなり Heartbeat をブロードキャストして自分が新しいタームのリーダーとなったことをクラスタの全サーバに通達する。

Leader Election

全てのフォロワーは、受信した Heartbeat (AppendEntries RPC) のタームが自分の認識しているタームより進んでいた場合、自分がそのタームの投票にどう関与したかにかかわらずその送信元のサーバを最新のリーダーと認識する (従ってもしクラスタ内に Byzantine ノードが存在した場合は選挙を行わずに容易にリーダーとなることが可能である)。

反対に、受信した Heartbeat が自分の認識しているタームより小さい場合、故障して停止していた古いリーダーが復帰して何も知らずに送信したものであると推測できる。この時フォロワーは自分の認識している最新のターム付きで失敗応答を返すため、旧リーダーは自分がすでにリーダーではない事を知ることができる。


選挙の失敗

リーダー選挙はいくつかの理由で失敗することがある。一つの例は応答可能なサーバが過半数を下回ったケースである。このような状況ではどの候補者もリーダーになることはできない (これは Raft で Split Brain が発生した時に片方のクラスタがサービスを継続できない = 分断耐性がないことも意味する)。

もう一つの例は Split Vote である。複数のフォロワーがリーダー不在を検知し、ほぼ同時に RequestVote RPC をブロードキャストした場合、残りのサーバの票が割れてどの候補者も過半数票を獲得できない可能性がある。

Split Vote

Raft は Heartbeat と全く同じ選挙タイムアウト機構を選挙時にも機能させている。RequestVote RPC の呼び出しから選挙タイムアウトの時間が経過しても新しいリーダーからの Heartbeat が到達しなかったサーバ (過半数を獲得できていない候補者も含む) は、そのタームの選挙が失敗したものとみなして、自分のタームを一つ進め、自分が立候補して新しい選挙を開始する。

Raft では同時の立候補が発生し辛いように各ノードの選挙タイムアウト時間はノードごとに乱数を加算して調整している。


機能回復時間

リーダーが故障してからクラスタが機能を回復するまでの時間を考えてみよう。Heartbeat の平均ブロードキャスト応答時間 (RTT) を $t_b$、選挙タイムアウトを $t_e$ とすると、リーダー故障からクラスタ機能回復までの各ステップに要する時間は以下のようになる。

Recovery Time

例えば $t_b$ = 20ms、$t_e$ = 500ms の環境では (リーダー以外の故障や split vote が発生しなければ) 最大 540ms 程度でクラスタ機能を回復することができるだろう。split vote が発生するケースでは 2. を $t_e$ で繰り返すこととなる。つまり split vote のような選挙失敗の発生回数を $n_f$ とした場合 $t_e + n_f \times t_e + 2 t_b$ 程度で回復する。

非同期の分散システムでよく使われるリーダー選挙という手法は split vote のような状況が連続して発生することを完全に阻止することはできない点に注意。言い換えると、Raft において一度リーダー選挙が発生すると機能回復が有限時間内に終わることを保証することはできない。これは安全性/障害耐性 (≒一貫性/可用性) を実装している非同期システムは有限時間内に処理を終了することを保証できないという FLP Impossibility が意味するところでもある。例えばフォロワーが20ノード存在すれば複数の立候補者が連続する確率は現実的に非常に高くなる。


クライアント

クライアントはどのようにしてリーダーを知ればよいだろうか? すべてのフォロワーは現在のリーダーを知っていることから、コンセンサスクラスタ内のいずれかのサーバと接続し、もしそのサーバがリーダーでなければリーダーのアドレス付きで失敗するのは一つの方法だろう (HTTP の 302 Found リダイレクト相当の動き)。


ログ複製

全てのログは AppendEntries RPC を使ってリーダーからフォロワー方向にのみ伝達される。リーダーは自分を含めて過半数のサーバがログを保存できたことを確認するとログの複製を完了したとみなして (commit) クライアントに応答する。少し遅れてフォロワーは次の Heartbeat もしくはログ複製要求を受けたときにリーダーがどこまで commit したかが分かるため、その時点までのログをローカルのステートマシンに適用する。

Log Replication


一貫性レベル

上記のシーケンス図が示すように、リーダーがどの時点を完了とみなしてクライアントに応答するかには考慮の余地がある。いくつかの分散データベースではこのようなパラメータを一貫性レベルとしてクライアント要求ごとに指定が可能になっている。


  1. リーダーがローカルのエントリにログを保存できた時点で

  2. クラスタ内の定足数以上がログの複製できた時点で (クラスタとして確定)

  3. クラスタ内のすべてがログを複製できた時点で

  4. リーダーが定足数のログ複製を確認しローカルの状態を更新した時点で (論文はここを示している)

  5. クラスタ内の定足数以上が状態を更新できた時点で

  6. クラスタ内のすべてが状態を更新できた時点で

上記の 1. は障害耐性が欠落しているが、重要なデータでなく高速な応答が求められるときに有用だろう。Raft の設計では 2. 以上であれば障害耐性を保証できる。応答にログの適用結果が必要なら 4. が必要だし、一貫性を犠牲にして負荷分散のためにフォロワーを Read Only ノードとして利用しているのであれば 6. が必要な場面もあるかもしれない。他にも地理分散を想定して1つのデータセンター障害でも可用性を確保できるよう「すべてのデータセンター内の過半数を満た時点で」といった方法も考えられる (CA型分散システムがデータセンターで分離されることはあまり見ないないが)。


障害時の復旧動作


停止したフォロワーの復帰

リーダーは AppendEntries RPC に応答のないフォロワーを検出すると、比較的短時間で復帰することを期待して定期的に同じ RPC を実行する。ただしこれは単なる投機実行である。

リーダー交代前にフォロワーが復帰した場合、リトライしていたログから順に再送信されフォロワーの状態は復元する。停止中にリーダー交代があった場合、新しいリーダーも停止を検知して同じように送信を繰り返すが、新しいリーダーが送っているログは復旧に必要なログの正しいインデックスではない可能性がある。この場合、フォロワーが復帰してもログ複製は拒否され、応答で正しいログのインデックスを渡される。リーダーはその正しいインデックスから再送信する。

Heartbeat がログ複製と同じ AppendEntiries RPC であることを考えれば、この再送信処理はなくても Heartbeat が到達した時点で再送信処理が開始する。


リーダー停止からの復旧

フォロワーがリーダーの停止を検出すると新しい選挙が始まり次のリーダーが選出される。ではこのとき処理中のログはどうなるだろうか?

前提として、Raft の投票条件は自分より確定済みログの複製が進んでいない候補者を拒否する。つまり過半数に複製されている最新ログを持っていない候補者は過半数票を取り得ない仕組みになっている。さらに言い換えると、過半数票を得た新リーダーは、少なくとも最新の確定済みログの複製を持っていることを意味している (Leader Completeness Property)。

クライアントに応答する前にリーダーがクラッシュした場合、その処理はどうなるだろうか? ログ複製の進行状況によっていくつかのケースが考えられる。


  1. ログが過半数に複製されていれば、ログは確定しており新リーダーもそのログを持っている。

  2. ログが過半数に複製されなければ:


    1. 新リーダーがログを持っていればフォロワーに複製/確定される (旧リーダーの処理を引き継ぐ結果)。

    2. 新リーダーがログを持っていなければ:


      1. 新リーダーが新しいログを受信/複製した時点で、一部のフォロワーに残っていたそのログは上書き/無効化される。

      2. 新リーダーが新しいログを受信/複製する前に停止すれば、新しいリーダー選挙後に 2. へ戻る。





  3. 旧リーダーのローカルエントリに保存される前であれば、クラスタ内には物理的に存在せず無効。

ログが過半数のサーバに複製されていなかったとしても、新しいリーダーが引き継いでログが確定する可能性がある点に注意。Raft は処理中にリーダーが故障したとしても、そのログは最終的に「確定」か「無効」かに収束しようとする。ただし 2.2.1 で新しいログを受信しない場合や、2.2.2 を繰り返すケースがあるため、有限時間内にどちらかに定まる保証はない。

処理中のリーダーがクラッシュした場合、クライアントはそのログがクラスタで確定したかを知るすべはない。論文では、クライアントがログにユニークな番号をつけ、サーバ側でそのログが確定済みかを検査することで、失敗時にクライアントがリトライしても安全な設計を提案している。


実装イメージ

覚え書き程度に書き起こしただけなので詳細は論文の 5 章あたりを参照。この章はしばらくしたら更新するかもしれない。


ステートマシンの内部状態

Raft ステートマシンは以下のような内部状態で選挙とログ複製を実行するだろう。

State Machine

Leader State は自分がリーダーの場合に使用する。サーバごとに次に送るべきログインデックスと、そのサーバと自分とで (タームも含めて) 一致している最も大きいログインデックスを保持している。


AppendEntries RPC

以下に AppendEntiries RPC の処理説明を擬似コードで記述する。

def AppendEntries(

term:Long, // リーダーのターム (0,1,2,...)
leaderId:Int, // このタームのリーダー
prevLogIndex:Long, // このリクエストのログエントリの直前のログインデックス (0,1,2,...)
prevLogTerm:Long, // prevLogIndex のターム (0,1,2,...)
entries:Array[Log], // 複製するログ (効率のため複数可; Heartbeat の場合は空)
leaderCommit:Long // リーダーが確定と認識しているログインデックス (0,1,2,...)
):(
term:Long, // フォロワーが認識している現在のターム (0,1,2,...)
success:Boolean // ログを複製した場合 true
) = {
if(term < this.currentTerm || // 旧世代のリーダーが Crash-Recovery した可能性
this.entries.length <= prevLogIndex || // 自分が Crash-Recovery した可能性
this.entries[prevLogIndex].term != prevLogTerm // 自分が Crash-Recovery した可能性
){
return (this.currentTerm, false)
}

// ログを複製する
for(i = 0; i < entries.length; i++){
this.entries[prevLogIndex + i].term = term
this.entries[prevLogIndex + i].log = entries[i]
}

// リーダーが確定と認識しているログの位置を合わせる
if(leaderCommit > this.commitIndex){
this.commitIndex = min(leaderCommit, prevLogIndex + entries.length)
}
this.currentTerm = term
return (this.currentTerm, true)
}


RequestVote RPC

以下に RequestVote RPC の処理説明を擬似コードで記述する。

def RequestVote(

term:Long, // 候補者のターム (0,1,2,...)
candidateId:Int, // 候補者ID
lastLogIndex:Long, // 候補者の持つ最新のログインデックス (0,1,2,...)
lastLogTerm:Long // 候補者の持つ最新のターム (0,1,2,...)
):(
term:Long, // 投票者が認識している現在のターム (0,1,2,...)
voteGranted:Boolean // 候補者に投票した場合 true
) = {
if(
term < this.currentTerm || // 旧世代のリーダー/候補者が Crash-Recovery した可能性
(this.votedFor != null && this.votedFor != candidateId) || // 別の候補者に投票済み
(論文参照) // 候補者の持つログが自分より古い
){
return (this.currentTerm, false)
}
this.votedFor = candidateId
return (this.currentTerm, true)
}


まとめ

Raft がどのようにして一貫性のある State Machine Replication を設計しているかについて説明した。この記事は元の論文のみならず一部私見も踏まえている。かなり概要よりで動作の細かい条件などは省略しているため、プロダクション向けの分散データベースを実装しようとしている人には全く情報不足だろうが、勉強目的で Raft の動きを理解したい人、実装してみたい人、あるいは etcd のような Raft 実装の API の意味や内部動作が良くわかない人の参考になればよい。