QuintoAndarにおけるデータエンジニアリングチームの日常においては、常に新たなデータフローの課題に直面しています。これら全てを処理するために、我々は大規模データ処理、変換、探索のためにDatabricksを活用しています。ワークフロー管理としては、DAG(有効非循環グラフ)の組み合わせとApache Airflowを使用しており、これによって、我々のETLステップを関係性、依存関係を考慮した形で整理することができます。もし、Airflowがどのようにして我々のデータパイプラインの開発を支援しているのかを知りたいのであれば、こちらを参照ください。
以下の図では、QuintoAndarにおいて、どれだけAirflowとDatabricksが密接にやりとりをしているのかを示しています。
我々のデータレイクアーキテクチャにおける、各ステップにAirflow、Databricksを含むフロー図
新たなDAGの高知右派、我々のチームルーチンにおける一般的な作業となっており、より効率的に構築できるようにいくつかのテンプレートを持っていました。これらのテンプレートには、他の項目に加え、ETLパイプラインを実行する一連の計算リソース、設定を含むデフォルトSparkクラスターが含まれていました。
対象データのサイズによってフローが異なるような我々のシナリオに取り組んでいる中で、いくつかのケースでは不必要な大規模なクラスターを用いることで不要なコストが発生していたため、デフォルトクラスターの設定を用いる必要性を調査する決断を下しました。
クラスターのマシンがどのように動作しているのかを理解するために、我々はCPU、萌モリー使用料、ネットワークなどのメトリクスを確認できる高性能コンピューティングのモニタリングシステムであるGangliaのダッシュボードを調査することができます。Gangliaを分析することで、クラスターの設定が十分なものであるのか、あるいは、特定のDAGにおいては若干増強しなくてはならないのかを理解することができます。
以下のケースでは、Gangliaダッシュボードはメモリー、CPU使用量が少なく、サーバー負荷の分布がバランスしていることから、クラスターの設定が必要以上のものであることを我々に教えてくれています。
メモリー、CPU使用量が少なく、サーバー負荷の分布がバランスしている状態のGangliaダッシュボード
また、単一のクラスター設定は常に黄金律であるとは限りません。いくつかのDAGは完璧に動作するかもしれませんが、他のケースでは同じ設定では不十分であり、パフォーマンスの劣化を引き起こし、データロードを完了することすらできなくなります。
あらゆる大企業においては、特定のサービスの最適化は常に重要な目標となり、特にそのサービスが広く活用され、重要かつ効果である場合にはその重要性は高まります。Databricksではインスタンスタイプを幅広くサポートしているので、目的により適合するマシンを選択することで、我々自身のクラスター設定を容易に設計することができます。すでに我々は4つのvCPU、31GBのメモリーを搭載したi3.xlargeマシンをデフォルトクラスター設定として選択していたため、多くのジョブでは50%のリソースすら活用しておらず、以下の状況をもたらしていました。
- 必要以上のCPU/RAMの使用
- 不要なコスト
そして、適切なDAGの実行を保証するために、我々は小規模なクラスター設定をテストし、ベストなパフォーマンス設定を得るために徐々に規模を増やしました。そして、デフォルトクラスター設定が大きすぎるかどうかをテストするために使用する、最小リソースのSparkクラスターを2つのvCPUと8GBメモリーを搭載するm4.largeに決定しました。クラスターの挙動を理解するために、新たなクラスターでDAGを実行し、Gangliaダッシュボードで挙動を確認しました。
ステップバイステップ
はじめに、デフォルトクラスター設定を用いて、従来通りのDAGのデータを取得して分析する必要があります。このためには、実行の際にGangliaダッシュボードを見て時間を追跡する必要があります。
Databricks Clusters > MetricsからGangliaにアクセスすることができ、以下のような内容が表示されます。
Gangliaダッシュボードの例:サーバー負荷、ホスト、メモリー、CPU、ネットワーク使用量に関する情報
ダッシュボードにおいて、このDAGを実行するために割り当てられたメモリー量を理解するためには、cluster Memory last hour
レポートのUse
に注意することが重要です。cluster CPU last hour
ダッシュボードでは、どれだけのCPUが使用されたのかを理解するためにUser
を確認することができ、CPUがどれだけ活用されていないのかをみるためにIdle
を確認することができます。
このダッシュボードにおいては、CPUの最大利用率は約52%であり、平均アイドル時間は約82%となっていることに気づきます。メモリーに関しては、最大使用量は10GBしかなく、リソースを使い切っていません。さらに、2つのノード、Server Load Distribution
における四角はバランスしており、これは良好と言えます。
Gangliaダッシュボードにおいて、クラスターによって割り当てられるメモリーは、プログラムによる事前割り当てのものであり、割り当てられるデータの量を直接反映するわけではないことに注意する必要があります。(OSによってメモリーが必要となるオペレーションとの競合を回避するために)マシンサイズに応じて事前に割り当てられるメモリー量に基づくクラスターでの計算処理に加えて、Memory
に含まれる他のオペレーションが存在します。
DAG全体の処理を完了すると、我々のデフォルトクラスターによるAirflowパイプラインの実行時間は約14分となりました。
小規模のクラスター設定のテストを開始するために、上述した最小リソースのクラスターとして2つのvCPUと8GBメモリーを搭載したm4.largeを使用することに決めました。
新規クラスターで再度DAGを実行すると、Gangliaダッシュボードには以下のように表示されました。
2ノードで動作しているGangliaダッシュボード:最大で3.4GBを使用、CPUの利用率は62%、平均アイドル時間
cluster Memory last hour
ダッシュボードにおいて最大メモリー使用料は3.4GB、cluster CPU last hour
ダッシュボードにおいて最大CPU使用率は62%、平均アイドル時間は64.5%となりました。また、この場合においては、我々はオートスケーリングの設定を有効化したので、クラスターが必要に応じて3ノードまで利用できるようにしました。上の図においては、2ノードのみが使用され、同時にバランスも良いものとなっています。
DAG全体の処理を完了すると、我々の最小クラスターによるAirflowパイプラインの実行時間は約17分となりました。
結果の比較
異なるタイプのクラスターによるDAGのサンプルの実行後、これらの結果を比較することができます。
デフォルトクラスター、最小クラスターにおいて、メモリー、CPU使用率、平均アイドル時間、処理時間合計を比較したグラフ
**メモリー使用量(memory usage)**に関しては、デフォルト設定を用いた際、キャパシティを使い切っておらず、62GB利用できるにもかかわらず、10GBも使用しておらず、16%程度の利用率となっていました。最小のリソース設定にすることで、シナリオは変わりました。今では8GB中3.5GB程度を使用しており、利用率は44%程度となり、キャパシティの利用率を改善できたことを示しています。
ここでもう一つ気づくべき点としてメモリーの使用量の差異があります。これはおそらく、バッファ領域によるものと考えられます。大規模マシンは割り当てられた領域に比例してより大きな領域を予約するのでしょう。
**最大CPU利用率(maximum CPU usage)**を見てみると、デフォルト設定では、最大約52%となっておりまずまずと言えます。しかし、最小リソース設定に変えることで、利用率を62%にすることができ、こちらも改善が認められます。
**平均CPUアイドル時間(CPU average idle time)**に関しては、デフォルト設定では、実行時間の約82%がアイドル時間となっています。最小リソース設定においては、64.5%となり、我々はより効率的にCPUを活用できており、結果として平均アイドル時間も改善されたことになります。
最後に、我々のDAGの**時間あたり処理時間合計(total runtime/hour)**に関しては、デフォルト設定では14分、一時間の23%となりました。最小リソース設定においては、17分となり、28%となります。ここでは、処理時間合計が増加することは良いことではありませんが、わずか3分の増加であり、依然として問題はありません。
シナリオ全体を考えると、最小リソース設定は、処理時間は5%増加したものの、利用可能なリソースの利用率を改善させていると言えます。
他のシナリオ
新規クラスターのセットアップ
いくつかのDAGにおいては、デフォルト設定の方が優れており、最小リソース設定では不十分となる場合があります。我々が遭遇したケースの一つとして、最小リソース設定に変更した際に、以下のGangliaダッシュボードに示すように、メモリーのスワップが発生したというものがあります。スワップはcluster Memory last hour
ダッシュボードの赤い線の上にある小さい紫の線で表現されています。
メモリーのスワップを示すGangliaダッシュボード、赤い線で表現される利用可能なメモリーの上の紫の線がスワップを示している
この場合、121MBのメモリースワップがありました。CPUに関しては、最大約62%の使用率となり、平均アイドル時間は約62%であり、利用可能なキャパシティを使い切ってはいないと言う結論に達しました。
しかし、メモリーのスワップとは別として、依然としてデフォルトクラスターの設定は必要以上に大きいものとなっています。メモリーに関してはより大規模ですが、CPUに関しては最小リソース設定と同じ規模のもの、そして、デフォルト設定よりもメモリー、CPUが小規模のものが必要となっていると言う状況に遭遇しています。この場合、r5.largeマシンを用いたクラスターを新規の設定として作成すべきです。これは最小リソース設定と同じCPUを使用しつつ、より多くのメモリー(16GB)を搭載しながらも、デフォルト設定よりも小さいものとなり、スワップ問題を解決しつつも、処理時間の増加は1分のみとなり、このDAGも問題なく稼働させることができています。
また、同じようにデフォルト設定では十分ではない場合には、リソースを余らせず、無駄なコストを発生させないように、直接大規模な設定を選択するのではなく、より多くのメモリー、CPUを保証できるクラスターを探索することができます。
混成クラスターのセットアップ
いくつかのDAGのシナリオにおいては、例えば、並列化の前にドライバーノードの処理を必要とする大量のオペレーションを実行するジョブなど、ジョブのSLAを達成するためにリソースの混成設定が必要になる場合があります。これらのケースにおいては、ドライバーノードにより多くのリソースを割り当て、ワーカーノードのリソースは少なめにするクラスターの構成を組むことができます。
より詳細な分析のためには、以下に示すようにGangliaのそれぞれのサマリーをクリックすることで、ノード間のワークロード分析を展開することが可能です。
ノードのワークロードを展開したGangliaダッシュボード
Gangliaの初期ビューはクラスターリソースの合計となっています。それぞれのノードを個別に分析する際には、ドライバーノードとワーカーノードのリソース使用量の詳細を確認することができます。
ドライバーノードシナリオ
まとめ
DAGの例において、デフォルト設定から最小リソース設定にクラスターを設定を変更することには問題がないと言うことを見てきました。いくつかの評価指標が増加したことは別として、これらの結果は依然として良好です。処理時間合計の3分の増加は問題とはなっていません。
おそらくですが、データ増加などに伴い、異なるクラスター設定を用いる必要が出てくるかもしれないことに留意する必要があります。しかし、げんじてでは、デフォルトと最小リソース設定の中間の設定が完璧にうまく動作しています。
QuintoAndarにおけるクラスターの最適化による経済性に関しては、以下が我々の結果となっています。
- vCPU利用率においては55%以上の削減
- RAM使用率においては62%以上の削減
- 約50%のコスト削減
本書では、より効率的なデータ処理を実現し、リソース、コストの削減に加え、最適化に対する労力の投資は非常に重要であることを示しました。バランスを取るのが非常に難しい問題ではありますが、数多くのメリットを提供するに違いありません!