0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksクラスターの設定とチューニング

Posted at

Comprehensive Guide to Optimize Data Workloads | DatabricksのセクションDatabricks Cluster Configuration and Tuningの翻訳です。

本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

汎用クラスターと自動クラスター

  • 汎用(All-purpose)クラスターは、開発やテストフェーズにおけるアドホッククエリーの実行やインタラクティブノートブックの実行にのみ使用すべきです。
  • 決して自動化ジョブで汎用クラスターを使わないでください。代わりにジョブには揮発性のジョブ(自動)クラスターを使ってください。
  • これ単体で非常に大きなコスト削減につながります: 汎用クラスターではなく自動クラスターを選択することでDBUコストを削減することができます。
  • また、SQLを実行する際にSQLウェアハウスを活用することができます。SQLウェアハウスはDatabricks SQLにあるデータオブジェクトに対してSQLコマンドを実行できる計算リソースです。また、即座に計算リソースにアクセスできるサーバレスのフレーバーも提供しています。

オートスケーリング

Databricksでは、ユニークな機能であるクラスターのオートスケーリングを提供しています。以下に、オートスケーリングをいつ、どのように活用するのかに関するガイドラインを示します:

  • インタラクティブクラスターを用いたアドホッククエリーの実行、インタラクティブノートブックの実行、パイプラインの開発・テストの際には常にワーカーの最小数を1にしましょう。これらを組み合わせることで、優れたコスト削減を実現できます。
  • プロダクションで使用するオートスケーリングのジョブクラスターでは、確実に1台のワーカー以上のリソースを必要とするジョブでは、最小インスタンス数を1に設定しないでください。代わりに、最小のコンピュート要件よりも大きな値を設定してください。ある程度スケールアップの時間を削減することができます。
  • 特定のジョブに対して全てのワーカーのコアを活用するにように、Sparkのシャッフルパーティションをファインチューニングして、個々のジョブに自分のクラスターが割り当てられている場合には、必ずしもオートスケーリングを活用する必要はありません。しかし、期待しないデータのサージが生じる場合に、計算リソースのバッファーを持っておきたい場合には、依然としてオートスケーリングを活用することができます。このためには、予測される日次のデータロード、必要とされるSLAに基づいた最小ワーカー数を設定し、必要に応じてオートスケールことで追加されるバッファーとして追加のワーカー(例えば、2から4)を維持しておきます。
  • Databricksワークフローを用いることで、同じパイプラインに含まれる複数のタスク(ジョブ)でジョブクラスターを共有することができます。共有されるジョブクラスターで並列に大量のジョブが実行される際、スケールアップして全ての並列ジョブにリソースを提供できるように、そのジョブクラスターのオートスケーリングは有効化すべきです。
  • Spark構造化ストリーミングワークロードにおいては、クラスターがスケールアップしてしまうと、いつスケールダウンしていいのかの判断が困難になるので、オートスケーリングを使用すべきではありません。この観点において、Delta Live Tablesではいくつかの進化がなされており、強化オートスケーリングと呼ばれる機能のおかげで、Delta Live Tablesでのストリーミングワークロードではオートスケーリングが非常にうまく動作します。

ワークロードに基づいたインスタンスタイプ

ワークロードのタイプに基づいてインスタンスファミリーを選択する一般的なガイドラインとなります:

VMカテゴリー ワークロードタイプ
メモリー最適化
  • MLワークロード
  • 大量のシャッフルやスピルが含まれる
  • Sparkキャッシュが必要
コンピュート最適化
  • 構造化ストリーミングジョブ
  • フルスキャンを伴うETL、データの再利用なし
  • OPTIMIZE、Z-orderのDeltaコマンドを実行する
ストレージ最適化
  • Deltaキャッシュを活用する
  • データキャッシュを伴うML、DLワークロード
  • アドホック、インタラクティブな分析
GPU最適化
  • 例外的に高いメモリー要件があるML、DLワークロード
汎用
  • 特定の要件がない状況での使用
  • VACUUM Deltaコマンドを実行する

ワーカーの数

適切なワーカーの数を選択するには、Sparkジョブのコンピュート、メモリー要件を特定するために、いくつかのトライアルとイテレーションが必要です。

  • プロダクションのジョブでは、単一障害点となるシングルワーカーを選択してはいけません。
  • 小規模なワークロード(joinや集計のようなワイドな変換処理を伴わないジョブなど)では2-4ワーカーからスタートします。
  • joinや集計のようなワイドな変換処理を含む中規模、大規模なワークロードでは8-10ワーカーからスタートします。

スポットインスタンス

  • 市場レートより安いスペアのVMインスタンスを使うためにスポットインスタンスを活用します。
  • インタラクティブ、アドホック、共有クラスターに適しています。
  • SLAのないプロダクションジョブでも活用できます。
  • 厳密なSLAがあるプロダクションジョブではお勧めしません。
  • 決してドライバーにスポットインスタンスを使わないでください。

自動停止

  • コスト削減のために、一定期間アクティビティがなかった場合にクラスターを停止します。
  • 夜間や週末にアイドル状態のクラスターが稼働し続けることを避けるために、汎用管スターでは自動停止を有効化します。
  • デフォルトでは、クラスターは120分後に自動停止するように設定されていますが、さらにコストを削減するために10-15分のようにさらに短い時間に変更することができます。

クラスターの使用率

クラスターを最大限活用していることを確実にする必要があります。シャッフルパーティションの数に応じて、ジョブが処理を完了するためにクラスターで大量のイテレーションを実行する場合があるので、経験則としては、全てのワーカーのコアがアクティブに使用されており、全てのイテレーションでアイドル状態になっていないことを確実にすることになります。これを確実なものにする唯一の方法は、シャッフルパーティションの数を、ワーカーコアの合計数の倍数に常に設定するというものです。

-- in SQL
set spark.sql.shuffle.partitions = M*<number of total cores in cluster>
## in PySpark
spark.conf.set("spark.sql.shuffle.partitions", M*sc.defaultParallelism)

ここでMは倍数となります。より詳細については手動でのシャッフルパーティションのチューニングのセクションをご覧ください。

手動でのシャッフルパーティションを行わない場合には、経験則としてMを2や3に設定します。

全てのコアが完全にエンゲージされていることを確認するために、GangliaのUIがあなたの親友となります。メトリクスタブの下にピン留めされているクラスターUIからGanglia UIにアクセスすることができます。

Ganglia UIで注意すべきことは:

  • クラスターの平均負荷が常に80%以上である。
  • クエリー実行時に、(UIの左にある)クラスター負荷分散グラフの(ドライバーノードを除く)すべての四角が、すべてのコアが完全にエンゲージされていることを意味する赤であるべきです。
  • クラスターメモリー使用量が最大(少なくとも60%-70%やそれ以上)であること。
  • GangliaメトリクスはDatabricksランタイム12.2以下でのみ利用できます。

Databricksランタイム13以降で、Gangliaはよりセキュアでパワフル、クリーンなデザインでシンプルなユーザー体験を提供する新たなクラスターメトリクスUIに置き換えられました。このため、DBR 13以降ではクラスター使用率をチェックするためにこの新たなUIを活用することができます。

クラスターポリシー

クラスターポリシーは、クラスターを設定する能力を一連のルールに基づいて制限します。クラスターポリシーを効果的に活用することで、管理者は以下のことを実現できます:

  • 事前定義した設定を用いたクラスターの作成に制限
  • (時間ごとの価格に寄与する属性に制限を設けることで)クラスターごとの最大コストを制限することでコストをコントロール
  • (いくつかの値を固定、非表示にすることで)ユーザーインタフェースをシンプルにし、より多くのユーザーが自分のクラスターを作成できるように

成功につながるクラスターガバナンスのロールアウトを計画、確実にするために、クラスターポリシーのベストプラクティスガイドをご覧ください。

インスタンスプール

Databricksのプールは、アイドル状態で利用可能なインスタンスのセットを維持することで、クラスターの起動時間やオートスケーリングの時間を削減します。クラスターがプールにアタッチされると、プールのアイドルインスタンスを用いてクラスターノードが作成されます。プールにあるインスタンスがアイドル状態になると、クラウドプロバイダーのコストのみが発生し、DBUコストは発生しません。

コストを最小化しつつ処理時間を改善するために、追加される計算リソースへの高速なアクセス(高速オートスケーリング)が要件となる、厳密なSLAを持つワークロードではプールが推奨となります。

Photon

Photonは、低コストで劇的に高速なクエリーパフォーマンスを提供する、Databricksレイクハウスプラットフォームにおける次世代エンジンです。PhotonはApache Spark APIと互換性があるので、既存のSparkコードですぐに動作し、標準的なDatabricksランタイムよりも優れたパフォーマンスを提供します。Photonのメリットのいくつかを以下に示します:

  • 膨大な量(< 100GB)のデータ処理を行うjoinや集計を含むクエリーを高速化
  • Deltaキャッシュから繰り返しデータがアクセスされる際のパフォーマンスを高速化
  • 大量のカラムと大量の小規模ファイルを持つテーブルに対するスキャン/読み込み性能をより堅牢に
  • UPDATE、DELETE、MERGE INTO、INSERT、CREATE TABLE AS SELECTを用いたDeltaの書き込みを高速化
  • Joinの改善

以下の特性を持つワークロードでは、(評価とパフォーマンステストの後で)Photonを有効化することを推奨します:

  • DeltaのMERGEオペレーションを含むETLパイプライン
  • クラウドストレージに大規模なデータ(Delta/Parquet)を書き込む
  • 大規模データセットのスキャン、join、集計処理、数値計算
  • ストレージに到着する新規データをインクリメンタル、効率的に処理するためのAuto Loader
  • SQLを用いたインタラクティブ/アドホックなクエリー

その他

  • 最新のDatabricksランタイムはほとんどのケースで常に以前のバージョンよりも高速なので、最新のLTSバージョンのDatabricksランタイム(DBR)を使いましょう。
  • いくつかの古いDBRではスタックした/ゾンビ化したSparkジョブが残っていることがあるので、最低でも週に一度(あるいはビジーなクラスターにおいては日次)は定期的に汎用クラスターを再起動しましょう。
  • 以下の場合にのみ大規模なドライバーノード(多くの場合、4-8コア、16-32GBで十分です)を設定しましょう:
    • 大規模なデータセットが返却/収集される(通常、spark.driver.maxResultSizeも大きくします)
    • 大規模なブロードキャストジョインが実行される
    • 同じクラスターで大量(50+)のストリームや同時実行ジョブ/ノートブックが実行される
      • このようなワークロードでは、Delta Live Tablesがフィットするかもしれません。DLTはパイプラインの完全なDAGを特定し、可能な限り最適化された方法で実行します。
  • シングルノードライブラリ(Pandasなど)を用いたワークロードでは、そのようなライブラリは分散された方法でクラスターのリソースを活用しないので、シングルノードのデータサイエンスクラスターを使うことが推奨となります。
  • 特定のチームや部署にコストを配賦するために、Databricksクラスターにクラスタータグを関連づけることができます。タグは自動的に背後のクラウドリソース(VM、ストレージ、ネットワークなど)に伝播されます。
  • シャッフルが頻繁に起こるワークロードにおいては、少数の大規模なノードを活用することが推奨となります。これによって、ノード間のデータ転送が起きる際のネットワークI/O削減の助けとなります。
  • RAMのサイズが大きなワーカー(>128GB)を選択することで、ジョブの実行をより効率的にする助けとなりますが、ガーベージコレクション時の長期間の停止につながることもあります。これは、パフォーマンスにネガティブな影響を与え、時にはジョブの失敗を引き起こすことがあります。このため、128GB RAMよりも大きなワーカーを選択することはお勧めしません。ワーカーのメモリーとは別に、長期間のガーベージコレクション(GC)による停止を避ける方法がいくつかあります:
    • ドライバーにデータを収集するためのcollect()関数を呼び出さないでください。
      • toPandas()関数も同様です。
    • SparkキャッシュよりもDeltaキャッシュを使いましょう
    • 上述のソリューションが役に立たない場合、高度なクラスターオプションの中にあるSpark設定で、spark.executor.defaultJavaOptions and spark.executor.extraJavaOptions to -XX:+UseG1GCを設定することで、G1GCガーベージコレクターを使いましょう。

特定のクラスターサイジングの例に関しては、Databricksドキュメントのクラスターサイジングの例をご覧ください。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?