以下の内容の元ネタのスライドを公開しました!
高性能な分析環境の構築
Databricksで採用されているレイクハウスアーキテクチャにおいては、データはファイルとして保持されます。このため、ファイルのレイアウトをどのように最適化するかが非常に重要となります。
つまり、基礎としてファイルのレイアウトを最適化してから、コードを最適化します。最後にクラスターのサイジングを行うというのが最適化の流れとなります。
注意
ここで言及しているパフォーマンスチューニングは、大規模データを対象としたものです。数メガバイトなどラップトップのメモリに収まるデータであればpandasを使ったほうが楽です。
基礎のデザイン
一般的なパフォーマンスのボトルネック
ビッグデータシステムやMPPシステムで遭遇します。
ボトルネック | 詳細 |
---|---|
小さなファイル問題 |
|
データの偏り |
|
過度の処理 | 従来のデータレイクプラットフォームは、多くの場合データセットやパーティション全体の再書き込みを必要とします。 |
リソース競合 | アドホックやBIクエリーとして、大規模な取り込み処理とETLジョブを同時に実行すると、クラスターの分離なしにはクエリーパフォーマンスの低下につながります。 |
小さなファイル問題の回避
Data Lakeではこの一般的なパフォーマンスの問題を自動で対応します。
- 大量の小さなファイルは読み込みのオーバーヘッドを大きく引き上げます
- 少量の大規模ファイルは読み込みの並列性を低下させます
- 過度のパーティショニングは一般的な問題です
- DatabricksではDelta Lakeテーブルのサイズを自動でチューニングします
- Databricksでは、auto-optimizeによって自動で小規模ファイルをコンパクトにします
データ偏りへの対応
データの偏りは不可避ですが、Databricksでは自動でこれに対応します。
- MPPシステムでは、一部のワーカーで大量の処理を行うことになるので、パフォーマンスに大きなインパクトをもたらします
- 多くのクラウドDWでは、データの偏りを解決するために手動、オフラインの再分配を必要とします
- Adaptive Query Executionによって、Sparkは大規模なパーティションを小規模でサイズの揃ったパーティションに自動でブレークダウンします
リキッドクラスタリング - パーティションよさらば
これまでのDatabricksにおける最適化技術を置き換えるのがリキッドクラスタリングです。
- 高速
- 適切にチューニングされパーティショニングされたテーブルよりも高速な書き込みと同等の読み込み性能
- 自己チューニング
- 過度のパーティショニングやパーティショニング不足を回避
- インクリメンタル
- 新規データに対して自動の部分的なクラスタリング
- 偏りへの耐性
- 一貫性のあるファイルサイズを生成し、書き込み量の増幅を抑制
- 柔軟
- クラスタリング列を変更したい?問題ありません!
リキッドクラスタリングのメリットが出るシナリオ
- 頻繁に高いカーディナリティでフィルタリングされるテーブル
- データ分布に大きな偏りのあるテーブル
- 急速に成長し、メンテナンスやチューニングの工数を要するテーブル
- 同時書き込み要件のあるテーブル
- 時間とともにアクセスパターンが変化するテーブル
- 典型的なパーティションキーが多すぎる/少なすぎるパーティションを生成してしまうテーブル
取り込み時間によるクラスタリング
パーティショニングやZ-Orderなしにすぐに利用可能なデータスキッピングを実現します。
テーブルの統計情報
コストベースのオプティマイザーがベストな結果を出せるようにテーブルの統計情報を最新に維持します。
- テーブルのすべてのカラムの統計情報を収集
- Adaptive Query Executionをサポート
- 適切なjoinタイプの選択
- hash-joinにおける適切なビルドサイドの選択
- マルチウェイjoinにおけるjoin順序の調整
ANALYZE TABLE mytable COMPUTE STATISTICS FOR ALL COLUMNS
インクリメンタル処理
ACIDトランザクションとCDCでデータセット全体の再処理を回避しましょう。
- 従来のデータレイクシステムでファイルを操作する際、追記、ファイル全体の上書きや削除しかできません。
- ACIDトランザクションによって、Delta LakeではDWシステムと同じようにレコードレベルでのINSERT、UPDATE、DELETE、UPSERTを可能にします。
- 変更点のみを処理するCDCアーキテクチャパターンに移行することで、全体的な処理時間を劇的に削減します。
基本的な推奨事項
- 自動チューニングのメリットを享受するためにDatabricksとDelta Lakeを活用しましょう:
- 小さなファイル問題を回避するためのファイルサイズの自動チューニングとauto-optimize
- AQEによる自動での偏りへの対応
- 自然なソートオーダーの保持によって、1TBより小さいパーティニングテーブルが不要に
- リキッドクラスタリングとクエリーのフィルターでよく使われるカラムに対するクラスターによるデータスキッピングを活用しましょう(週次のOPTIMIZEジョブ)。
- 特にjoinで使用されるカラムのテーブル統計情報を収集しましょう(週次のANALYZEジョブ)。
- CDCアーキテクチャに移行し、変更データのみを処理するようにするために、Delta LakeのSQL DML機能を活用しましょう。
- リソース競合を回避するために、分離されたジョブクラスターとSQLウェアハウスを活用しましょう。
コードの最適化
コード最適化の推奨事項は以下の通りとなります。
- プロダクションのジョブでは、ファイルの読み書きを伴うアクションを起動するオペレーションは避けましょう。これには、
count()
、display()
、collect()
が含まれます。 - シングルスレッドのpython/pandas/Scalaを用いるなど、ドライバーノードにすべての計算処理を強制するオペレーションを避けましょう。pandas関数を分散処理するためにPandas API on Sparkを使いましょう。
- 行ごとに処理を行うPythonのUDF(ユーザー定義かんす)は避けましょう。代わりに、ネイティブのPySpark関数かベクトル化UDFであるPandas UDFを使いましょう。
- RDDではなくデータフレームかデータセットを使いましょう。RDDではコストベースのオプティマイザーを活用できません。
ユーザー定義関数についてはこちらもご覧ください。
ファインチューニング: 適切なクラスターの選択
こちらのチートシートもご覧ください。
クラスターのタイプ
ワークロードに適したクラスターを選択することが重要です。
ALL PURPOSE COMPUTE
- 分析やアドホックなDE & DSの開発
- 共有クラスターがおすすめですが、チームやワークロードでの分離がベストプラクティスとなります。
- 稼働済みのクラスターを常に利用 (APIやスケジュール処理を含む)
- より高価です
JOBS COMPUTE
- ジョブのために作成され、処理が完了すると停止する揮発性のクラスター
- 事前のスケジュールあるいはAPI経由での作成
- シングルユーザー
- 処理の分離やデバッグに最適
- プロダクションあるいは定期的なワークロード
- 低コスト
SQL WAREHOUSE
- アドホックなSQL分析やBIにおける高い同時実行性を提供
- Photon有効化
- アドホックのSQL分析では、ワークロード固有の別個のウェアハウスではなく、共有ウェアハウスが推奨です。
- 即座に起動するサーバレスを利用できます。
オートスケーリング
- ワークロードに基づきクラスターのサイズを動的に変更します
- 固定サイズやプロビジョン不足のクラスターよりも高速な処理
- 固定サイズのクラスターよりも全体的なコストを削減
- ワーカー数の範囲設定にはいくつかの実験が必要です
ユースケース | オートスケーリングの範囲 |
---|---|
アドホックの利用やビジネス分析 | 広い範囲 |
プロダクションのバッチジョブ | 不要あるいは上限値のバッファ |
ストリーミング | Delta Live Tablesで利用可能 |
スポットインスタンス
- スペアのVMインスタンスを安いマーケット価格で利用するためにスポットインスタンスを活用しましょう
- アドホック/共有クラスターに最適
- ミッションクリティカルなSLAを持つジョブでは非推奨
- ドライバーでは決して使わないでください!
- 様々なユースケースにクラスターを仕立てるためにオンデマンドと(カスタムスポット価格の)スポットインスタンスを組み合わせましょう
SLA | スポットかオンデマンドか |
---|---|
ミッションクリティカルではないジョブ | ドライバーはオンデマンド、ワーカーはスポット |
厳しいSLAのあるワークフロー | オンデマンドにフォールバックするスポットインスタンスを使用 |
フリートクラスター
- 可用性の改善:
- インスタンス取得エラーの最小化
- スポット削除の最小化
- Spot Placement Scoreを用いた自動AZ選択の改善
- コスト削減
- スポットへのアクセス改善はTCOを削減します
- クラスター作成をシンプルに:
- コンピュートタイプとサイズを選択するだけでよく、インスタンスタイプの選択が不要になります
AWS Gravitonインスタンスのサポート (ARM)
- Databricksのコストパフォーマンスを改善し、お客様のTCOを削減
- ARM上のPhotonランタイムをサポート
ワークロードごとのクラスターのお勧め - AWS
- 使えるならGravitonを使いましょう
- 使えるなら最新世代のVMを使いましょう
- General purposeから始めて他のオプションをテストしましょう
ワークロードごとのクラスターのお勧め - Azure
- 使えるならAMD on Azureを使いましょう
- 使えるなら最新世代のVMを使いましょう
- General purposeから始めて他のオプションをテストしましょう
- Deltaキャッシュが有効化されるものを使いましょう
ワークロードごとのクラスターのお勧め - GCP
- 使えるならAMD on GCPを使いましょう
- 使えるなら最新世代のVMを使いましょう
- General purposeから始めて他のオプションをテストしましょう
- Deltaキャッシュが有効化されるものを使いましょう
Photon
チューニングやセットアップ不要の世界記録のクエリーエンジンを活用しましょう。
- コンピュートコストの削減
- ETL利用者はコンピュートコストを最大40%削減
- 高速なクエリーパフォーマンス
- 他のクラウドデータウェアハウスと比較して、最大12倍のコストパフォーマンスを持つモダンなハードウェア向けに開発
- コード変更が不要
- 探索、ETL、大規模データ、小規模データ、低レーテンシー、高い同時実行性、バッチ、ストリーミングを行うSpark API
- 様々な言語をサポート
- SQL、Python、Scala、R、Javaをサポート
Photonのカバレッジ (DBR 11.3)
アーキテクチャの検討事項
リソース競合を避けるためにクラスター & ウェアハウスを分離しましょう。
- 揮発性のジョブクラスター
- Jobs - 取り込み + ETLオブのために分離されたコンピュートは当該ワークロードに対してサイジング/最適化することができ、スケジュールで実行されます
- ジョブ実行中のみ課金が発生します
- 共有の開発クラスター
- All-purpose - チームが活発に開発している際にのみ利用し、必要なリソースが追加されるようにするための自動停止とオートスケーリング
- 完全なデータセットのサブセットで開発、テストすることを推奨します
- アドホック分析のための共有SQLウェアハウス
- SQL warehouse - チームが活発にクエリーを実行している際に利用し、必要なリソーが追加されるようにするための自動停止とオートスケーリング
- 即座きに起動し、アイドル時間でシャットダウンするサーバレスを利用可能
- BIレポートのための別個のSQLウェアハウス
- BI要件に合わせてサイジングし、他のプロセスとの競合を回避
ファインチューニング: 適切なウェアハウスの選択
Databricks SQLのサーバレスコンピュート
即時起動、柔軟 & 管理不要のコンピュートを活用しましょう。
- クイックにセットアップ、即時起動で柔軟なSQLウェアハウス - ストレージからの分離 - Photon有効化
- ベストなコストパフォーマンス(最大12倍)のためにインスタンスタイプと設定を自動で決定
- 高い同時実行性 ビルトインの自動ロードバランス
- インテリジェントなワークロード管理、クラウドストレージからの高速な読み込み
- 即座に起動、優れた可用性、サーバレスで全体のコストを平均で40%削減
AIが提供するシンプルさ
新たなクエリーを即座に実行するために優先度を上げるか、実行中のクエリーを阻害することなしに実行できるようにするためにスケールアップするかを判断するために、ワークロードの履歴を継続的に学習します。
Predictive I/O
インデックスなしに選択的なクエリーを実現します。
SELECT * from events where user_id = 123
- データのコピーを作成する必要が無かったり、高コストなインデックスを作る必要が無かったとしたら?
- システムがあなたのクエリーでどのデータが必要で、次に何を必要とするのかを学習するとしたら?
- あなたのクエリーが調整なしに高速だとしたら?
Predictive I/O for Readsでは、パフォーマンスの手動のチューニングを排除するためにMLを活用します。
- 手動チューニングの必要なしにデータの場所を三角法で計測 (PARTITION BY, CLUSTER BY)
- Predictive I/Oでは、レイクハウスをデータウェアハウスにするために、大規模AI/MLシステムを構築してきたDatabricksの数年に渡る経験を活用しています
クラスター/ウェアハウス最適化の推奨事項
- DS & DEの開発: オートスケールと自動停止を有効化したall-purposeコンピュート、データのサブセットで開発 & テスト
- 取り込み & ETLジョブ: オートスケールが有効化され、ジョブのSLAに適したサイズのjobsコンピュート
- アドホックのSQL分析: (サーバレス) SQLウェアハウス、オートスケールと自動停止を有効化
- BIレポート: BIのSLAに適したサイズの別個のSQLウェアハウス
-
ベストプラクティス:
- ワーカーノードでスポットインスタンスを活用しましょう
- スポットの可用性を高めるためにフリートクラスターやauto-AZを活用しましょう
- 可能な場合にはGravitonインスタンスを使いましょう
- 可能な場合には最新のLTSのDatabricksランタイムを使いましょう
- 適用できる際にはベストなTCOのためのPhotonを使いましょう
- 最新世代のVMのgeneral purposeからスタートし、memory/compute optimizedをテストしましょう
- アイドル時間の削減とAIで強化された最適化のためにサーバレスSQLウェアハウスを使いましょう
DS&DEクラスターのサイジング
クラスターサイジングのスタート地点
- それぞれのワークロードにおけるSLA要件に基づいて選択します
- 最初はタスク(ファイル)の数に基づいてサイズを決め、あとで調整します
- タスクの数の見当をつけるために小規模なクラスターでジョブを実行します
- ベースのサイジングではコア数の2-3倍のタスクを使用します
- 1タスク = 1ファイル。 Databricksはauto-optimizeによって最適なサイズでファイルを書き出します。
ディスク溢れの回避
- 予想したよりもジョブに時間を要する、あるいは、全く完了しない場合、それはディスク溢れによるものである場合があります。
- Spark UIでディスク溢れを特定できます (下をご覧ください)
- インメモリでオペレーションを実行するのに十分なメモリがない場合、ディスクの読み書きが必要となります。これは非常に高コストなオペレーションです。
- メモリー量を増加することでディスク溢れを解決します。
- 他のプラットフォームでは、データの偏りによってディスク溢れが起きることがあります。
- AQE(デフォルトで有効化)によってDatabricksではデータの偏りに自動で対応します。
- インメモリのシャッフルパーティションを調整することで、ディスク溢れを回避できる場合があります。Databricksでは適切なシャッフルパーティション数を自動で特定することができます:
spark.sql.shuffle.partitions = Auto
クラスターのサイジング - ETL、分析ワークロード
- 計算資源に制限を受けていますか?
- CPU使用量をチェック (クラスターのメトリクス)
- 高速にする唯一の方法はコアの追加です
- ネットワークに制限を受けていますか?
- 重い計算ステップのまえの高いスパイクをチェック
- シャッフルを削減するためにより大きく/少ないマシンを使用
- 高速なリモート読み込みのためにSSD搭載インスタンスを使用
- 大量に溢れていますか?
- Spark SQLタブで溢れをチェック (シャッフル前の集計は一般的な溢れの原因です)
- メモリー最適化インスタンスを使用
SQLウェアハウスのサイジング
SQLウェアハウスにおいてはサイズによってワーカー数が変化します。
SQLウェアハウスのオートスケール
- 同時実行処理に対応するために、それぞれのDatabricks SQLウェアハウスの同時実行クエリー数は10に制限されています。
- Databricks SQLウェアハウスでオートスケールを有効化すると、ウェアハウスは同時実行数に応じて自動でクラスターを追加し、不要になったらクラスターを削減します。
ウェアハウスのサイジング
1. どのくらいのコストと時間のクエリーなのかの見当をつけます
- 標準のクラスターのサイジングと同じように通常のランタイムを使います。
- 以下に助けとなるメトリクスを示します:
- 500GB, 3.6B行、小規模テーブルのjoin + 大量のフィルター => Lクラスターで50秒
- 100GB, すべての700M行が600Mとjoin + group by + order => Lクラスターで60秒
- 50GB, 60M行が1Mとjoin => Lクラスターで8秒
- 10GB => 72M行で多数のbroadcast joinとフィルター => Sクラスターで2秒
- “場合によります”がここからスタートできます…
2. あなたのSLAに応じた個々のクラスターサイズを考えます
- レーテンシーはクラスターあたりのクエリー数とほぼ線形関係にあります
- 例: うまく分散された1つのクエリーが10秒かかる場合、10クエリーの実行に100秒要します
3. ユーザー / 同時実行リクエストの数でスケールします
- 大きなクラスターは大規模なクエリーを高速に処理し、スループットを改善します
- クエリーの処理時間が約5秒以下の場合、大きなクラスターはより多くの同時実行クエリーを捌くための助けにはなりません
- 10-15の同時実行クエリーに対して1台のクラスターでうまく動作すると考えましょう
ウェアハウスのサイジングではモニタリングの機能も活用しましょう。