Comprehensive Guide to Optimize Data Workloads | DatabricksのセクションDelta Merge — Let’s Speed It Upの翻訳です。
- 本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
- 2023年時点の内容です。一部情報が古いものがあります。
MERGEオペレーションを用いることで、ターゲットのDeltaテーブルにソーステーブル、ビューやデータフレームからのデータをupsertすることができます。Delta LakeではMERGEコマンドで、insert、update、deleteをサポートしています。
日次ベースでDeltaテーブル全体を上書き、insertするのではなく、可能な限りインクリメンタルなロード戦略を用いることをお勧めします。インクリメンタルなロードを実現するには、DeltaのMERGEが非常に重要になります。また、DeltaのmergeはSCD Type 2テーブルやchange data capture (CDC)ユースケースの作成で活用することができます。こちらでは、Deltaのmergeの使い方のいくつかの例を示しています: マージを使用してDelta Lakeテーブルにアップサートする、APPLY CHANGES API : Delta Live Tablesを使用してチェンジデータキャプチャを簡素化。
Mergeの内部処理
Mergeオペレーションの背後では2つのステップで処理が行われています:
- ドライバーにマッチする業を含むターゲット内のファイルの一覧を返却するために、ON句の条件を用いたソースとターゲット間のinner joinを実行。
- このステップでは2つの可能性があります - 条件に基づいて以下のいずれかが実行されます:
- ステップ1でターゲットにマッチする行がない場合には、ソースからターゲットに追記するためにappend-onlyの書き込みが行われます。
- そうでない場合、マージされるソースとステップ1で生成されたマッチファイル間の変更点を統合するために、full outer joinが実行されます。
Mergeの課題
以下の理由から遭遇しうる特定のパフォーマンスの問題が存在します:
- MergeオペレーションのON句でマッチするような特定の十分な条件がない場合、膨大なデータを再度書き込むことなります。これはMergeをスローダウンさせます。
- それぞれのmergeの後で膨大な量のデータを再度書き込む場合、Z-orderによるソートはメチャクチャになり、ソートを維持するために毎回のmergeの後でZ-orderを実行しなくてはならないかもしれません。
Mergeの最適化
上述の課題を解決するために、以下の最適化テクニックを活用することができます:
ターゲットテーブルのデータレイアウト
ターゲットテーブルに大きなファイル(例えば、500MB-1GB)が含まれており、merge処理のステップ1でこれらのファイルの多くがドライバーに返却されると、ファイルが大きいほどに少なくとも1行がマッチする確率が高まります。そして、これによって膨大な量のデータが再書き込みされます。このため、mergeが大量に起きるテーブルにおいては、ケースパイケースかつワークロード依存で変動しますが、ファイルサイズを16MBから64MBのように小規模のものにすることをお勧めします。詳細はファイルサイズのチューニングをご覧ください。
パーティションのプルーニング
不適切なパーティションを除外するように、mergeオペレーションのON句でパーティションのフィルターを指定します。詳細はこちらの例をご覧ください。
ファイルのプルーニング
不適切なファイルを除外するように、mergeオペレーションのON句にフィルターとして(あれば)Z-orderのカラムを指定します。ON句でANDオペレーターを用いることで複数の条件を追加することができます。
ブロードキャストジョイン
Deltaのmergeは背後でjoinを実行するので、ソースのデータフレームが十分小さい場合(<= 200MB)には、ターゲットDeltaテーブルにマージされるソースのデータフレームを明示的にブロードキャストすることでスピードアップすることができます。詳細はブロードキャストジョインをご覧ください。
ローシャッフルマージ
- これは、変更されないデータの(Z-orderクラスタリングを含む)既存のデータ構造を維持することを狙いとした新たなMERGEアルゴリズムであり、パフォーマンスも改善します。
- この「ローシャッフル」MERGEによって、更新された行のみがオペレーションによって再構成されますが、変更されていない行は、オペレーション前に存在したのと同じオーダーとファイルのグループに留まります。
ローシャッフルマージはDatabricksランタイム10.4以降ではデフォルトで有効化されています。以前のバージョンのDatabricksランタイムでは、spark.databricks.delta.merge.enableLowShuffle
をtrueに設定することで有効化することができます。