概要
Databricks の Merge 文において、WHEN NOT MATCHED
句に該当するレコードが重複している場合は、そのままテーブルに挿入される仕様があります。本来、私は重複を除外してから Merge 文で書き込んでいたため、この仕様については把握していませんでした。
Merge 文を利用する際は、事前に dropDuplicates
メソッドなどでユニーク性を担保することをおすすめします。
ソースデータが重複していた場合の Merge 文の仕様について
ソース側のデータが重複していると、Merge 文で以下のように DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE
エラーが発生します。これはエラーメッセージにある通り、重複が WHEN NOT MATCHED
句で検知された場合に起こるものです。
[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
このようなエラーを回避するため、私は Merge 文を実行する前に dropDuplicates
を実行していました。
引用元:pyspark.sql.DataFrame.dropDuplicates — PySpark 3.5.4 documentation
そのため、WHEN NOT MATCHED THEN
句の仕様を詳しく把握する機会がありませんでしたが、先日、ソース側の重複を排除せずに空のテーブルに対して Merge 文を実行する事例があったため、実際に検証しました。すると、重複したレコードがそのまま挿入されることを確認しました。すなわち、テーブル側での一意性が保証されない仕様であることが判明し、注意すべき重要なポイントといえます。
検証コードと実行結果
1. カタログとスキーマを作成
catalog_name = "confluent_test"
schema_name = "test"
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
2. テーブルの作成
table_name = "test_table_01"
table_full_name = f"{catalog_name}.{schema_name}.{table_name}"
spark.sql(
f"""
CREATE OR REPLACE TABLE {table_full_name}
(
key_col string,
name_col string,
timestamp_col timestamp
)
"""
)
3. ソースデータの作成
temp_view_name = "tmp_updates"
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime
schema = StructType([
StructField("key_col", StringType(), True),
StructField("name_col", StringType(), True),
StructField("timestamp_col", TimestampType(), True)
])
data = [
("key1", "name1", datetime(2021, 1, 1, 12, 0, 0)),
("key1", "name1", datetime(2021, 1, 1, 12, 0, 0)),
("key1", "name1", datetime(2021, 1, 1, 12, 0, 0)),
]
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView(temp_view_name)
df.display()
4. Merge 文の実行
res_df = spark.sql(
f"""
MERGE INTO {table_full_name} AS T
USING {temp_view_name} AS S
ON T.key_col = S.key_col
WHEN MATCHED THEN
UPDATE SET
T.name_col = S.name_col,
T.timestamp_col = S.timestamp_col
WHEN NOT MATCHED THEN
INSERT (key_col, name_col, timestamp_col)
VALUES (S.key_col, S.name_col, S.timestamp_col)
"""
)
res_df.display()
5. テーブルのデータが重複していることを確認
spark.table(table_full_name).display()