概要
Azure Data Factory のメタデータ駆動のコピータスクにて差分読み込み(DeltaLoad
)を透かし列(watermarkColumn
、基準値列ともいう)を Datetime で実施する場合に抽出期間を長くする方法を共有します。実施概要を説明後に検証手順を記述します。
差分読み込みでは、透かし列(例えばupdate_timestamp
列)を使用して、前回連携されたデータより新しいデータのみを取得します。このプロセスは、更新日時が秒単位で正確に記録されている場合には問題ありません。しかし、システムによっては日付のみが記録されることがあり、その場合、適切なデータ連携が行えない可能性があります。
select *
from [dbo].[adf_metadata_01]
where [update_timestamp] > '2024-06-03T00:00:00'
and [update_timestamp] <= '2024-06-03T00:00:00'
データ連携期間を延長する目的で、DeltaLoadOneObject アクティビティのクエリ設定を修正し、抽出期間を拡大する方法を確立しました。WaterMarkIncrement パラメータで指定された日数(例:-1日
)を加算して、抽出期間を延長します。
select
*
from [@{json(item().SourceObjectSettings).schema}].[@{json(item().SourceObjectSettings).table}]
where [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] > @{
if(
contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
if(
equals(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'DateTime'),
concat(
'DATEADD(day, ',
pipeline().parameters.WaterMarkIncrement,
' , ',
'''',
json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
'''',
')'
),
concat(
'''',
json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
''''
)
)
)
}
and [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] <= @{
if(
contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
concat(
'''',
activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
''''
)
)
}
本手法を適用する主なデメリットは、連携先でソースシステムが保証する主キー制約が失われることです。しかし、コピーアクティビティを通じてデータを連携した後、下記の記事で紹介しているデータアーキテクチャを実装することにより、このデメリットを克服することが可能です。
引用元:誰も教えてくれないメダリオンアーキテクチャの デザインメソッド:JEDA データエンジニア分科会 #1 #Python - Qiita
検証手順
1. Azure SQL Database にてテーブル作成と初期データを登録
IF OBJECT_ID(N'dbo.adf_metadata_01', N'U') IS NOT NULL
DROP TABLE dbo.adf_metadata_01
CREATE TABLE dbo.adf_metadata_01
(
ID INT IDENTITY(1,1) PRIMARY KEY,
NAME varchar(10),
update_timestamp DATETIME
);
INSERT INTO dbo.adf_metadata_01
(
NAME,
update_timestamp
)
SELECT 'A' AS NAME,CAST('2024-06-01' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'B' AS NAME,CAST('2024-06-02' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'C' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;
SELECT
*
FROM
dbo.adf_metadata_01
2. Azure Data Factory にて差分読み込みのメタデータ駆動のコピータスクを作成
3. DataLoadingBehaviorSettings
列内のwatermarkColumnStartValue
の値がデフォルトであることを確認
SELECT
ID,
JSON_VALUE(DataLoadingBehaviorSettings, '$.watermarkColumnStartValue') AS watermarkColumnStartValue
FROM
[adf_metadata].[MainControlTable_fj6]
4. Azure Data Factory にてメタデータ駆動のコピータスクを実行
5. DataLoadingBehaviorSettings
列内のwatermarkColumnStartValue
の値が更新されていることを確認
6. ソースのテーブルにデータを追加
INSERT INTO dbo.adf_metadata_01
(
NAME,
update_timestamp
)
SELECT 'D' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;
SELECT
*
FROM
dbo.adf_metadata_01
7. Azure Data Factory にてメタデータ駆動のコピータスクを再実行してデータが連携されないことを確認
書き込まれた行
が0
となっているため、データ連携されていない。
8. MetadataDrivenCopyTask_xxx_ BottomLevel
パイプラインにてWaterMarkIncrement
パラメータの追加とDeltaLoadOneObject
アクティビティにてクエリの設定値を変更
select
*
from [@{json(item().SourceObjectSettings).schema}].[@{json(item().SourceObjectSettings).table}]
where [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] > @{
if(
contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
if(
equals(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'DateTime'),
concat(
'DATEADD(day, ',
pipeline().parameters.WaterMarkIncrement,
' , ',
'''',
json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
'''',
')'
),
concat(
'''',
json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
''''
)
)
)
}
and [@{json(item().DataLoadingBehaviorSettings).watermarkColumnName}] <= @{
if(
contains(json(item().DataLoadingBehaviorSettings).watermarkColumnType, 'Int'),
activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
concat(
'''',
activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue,
''''
)
)
}
9. Azure Data Factory にてメタデータ駆動のコピータスクを再実行してデータが連携されることを確認
書き込まれた行
が2
となっているため、update_timestamp
が2024-06-03
のレコードである NAME がC
とD
の 2 レコードが連携される。