0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Data Factory のメタデータ駆動のコピータスクにて透かし列(`watermarkColumn`)が Datetime の場合に抽出期間を長くする方法

Last updated at Posted at 2024-06-05

概要

Azure Data Factory のメタデータ駆動のコピータスクにて差分読み込み(DeltaLoad)を透かし列(watermarkColumn、基準値列ともいう)を Datetime で実施する場合に抽出期間を長くする方法を共有します。実施概要を説明後に検証手順を記述します。

差分読み込みでは、透かし列(例えばupdate_timestamp列)を使用して、前回連携されたデータより新しいデータのみを取得します。このプロセスは、更新日時が秒単位で正確に記録されている場合には問題ありません。しかし、システムによっては日付のみが記録されることがあり、その場合、適切なデータ連携が行えない可能性があります。

select *
from [dbo].[adf_metadata_01]
where [update_timestamp] > '2024-06-03T00:00:00'
    and [update_timestamp] <= '2024-06-03T00:00:00'

image.png

データ連携期間を延長する目的で、DeltaLoadOneObject アクティビティのクエリ設定を修正し、抽出期間を拡大する方法を確立しました。WaterMarkIncrement パラメータで指定された日数(例:-1日 )を加算して、抽出期間を延長します。

select 
    *
    from [@{json(item().SourceObjectSettings).schema}].[@{json(item().SourceObjectSettings).table}] 
    where [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] > @{
        if(
            contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
        json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
        if(
            equals(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'DateTime'),
            concat(
                'DATEADD(day, ',
                pipeline().parameters.WaterMarkIncrement,
                ' , ',
                '''',
                json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
                '''',
                ')'
            ),
            concat(
                '''',
                json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
                ''''
            )
        )
        )
    }
    and [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] <= @{
        if(
            contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
            activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
            concat(
                '''',
                activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
                ''''
            )
        )
    }

image.png

本手法を適用する主なデメリットは、連携先でソースシステムが保証する主キー制約が失われることです。しかし、コピーアクティビティを通じてデータを連携した後、下記の記事で紹介しているデータアーキテクチャを実装することにより、このデメリットを克服することが可能です。

image.png

引用元:誰も教えてくれないメダリオンアーキテクチャの デザインメソッド:JEDA データエンジニア分科会 #1 #Python - Qiita

検証手順

1. Azure SQL Database にてテーブル作成と初期データを登録

IF  OBJECT_ID(N'dbo.adf_metadata_01', N'U') IS NOT NULL
    DROP TABLE dbo.adf_metadata_01
CREATE TABLE dbo.adf_metadata_01
(
    ID INT IDENTITY(1,1) PRIMARY KEY,
	NAME varchar(10),
    update_timestamp DATETIME
);

INSERT INTO dbo.adf_metadata_01
(
	NAME,
	update_timestamp
)
SELECT 'A' AS NAME,CAST('2024-06-01' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'B' AS NAME,CAST('2024-06-02' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'C' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;

SELECT
	*
	FROM
		dbo.adf_metadata_01

2. Azure Data Factory にて差分読み込みのメタデータ駆動のコピータスクを作成

image.png

3. DataLoadingBehaviorSettings列内のwatermarkColumnStartValueの値がデフォルトであることを確認

SELECT 
	ID,
	JSON_VALUE(DataLoadingBehaviorSettings, '$.watermarkColumnStartValue') AS watermarkColumnStartValue
	FROM
		[adf_metadata].[MainControlTable_fj6]

image.png

4. Azure Data Factory にてメタデータ駆動のコピータスクを実行

image.png

5. DataLoadingBehaviorSettings列内のwatermarkColumnStartValueの値が更新されていることを確認

image.png

6. ソースのテーブルにデータを追加

INSERT INTO dbo.adf_metadata_01
(
	NAME,
	update_timestamp
)
SELECT 'D' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;

SELECT
	*
	FROM
		dbo.adf_metadata_01

image.png

7. Azure Data Factory にてメタデータ駆動のコピータスクを再実行してデータが連携されないことを確認

書き込まれた行0となっているため、データ連携されていない。

image.png

8. MetadataDrivenCopyTask_xxx_ BottomLevelパイプラインにてWaterMarkIncrementパラメータの追加とDeltaLoadOneObjectアクティビティにてクエリの設定値を変更

image.png

select 
    *
    from [@{json(item().SourceObjectSettings).schema}].[@{json(item().SourceObjectSettings).table}] 
    where [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] > @{
        if(
            contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
        json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
        if(
            equals(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'DateTime'),
            concat(
                'DATEADD(day, ',
                pipeline().parameters.WaterMarkIncrement,
                ' , ',
                '''',
                json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
                '''',
                ')'
            ),
            concat(
                '''',
                json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
                ''''
            )
        )
        )
    }
    and [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] <= @{
        if(
            contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
            activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
            concat(
                '''',
                activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
                ''''
            )
        )
    }

image.png

9. Azure Data Factory にてメタデータ駆動のコピータスクを再実行してデータが連携されることを確認

書き込まれた行2となっているため、update_timestamp2024-06-03のレコードである NAME がCDの 2 レコードが連携される。

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?