** 本エントリはDistributed computing Advent Calendar 2021の12/22エントリです。
はじめに
Apache Kafka v.2.8はKIP-500という特別な機能改善が含まれたリリースでした。この変更はKafkaの全体構成への大規模な変更であり、Cloud Nativeな環境で動くKafkaにとって大きな前進となる変更です。KIP-500は現時点 (2021年クリスマス時期) でもまだ本番稼働で利用できるステータスとはなっていませんが、最も新しいKafka 3.0では主要な機能は既に含まれた状態となっています。
本エントリではKIP-500の概要ならびにこの変更がもたらすCloud Nativeな世界におけるKafkaについてご紹介します。
本エントリは先日 (2021/12/17) 実施されたApache Kafka Meetup Japan #10でのLT登壇内容をなぞらえたものとなっています。
参考
Apache Kafka Meetup Japan #10
登壇資料:「Cloud Native Kafka - KIP-500および関連KIPについて」
Cloud Nativeとは
Cloud Nativeという単語を聞くと直感的にKubernetesを連想するのが一般的です。もちろんKakfaをKuberntesで稼働させるという取り組みは進んでおり、StrimziやKUDO Kafka Operator、商用製品ではConfluent for Kubernetesの様にマネージドサービスで大規模に利用されているものもあります。しかしながら本エントリの主旨はKafkaをKubernetesで動かす事ではなく、Kafka自体がCloud Nativeになる事です。
クラウドネイティブ技術は、パブリッククラウド、プライベートクラウド、ハイブリッドクラウドなどの近代的でダイナミックな環境において、スケーラブルなアプリケーションを構築および実行するための能力を組織にもたらします。 このアプローチの代表例に、コンテナ、サービスメッシュ、マイクロサービス、イミュータブルインフラストラクチャ、および宣言型APIがあります。
これらの手法により、回復性、管理力、および可観測性のある疎結合システムが実現します。 これらを堅牢な自動化と組み合わせることで、エンジニアはインパクトのある変更を最小限の労力で頻繁かつ予測どおりに行うことができます。
Cloud Native Computing Foundationは、オープンソースでベンダー中立プロジェクトのエコシステムを育成・維持して、このパラダイムの採用を促進したいと考えてます。 私たちは最先端のパターンを民主化し、これらのイノベーションを誰もが利用できるようにします。
Cloud Native Definition - Cloud Native Computing Foundation
この定義が唯一無二のCloud Native定義という訳でもありません。ただKubernetesのエコシステムとしての定義であっても、ランタイムだけでなく、その上で動くものであり、運用方法も変わる事を示唆しています。そして、最終的にはそのプラクティスがもたらすビジネスメリットを含有した大きな定義となっています。本エントリの焦点は中でも「その上で動くもの」としてのKafkaです。
KafkaとCloud Native
Kafkaは2011年にOSSとなりましたが、元々Cloud Nativeの価値観が存在する前に生まれた技術です。極端に高いスループットと急激な負荷上昇、そして順序通りイベントを消化し保全するという目的で、LinkedIn社内で開発されました。KafkaはJavaで書かれた技術ながら、極めてリソース利用効率の高いソフトウェアとして機能するよう意図的に原始的な設計思想に基づいています。この為、ComputeとStorageの結び付きは一般的なデータベースより強くなっており、あまりCloud Nativeではないとも言えます。
また、KafkaはHDFSの様にデータを自動的に複製することにより障害耐性を高めるモデルを採用しています。このモデルは高いデータ耐性と引き換えに、ランタイム障害に弱いという課題も併せ持っています。 (そのノードにあるPartitionは全て別のNodeに別途複製し、かつLeader Partitionであれば新たなLeaderの選出が必要。)
もう一つはZookeeperです。Zookeeperはこれまで、Kafkaが分散システムとしての健全性を維持する上で重要な役割を担ってきました。一方、Zookeeperという外部メタデータストアに依存する為、Kafkaは分散システムとして複雑なものとなっています。
Kafkaのデータ保全という機能性は、Kafkaがそれまでのメッセージブローカーと一線を画す重要な特性です。その為Cloud Nativeの為と言えども簡単に捨て去るわけにはいきません。一方、Zookeeperへの依存は、簡単では無いながらも対応は可能であり、その成果がKIP-500です。
#KIP-500 : Replace Zookeeper with a Self-Managed Metadata Quorum
KIP-500は、そのタイトルが示す通りメタデータの合意形成をZookeeperではなくBroker内で完結するという機能改善です。通常KIP (Kafka Improvement Proposal) は一つの明確な機能改善についての要件と仕様を纏めたものですが、Zookeeperとの依存関係を断ち切るとなると影響範囲が極めて大きく、1つのKIPで定義するには大きすぎます。この為KIP-500はいくつかのKIPを包括し要点を纏めた親KIPの様な少し特殊なKIPです。 (他にはKIP-4 - Command line and centralized administrative operationsもこの部類に該当します。)
Zookeeperは決して厄介者ではなく、メタデータへの線形化アクセスの提供の他に、これまでKafkaクラスタに対して様々な機能を提供してきた重要なストレージです。その中でもHeatbeatによるBrokerの死活監視、Controllerの選出やBrokerの状態管理等、分散システムにおいてコンセンサスが必要な多くの機能を提供してきました。
これまでZookeeperがクラスタ (Zookeeperの用語ではアンサンブルと呼ばれます) として提供してきた機能をBrokerが提供するモデルとなります。具体的には、これまで1つだったControllerがグループとして合意形成を行うモデルとなります。一方クラスタメタデータへのリクエストは、Controllerの中でもActive Controllerと呼ばれるリーダーが一手に担う為、リクエストする側観点ではこれまでの1Controllerの構成と大きな違いはありません。
肝心のメタデータですが、これまではZookeeperのNodeで永続化されていました。それがKafka Nativeな、つまりKafkaのログとして管理するモデルと変わります。KafkaがKafka自身の情報をKafkaらしい形で管理する様になります。
KIP-631 : The quorum-based Kafka Controller
KIP-500によってControllerの役割が大きく変わりますが、その詳細がKIP-631に定義されています。
これまでクラスタ全体に関するメタデータ (各BrokerおよびPartitionの情報) は別途Zookeeperで管理してきましたが、メタデータへのリクエストはBroker内のControllerが担当してきました。Controllerは同期したデータをメモリ内に保持し、Zookeeperでのデータ変更時にはその更新を反映しつつ、他のBrokerに通知する役割も果たしていました。非常に多くの機能をControllerが一手に引き受けていた事になります。
このControllerがダウンした場合、クラスタは新たなControllerを選出しなければなリません。別のBrokerが新たにControllerとして機能する為には:
- Brokerの中から新たなControllerを選出する。
- クラスタ全体に新Controllerの選出を通知する。
- Zookeeperに格納されたクラスタメタデータをメモリに読み込む。
これら全てが為されるまでControllerはリクエストに答える事が出来ません。またZookeeperの持つクラスタメタデータを読み込みにかかる時間はクラスタの規模 (より具体的にはクラスタ内で管理するPartitionの数) が増えれば増えるだけ時間がかかります。小規模なクラスタではさほど長くはありませんが、大規模、特にPartition数が数十万から数百万に達する規模になるとこの読み込みに数分から数十分の時間を要します。
新たなControllerによる合意形成モデルでは、合意形成の為全てのController (3以上) が常にクラスタメタデータを保持する状態となります。実際にリクエストに応えるのはActive Controllerのみですが、他のControllerも若干のOffsetの消化ですぐにActive Controllerに昇格できる状態となります。また、Brokerへの状態通知も、これまでのRPCベースのpush通知ではなく、内部Topic __cluster_metadata
を基にしたpullモデルとなりControllerの処理負荷は軽減されます。
Registered, Unregistered, and Fenced.
これまでBrokerの状態はRegisteredとUnregisteredのいずれかでした。 (他にも状態遷移上いくつかありますが割愛します。) つまりBrokerは「クラスタの一員である」か「クラスタの一員では無い」のいずれかだったのですが、KIP-631により新たに「一時的に一員ではない (Fenced)」という状態が追加されました。この状態の追加により、一時的にBrokerとしての責務が果たせない (ネットワーク分断、I/Oブロック、もしくは何かしらの復旧可能なランタイム障害) 状態であっても即時に追放はされずメタデータは温存できる様になりました。Brokerの離脱は多くのデータ移動と新たなPartitionリーダーの選出という負荷の高いイベントですが、場合によっては一から再度メタデータを構築せず元の状態を保ったまま復帰出来る機会が増える事になります。
このFencedという状態を維持する事はZookeeperを利用したモデルでは困難でした。Zookeeperはメタデータをznodeと呼ばれるノードをツリー構造で保持するKVストアです。またZookeeperは適切なznode構成と実際のクラスタ状態の同期を取る事により効率的にクラスタの状態を把握しています。この為、クラスタに参加できないBrokerのznodeは即座に削除され、当然その配下にぶら下がるBrokerのメタデータも全て削除されます。クラスタメタデータ管理にZookeeperのストレージとしての特性を利用しなくなった事により、アクティビティには参加しないBrokerのメタデータの一時的な保持が可能となりました。
KIP-595 - A Raft Protocol for Metadata Quorum
これまでZookeeperの無い中でどの様にメタデータの合意形成を取るかについて言及してきませんでしたが、その詳細がKIP-595で説明されています。合意形成にはKRaftと呼ばれるRaftプロトコルの実装が採用されており、3以上で形成されたControllerがその役割を担います。Raftはこのアニメーションが世界一分かり易いので是非ご覧ください。複数Node間で「何が正か?」という問いに答える為の仕組みです。
Raftプロトコルではログとコミットをベースとしたモデルとなっていますが、この思想は一般的にKafka上でのデータ利用のモデルとかなり近いものです。クラスタメタデータを内部Topicにする事はRaftベースのモデルに移行する上で理に適っています。また、RaftではTermと呼ばれるLeaderの世代を司るバージョンはKafkaではEpochと呼ばれ既に採用されています。 (KIP-380: Detect outdated control requests and bounced brokers using broker generation)
一方、RaftはLeaderからのpushによる通知モデルですが、KRaftでは内部Topicを利用したpullモデルとなります。この仕様差異は大きく、pullモデルだからこそ複雑な処理が必要なレアケースの対応も必要となりますが、KRaftとしてはKafka思想により近いpullモデルの採用となりました。
他にはLeader Electionが挙げられます。これまではZookeeperのControllerのznodeに自分のBroker IDを真っ先に書いたBrokerがControllerとなっていました。今回RaftのLeader Electionの仕様に基づいて新たに実装する必要がありました。また、上記アニメーションでお気付きの方もいるかと思いますが、ackの返信タイミングがKafka Producerへの返信のそれとは異なります。こちらもRaftの仕様に合わせてFollowerのコミットを待たずackを返す実装となっています。
KIP-630 - Kafka Raft Snapshot
最後に、Kafka 3.0になって導入されたのがKIP-630です。このKIPはController間のメタデータ同期に関わる改善で、Active Controllerの持つ最新状態のクラスタメタデータのスナップショットを取り共有するというものです。
Controller障害時には、機能しなくなったノードの代わりに新たにControllerを追加する必要があります。この際他のControllerと同等の状態を保持する為には__cluster_metadata
を最初から順序通り読み込む必要がありますが、このTopicが全て揃っていないと状態を再現する事が出来なくなってしまいます。このKIPによって、より直近のスナップショットをスタートとして差分のOffsetの消化のみで最新状態を再現する事が出来ます。
Kafkaのクラスタサイズには限界がありますが、既知のものとして最も直近に遭遇するもの(それでも実際には滅多に遭遇しませんが)がPartition数です。構成にもよりますが、Partition数が数十万ほど大きくなるとControllerがZookeeperから取得しメモリ内で構成するのに数分を要してしまい、100万件を超えると十数分かかる場合もあります。このスナップショットの導入により、クラスタメタデータのサイズにほとんど影響を受けずに状態を再現する事が可能となります。
さらにこの改善は古いログの処理方法が追加される事であり、概念的にはより大きな変更です。これまでは古いログは捨てる (Delete) か、もしくは圧縮 (Compact:キー毎の最新以外を捨てる) という二択だったのですが、新たにその時点での最新状態を取得 (Snapshot) 出来る様になります。現時点ではあくまで内部Topicの、それも__cluster_metada
限定の機能ではありますが、概念として導入される為今後別の用途にも広がると思われます。
おわりに
KIP-500は一言で言えばZookeeperが不要になるというものですが、クラスタ構成がシンプルになるだけでなく、より障害耐性の強いKafkaへの大きなステップアップとなるKIPです。元々ハードウェアの構成にはシビアだったKafkaですが、様々な機能改善によってより容易に稼働させる事ができる様になり、今回さらにランタイム障害にも強くなりました。
クラウド、そしてコンテナといった「どこからどう調達されて、いつ無くなるかも分からない」ランタイム上でも安定して稼働するKafkaというのが、KafkaのCloud Nativeというエコシステムに対する大きなコミットメントであるよう感じています。