0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ソース側重複がそのまま挿入?Databricks Merge文の`WHEN NOT MATCHED THEN`句の注意点を知ろう

Posted at

概要

Databricks の Merge 文において、WHEN NOT MATCHED 句に該当するレコードが重複している場合は、そのままテーブルに挿入される仕様があります。本来、私は重複を除外してから Merge 文で書き込んでいたため、この仕様については把握していませんでした。
Merge 文を利用する際は、事前に dropDuplicates メソッドなどでユニーク性を担保することをおすすめします。

ソースデータが重複していた場合の Merge 文の仕様について

ソース側のデータが重複していると、Merge 文で以下のように DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE エラーが発生します。これはエラーメッセージにある通り、重複が WHEN NOT MATCHED 句で検知された場合に起こるものです。

[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous

image.png

このようなエラーを回避するため、私は Merge 文を実行する前に dropDuplicates を実行していました。

image.png

引用元:pyspark.sql.DataFrame.dropDuplicates — PySpark 3.5.4 documentation

そのため、WHEN NOT MATCHED THEN 句の仕様を詳しく把握する機会がありませんでしたが、先日、ソース側の重複を排除せずに空のテーブルに対して Merge 文を実行する事例があったため、実際に検証しました。すると、重複したレコードがそのまま挿入されることを確認しました。すなわち、テーブル側での一意性が保証されない仕様であることが判明し、注意すべき重要なポイントといえます。

image.png

検証コードと実行結果

1. カタログとスキーマを作成

catalog_name = "confluent_test"
schema_name = "test"

_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")

image.png

2. テーブルの作成

table_name = "test_table_01"
table_full_name =  f"{catalog_name}.{schema_name}.{table_name}"

spark.sql(
    f"""
    CREATE OR REPLACE TABLE {table_full_name}
    (
        key_col string,
        name_col string,
        timestamp_col timestamp
    )
    """
)

image.png

3. ソースデータの作成

temp_view_name = "tmp_updates"

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

schema = StructType([
    StructField("key_col", StringType(), True),
    StructField("name_col", StringType(), True),
    StructField("timestamp_col", TimestampType(), True)
])

data = [
    ("key1", "name1", datetime(2021, 1, 1, 12, 0, 0)),
    ("key1", "name1", datetime(2021, 1, 1, 12, 0, 0)),
    ("key1", "name1", datetime(2021, 1, 1, 12, 0, 0)),
]
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView(temp_view_name)
df.display()

image.png

4. Merge 文の実行

res_df = spark.sql(
    f"""
    MERGE INTO {table_full_name} AS T
    USING {temp_view_name} AS S
    ON T.key_col = S.key_col
    WHEN MATCHED THEN 
      UPDATE SET
        T.name_col = S.name_col,
        T.timestamp_col = S.timestamp_col
    WHEN NOT MATCHED THEN
      INSERT (key_col, name_col, timestamp_col)
      VALUES (S.key_col, S.name_col, S.timestamp_col)
    """
)
res_df.display()

image.png

5. テーブルのデータが重複していることを確認

spark.table(table_full_name).display()

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?