How to choose the number of topics/partitions in a Kafka cluster? - Confluent を読んだメモ。2015/03/12 の記事なので若干古い。
partition を増やすとスループットが向上する
- 最初に理解しておくべきなのは topic の partition は Kafka における並列処理の単位であるということ
- producer と broker の両側で、異なる partition への書き込みは完全に並列して行うことができる
- よって、圧縮のようなコストの高い操作は、より多くのハードウェア・リソースを利用しうる
- consumer の側では、Kafka はいつでも単一の partition のデータを consumer のスレッドに送る
- それゆえ、(consumer group 内での) consumer における並列性の度合いは、consume される partition の数によって規定される
- それゆえ、一般的には Kafka クラスタ内の partition の数が多ければ多いほど、高いスループットを実現することができる
- partition の数を選ぶためのラフな公式は、スループットに基づいたものとなる
- 1 partition あたりにどれだけの production (以下 p) と consumption (以下 c) のスループットが実現できるかを測定してください
- 達成したいスループットを t としよう
- そのとき少なくとも
max(t/p, t/c)
個の partition が必要となる - producer 側で実現できる partition ごとのスループットは、バッチサイズ・圧縮コーデック・acknowledgement の種類、レプリケーションの因数に依存する
- しかしながら、一般的にはこのベンチで示された通り、ただ 1 つの partition で数 10 MB/sec のスループットで produce することができる
- consumer のスループットは、たいていアプリケーションに依存する、というのは consumer のロジックがどのくらいのスピードで各メッセージを処理できるかに対応しているからだ
- よって、本当に計測が必要なのだ
- 徐々に partition の数を増やすことは可能だが、メッセージがキーつきで produce されている場合には注意が必要だ
- キーつきのメッセージを発行するとき、Kafka はキーのハッシュにもとづきメッセージを partition にマッピングする
- これによって同じキーがついたメッセージは必ず同じ partition に転送されるという保証が提供される
- この保証はアプリケーションの種類によっては重要で、というのは 1 つのパーティション内のメッセージは必ず順番に consumer へ配送されるからだ
- もし partition の数が変わると、そのような保証は持続されなくなってしまうだろう
- この問題を回避するための、一般的なプラクティスとしては少しだけ余分に partition を用意しておくことだ
- 基本的に、未来 (たとえば 1, 2 年後) に目標とするスループットに基づき、partition の数を決めなさい
- 時がたてば、クラスタにより多くの broker を追加し、比例的に既存の partition の一部を新しい broker に移動させることができる (活性で可能)
- このようにすれば、キーが利用されているときでも、アプリケーションのセマンティクスを壊さずに、スループットの増加に追いつくことができる
partition を増やすとより多くのファイルハンドルをオープンしておく必要がある
- 各 partition は broker のファイルシステム上のディレクトリに対応している
- そのログ・ディレクトリの中で、ログ・セグメントごとに 2 つのファイルが存在するだろう (1 つはインデクスで、もう 1 つは実際のデータ)
- 現在、kafka では、各 broker は各ログ・セグメントについてインデクス・ファイルとデータ・ファイル両方のファイルハンドルをオープンする
- よって、partition が増えれば増えるほど、下層 OS で設定すべきオープンできるファイルハンドルの上限が大きくなる
- これはたいていは単なる設定の問題だ
- われわれは本番環境の Kafka クラスタが broker ごとに 3 万のファイルハンドルを開いた上体で動いているのを見たことがある
partition を増やすと可用性が落ちるかも
-
Kafka はクラスタ内レプリをサポートしており、これにより高可用性と持続性を提供している
- 1 つの partition は複数のレプリカをもつことができ、それぞれのレプリカは異なる broker に保存される
- レプリカのうちの 1 つが leader に任命され、残りのレプリカが follower となる
- 内部的に、Kafka はそのようなレプリカのすべてを自動的に管理し、レプリカ間で同期された状態であることを保証する
- producer と consumer の両方からの partition に対するリクエストは、leader が処理する
- 1 人の broker が死ぬと、その broker 上にあった leader である partition 群は一時的に利用不可となる
- Kafka は自動的にそれらの利用できない partition の leader をいくつかの他のレプリカに移動させ、クライアントからのリクエストに対する処理を継続する
- このプロセスは、controller に任命された Kafka の broker のうちの 1 人によって実行される
- このプロセスには、Zookeeper 内で影響のある partition それぞれについていくらかのメタデータを読み書きすることが含まれる
- 現在、Zookeeper に対する操作は controller の中で直列的に実行される
-
一般的に broker がクリーンにシャットダウンされた場合には、controller は先を見越してシャットダウンしようとしている broker から leader を 1 つずつ移動させる
- 1 つの leader の移動だけなら 2,3 ミリ秒で終わる
- だから、クライアントから見ると、クリーンな broker のシャットダウン中に、利用不可となるのはほんの少しの期間だけだ
-
しかしながら、broker が
kill -9
等でアンクリーンにシャットダウンされた場合、観察される利用不可な期間は、partition の数に比例する- broker が合計 2000 個の partition を持ち、それぞれの partition が 2 つのレプリカを持っているとしよう
- 概算で、この broker はだいたい 1000 個の partition の leader となる
- この broker がアンクリーンに落ちたとき、それら 1000 個の partition のすべてがまったく同時に利用不可となる
- 1 つの partition につき新しい leader を選出するのに 5 ミリ秒がかかるとしよう
- 1000 個全部の partition に対して新しい leader を選出するには最大 5 秒がかかることになる
- なので、いくつかの partition にとっては観察される利用不可な期間は、5 秒プラス障害検出にかかった時間ということになるだろう
-
もし不運だった場合には、落ちた broker が controller だったかもしれない
- この場合には、新しい leader の選出のプロセスは、controller が新しい broker へフェイルオーバーしてからでないとはじまらない
- controller のフェイルオーバーは自動的に発生するが、初期化の間に ZooKeeper から各 partition についてのいくつかのメタデータを読む必要がある
- たとえば、もし Kafka クラスタ内に 10,000 個の partition があり、1 partition あたり Zookeeper からのメタデータ初期化に 2 ミリ秒かかるとしたら、利用不可の期間がさらに 20 秒増えることになるだろう
-
一般的に、アンクリーンに落ちるのはまれである
- しかし、このようなレアケースでの可用性を気にかけるのだとしたら、おそらく broker ごとの partition の数は 2~4 千程度、クラスタ内の全 partition 数は数万程度に制限したほうがよい
partition を増やすと end-to-end のレイテンシが増加する
- Kafka における end-to-end のレイテンシというものは、メッセージが producer によって発行されてから consumer によって読まれるまでの時間と定義される
- Kafka はメッセージがコミットされたあと、つまり同期中のレプリカすべてに複製されたあとで、そのメッセージを consumer に露出する
- よってメッセージをコミットする時間が end-to-end のレイテンシのうちで大きな割合を占める
- デフォルトでは、Kafka の broker は、ほかの broker からデータを複製するのに 1 つのスレッドしか使わない (2 つの broker の間でレプリカが共有されるすべての partition について 1 スレッドしか使わない)
- われわれの実験では、1 人の broker から別の broker へ 1000 個の partition を複製するのに、20 ミリ秒のレイテンシが追加でかかり、これは end-to-end のレイテンシが少なくとも 20 ミリ秒であることを示唆している
- これはいくつかのリアルタイム・アプリケーションにとっては高すぎるかもしれない
- この問題はクラスタをより大きくすることで回避することができることに注意してほしい
- たとえば、同じ Kafka クラスタの中で、1 つのブローカー上に 1000 partition leader がいて、10 人のほかの broker がいるとしよう
- 残りの 10 ブローカーのそれぞれは平均すると最初の broker から 100 partition を取ってくればいいだけだ
- それゆえ、メッセージをコミットすることによる追加レイテンシは数 10 ミリ秒ではなく数ミリ秒となる
- 大体の目安で言うと、もしレイテンシを心配しているなら、broker ごとの partition の数を
100 * b * r
に制限するのがよいだろう (b が Kafka クラスタ内の broker の数、r がレプリケーション因数)
パーティションを増やすとクライアント側でより多くのメモリが必要になるかも
- Confluent Platform 1.0 も搭載したもっとも最近のリリースである 0.8.2 で、われわれはより効率的な Java producer を開発した
- 新しい producer のよい機能の 1 つは、入ってくるメッセージをバッファする際に使われるメモリの量についてユーザーが上限を設定できるようになっていることだ
- 内部的には、producer は partition ごとにメッセージをバッファする
- 十分なデータが蓄積するか十分な時間が経過すると、蓄積されたメッセージはバッファから削除され broker へと送信される
- もし partition の数を増やすなら、メッセージは producer の中のより多くの partition に蓄積されるだろう
- 集約された利用メモリの量は設定したメモリの上限を超えてしまうかもしれない
- これが起きるとき、producer はブロックするかどんな新しいメッセージも捨てなければならず、そのどちらも理想的ではない
- これが起こるのを防ぐために、 producer のメモリサイズをより大きく再設定する必要があるだろう
- 大まかに見積もると、よいスループットを実現するには、producer の中で produce されようとしている partition ごとに少なくとも数 10 KB を割り当て、もし partition の数が非常にたくさん増えるなら、合計のメモリ量を調整する必要がある
- 似たような問題が consumer にも同様に存在する
- consumer は partition ごとにメッセージのバッチを取りに行く
- consumer が consume する partition が増えれば増えるほど、より多くのメモリが必要となる
- しかしながら、これはリアルタイムでない consumer のみに存在する問題にすぎない
まとめ
- 一般的に、Kafka クラスタ内で partition を増やすとより高いスループットが得られる
- しかしながら、全体または broker ごとの partition 数を増やしすぎると、可用性やレイテンシのような項目に影響を与える可能性があることに気をつけなければならない
- 将来的には、これらの制約のいくつかを改善し、partition の数という点で Kafka をよりスケーラブルにしようと計画している