3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Deltaテーブルのdelete、update、merge

Last updated at Posted at 2022-07-05

Table deletes, updates, and merges | Databricks on AWS

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Delta Lakeでは、Deltaテーブルからのデータの削除、データの更新を可能にするいくつかのステートメントをサポートしています。

Delta Lakeにおけるデータの削除、更新に関する概要、デモンストレーションに関しては、以下のYoutubeをご覧ください(54分)。

Delta Lakeにおけるチェンジデータキャプチャに関しては、以下のYoutubeをご覧ください(53分)。

テーブルからのdelete

Deltaテーブルから述語にマッチするデータを削除することができます。例えば、/tmp/delta/people-10mのパスにあるテーブル、あるいはpeople10mというテーブルにおいて、birthDateカラムの値が1955より少ない値を持つ人に対応する全ての行を削除するには以下を実行します。

SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

注意
Python APIはDatabricksランタイム6.1以降で利用できます。

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

注意
Scala APIはDatabricksランタイム6.0以降で利用できます。

Scala
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

注意
Java APIはDatabricksランタイム6.0以降で利用できます。

Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'");

// Declare the predicate by using Spark SQL functions.
deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));

詳細に関してはDelta Lake APIをご覧ください。

重要!
deleteは最新バージョンのDeltaテーブルからデータを削除しますが、明示的に古いバージョンがvacuumされるまでは、物理ストレージからは削除されません。詳細はvacuumをご覧ください。

ティップス
可能であれば、述語がオペレーションを劇的に高速化できるように、パーティショニングされたDeltaテーブルにおけるパーティションカラムに対して述語を指定するようにしてください。

テーブルのupdate

Deltaテーブルにおいて述語にマッチするデータを更新することができます。例えば、/tmp/delta/people-10mのパスにあるテーブル、あるいはpeople10mというテーブルにおいて、genderカラムのMaleFemaleを略字MFに変更するには以下を実行します。

SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

注意
Python APIはDatabricksランタイム6.1以降で利用できます。

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

注意
Scala APIはDatabricksランタイム6.0以降で利用できます。

Scala
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

注意
Java APIはDatabricksランタイム6.0以降で利用できます。

Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  new HashMap<String, String>() {{
    put("gender", "'Female'");
  }}
);

// Declare the predicate by using Spark SQL functions.
deltaTable.update(
  functions.col(gender).eq("M"),
  new HashMap<String, Column>() {{
    put("gender", functions.lit("Male"));
  }}
);

詳細に関してはDelta Lake APIをご覧ください。

ティップス
deleteと同様に、パーティショニングされたDeltaテーブルのパーティションカラムに対してupdateを行うことでオペレーション高速化することができます。

mergeを用いたテーブルへのupsert

SQLオペレーションMERGEを用いることで、ターゲットDeltaテーブルにソーステーブル、ビュー、データフレームをupsertすることができます。Delta LakeはMERGEでinsert、update、deleteをサポートしており、高度なユースケースに対応できるようにSQLの標準を超えた拡張文法に対応しています。

people10mというターゲットテーブル、あるいは/tmp/delta/people-10mのターゲットパスに対して、新規データを保有するソーステーブルpeople10mupdatesあるいは、ソースパス/tmp/delta/people-10m-updatesがあるものとします。これらの新規レコードのいくつかはターゲットデータに既に含まれています。新規データをマージするために、個人のidが既に存在する行は更新し、マッチするidが存在しない場合には新規に行を追加したいと考えています。この場合、以下を実行することができます。

SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )
Python
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")

deltaTable
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("id", "updates.id");
      put("firstName", "updates.firstName");
      put("middleName", "updates.middleName");
      put("lastName", "updates.lastName");
      put("gender", "updates.gender");
      put("birthDate", "updates.birthDate");
      put("ssn", "updates.ssn");
      put("salary", "updates.salary");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("id", "updates.id");
      put("firstName", "updates.firstName");
      put("middleName", "updates.middleName");
      put("lastName", "updates.lastName");
      put("gender", "updates.gender");
      put("birthDate", "updates.birthDate");
      put("ssn", "updates.ssn");
      put("salary", "updates.salary");
    }})
  .execute();

Scala、Java、Pythonの文法の詳細については、Delta Lake APIをご覧ください。

Delta Lakeのmergeオペレーションでは、通常ソースデータに対する2つのパスが必要となります。ソースデータに非決定論的表現が含まれる場合、ソースデータに対する複数のパスが異なる行を生成し、一貫性のない結果となる場合があります。非決定論的表現の一般的な例としては、current_datecurrent_timestamp関数が挙げられます。非決定論的関数の使用が避けられない場合、ソースデータを例えば一時的なDeltaテーブルのようなストレージに保存することを検討してください。ソースデータのキャッシュはこの問題を解決しません。これは、キャッシュの無効化によってソースデータが部分的、あるいは全体的に再計算される場合があるためです(例えば、スケールダウンによってクラスターがいくつかのエグゼキューターを失った場合)。

オペレーションのセマンティクス

mergeのプログラム上のオペレーションの詳細な説明を以下に示します。

  • 任意の数のwhenMatchedwhenNotMatched句を追加することができます。

    注意
    Databricksランタイム7.2以前では、mergeに指定できるwhenMatchedは最大2つ、whenNotMatchedは最大1つとなります。

  • whenMatched句は一致条件に基づいてターゲットテーブルにソースの行が一致した際に実行されます。これらの句には以下のセマンティクスが存在します。

    • whenMatched句は最大1つのupdateと最大1つのdeleteアクションを持つことができます。mergeにおけるupdateアクションは(updateオペレーションと同様に)マッチしたターゲット行の特定のカラムのみを更新します。deleteオペレーションはマッチした行を削除します。
    • それぞれのwhenMatched句にはオプションの条件を含めることができます。この句の条件が存在する際、句の条件がtrueになるソース・ターゲットの行ペアに対してのみupdatedeleteアクションが実行されます。
    • 複数のwhenMatched句が存在する場合、これらは指定された順序で評価されます。最後のものを除いた全てのwhenMatched句には条件を含めなくてはなりません。
    • merge条件にマッチするソース、ターゲット行のペアにおいて、すべてのwhenMatchedの条件がtureと評価されない場合、ターゲットの行は変更されません。
    • ソースデータセットで対応するカラムでターゲットのDeltaテーブルの全てのカラムを更新するには、whenMatched(...).updateAll()を使ってください。これは、ターゲットのDeltaテーブルの全てのカラムに対する以下の処理と等価です。
    Scala
    whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
    

    このため、このアクションではソーステーブルのカラムはターゲットテーブルのカラムが同じであることを前提とします。そうでない場合、クエリーは解析エラーをスローします。

    注意
    この挙動は、自動スキーママイグレーションが有効化されている場合には変化します。詳細は自動スキーマエボリューションをご覧下さい。

  • whenNotMatched句は、マッチ条件に基づいてソースの行がターゲットの行にマッチしなかった場合に実行されます。これらの句には以下のセマンティクスが存在します。

    • whenNotMatched句にはinsertアクションのみを含めることができます。指定されたカラムと対応する表現に基づいて新規の行が生成されます。ターゲットテーブルの全てのカラムを指定する必要はありません。指定されないターゲットカラムはNULLとなります。

    注意
    Databricksランタイム6.5以下では、INSERTアクションにターゲットテーブルの全てのカラムを指定しなくてはなりません。

    • それぞれのwhenNotMatched句にはオプションの条件を含めることができます。句の条件が存在する際、当該行でこの条件がtrueになった場合になった時のみソースの行がインサートされます。それ以外の場合には、ソースカラムは無視されます。
    • 複数のwhenNotMatched句が存在する場合、指定された順番で評価されます。最後のものを除く全てのwhenNotMatchedには条件を含めなくてはなりません。
    • ソースデータセットに対応するカラムを用いて、ターゲットのDeltaテーブルに全てのカラムをインサートするには、whenNotMatched(...).insertAll()を使ってください。これは、ターゲットDeltaテーブルの全てのカラムに対する以下の処理と等価です。
    Scala
    whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
    

    このため、このアクションではソーステーブルのカラムはターゲットテーブルのカラムが同じであることを前提とします。そうでない場合、クエリーは解析エラーをスローします。

    注意
    この挙動は、自動スキーママイグレーションが有効化されている場合には変化します。詳細は自動スキーマエボリューションをご覧下さい。

重要!

  • ソースデータセットの複数行がマッチし、mergeがターゲットDeltaテーブルの同じ行を更新しようとした際にmergeオペレーションが失敗する場合があります。mergeのSQLセマンティクスによると、マッチしたターゲットの行をどのソースの行を用いてアップデートすべきかが明確でないため、このようなupdateオペレーションは曖昧なものとなります。複数マッチの可能性を排除するためにソーステーブルの前処理を行うことができます。ターゲットDeltaテーブルに変更を適用する前に、それぞれのキーに対する最新の変更のみを保持するために変更データセットをどのように処理するのかを説明するチェンジデータキャプチャのサンプルをご覧ください。
  • ソースデータセットが決定論的でない場合、mergeオペレーションが不正な結果を引き起こすことがあります。これは、mergeがソースデータセットに対して2回のスキャンを実行することがあり、2つのスキャンによって生成されるデータが異なる場合、テーブルに対して最終的に適用される変更が不正となるためです。ソースにおける非決定論的な状況は多くの形で起こり得ます。いくつかの例を示します。
    • 非Deltaテーブルからの読み込み。例えば、複数回のスキャンの間にファイルが変更されることがあるCSVテーブルからの読み込み。
    • 非決定論的オペレーションの使用。例えば、データをフィルタリングするために現在のタイムスタンプを使用するDataset.filter()オペレーションは、複数回のスキャンにおいて異なる結果を生み出すことがあります。
  • ビューがCREATE VIEW viewName AS SELECT * FROM deltaTableで定義されている場合にのみ、SQLのビューに対してSQLのMERGEオペレーションを実行することができます。

注意
Databricksランタイム7.3LTS以降では、マッチしたものが無条件で削除される場合、複数のマッチは許可されます(複数マッチがあったとしても、無条件の削除は曖昧ではないため)。

スキーマ検証

mergeは、insertやupdateによって生成されたデータのスキーマがテーブルのスキーマと互換性があるかどうかを自動で検証します。mergeオペレーションに互換性があるかどうかを決定するために以下のルールを使用します。

  • updateinsertアクションでは、指定されたターゲットのカラムはターゲットのDeltaテーブルに存在しなくてはなりません。
  • updateAllinsertAllアクションでは、ソースデータセットはターゲットのDeltaテーブルの全てのカラムを持っていなくてはなりません。ソースデータセットでは追加のカラムがあっても構いませんが、それらは無視されます。
  • 全てのアクションにおいて、表現によって生成されるターゲットカラムのデータ型がターゲットのDeltaテーブルに対応するカラムと異なる場合、mergeはテーブルの型にキャストしようとします。

自動スキーマエボリューション

注意
mergeにおけるスキーマエボリューションはDatabricksランタイム6.6以降で利用できます。

デフォルトでは、updateAllinsertAllはソースデータセットの全てのカラムをターゲットDeltaテーブルの同じ名前のカラムに割り当てます。ターゲットテーブルに存在しないソースデータセットのカラムは無視されます。しかし、いくつかのユースケースにおいては、ターゲットのDeltaテーブルにソースのカラムを追加することが望ましい場合があります。updateAllinsertAllを用いたmergeオペレーションにおいて自動でテーブルスキーマを更新するために、mergeオペレーションを実行する前にSparkセッション設定のspark.databricks.delta.schema.autoMerge.enabledtrueに設定することができます。

注意

  • updateAll (UPDATE SET *) あるいは insertAll (INSERT *)アクション、あるいは両方が存在する場合にのみスキーマエボリューションが行われます。
  • (一つの句としてupdateAllinsertAllが存在したとしても)updateinsertアクションでは、ターゲットテーブルに存在しないターゲットカラムを明示的に参照することができません。以下のサンプルをご覧ください。

注意
Databricksランタイム7.4以前では、mergeはトップレベルのカラムのスキーマエボリューションのみをサポートしており、ネストされたカラムではサポートしていません。

スキーマエボリューションがある場合、ない場合におけるmergeオペレーションの影響の例を示します。

サンプルは原文を参照ください。

structの配列を含むスキーマ固有の検討事項

DeltaのMERGE INTOは、名前を用いたstructフィールドの解決とstructの配列のスキーマの進化をサポートしています。スキーマエボリューションが有効化された際、ターゲットテーブルはstructの配列に対しても進化し、配列内にネストされたstructに対しても動作します。

注意
この機能はDatabricksランタイム9.1以降で利用できます。Databricksランタイム9.0以前では、structのフィールドを位置で解決するためにstructの配列に暗黙的なSparkのキャスト処理が使用され、配列外のstructの挙動によって、配列内のstrcutのスキーマ進化の有無によってmergeオペレーションの効果には一貫性が無いものになります。

structの配列に対するスキーマ進化の有無によるmergeオペレーションの効果の例を示します。

サンプルに関しては原文を参照ください。

パフォーマンスチューニング

以下のアプローチを取ることで、mergeに要する時間を短縮することができます。

  • マッチにおける探索空間を削減する: デフォルトでは、mergeオペレーションはソーステーブルのマッチングを特定するためにDeltaテーブル全体を探索します。mergeをスピードアップする方法は、マッチ条件に既知の制約を追加することで探索空間を削減するというものです。例えば、テーブルがcountrydateでパーティショニングされており、最新の日付と特定の国に対する情報を更新するためにmergeを使用するにであれば、以下の条件を追加することで、適切なパーティションにのみマッチするようになるので、クエリーが劇的に高速になります。

    SQL
    events.date = current_date() AND events.country = 'USA'
    

    さらに、他の同時実行オペレーションと競合する可能性を削減することができます。詳細はコンカレンシーコントロールを参照ください。

  • ファイルをコンパクトにする: 大量の小さいファイルにデータが格納されていると、マッチングを検索するためのデータの読み込みが遅くなります。読み込みのスループットを開演するために小さいファイルを大きなファイルにコンパクトにすることができます。詳細はファイルのコンパクト化を参照ください。

  • 書き込みにおけるシャッフルパーティションを制御する: mergeオペレーションは計算処理、更新データの書き込みのために複数回データをシャッフルします。シャッフルに使用されるタスクの数はSparkセッション設定spark.sql.shuffle.partitionsで制御することができます。このパラメーターを設定することで、並列度を制御できるだけではなく、出力ファイルの数を決定することもできます。値を増やすことで並列度を高めることができますが、小さなファイルがたくさん生成されることになります。

  • 最適化書き込みを有効化する: パーティショニングされたテーブルに対して、mergeはシャッフルパーティションの数よりも遥かに多い小さなファイルを作り出します。これは、それぞれのシャッフルタスクが複数のパーティションに複数のファイルを書き込むためであり、パフォーマンス上のボトルネックとなり得ます。最適化書き込みを有効化することでファイルの数を削減することができます。

    注意
    Databricksランタイム7.4以降では、パーティショニングされたテーブルに対するmergeオペレーションでは最適化書き込みは自動で有効化されます。

  • テーブルのファイルサイズをチューニングする: Databricksランタイム8.2以降では、Databricksが自動でDeltaテーブルにファイルを再書き込みするmergeオペレーションが頻繁に行われることを検知し、将来的にさらにファイルが再書き込みされることを予測して、再書き込みファイルのサイズを削減する選択をすることがあります。詳細はファイルサイズのチューニングを参照ください。

  • 低シャッフルマージ: Databricksランタイム9.0以降では、Low Shuffle Mergeが多くの一般的なワークロードに多いて優れた性能を提供するMERGEの最適化された実装を提供します。さらに、変更前のデータに対するZ-orderingのような既存のデータレイアウトの最適化はそのまま保持します。

mergeのサンプル

こちらでは、異なるシナリオでmergeをどのように使用するのかを説明するサンプルを示します。

Deltaテーブルに書き込む際のデータの重複排除

一般的なETLユースケースは、収集したログをDeltaテーブルに追加するというものです。しかし、多くの場合、ソースでは重複したログレコードを生成し、後段では重複排除のステップが必要となり、取り扱いに注意しなくてはなりません。mergeを用いることで、重複したレコードのインサートを回避することができます。

SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
Python
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
Scala
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
Java
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

注意
新たなログを含むデータセットは自身の中で重複排除する必要があります。mergeのSQLセマンティクスによって、テーブルの既存のデータとマッチングを行い、新規データの重複を排除しますが、新規のデータセット内に重複データが存在する場合は、それらはインサートされます。このため、テーブルにマージする前に新規データの重複を排除してください。

数日間において重複するレコードを取得することを知っているのであれば、テーブルを日付でパーティショニングし、マッチするターゲットすテーブルの日付レンジを指定することで、さらにクエリーを最適化することができます。

SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
Python
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
Scala
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
Java
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

これはテーブル全体ではなく、最新7日寛の重複のみを検索するので、前のコマンドよりも効率的です。さらに、ログに対する連続的な重複排除処理を行うために、このインサートのみのmergeを構造化ストリーミングで使用することもできます。

  • ストリーミングクエリーでは、重複排除とDeltaテーブルへのストリーミングデータの書き込みを継続的に行うために、foreachBatchの中でmergeオペレーションを使用することができます。foreachBatchの詳細に関しては、以下のストリーミングのサンプルをご覧ください。
  • 別のストリーミングクエリーにおいては、このDeltaテーブルから重複排除されたデータを連続的に読み込みを行うことができます。インサートのみのmergeはDeltaテーブルに新規データの追加のみを行うため、このようなことが可能となります。

注意
インサートのみのmergeは、Databricksランタイム6.2以降におけるデータの追加でのみ最適化されています。Databricksランタイム6.1以前では、インサートのみのmergeオペレーションからの書き込みはストリームから読み込みを行うことはできません。

Deltaテーブルに対するSCD(slowly changing data) Type 2のオペレーション

別の一般的なオペレーションは、ディメンジョナルテーブルのそれぞれのキーになされた全ての変更を保持するSCD Type 2です。このようなオペレーションでは、キーの以前の値を古いものとマークするために既存の行の更新と、最新の値として新規の行を追加する必要があります。ソーステーブルを更新内容、ターゲットテーブルをディメンジョナルテーブル富なすことで、SCD Type 2をmergeと表現することができます。

以下に、顧客の住所の履歴とそれぞれの住所が有効な期間をを保持する完全なサンプルを示します。顧客の住所を更新する必要がある場合、以前の住所は現在のものではないということをマークし、有効期間を更新し、現在のものとして新たな住所を追加しなくてはなりません。

mergeを用いたSCD Type2のノートブック

Deltaテーブルへのチェンジデータの書き込み

SCDと同様、一般的な別のユースケースはチェンジデータキャプチャ(CDC)と呼ばれるものであり、外部データベースで行われた全てのデータ変更をDeltaテーブルに適用するというものです。言い換えると、外部テーブルに適用された一連のupdate、delete、insertをDeltaテーブルに適用する必要があります。以下のようにmergeを用いることで、これを実現することができます。

mergeを用いたチェンジデータ書き込みのノートブック

foreachBatchを用いたストリーミングクエリーからのupsert

ストリーミングクエリーからの複雑なupsertをDeltaテーブルに書き込むためにmergeforeachBatch(詳細に関してはforeachBatchをご覧下しあ)を組み合わせて使用することができます。例えば、

  • Updateモードでストリーミングの集計結果を書き込む: Completeモードよりもはるかに効率的です。
  • データベースの変更ストリームをDeltaテーブルに書き込む: 変更ストームを連続的にDeltaテーブルに適用するために、チェンジデータを書き込むmergeクエリーforeachBatchの中で使用することができます。
  • 重複排除を行なったストリームデータをDeltaテーブルに書き込む: 自動重複排除を行なって連続的にデータをDeltaテーブルに書き込むために、foreachBatchの中で重複排除のためのインサートのみのmergeクエリーを使用することができます。

注意

  • ストリーミングクエリーの再起動によって同じデータバッチに対してオペレーションが複数回てきようされることがあるため、foreachBatch内のmerge文に冪等性があることを確認してください。
  • foreachBatchの中でmergeを使用する際、ストリーミングクエリーの入力データレート(StreamingQueryProgressとしてレポートされ、ノートブック上のレートグラフで確認できます)がソースでデータが生成された実際のレートの倍数として報告されることがあります。これは、mergeが入力データを複数回読み込むことで、入力メトリクスが乗算されることによるものです。これがボトルネックの場合、mergeの前にでバッチのデータフレームをキャッシュし、mergeの後にキャッシュを削除してください。

mergeとforeachBatchを用いたアップデートモードでのストリーミング集計処理の書き込みのノートブック

Databricks 無料トライアル

Databricks 無料トライアル

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?