概要
本記事では、Databricks を使用して Azure SQL Database から DELETE されたレコードを特定し、それを Databricks に反映する方法を紹介します。通常、ソースシステムでレコードが削除されると、下流のシステムではそのレコードを特定するのが難しくなります。しかし、以前の記事で紹介した Azure SQL Database のテンポラルテーブル機能を利用することで、削除されたレコードを特定し、Databricks に反映することが可能です。
検証コードと実行結果
Databricks にてスキーマとテーブルを作成
Databricks でスキーマとテーブルを作成するコードです。スキーマ名は 'qiita_test_01' とし、テーブル名は 'TempTableTest' としています。
schame_name = 'qiita_test_01'
table_name = f"{schame_name}.TempTableTest"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schame_name}")
spark.sql(
f"""
CREATE OR REPLACE TABLE {table_name}
(
Id integer,
Name string
)
"""
)
Azure SQL Database にてテーブルの作成と初期データの挿入
Azure SQL Database でテーブルを作成し、初期データを挿入するコードです。サーバー名は 'ads-01'、データベース名も 'ads-01' としています。ユーザー名とパスワードは Databricks のシークレットスコープから取得します。
server = "ads-01" ## サーバー名
database = "ads-01" ## データベース名
username = dbutils.secrets.get(scope="qiita", key="sql_authentificate_user") ## Azure SQL DB のユーザー名
password = dbutils.secrets.get(scope="qiita", key="sql_authentificate_password") ## Azure SQL DB のパスワード
# Azure SQL Database にてテーブル作成とデータの挿入
sql = """
IF OBJECT_ID('dbo.TempTableTest', 'U') IS NOT NULL
ALTER TABLE TempTableTest SET (SYSTEM_VERSIONING = OFF);
DROP TABLE IF EXISTS TempTableTest;
DROP TABLE IF EXISTS TempTableTestHistory;
CREATE TABLE dbo.TempTableTest (
Id INT NOT NULL,
Name VARCHAR(50),
ValidFrom DATETIME2 GENERATED ALWAYS AS ROW START HIDDEN NOT NULL,
ValidTo DATETIME2 GENERATED ALWAYS AS ROW END HIDDEN NOT NULL,
PERIOD FOR SYSTEM_TIME(ValidFrom, ValidTo),
CONSTRAINT PK_TempTableTest PRIMARY KEY CLUSTERED (Id)
)
WITH (
SYSTEM_VERSIONING = ON
(
HISTORY_TABLE = dbo.TempTableTestHistory,
HISTORY_RETENTION_PERIOD = 2 WEEKS
)
);
-- insert
INSERT INTO [dbo].[TempTableTest] (
Id,
Name
)
VALUES (
1,
'a'
);
-- update
INSERT INTO [dbo].[TempTableTest] (
Id,
Name
)
VALUES (
2,
NULL
);
UPDATE [dbo].[TempTableTest] SET Name = 'b' WHERE Id = 2;
-- data for delete
INSERT INTO [dbo].[TempTableTest] (
Id,
Name
)
VALUES (
3,
'c'
);
"""
df = (
spark.read.format("sqlserver")
.option("host", f"{server}.database.windows.net")
.option("user", username)
.option("password", password)
.option("database", database)
.option("prepareQuery", sql)
.option("dbTable", "[dbo].[TempTableTest]")
.load()
)
df.write.mode("overwrite").saveAsTable(table_name)
spark.table(table_name).display()
Azure SQL Database にて DELETE の実行
DELETE 操作を実行し、特定のレコードを削除するコードです。この例では、Id が 3 のレコードを削除しています。
# DELETE の実行
sql = """
DELETE TempTableTest
WHERE Id = 3;
"""
df = (
spark.read.format("sqlserver")
.option("host", f"{server}.database.windows.net")
.option("user", username)
.option("password", password)
.option("database", database)
.option("prepareQuery", sql)
.option("dbTable", "[dbo].[TempTableTest]")
.load()
)
df.display()
削除されたレコードを Databricks に反映
削除されたレコードを取得するための SQL クエリです。テンポラルテーブルの履歴を利用して、削除されたレコードを特定します。
# 削除されたレコードを取得
delete_records_sql = """
SELECT tgt.*
FROM [dbo].[TempTableTestHistory] tgt
LEFT OUTER JOIN [dbo].[TempTableTest] src
ON tgt.Id = src.Id
WHERE tgt.ValidTo < '9999-12-31 23:59:59.9999999'
AND src.Id IS NULL
"""
df = (
spark.read.format("sqlserver")
.option("host", f"{server}.database.windows.net")
.option("user", username)
.option("password", password)
.option("database", database)
.option("prepareQuery", sql)
.option("query", delete_records_sql)
.load()
)
Azure SQL Database から取得したレコードのキーカラムでマッチしたレコードを削除する Merge 文を実行します。
# Merge を実行するするために一意性を保証
key_cols = ["Id"]
df = df.select(key_cols).dropDuplicates(key_cols)
# Azure SQL Database から取得したデータに基づき Merge 文により DELETE を実行
src_table_name = "_temp_merge_source"
df.createOrReplaceTempView(src_table_name)
delete_sql = f"""
MERGE INTO {table_name} AS target
USING {src_table_name} AS source
ON target.Id = source.Id
WHEN MATCHED THEN
DELETE
;
"""
spark.sql(delete_sql)
Databricks にて DELETE されたことを確認
Databricks で DELETE されたレコードが反映されたことを確認するためのコードです。
# データを確認
spark.table(table_name).display()
まとめ
この記事では、Databricks と Azure SQL Database を使用して、ソースシステムで削除されたレコードを特定し、Databricks に反映する方法を紹介しました。Azure SQL Database のテンポラルテーブル機能を活用することで、削除されたレコードを効率的に特定し、Databricks でのデータ整合性を保つことが可能です。具体的な手順としては、Databricks でスキーマとテーブルを作成し、Azure SQL Database でテーブルを作成して初期データを挿入します。その後、DELETE 操作を実行し、削除されたレコードを履歴テーブルから取得します。最終的に、Databricks で Merge 文を使用して DELETE されたレコードを反映し、データの整合性を確認します。このプロセスにより、データの削除が下流のシステムに正確に反映され、データの信頼性が向上します。