0
0

More than 1 year has passed since last update.

Databricksにおけるデータファイル管理によるパフォーマンスの最適化

Last updated at Posted at 2022-01-26

Optimize performance with file management | Databricks on AWS [2022/6/2時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

クエリーを高速化するために、Databricks上のDelta Lakeはクラウドストレージに格納されるデータのレイアウトの最適化をサポートしています。DatabricksのDelta Lakeは2つのレイアウトアルゴリズムをサポートしています。bin-packingとZ-Orderingです。

本書では、以下を説明します。

  • 最適化コマンドをどのように実行するのか。
  • 2つのレイアウトアルゴリズムがどのように動作するのか。
  • 古いテーブルスナップショットをどのようにクリーンアップするのか。
  • FAQではなぜ最適化が自動化されないのかを説明し、どの程度の頻度で最適化コマンドを実行すべきかの推奨を説明します。
  • 最適化のメリットを説明するノートブックに関しては、Optimization examplesを参照ください。
  • Databricks SQL上のDelta Lakeの最適化コマンドのリファレンスに関しては、以下を参照ください。

コンパクション(bin-packing)

DatabricksのDelta Lakeでは、小さなファイル群を大きなファイルにまとめることで、テーブルから読み込みを行うクエリーのスピードを改善することができます。OPTIMIZEコマンドを実行することでコンパクションを起動することができます。

SQL
OPTIMIZE delta.`/data/events`
Python
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events")
deltaTable.optimize().executeCompaction()

あるいは、以下を実行します。

SQL
OPTIMIZE events
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events")
deltaTable.optimize().executeCompaction()

大規模データを保持しており、そのサブセットのみを最適化したい場合、オプションのWHEREを用いたパーティション述語を持ちいてサブセットを指定することができます。

SQL
OPTIMIZE events WHERE date >= '2022-11-18'
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()

注意

  • Bin-packingの最適化は冪等性が保証されるので、同じデータセットに対して2回実行したとしても、2回目の実行の効果はありません。
  • Bin-packingは、ディスク上のデータファイルのサイズに基づいて、均等にバランスされたデータファイルを生成しようとしますが、ファイルごとのタプルの数が必ずしも均等になるわけではありません。しかし、これら二つの指標は多くの場合相関します。
  • OPTIMIZEオペレーションのPythonとScala APIはDatabricksランタイム11.0以降で利用できます。

Deltaテーブルのreaderはスナップショット分離を使用ており、OPTIMIZEがトランザクションログから不要なファイルを削除している際に影響を受けないことを意味します。OPTIMIZEはテーブルに対するデータの変更は行いませんので、OPTIMIZEの実行前後での読み取り結果は同じとなります。ストリーミングのソースとなっているテーブルに対するOPTIMIZEの実行は、このテーブルをソースとして取り扱うストリームの現状、未来の状態に影響を及ぼしません。OPTIMIZEはオペレーションによって削除・追加されたファイルに関する統計情報(min、max、totalなど)を返却します。OPTIMIZEの統計情報には、Z-Orderingの統計情報、バッチの数、最適化されたパーティションの数も含まれます。

注意
Databricksランタイム 6.0以降で利用できます。

Auto Optimizeを用いることで、小規模ファイルを自動でコンパクトにすることができます。

データスキッピング

データスキッピングの情報は、Deltaテーブルにデータを書き込む際に自動で収集されます。DatabricksのDelta Lakeは、クエリーを行う際にこの情報(最小値、最大値)を活用してクエリーを高速化します。データスキッピングは設定不要です。利用できる場合には自動で適用されます。しかし、この効果はデータのレイアウトに依存します。ベストな結果を得るためにはZ-Orderingを適用します。

Databricks Delta Lakeのデータスキッピング、Z-Orderingのメリットの例に関しては、Optimization examplesのノートブックをご覧ください。Databricks Delta Lakeはデフォルトで、お使いのテーブルスキーマの最初の32列の統計情報を収集します。この値は、テーブルプロパティdataSkippingNumIndexedColsで変更することができます。統計情報を収集するカラムを増やすと、ファイルを書き込む際のオーバーヘッドが増加します。

長い文字列の統計情報の収集はコストが大きいオペレーションです。長い文字列の統計情報の収集を回避するには、長い文字列を含むカラムを除外するためにテーブルプロパティdataSkippingNumIndexedColsを設定するか、ALTER TABLE ALTER COLUMNを用いて、dataSkippingNumIndexedColsの範囲外の場所に長い文字列を含むカラムを移動するかします。以下を参照ください。

統計情報の収集のために、ネストされたカラムの個々のフィールドは個々のカラムと見なされます。

本書のトピックに関してはブログ記事Databricks Deltaを使って秒でペタバイトデータを処理するもご覧下さい。

Z-Ordering(多次元クラスタリング)

Z-Orderingは関連する情報を同じファイルセットに配置するテクニックです。Databricks Delta Lakeのデータスキッピングアルゴリズムは読み取る必要があるデータ量を劇的に削減するために、自動でこの局所性を活用します。データをZ-Orderするには、ZORDER BY句に並び替えを行うカラムを指定します。

SQL
OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

クエリーの述語に一般的に用いられるカラムがわかっており、当該カラムのカーディナリティが高い場合(すなわち、異なる値の数が多い)にはZORDER BYを使ってください。

カンマ区切りで複数のカラムをZORDER BYに指定することができます。しかし、カラムを追加するほど局所性の効果は減少します。データスキッピングはmin、max、countのようなコラム固有の統計情報を必要とするので、統計情報が収集されていないカラムに対するZ-Orderingは効果がなく、リソースの無駄となります。スキーマ内でカラムを並び替えることで統計情報を収集するカラムを指定するか、統計情報を収集する絡むの数を増やしてください。詳細はデータスキッピングを参照ください。

注意

  • Z-Orderingは冪等性が保証されませんが、インクリメンタルな処理を行おうとします。複数回のZ-Orderingで処理に要する時間が削減されることは保証されません。しかし、Z-Orderingが実行された直後のパーティションに新規データが追加されない場合、当該パーティションに対するさらなるZ-Orderingには効果がありません。

  • Z-Orderingはタプルの数に基づいて均等なデータファイルを作成しようとしますが、ディスク上のデータファイルサイズが必ずしも均等になるわけではありません。これらの二つの指標は多くの場合相関しますが、そうでない場合もありえますし、最適化タスクの時間に偏りが発生することもあります。

    例えば、過去より幅の広い(例えば、より長い配列、文字列)最近のデータに対して、ZORDER BY dateを実行した場合、OPTIMIZEのジョブタスクの時間、生成されるファイルサイズに偏りが生じることが予想されます。しかし、この場合、問題はOPTIMIZEコマンドに対するものであり、以降のクエリーにネガティブなインパクトはありません。

ファイルサイズのチューニング

このセクションでは、Deltaテーブルのサイズをどのようにチューニングするのかを説明します

ターゲットサイズの設定

注意
Databricksランタイム8.2以降で利用できます。

お使いのDeltaテーブルのサイズをチューニングしたい場合には、希望のサイズをテーブルプロパティdelta.targetFileSizeに設定します。プロパティが設定されると、すべてのデータレイアウト最適化オペレーション(例えば、コンパクション(bin-packing)Z-OrderingAuto CompactionOptimized Writes)は指定されたサイズでファイルを生成するようにベストエフォートで処理を行います。

テーブルプロパティ
delta.targetFileSize
Type: バイトあるいはより大きい単位で指定するサイズ
ターゲットのファイルサイズ。例えば、104857600 (バイト) あるいは 100mb
デフォルト値: None

既存のテーブルに対しては、SQLコマンドのALTER TABLE SET TBL PROPERTIESを用いて値の設定、解除が可能です。また、Sparkセッション設定を用いて、新規テーブルに自動的にこれらのプロパティを設定することができます。詳細はTable propertiesを参照ください。

ワークロードに基づく自動チューニング

注意
Databricksランタイム8.2以降で利用できます。

手動でのチューニングの手間を最小化するために、Databricksではテーブルに操作を加えているワークロードに基づいて、Deltaテーブルのファイルサイズを自動でチューニングすることができます。Databricksはファイルの再書き込みを行う頻繁なMERGEを行うテーブルを自動で検知し、将来のさらなるファイルの再書き込みを予想して再書き込みを行うファイルサイズを小さくする選択をする場合があります。例えば、MERGEオペレーションを行う際、テーブルに対する過去の10回のオペレーションのうち9回がMERGEであった場合、MERGEで使用されるOptimized WritesAuto Compactionが有効化されている場合、有効化されていない場合よりも小さいサイズのファイルを生成します。これによって、将来的なMERGEオペレーションの処理時間を削減する助けになります。

再書き込みが数回行われた後でオートチューニングが有効化されます。しかし、Deltaテーブルで頻繁なMERGEUPDATEDELETEオペレーションが行われることが予想され、即座にオートチューニングを有効化したい場合、テーブルプロパティdelta.tuneFileSizesForRewritesを設定することで、再書き込みに対するファイルサイズを明示的にチューニングすることができます。このプロパティをtrueに設定することで、当該テーブルに対する全てのデータレイアウト最適化オペレーションに対して常に小さい際ファイルサイズを使うようになるので、自動検知の起動が不要になります。

テーブルプロパティ
delta.tuneFileSizesForRewrites
Type: Boolean
データレイアウト最適化のファイルサイズをチューニングするかどうか。
デフォルト値: None

既存のテーブルに対しては、SQLコマンドのALTER TABLE SET TBL PROPERTIESを用いて値の設定、解除が可能です。また、Sparkセッション設定を用いて、新規テーブルに自動的にこれらのプロパティを設定することができます。詳細はTable propertiesを参照ください。

テーブルサイズに基づく自動チューニング

注意
Databricksランタイム8.4以降で利用できます。

手動でのチューニングの手間を最小化するために、Databricksではテーブルサイズに基づいてDeltaテーブルのファイルサイズを自動でチューニングすることができます。Databricksは、テーブルのファイル数が膨大にならないように、小さいテーブルには小さいファイルサイズを適用し、大きいテーブルには大きいファイルサイズを適用します。Dataricksはターゲットサイズの設定でチューニングしている場合、あるいは、頻繁な再書き込みを行うワークロードに基づくチューニングをしている場合にはこの自動チューニングを行いません。

現在のDeltaテーブルのサイズに基づいてターゲットファイルサイズが決定されます。2.56TBよりも小さいテーブルに対しては、自動チューニングされたターゲットファイルサイズは256MBとなります。2.56TBと10TBの間のテーブルに対しては、ターゲットファイルサイズは256MBから1GBに線形増加します。10TBより大きいテーブルでは、ターゲットファイルサイズは1GBになります。

注意
テーブルに対するターゲットファイルサイズが増加する際、OPTIMIZEコマンドによって既存のファイルは大きなサイズに再最適化されません。このため、大きなテーブルには常にターゲットサイズよりも小さいサイズのファイルがいくつか存在することになります。これらの小さなファイルも大きなファイルに最適化する必要がある場合には、テーブルプロパティdelta.targetFileSizeを指定することで、テーブルに対する固定のターゲットファイルサイズを設定することができます。

テーブルに対してインクリメンタルな書き込みが行われる際、ターゲットファイルサイズとファイル数はテーブルサイズに基づき以下のように近似されます。このテーブルのファイル数はあくまで例となります。実際の結果は、様々な要因によって変動します。

テーブルサイズ ターゲットファイルサイズ テーブルのファイル数の近似値
10GB 256MB 40
1TB 256MB 4096
2.56TB 256MB 10240
3TB 307MB 12108
5TB 512MB 17339
7TB 716MB 20784
10TB 1GB 24437
20TB 1GB 34437
50TB 1GB 64437
100TB 1GB 114437

ノートブック

最適化のメリットを感じていただく例として、以下のノートブックを参照下さい。

Pythonのノートブックの翻訳版はこちらとなります。

インタラクティブクエリーの性能改善

Deltaエンジンは、クエリー性能を改善するためのいくつかの追加機能を提供しています。

データ鮮度の管理

それぞれのクエリーの最初に、Deltaテーブルはテーブルの最新のバージョンまで自動で更新をかけます。このプロセスはノートブックのコマンドステータスレポートのUpdating the Delta table's stateで確認することができます。しかし、テーブルの過去のデータに対する分析を実施している場合、特にストリーミングで頻繁にデータが取り込まれている場合においては、分単位での最新データを必要としないかもしれません。このような場合、お使いのDeltaテーブルの古いスナップショットに対してクエリーが実行されます。これにより、クエリーの結果を得る際のレーテンシーを低減することができます。

Sparkセッション設定spark.databricks.delta.stalenessLimitに、1h(1時間)、15m(15分)、1d(1日)のように時間を表現する文字列を指定することで、お使いのテーブルデータがどのくらい古くて良いのかを設定することができます。この設定はセッション固有なので他のノートブック、ジョブ、BIツールなどからこのテーブルにアクセスする他のユーザーに影響を与えません。さらに、この設定はテーブルの更新を妨げるものではありません。ただ、クエリーはテーブルの更新を待つ必要があるだけです。バックグラウンドで更新は発生し、クラスターでリソースを公平に共有します。古さの制限を超えた場合、クエリーはテーブル状態の更新でブロックされます。

低レーテンシークエリー向けに強化されたチェックポイント

Delta Lakeは、集約されたDeltaテーブルの状態としてチェックポイントを最適化された頻度で書き込みます。これらのチェックポイントはテーブルの最新の状態を計算する際のスタート地点となります。チェックポイントがないと、Delta Lakeはテーブルの状態を計算するためにトランザクションログに対するコミットを表現するJSONファイル("delta"ファイル)の膨大なコレクションを読み込まなくてはなりません。さらに、データスキッピングを行うためにDelta Lakeが使用するカラムレベルの統計情報はチェックポイントに格納されます。

重要!
Delta Lakeのチェックポイントは構造化ストリーミングのチェックポイントとは別です。

Databricksランタイム7.2以前では、カラムレベルの統計情報はDelta LakeのチェックポイントにJSONカラムとして保存されます。

Databricksランタイム7.3 LTS以降では、カラムレベルの統計情報はstructとして格納されます。structフォーマットにすることで、以下の理由からDelta Lakeはより高速に読み込みを行うことができます。

  • カラムレベルの統計情報を取得するためにDelta Lakeが高コストなJSONパーシングを実行する必要がありません。
  • Parquetのカラムプルーニング機能が、カラムの統計情報を読み込むために必要なI/Oを劇的に削減します。

structフォーマットを用いることで、Delta Lakeの読み込みオペレーションのオーバーヘッドを秒単位から十数ミリ秒に削減する最適化を可能とし、短時間のクエリーのレーテンシーを劇的に削減します。

チェックポイントにおけるカラムレベル統計情報の管理

テーブルプロパティdelta.checkpoint.writeStatsAsJsondelta.checkpoint.writeStatsAsStructを用いて、どのように統計情報をチェックポイントに書き込むのかを管理することができます。両方のプロパティがfalseに設定された場合、Delta Lakeはデータスキッピングを行うことはできません

Databricksランタイム 7.3 LTS以降では:

  • バッチの書き込み処理は統計情報をJSONとstructフォーマットの両方で書き込みます。delta.checkpoint.writeStatsAsJsontrueです。
  • デフォルトではdelta.checkpoint.writeStatsAsStructはundefinedです。
  • Readerは利用可能な場合にはstructカラムを使用し、そうでない場合にはJSONカラムにフォールバックします。

ストリーミング書き込みでは:

  • Databricksランタイム 7.5以降: 統計情報をJSONフォーマット、structフォーマットの両方に書き込みます。
  • Databricksランタイム 7.3 LTSと7.4: (チェックポイントによる書き込みレーテンシーへのインパクトを最小限にするために)統計情報をJSONフォーマットでのみ書き込みます。structフォーマットでも書き込みを行う際には、チェックポイントにおける統計情報のトレードオフを参照ください。

Databricksランタイム 7.2以前では、ReaderはJSONカラムのみを使用します。このため、delta.checkpoint.writeStatsAsJsonfalseの場合、そのようなreaderはデータスキッピングを行うことはできません

重要!
強化チェックポイントはオープンソースのDelta Lakeのreaderとの互換性を損なうものではありません。しかし、delta.checkpoint.writeStatsAsJsonfalseに設定した場合、プロプライエタリのDelta Lakeリーダーに影響を与える可能性があります。パフォーマンスへの影響に関してはベンダーにコンタクトください。

チェックポイントにおける統計情報のトレードオフ

チェックポイントへの統計情報の書き込みにはコストがかかる(通常は大規模テーブルにおいても1分未満)ので、チェックポイントの書き込みに要する時間とDatabricksランタイム 7.2以下との互換性に対するトレードオフが存在します。お使いのワークロードをDatabricksランタイム 7.3 LTS以降にアップグレードすることができるのであれば、レガシーなJSONの統計情報を無効化することで、チェックポイントの書き込みコストを削減することができます。

お使いのアプリケーションでデータスキッピングが不要である場合、両方のプロパティをfalseに設定することで、統計情報が収集、書き込みされなくなります。この設定はお勧めしません。

writeStatsAsStruct
false true
writeStatsAsJson false
  • データスキップなし
  • Databricksランタイム7.3 LTS以降で高速なクエリーを実現
  • チェックポイント作成が若干遅くなる
  • Databricksランタイム 7.2以前ではreaderがデータスキッピングを利用できない
false
  • Databricksランタイム7.2以前
  • クエリーが遅い
  • Databricksランタイム7.3 LTS以降で高速なクエリーを実現
  • Databricksランタイム 7.2以前のreaderとの互換性を維持
  • チェックポイントのレーテンシーは最大(秒オーダー)

構造化ストリーミングクエリーに対する強化チェックポイントの有効化

お使いの構造化ストリーミングのワークロードに低レーテンシーの要件がない場合、以下のSQLコマンドを実行することで強化チェックポイントを有効化することができます。

SQL
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

データにクエリーを実行するためにDatabricksランタイム 7.2以前を使用していない場合には、以下のテーブルプロパティを設定することでチェックポイントのレーテンシーの書き込み性能を改善することができます。

SQL
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

統計情報のstructなしにチェックポイントを書き込むクラスターからの書き込みを無効化する

Databricksランタイム 7.2以前のwriterは、統計情報のstructなしにチェックポイントを書き込みますが、これはDatabricksランタイム 7.3 LTSのreaderに対する最適化を妨げます。

Databricksランタイム 7.2以前が実行されているクラスターによるDeltaテーブルへの書き込みをブロックするために、upgradeTableProtocolメソッドを用いてDeltaテーブルをアップグレードすることができます。

Python
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
Scala
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

警告!
upgradeTableProtocolメソッドを適用することで、お使いのテーブルに対するDatabricksランタイム 7.2以前のクラスターの書き込みはできなくなり、この変更を戻すことはできません。新たなフォーマットに切り替えることをコミットしたあとでのみ、テーブルをアップグレードすることをお勧めします。Databricksランタイム 7.3 LTSを用いてテーブルのシャローCLONEを作成することで、これらの最適化をトライすることができます。

テーブルwriterのバージョンをアップグレードしたら、writerは'delta.checkpoint.writeStatsAsStruct' 'delta.checkpoint.writeStatsAsJson'の設定に従わなくてはなりません。

以下のテーブルでは様々なバージョンのDatabricksランタイム、プロトコルのバージョン、wrtiterのタイプに応じて強化チェックポイントをどのように活用できるのかをまとめています。

注意
テーブルは原文を参照ください。

古いチェックポイントフォーマットを使用するクラスターの書き込みを無効化する

Databricksランタイム 7.2以前のwriterは古いフォーマットでチェックポイントを書き込むことができ、これはDatabricksランタイム 7.3 LTSのwriterの最適化を妨げます。Databricksランタイム 7.2以前が実行されているクラスターによるDeltaテーブルへの書き込みをブロックするために、upgradeTableProtocolメソッドを用いてDeltaテーブルをアップグレードすることができます。

Python
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
Scala
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

警告!
upgradeTableProtocolメソッドを適用することで、お使いのテーブルに対するDatabricksランタイム 7.2以前のクラスターの書き込みはできなくなり、この変更を戻すことはできません。新たなフォーマットに切り替えることをコミットしたあとでのみ、テーブルをアップグレードすることをお勧めします。Databricksランタイム 7.3 LTSを用いてテーブルのシャローCLONEを作成することで、これらの最適化をトライすることができます。

FAQ

なぜOPTIMIZEは自動ではないのか?

OPTIMIZEオペレーションは、コンパクション(オプションとしてZ-Orderingの実行)を通じてファイルのサイズを最適化するために多くのSparkジョブを起動します。OPTIMIZEが実行することの大部分は小規模ファイルのコンパクションなので、オペレーションが効果を発揮する前にまず初めに大量の小規模ファイルを集約する必要があります。このため、OPTIMIZEオペレーションを実行します。

さらに、OPTIMIZEの実行、特にZORDERは時間、リソースの観点で高コストなものになります。DatabricksがOPTIMIZEを自動で実行し、バッチにおいてデータの書き込みを待つことになる、低レーテンシーの(Deltaテーブルがソースとなる)Delta Lakeストリームを実行する能力を損なうことになります。多くのお客様においては、テーブルからデータをストリーミングしていることから、OPTIMIZEが提供するであろうクエリーのメリットがあったとしても、決してDeltaテーブルの最適化を実施していません。

最後に、(OPTIMIZEオペレーションを経由しようがしまいが)Delta Lakeはテーブルに書き込まれたファイルに関する統計情報を自動で収集します。これは、Detlaテーブルからの読み込みにおいては、テーブルやパーティションにOPTIMIZEオペレーションを実行したかどうかにかかわらず、この情報を活用することを意味します。

どのくらいの頻度でOPTIMIZEを実行すべきですか?

OPTIMIZEの実行頻度を検討する際、パフォーマンスとコストのトレードオフが存在します。すぐれたエンドユーザーのクエリーパフォーマンス(リソースの使い方によっては、より高いコストでも必要となります)を必要とするのであれば、高い頻度でOPTIMIZEを実行すべきです。コストを最適化したい場合には頻度を下げて実行すべきです。

日次ベース(スポット価格の安い夜間の実行をお勧めします)でOPTIMIZEを実行するところからスタートすることをお勧めします。その後に更新頻度を更新していきます。

OPTIMIZEを実行するのに最適なインスタンスタイプは何ですか?

両方のオペレーションは、大量のParquetデコーディング、エンコーディングを行うCPUを必要とする処理となります。

これらのワークロードに対しては、c5dシリーズをお勧めします。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0