2
1

More than 3 years have passed since last update.

Best practices: Delta Lake | Databricks on AWS [2021/4/14時点]の翻訳に一部追記したものです。

Delta Lakeを使用する際のベストプラクティスをご紹介します。

データ位置のヒントを指定する

あるカラムがクエリーの検索条件に頻繁に使用され、かつ、カラムが高いカーディナリティ(多くの異なる値が含まれている)を有している場合には、Z-ORDER BYを使いましょう。Delta Lakeはカラムの値に基づき自動的にデータをレイアウトし、クエリーに関係のないデータをスキップするのにレイアウト情報を使用します。

詳細は、Z-Ordering (multi-dimensional clustering)を参照ください。

正しいパーティション列を選択する

カラムを指定してDeltaテーブルのパーティションを作成することができます。一般的にパーティションに用いられるカラムは「日付」となります。

  • 非常にカーディナリティが高いカラムでパーティションを作るべきではありません。例えば、100万人のユーザーが存在する場合、「ユーザーID」でパーティションを作成するのは良いパーティション戦略とは言えません。
  • パーティションのデータ量:あるカラムを指定してパーティションを作成した結果、パーティション毎のデータ量が最低でも1Gバイトになるのであれば、そのカラムを指定してパーティションを作成すべきです。

ファイルをコンパクトに

Deltaテーブルに継続的にデータを書き込んでいくと、大量のファイルが作成されることになります。小規模なバッチ処理でデータを追加していくと、この現象は顕著になります。

これはテーブル読み込みの性能劣化の原因にもなりますし、ファイルシステムの性能にも影響を及ぼします。理想的には、大量の小さいファイルは、少量の大きいファイルにまとめられるべきです。これがファイル圧縮(compaction)です。

OPTIMIZEコマンドを使用することでテーブルをコンパクトにすることができます。

テーブルのコンテンツ、スキーマの置換

データの不具合を解消する、スキーマの変更に対応する(カラムのdrop、カラムのデータタイプの変更)などの目的で、テーブルをリプレースする際に、Deltaファイルが格納されているディレクトリを削除して、新たに同じ場所にテーブルを作成したくなりますが、以下の理由からお勧めできません。

  • ディレクトリの削除は効率的ではありません。ディレクトリには非常に大きファイルが格納されているため、削除に数時間あるいは数日を要します。
  • データは完全に削除されるため、操作を誤った場合のリカバリーが困難です。
  • ディレクトリ削除処理はatomic(原子的)ではないため、削除処理中にもかかわらず不完全なテーブルが参照されてしまいます。
  • 結果的整合性を保証するS3においては、整合性の問題に直面するかもしれません。

スキーマを変更する必要がなければ、Deltaテーブルからレコードを削除(DELETE)し、新たにレコードを追加(INSERT)するか、不正な値を修正するために更新(UPDATE)してください。

スキーマを変更する場合には、以下のように「overWriteSchema」オプションを指定してテーブルを上書きしてください。

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .partitionBy(<your-partition-columns>) \
  .saveAsTable("<your-table>") # Managed table
dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .partitionBy(<your-partition-columns>) \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .partitionBy(<your-partition-columns>)
  .saveAsTable("<your-table>") // Managed table
dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .partitionBy(<your-partition-columns>)
  .saveAsTable("<your-table>") // External table

このアプローチをとることで、大量データの削除が不要になため高速であることに加え、Time TravelによるリカバリやACID特性の担保が可能になります。

また、ストレージのコストを節約するためにテーブル更新後の古いファイルを削除したい場合には、VACUUMを使用することで削除することができます。これはファイル削除に最適化されており、ディレクトリ全体を削除するより高速です。

Sparkのキャッシング

Sparkのキャッシングを利用することもお勧めして居ます。

Python

df = spark.read.delta("/some/path")
df .cache()

ロールアップを繰り返し実行するなど、高コストの集計やJOINの結果を複数回参照する際にのみ、この方法は有効です。

それ以外の場合には、Deltaテーブルに対してこの方法を使わないでください。なぜならば、

  • キャッシュされたデータフレームでは、フィルターが追加された際にデータスキッピングを利用できません。
  • 異なる識別子でアクセスされ、テーブルが更新された場合にはキャッシュされたデータは更新されません。(例えば、spark.table(x).cache()の後にspark.write.save(/some/path)でテーブルを更新)

Databricks 無料トライアル

Databricks 無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1