0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks へ Azure SQL Database から連携する際の DELETE されたレコードの同期方法:Azure SQL Database の テンポラルテーブル機能の活用

Last updated at Posted at 2024-05-15

概要

本記事では、Databricks を使用して Azure SQL Database から DELETE されたレコードを特定し、それを Databricks に反映する方法を紹介します。通常、ソースシステムでレコードが削除されると、下流のシステムではそのレコードを特定するのが難しくなります。しかし、以前の記事で紹介した Azure SQL Database のテンポラルテーブル機能を利用することで、削除されたレコードを特定し、Databricks に反映することが可能です。

検証コードと実行結果

Databricks にてスキーマとテーブルを作成

Databricks でスキーマとテーブルを作成するコードです。スキーマ名は 'qiita_test_01' とし、テーブル名は 'TempTableTest' としています。

schame_name = 'qiita_test_01'
table_name = f"{schame_name}.TempTableTest"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schame_name}")

spark.sql(
    f"""
    CREATE OR REPLACE TABLE {table_name}
    (
        Id integer,
        Name string
    )
    """
)

image.png

Azure SQL Database にてテーブルの作成と初期データの挿入

Azure SQL Database でテーブルを作成し、初期データを挿入するコードです。サーバー名は 'ads-01'、データベース名も 'ads-01' としています。ユーザー名とパスワードは Databricks のシークレットスコープから取得します。

server = "ads-01"  ## サーバー名
database = "ads-01"  ## データベース名
username = dbutils.secrets.get(scope="qiita", key="sql_authentificate_user")  ## Azure SQL DB のユーザー名
password = dbutils.secrets.get(scope="qiita", key="sql_authentificate_password")  ## Azure SQL DB のパスワード

# Azure SQL Database にてテーブル作成とデータの挿入
sql = """
IF OBJECT_ID('dbo.TempTableTest', 'U') IS NOT NULL
    ALTER TABLE TempTableTest SET (SYSTEM_VERSIONING = OFF);
    DROP TABLE IF EXISTS TempTableTest;
    DROP TABLE IF EXISTS TempTableTestHistory;

CREATE TABLE dbo.TempTableTest (
    Id INT NOT NULL,
    Name VARCHAR(50),
    ValidFrom DATETIME2 GENERATED ALWAYS AS ROW START HIDDEN NOT NULL,
    ValidTo DATETIME2 GENERATED ALWAYS AS ROW END HIDDEN NOT NULL,
    PERIOD FOR SYSTEM_TIME(ValidFrom, ValidTo),
    CONSTRAINT PK_TempTableTest PRIMARY KEY CLUSTERED (Id)
    )
    WITH (
        SYSTEM_VERSIONING = ON
            (
                HISTORY_TABLE = dbo.TempTableTestHistory,
                HISTORY_RETENTION_PERIOD = 2 WEEKS
            )
    );
-- insert
INSERT INTO [dbo].[TempTableTest] (
    Id,
    Name
    )
VALUES (
    1,
    'a'
    );

-- update
INSERT INTO [dbo].[TempTableTest] (
    Id,
    Name
)
VALUES (
    2,
    NULL
);
UPDATE [dbo].[TempTableTest] SET Name = 'b' WHERE Id = 2;

-- data for delete
INSERT INTO [dbo].[TempTableTest] (
    Id,
    Name
    )
VALUES (
    3,
    'c'
    );
"""

df = (
    spark.read.format("sqlserver")
    .option("host", f"{server}.database.windows.net")
    .option("user", username)
    .option("password", password)
    .option("database", database)
    .option("prepareQuery", sql)
    .option("dbTable", "[dbo].[TempTableTest]")
    .load()
)

df.write.mode("overwrite").saveAsTable(table_name)

spark.table(table_name).display()

image.png

Azure SQL Database にて DELETE の実行

DELETE 操作を実行し、特定のレコードを削除するコードです。この例では、Id が 3 のレコードを削除しています。

# DELETE の実行
sql = """
DELETE TempTableTest
WHERE Id = 3;
"""

df = (
    spark.read.format("sqlserver")
    .option("host", f"{server}.database.windows.net")
    .option("user", username)
    .option("password", password)
    .option("database", database)
    .option("prepareQuery", sql)
    .option("dbTable", "[dbo].[TempTableTest]")
    .load()
)

df.display()

image.png

削除されたレコードを Databricks に反映

削除されたレコードを取得するための SQL クエリです。テンポラルテーブルの履歴を利用して、削除されたレコードを特定します。

# 削除されたレコードを取得
delete_records_sql = """
SELECT tgt.*
FROM [dbo].[TempTableTestHistory] tgt
LEFT OUTER JOIN [dbo].[TempTableTest] src
    ON tgt.Id = src.Id
WHERE tgt.ValidTo < '9999-12-31 23:59:59.9999999'
    AND src.Id IS NULL
"""

df = (
    spark.read.format("sqlserver")
    .option("host", f"{server}.database.windows.net")
    .option("user", username)
    .option("password", password)
    .option("database", database)
    .option("prepareQuery", sql)
    .option("query", delete_records_sql)
    .load()
)

image.png

Azure SQL Database から取得したレコードのキーカラムでマッチしたレコードを削除する Merge 文を実行します。

# Merge を実行するするために一意性を保証
key_cols = ["Id"]
df = df.select(key_cols).dropDuplicates(key_cols)

# Azure SQL Database から取得したデータに基づき Merge 文により DELETE を実行
src_table_name = "_temp_merge_source"
df.createOrReplaceTempView(src_table_name)
delete_sql = f"""
MERGE INTO {table_name} AS target 
    USING {src_table_name} AS source 
        ON target.Id = source.Id
    WHEN MATCHED THEN
        DELETE
;
"""
spark.sql(delete_sql)

image.png

Databricks にて DELETE されたことを確認

Databricks で DELETE されたレコードが反映されたことを確認するためのコードです。

# データを確認
spark.table(table_name).display()

image.png

まとめ

この記事では、Databricks と Azure SQL Database を使用して、ソースシステムで削除されたレコードを特定し、Databricks に反映する方法を紹介しました。Azure SQL Database のテンポラルテーブル機能を活用することで、削除されたレコードを効率的に特定し、Databricks でのデータ整合性を保つことが可能です。具体的な手順としては、Databricks でスキーマとテーブルを作成し、Azure SQL Database でテーブルを作成して初期データを挿入します。その後、DELETE 操作を実行し、削除されたレコードを履歴テーブルから取得します。最終的に、Databricks で Merge 文を使用して DELETE されたレコードを反映し、データの整合性を確認します。このプロセスにより、データの削除が下流のシステムに正確に反映され、データの信頼性が向上します。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?