概要
Databricks にて MERGE 文を実行する場合には、書き込むデータセットをもつデータフレームで処理を完結させてから書き込むことで、Merge 文がシンプルとなります。Merge 文内でロジックを持たせてしまうとレビューやテストが困難となり、処理の共通化の実施が困難となります。
下記のような Merge 文のプログラムに遭遇しました。Merge 文内でtrigger_ts
列がcurrent_timestamp
関数でセットされています。コードが長くなることや処理の共通化が実施されてないことコードレビューが困難になることや事前にテーブルを作成して Merge 文を実行しないとロジックの妥当性を確認できないなどのテストができないことなどの課題があります。
事前にロジックを含むデータフレームを定義して、そのデータフレームをベースに Merge 文を実行することでシンプルになります。
本記事では、2 つの方法の実行コードとその結果を共有します。
事前準備
1. スキーマとテーブルを作成
%sql
CREATE SCHEMA IF NOT EXISTS hive_metastore.merge_test_01;
CREATE OR REPLACE TABLE hive_metastore.merge_test_01.table_01
(
id INT,
string_col string,
trigger_ts TIMESTAMP
);
2. ベースとなるデータフレームを定義
import datetime
from pyspark.sql.functions import current_timestamp
schema = """
id INT,
string_col string
"""
data = [
{"id": 1, "string_col": "aaa"},
{"id": 2, "string_col": "bbb"},
{"id": 3, "string_col": "ccc"},
]
df = spark.createDataFrame(data, schema)
Merge 文の実行
1. Merge 文内にロジックを保持する方法
df.createOrReplaceTempView('_tmp_table_01')
# Merge処理を実行
spark.sql('''
MERGE INTO hive_metastore.merge_test_01.table_01 AS tgt
USING _tmp_table_01 AS src
ON tgt.id = src.id
WHEN MATCHED THEN
UPDATE
SET
tgt.id = src.id,
tgt.string_col = src.string_col,
tgt.trigger_ts = current_timestamp()
WHEN NOT MATCHED THEN
INSERT
(
id,
string_col,
trigger_ts
)
VALUES
(
src.id,
src.string_col,
current_timestamp()
)
''')
2. 事前に定義したデータフレームで Merge 処理を実行する方法
# データフレーム操作でカラムを追加
df_2 = df.withColumn("trigger_ts", current_timestamp())
df_2.createOrReplaceTempView("_tmp_table_01")
# Merge処理を実行
spark.sql(f'''
MERGE INTO hive_metastore.merge_test_01.table_01 AS tgt
USING _tmp_table_01 AS src
ON tgt.id = src.id
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
''')
事後処理
1. スキーマを削除
%sql
DROP SCHEMA IF EXISTS hive_metastore.merge_test_01 CASCADE;