Best practices: Delta Lake | Databricks on AWS [2021/4/14時点]の翻訳に一部追記したものです。
Delta Lakeを使用する際のベストプラクティスをご紹介します。
データ位置のヒントを指定する
あるカラムがクエリーの検索条件に頻繁に使用され、かつ、カラムが高いカーディナリティ(多くの異なる値が含まれている)を有している場合には、Z-ORDER BY
を使いましょう。Delta Lakeはカラムの値に基づき自動的にデータをレイアウトし、クエリーに関係のないデータをスキップするのにレイアウト情報を使用します。
詳細は、Z-Ordering (multi-dimensional clustering)を参照ください。
正しいパーティション列を選択する
カラムを指定してDeltaテーブルのパーティションを作成することができます。一般的にパーティションに用いられるカラムは「日付」となります。
- 非常にカーディナリティが高いカラムでパーティションを作るべきではありません。例えば、100万人のユーザーが存在する場合、「ユーザーID」でパーティションを作成するのは良いパーティション戦略とは言えません。
- パーティションのデータ量:あるカラムを指定してパーティションを作成した結果、パーティション毎のデータ量が最低でも1Gバイトになるのであれば、そのカラムを指定してパーティションを作成すべきです。
ファイルをコンパクトに
Deltaテーブルに継続的にデータを書き込んでいくと、大量のファイルが作成されることになります。小規模なバッチ処理でデータを追加していくと、この現象は顕著になります。
これはテーブル読み込みの性能劣化の原因にもなりますし、ファイルシステムの性能にも影響を及ぼします。理想的には、大量の小さいファイルは、少量の大きいファイルにまとめられるべきです。これがファイル圧縮(compaction)です。
OPTIMIZEコマンドを使用することでテーブルをコンパクトにすることができます。
テーブルのコンテンツ、スキーマの置換
データの不具合を解消する、スキーマの変更に対応する(カラムのdrop、カラムのデータタイプの変更)などの目的で、テーブルをリプレースする際に、Deltaファイルが格納されているディレクトリを削除して、新たに同じ場所にテーブルを作成したくなりますが、以下の理由からお勧めできません。
- ディレクトリの削除は効率的ではありません。ディレクトリには非常に大きファイルが格納されているため、削除に数時間あるいは数日を要します。
- データは完全に削除されるため、操作を誤った場合のリカバリーが困難です。
- ディレクトリ削除処理はatomic(原子的)ではないため、削除処理中にもかかわらず不完全なテーブルが参照されてしまいます。
- 結果的整合性を保証するS3においては、整合性の問題に直面するかもしれません。
スキーマを変更する必要がなければ、Deltaテーブルからレコードを削除(DELETE)し、新たにレコードを追加(INSERT)するか、不正な値を修正するために更新(UPDATE)してください。
スキーマを変更する場合には、以下のように「overWriteSchema」オプションを指定してテーブルを上書きしてください。
Python
dataframe.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy(<your-partition-columns>) \
.saveAsTable("<your-table>") # Managed table
dataframe.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.option("path", "<your-table-path>") \
.partitionBy(<your-partition-columns>) \
.saveAsTable("<your-table>") # External table
SQL
REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) LOCATION "<your-table-path>" AS SELECT ... -- External table
Scala
dataframe.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy(<your-partition-columns>)
.saveAsTable("<your-table>") // Managed table
dataframe.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.option("path", "<your-table-path>")
.partitionBy(<your-partition-columns>)
.saveAsTable("<your-table>") // External table
このアプローチをとることで、大量データの削除が不要になため高速であることに加え、Time TravelによるリカバリやACID特性の担保が可能になります。
また、ストレージのコストを節約するためにテーブル更新後の古いファイルを削除したい場合には、VACUUMを使用することで削除することができます。これはファイル削除に最適化されており、ディレクトリ全体を削除するより高速です。
Sparkのキャッシング
Sparkのキャッシングを利用することもお勧めして居ます。
Python
df = spark.read.delta("/some/path")
df .cache()
ロールアップを繰り返し実行するなど、高コストの集計やJOINの結果を複数回参照する際にのみ、この方法は有効です。
それ以外の場合には、Deltaテーブルに対してこの方法を使わないでください。なぜならば、
- キャッシュされたデータフレームでは、フィルターが追加された際にデータスキッピングを利用できません。
- 異なる識別子でアクセスされ、テーブルが更新された場合にはキャッシュされたデータは更新されません。(例えば、
spark.table(x).cache()
の後にspark.write.save(/some/path)
でテーブルを更新)