Selectively overwrite data with Delta Lake | Databricks on AWS [2023/2/3時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricksでは選択的上書き処理で2つの異なるオプションをサポートしているDelta Lakeの機能を活用しています。
-
replaceWhere
は指定された述語にマッチするすべてのレコードを原子性を持って置き換えるオプションです。 - 動的パーティション上書きを用いることで、テーブルがどのようにパーティショニングされているのかに基づいてデータディレクトリを置き換えることができます。
replaceWhere
による任意かつ選択的な上書き
任意のエクスプレッションにマッチするデータのみを選択的に上書きすることができます。この機能はDatabricks Runtime 9.1 LTS以降のデータフレームとDatabricks Runtime 12.0以降のSQLでサポートされています。
以下のコマンドは、start_date
でパーティショニングされているターゲットテーブルにおいて1月のイベントをreplace_dataのデータで置き換えます。
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
)
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
INSERT INTO TABLE events REPLACE WHERE start_data >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
このサンプルコードはreplace_data
のデータを書き出し、すべてが述語にマッチすることを検証し、原子的な置換を実行します。術後にすべてがマッチしないデータを書き出したい場合に、ターゲットテーブルでマッチする行を置き換えるために、spark.databricks.delta.replaceWhere.constraintCheck.enabled
をfalseに設定することでこの制約チェックを無効にすることができます。
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Databricksランタイム9.0以前では、replaceWhere
はパーティションカラムに対する述語にマッチするデータのみを上書きします。以下のコマンドでは、df
のデータを用いて、date
でパーティショニングされているターゲットテーブルの1月のデータを原子的に上書きします。
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
)
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
Databricksランタイム9.1以降で古い挙動に戻したい場合、spark.databricks.delta.replaceWhere.dataColumns.enabled
フラグを無効にすることができます。
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
動的パーティション上書き
プレビュー
本機能はパブリックプレビューです。
Databricksランタイム11.1以降では、パーティショニングされたテーブルで動的パーティション上書きモードをサポートしています。複数のパーティションを持つテーブルにおいては、Databricks12.0以前では、すべてのパーティションカラムが同じデータ型である場合にのみ、動的パーティション上書きをサポートしています。
動的パーティション上書きモードでは、書き込み処理が新規のデータをコミットする論理的パーティションのそれぞれのすべての既存データを上書きします。書き込み処理の対象データを持たないすべての既存論理パーティションは変更されません。このモードは、データが上書きモードで書き込まれる際にのみ適用されます: SQLにおけるINSERT OVERWRITE
、df.write.mode("overwrite")
によるデータフレーム書き込みのいずれかです。
Sparkセッション設定spark.sql.sources.partitionOverwriteMode
をdynamic
に設定することで、動的パーティション上書きモードを設定します。また、DataFrameWriter
のオプションpartitionOverwriteMode
をdynamic
に設定することでこの機能を有効化することもできます。有効化されている場合、クエリー固有の設定がセッション設定で定義されているモードを上書きします。partitionOverwriteMode
のデフォルトはstatic
です。
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
注意
動的パーティション上書きは、パーティショニングされているテーブルのreplaceWhere
オプションと競合します。
- Sparkセッション設定で動的パーティション上書きが有効化されており、
DataFrameWriter
のオプションでreplaceWhere
が設定されていると、Delta LakeはreplaceWhere
エクスプレッションに従ってデータを上書きします(クエリー固有オプションがセッション設定を上書き)。 -
DataFrameWriter
のオプションで動的パーティション上書きとrelplaceWhere
の両方が有効化されているとエラーとなります。
重要!
動的パーテイション上書きで書き込まれたデータが期待されるパーティションのみを更新していることを検証してください。不正なパーティションに単一行があると、パーティション全体に対する意図しない上書きを引き起こすことがあります。どのデータを上書きするのかを指定するためにreplaceWhere
を使用することをお勧めします。
意図せずパーティションが上書きされた場合には、変更を取り消すためにrestoreを活用することができます。