Table deletes, updates, and merges | Databricks on AWS
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta Lakeでは、Deltaテーブルからのデータの削除、データの更新を可能にするいくつかのステートメントをサポートしています。
Delta Lakeにおけるデータの削除、更新に関する概要、デモンストレーションに関しては、以下のYoutubeをご覧ください(54分)。
Delta Lakeにおけるチェンジデータキャプチャに関しては、以下のYoutubeをご覧ください(53分)。
テーブルからのdelete
Deltaテーブルから述語にマッチするデータを削除することができます。例えば、/tmp/delta/people-10m
のパスにあるテーブル、あるいはpeople10m
というテーブルにおいて、birthDate
カラムの値が1955
より少ない値を持つ人に対応する全ての行を削除するには以下を実行します。
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
注意
Python APIはDatabricksランタイム6.1以降で利用できます。
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
注意
Scala APIはDatabricksランタイム6.0以降で利用できます。
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
注意
Java APIはDatabricksランタイム6.0以降で利用できます。
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'");
// Declare the predicate by using Spark SQL functions.
deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));
詳細に関してはDelta Lake APIをご覧ください。
重要!
delete
は最新バージョンのDeltaテーブルからデータを削除しますが、明示的に古いバージョンがvacuumされるまでは、物理ストレージからは削除されません。詳細はvacuumをご覧ください。
ティップス
可能であれば、述語がオペレーションを劇的に高速化できるように、パーティショニングされたDeltaテーブルにおけるパーティションカラムに対して述語を指定するようにしてください。
テーブルのupdate
Deltaテーブルにおいて述語にマッチするデータを更新することができます。例えば、/tmp/delta/people-10m
のパスにあるテーブル、あるいはpeople10m
というテーブルにおいて、gender
カラムのMale
、Female
を略字M
、F
に変更するには以下を実行します。
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
注意
Python APIはDatabricksランタイム6.1以降で利用できます。
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
注意
Scala APIはDatabricksランタイム6.0以降で利用できます。
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
注意
Java APIはDatabricksランタイム6.0以降で利用できます。
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
new HashMap<String, String>() {{
put("gender", "'Female'");
}}
);
// Declare the predicate by using Spark SQL functions.
deltaTable.update(
functions.col(gender).eq("M"),
new HashMap<String, Column>() {{
put("gender", functions.lit("Male"));
}}
);
詳細に関してはDelta Lake APIをご覧ください。
ティップス
deleteと同様に、パーティショニングされたDeltaテーブルのパーティションカラムに対してupdateを行うことでオペレーション高速化することができます。
mergeを用いたテーブルへのupsert
SQLオペレーションMERGE
を用いることで、ターゲットDeltaテーブルにソーステーブル、ビュー、データフレームをupsertすることができます。Delta LakeはMERGE
でinsert、update、deleteをサポートしており、高度なユースケースに対応できるようにSQLの標準を超えた拡張文法に対応しています。
people10m
というターゲットテーブル、あるいは/tmp/delta/people-10m
のターゲットパスに対して、新規データを保有するソーステーブルpeople10mupdates
あるいは、ソースパス/tmp/delta/people-10m-updates
があるものとします。これらの新規レコードのいくつかはターゲットデータに既に含まれています。新規データをマージするために、個人のid
が既に存在する行は更新し、マッチするid
が存在しない場合には新規に行を追加したいと考えています。この場合、以下を実行することができます。
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")
deltaTable
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched()
.updateExpr(
new HashMap<String, String>() {{
put("id", "updates.id");
put("firstName", "updates.firstName");
put("middleName", "updates.middleName");
put("lastName", "updates.lastName");
put("gender", "updates.gender");
put("birthDate", "updates.birthDate");
put("ssn", "updates.ssn");
put("salary", "updates.salary");
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, String>() {{
put("id", "updates.id");
put("firstName", "updates.firstName");
put("middleName", "updates.middleName");
put("lastName", "updates.lastName");
put("gender", "updates.gender");
put("birthDate", "updates.birthDate");
put("ssn", "updates.ssn");
put("salary", "updates.salary");
}})
.execute();
Scala、Java、Pythonの文法の詳細については、Delta Lake APIをご覧ください。
Delta Lakeのmergeオペレーションでは、通常ソースデータに対する2つのパスが必要となります。ソースデータに非決定論的表現が含まれる場合、ソースデータに対する複数のパスが異なる行を生成し、一貫性のない結果となる場合があります。非決定論的表現の一般的な例としては、current_date
、current_timestamp
関数が挙げられます。非決定論的関数の使用が避けられない場合、ソースデータを例えば一時的なDeltaテーブルのようなストレージに保存することを検討してください。ソースデータのキャッシュはこの問題を解決しません。これは、キャッシュの無効化によってソースデータが部分的、あるいは全体的に再計算される場合があるためです(例えば、スケールダウンによってクラスターがいくつかのエグゼキューターを失った場合)。
オペレーションのセマンティクス
merge
のプログラム上のオペレーションの詳細な説明を以下に示します。
-
任意の数の
whenMatched
、whenNotMatched
句を追加することができます。注意
Databricksランタイム7.2以前では、merge
に指定できるwhenMatched
は最大2つ、whenNotMatched
は最大1つとなります。 -
whenMatched
句は一致条件に基づいてターゲットテーブルにソースの行が一致した際に実行されます。これらの句には以下のセマンティクスが存在します。-
whenMatched
句は最大1つのupdate
と最大1つのdelete
アクションを持つことができます。merge
におけるupdate
アクションは(update
オペレーションと同様に)マッチしたターゲット行の特定のカラムのみを更新します。delete
オペレーションはマッチした行を削除します。 - それぞれの
whenMatched
句にはオプションの条件を含めることができます。この句の条件が存在する際、句の条件がtrueになるソース・ターゲットの行ペアに対してのみupdate
やdelete
アクションが実行されます。 - 複数の
whenMatched
句が存在する場合、これらは指定された順序で評価されます。最後のものを除いた全てのwhenMatched
句には条件を含めなくてはなりません。 - merge条件にマッチするソース、ターゲット行のペアにおいて、すべての
whenMatched
の条件がtureと評価されない場合、ターゲットの行は変更されません。 - ソースデータセットで対応するカラムでターゲットのDeltaテーブルの全てのカラムを更新するには、
whenMatched(...).updateAll()
を使ってください。これは、ターゲットのDeltaテーブルの全てのカラムに対する以下の処理と等価です。
ScalawhenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
このため、このアクションではソーステーブルのカラムはターゲットテーブルのカラムが同じであることを前提とします。そうでない場合、クエリーは解析エラーをスローします。
注意
この挙動は、自動スキーママイグレーションが有効化されている場合には変化します。詳細は自動スキーマエボリューションをご覧下さい。 -
-
whenNotMatched
句は、マッチ条件に基づいてソースの行がターゲットの行にマッチしなかった場合に実行されます。これらの句には以下のセマンティクスが存在します。-
whenNotMatched
句にはinsert
アクションのみを含めることができます。指定されたカラムと対応する表現に基づいて新規の行が生成されます。ターゲットテーブルの全てのカラムを指定する必要はありません。指定されないターゲットカラムはNULL
となります。
注意
Databricksランタイム6.5以下では、INSERT
アクションにターゲットテーブルの全てのカラムを指定しなくてはなりません。- それぞれの
whenNotMatched
句にはオプションの条件を含めることができます。句の条件が存在する際、当該行でこの条件がtrueになった場合になった時のみソースの行がインサートされます。それ以外の場合には、ソースカラムは無視されます。 - 複数の
whenNotMatched
句が存在する場合、指定された順番で評価されます。最後のものを除く全てのwhenNotMatched
には条件を含めなくてはなりません。 - ソースデータセットに対応するカラムを用いて、ターゲットのDeltaテーブルに全てのカラムをインサートするには、
whenNotMatched(...).insertAll()
を使ってください。これは、ターゲットDeltaテーブルの全てのカラムに対する以下の処理と等価です。
ScalawhenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
このため、このアクションではソーステーブルのカラムはターゲットテーブルのカラムが同じであることを前提とします。そうでない場合、クエリーは解析エラーをスローします。
注意
この挙動は、自動スキーママイグレーションが有効化されている場合には変化します。詳細は自動スキーマエボリューションをご覧下さい。 -
重要!
- ソースデータセットの複数行がマッチし、
merge
がターゲットDeltaテーブルの同じ行を更新しようとした際にmerge
オペレーションが失敗する場合があります。mergeのSQLセマンティクスによると、マッチしたターゲットの行をどのソースの行を用いてアップデートすべきかが明確でないため、このようなupdateオペレーションは曖昧なものとなります。複数マッチの可能性を排除するためにソーステーブルの前処理を行うことができます。ターゲットDeltaテーブルに変更を適用する前に、それぞれのキーに対する最新の変更のみを保持するために変更データセットをどのように処理するのかを説明するチェンジデータキャプチャのサンプルをご覧ください。 - ソースデータセットが決定論的でない場合、
merge
オペレーションが不正な結果を引き起こすことがあります。これは、merge
がソースデータセットに対して2回のスキャンを実行することがあり、2つのスキャンによって生成されるデータが異なる場合、テーブルに対して最終的に適用される変更が不正となるためです。ソースにおける非決定論的な状況は多くの形で起こり得ます。いくつかの例を示します。- 非Deltaテーブルからの読み込み。例えば、複数回のスキャンの間にファイルが変更されることがあるCSVテーブルからの読み込み。
- 非決定論的オペレーションの使用。例えば、データをフィルタリングするために現在のタイムスタンプを使用する
Dataset.filter()
オペレーションは、複数回のスキャンにおいて異なる結果を生み出すことがあります。
- ビューが
CREATE VIEW viewName AS SELECT * FROM deltaTable
で定義されている場合にのみ、SQLのビューに対してSQLのMERGE
オペレーションを実行することができます。
注意
Databricksランタイム7.3LTS以降では、マッチしたものが無条件で削除される場合、複数のマッチは許可されます(複数マッチがあったとしても、無条件の削除は曖昧ではないため)。
スキーマ検証
merge
は、insertやupdateによって生成されたデータのスキーマがテーブルのスキーマと互換性があるかどうかを自動で検証します。merge
オペレーションに互換性があるかどうかを決定するために以下のルールを使用します。
-
update
、insert
アクションでは、指定されたターゲットのカラムはターゲットのDeltaテーブルに存在しなくてはなりません。 -
updateAll
、insertAll
アクションでは、ソースデータセットはターゲットのDeltaテーブルの全てのカラムを持っていなくてはなりません。ソースデータセットでは追加のカラムがあっても構いませんが、それらは無視されます。 - 全てのアクションにおいて、表現によって生成されるターゲットカラムのデータ型がターゲットのDeltaテーブルに対応するカラムと異なる場合、
merge
はテーブルの型にキャストしようとします。
自動スキーマエボリューション
注意
merge
におけるスキーマエボリューションはDatabricksランタイム6.6以降で利用できます。
デフォルトでは、updateAll
やinsertAll
はソースデータセットの全てのカラムをターゲットDeltaテーブルの同じ名前のカラムに割り当てます。ターゲットテーブルに存在しないソースデータセットのカラムは無視されます。しかし、いくつかのユースケースにおいては、ターゲットのDeltaテーブルにソースのカラムを追加することが望ましい場合があります。updateAll
やinsertAll
を用いたmerge
オペレーションにおいて自動でテーブルスキーマを更新するために、merge
オペレーションを実行する前にSparkセッション設定のspark.databricks.delta.schema.autoMerge.enabled
をtrue
に設定することができます。
注意
-
updateAll
(UPDATE SET *
) あるいはinsertAll
(INSERT *
)アクション、あるいは両方が存在する場合にのみスキーマエボリューションが行われます。 - (一つの句として
updateAll
、insertAll
が存在したとしても)update
、insert
アクションでは、ターゲットテーブルに存在しないターゲットカラムを明示的に参照することができません。以下のサンプルをご覧ください。
注意
Databricksランタイム7.4以前では、merge
はトップレベルのカラムのスキーマエボリューションのみをサポートしており、ネストされたカラムではサポートしていません。
スキーマエボリューションがある場合、ない場合におけるmerge
オペレーションの影響の例を示します。
サンプルは原文を参照ください。
structの配列を含むスキーマ固有の検討事項
DeltaのMERGE INTO
は、名前を用いたstructフィールドの解決とstructの配列のスキーマの進化をサポートしています。スキーマエボリューションが有効化された際、ターゲットテーブルはstructの配列に対しても進化し、配列内にネストされたstructに対しても動作します。
注意
この機能はDatabricksランタイム9.1以降で利用できます。Databricksランタイム9.0以前では、structのフィールドを位置で解決するためにstructの配列に暗黙的なSparkのキャスト処理が使用され、配列外のstructの挙動によって、配列内のstrcutのスキーマ進化の有無によってmergeオペレーションの効果には一貫性が無いものになります。
structの配列に対するスキーマ進化の有無によるmergeオペレーションの効果の例を示します。
サンプルに関しては原文を参照ください。
パフォーマンスチューニング
以下のアプローチを取ることで、mergeに要する時間を短縮することができます。
-
マッチにおける探索空間を削減する: デフォルトでは、
merge
オペレーションはソーステーブルのマッチングを特定するためにDeltaテーブル全体を探索します。merge
をスピードアップする方法は、マッチ条件に既知の制約を追加することで探索空間を削減するというものです。例えば、テーブルがcountry
とdate
でパーティショニングされており、最新の日付と特定の国に対する情報を更新するためにmerge
を使用するにであれば、以下の条件を追加することで、適切なパーティションにのみマッチするようになるので、クエリーが劇的に高速になります。SQLevents.date = current_date() AND events.country = 'USA'
さらに、他の同時実行オペレーションと競合する可能性を削減することができます。詳細はコンカレンシーコントロールを参照ください。
-
ファイルをコンパクトにする: 大量の小さいファイルにデータが格納されていると、マッチングを検索するためのデータの読み込みが遅くなります。読み込みのスループットを開演するために小さいファイルを大きなファイルにコンパクトにすることができます。詳細はファイルのコンパクト化を参照ください。
-
書き込みにおけるシャッフルパーティションを制御する:
merge
オペレーションは計算処理、更新データの書き込みのために複数回データをシャッフルします。シャッフルに使用されるタスクの数はSparkセッション設定spark.sql.shuffle.partitions
で制御することができます。このパラメーターを設定することで、並列度を制御できるだけではなく、出力ファイルの数を決定することもできます。値を増やすことで並列度を高めることができますが、小さなファイルがたくさん生成されることになります。 -
最適化書き込みを有効化する: パーティショニングされたテーブルに対して、
merge
はシャッフルパーティションの数よりも遥かに多い小さなファイルを作り出します。これは、それぞれのシャッフルタスクが複数のパーティションに複数のファイルを書き込むためであり、パフォーマンス上のボトルネックとなり得ます。最適化書き込みを有効化することでファイルの数を削減することができます。注意
Databricksランタイム7.4以降では、パーティショニングされたテーブルに対するmerge
オペレーションでは最適化書き込みは自動で有効化されます。 -
テーブルのファイルサイズをチューニングする: Databricksランタイム8.2以降では、Databricksが自動でDeltaテーブルにファイルを再書き込みする
merge
オペレーションが頻繁に行われることを検知し、将来的にさらにファイルが再書き込みされることを予測して、再書き込みファイルのサイズを削減する選択をすることがあります。詳細はファイルサイズのチューニングを参照ください。 -
低シャッフルマージ: Databricksランタイム9.0以降では、Low Shuffle Mergeが多くの一般的なワークロードに多いて優れた性能を提供する
MERGE
の最適化された実装を提供します。さらに、変更前のデータに対するZ-orderingのような既存のデータレイアウトの最適化はそのまま保持します。
mergeのサンプル
こちらでは、異なるシナリオでmerge
をどのように使用するのかを説明するサンプルを示します。
Deltaテーブルに書き込む際のデータの重複排除
一般的なETLユースケースは、収集したログをDeltaテーブルに追加するというものです。しかし、多くの場合、ソースでは重複したログレコードを生成し、後段では重複排除のステップが必要となり、取り扱いに注意しなくてはなりません。merge
を用いることで、重複したレコードのインサートを回避することができます。
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
注意
新たなログを含むデータセットは自身の中で重複排除する必要があります。mergeのSQLセマンティクスによって、テーブルの既存のデータとマッチングを行い、新規データの重複を排除しますが、新規のデータセット内に重複データが存在する場合は、それらはインサートされます。このため、テーブルにマージする前に新規データの重複を排除してください。
数日間において重複するレコードを取得することを知っているのであれば、テーブルを日付でパーティショニングし、マッチするターゲットすテーブルの日付レンジを指定することで、さらにクエリーを最適化することができます。
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
これはテーブル全体ではなく、最新7日寛の重複のみを検索するので、前のコマンドよりも効率的です。さらに、ログに対する連続的な重複排除処理を行うために、このインサートのみのmergeを構造化ストリーミングで使用することもできます。
- ストリーミングクエリーでは、重複排除とDeltaテーブルへのストリーミングデータの書き込みを継続的に行うために、
foreachBatch
の中でmergeオペレーションを使用することができます。foreachBatch
の詳細に関しては、以下のストリーミングのサンプルをご覧ください。 - 別のストリーミングクエリーにおいては、このDeltaテーブルから重複排除されたデータを連続的に読み込みを行うことができます。インサートのみのmergeはDeltaテーブルに新規データの追加のみを行うため、このようなことが可能となります。
注意
インサートのみのmergeは、Databricksランタイム6.2以降におけるデータの追加でのみ最適化されています。Databricksランタイム6.1以前では、インサートのみのmergeオペレーションからの書き込みはストリームから読み込みを行うことはできません。
Deltaテーブルに対するSCD(slowly changing data) Type 2のオペレーション
別の一般的なオペレーションは、ディメンジョナルテーブルのそれぞれのキーになされた全ての変更を保持するSCD Type 2です。このようなオペレーションでは、キーの以前の値を古いものとマークするために既存の行の更新と、最新の値として新規の行を追加する必要があります。ソーステーブルを更新内容、ターゲットテーブルをディメンジョナルテーブル富なすことで、SCD Type 2をmerge
と表現することができます。
以下に、顧客の住所の履歴とそれぞれの住所が有効な期間をを保持する完全なサンプルを示します。顧客の住所を更新する必要がある場合、以前の住所は現在のものではないということをマークし、有効期間を更新し、現在のものとして新たな住所を追加しなくてはなりません。
mergeを用いたSCD Type2のノートブック
Deltaテーブルへのチェンジデータの書き込み
SCDと同様、一般的な別のユースケースはチェンジデータキャプチャ(CDC)と呼ばれるものであり、外部データベースで行われた全てのデータ変更をDeltaテーブルに適用するというものです。言い換えると、外部テーブルに適用された一連のupdate、delete、insertをDeltaテーブルに適用する必要があります。以下のようにmerge
を用いることで、これを実現することができます。
mergeを用いたチェンジデータ書き込みのノートブック
foreachBatch
を用いたストリーミングクエリーからのupsert
ストリーミングクエリーからの複雑なupsertをDeltaテーブルに書き込むためにmerge
とforeachBatch
(詳細に関してはforeachBatchをご覧下しあ)を組み合わせて使用することができます。例えば、
- Updateモードでストリーミングの集計結果を書き込む: Completeモードよりもはるかに効率的です。
-
データベースの変更ストリームをDeltaテーブルに書き込む: 変更ストームを連続的にDeltaテーブルに適用するために、チェンジデータを書き込むmergeクエリーを
foreachBatch
の中で使用することができます。 -
重複排除を行なったストリームデータをDeltaテーブルに書き込む: 自動重複排除を行なって連続的にデータをDeltaテーブルに書き込むために、
foreachBatch
の中で重複排除のためのインサートのみのmergeクエリーを使用することができます。
注意
- ストリーミングクエリーの再起動によって同じデータバッチに対してオペレーションが複数回てきようされることがあるため、
foreachBatch
内のmerge
文に冪等性があることを確認してください。 -
foreachBatch
の中でmerge
を使用する際、ストリーミングクエリーの入力データレート(StreamingQueryProgress
としてレポートされ、ノートブック上のレートグラフで確認できます)がソースでデータが生成された実際のレートの倍数として報告されることがあります。これは、merge
が入力データを複数回読み込むことで、入力メトリクスが乗算されることによるものです。これがボトルネックの場合、merge
の前にでバッチのデータフレームをキャッシュし、merge
の後にキャッシュを削除してください。
mergeとforeachBatchを用いたアップデートモードでのストリーミング集計処理の書き込みのノートブック