概要
Azure Data Factory のメタデータ駆動のコピータスクにて実行に失敗したコピーアクティビティのみ実行する方法を共有します。メタデータ駆動のコピータスクにおける一部のパイプラインでエラーが発生した場合には、そのパイプラインのみ実行することができず、全体を再実行する必要があります。メタデータ駆動のコピータスクのメイン制御テーブル(MainControlTable
)にて、実行タイミングと成功可否を保持させることで失敗したパイプラインのみ再実行するような実装方法を確立しました。
基本的な方針
メイン制御テーブル(MainControlTable
)の下記のカラムにて、実行タイミングごとの成功可否を保持させます。その値の更新を行うために、実行タイミングごとにカラムの値の初期化処理の追加、実行対象パイプラインの抽出時に IsLastTriggerSuccessfull 列の追加、実行完了後の IsLastTriggerSuccessfull
の更新処理の追加を実施する必要があります。実行タイミングについては、MetadataDrivenCopyTask_xxx_TopLevel
に渡すパラメータ(TargetTriggerTimestamp
)で設定します。
# | カラム | 概要 |
---|---|---|
1 | LastTriggeredTimestamp | メタデータ駆動コピーのタスクの実行タイミングを保持する。 |
2 | IsLastTriggerSuccessfull | メタデータ駆動コピーのタスクの実行タイミングにて成功可否を保持する。 |
タイムスタンプを用いて実行タイミングを記録するため、日付と時刻を調整します。1日に1回の実行が必要な場合は、日付のみを含む値(例: 2024-06-01 00:00:00
)を実行タイミングとして使用します。例えば、初回実行が6月1日の3:00に設定されており、エラーが発生した場合、6月1日の9:00に再実行することで、IsLastTriggerSuccessfull=0
となっているパイプラインのみを再実行できます。次の日の6月2日の3:00に実行すると、実行タイミングが2024-06-02 00:00:00
となり、すべてのパイプラインが実行されます。1日に複数回実行する必要がある場合は、特定の時刻に実行を丸めることで対応可能です。例として、1日に3回実行する場合、2024-06-01 00:00:00
、2024-06-01 08:00:00
、2024-06-01 16:00:00
を実行タイミングとして設定します。
実行タイミングごとにカラムの値の初期化処理を行うために、ストアードプロシージャを追加しています。実行対象(TopLevelPipelineName
列、TriggerName
列、および、CopyEnabled
列のフィルタリング結果)のLastTriggeredTimestamp
列の値が想定の実行タイミングでない場合には、LastTriggeredTimestamp
とIsLastTriggerSuccessful
列の値を更新します。
<2024/8/13 追記 ここから>
実際に検証を実施したところ機能としては動作したのですが、特定のトリガーで実行することを前提とした場合に、エラー後の再実行にて TriggerName が Manual となってしまう課題が発覚しました。
その対応策としては下記の方法があり、なるべく 1 の方法を採用することをおすすめします。
- TriggerName をパラメータで渡すようにしておき再実行時にも同じ TriggerName を渡すようにする方法
- TriggerName でフィルタリングされるロジックをすべて削除する方法
TriggerName をパラメータで渡す方法は下記の記事で紹介しています。
<2024/8/13 追記 ここまで>
構築手順
前提条件
- Azure SQL Database をソースとしたメタデータ駆動のコピータスクを構築すること
-
mx1
のサフィックスがついたオブジェクト(例:MainControlTable_mx1
)に対する SQL を記述するー
1. Azure SQL Database にてテーブル作成と初期データを登録
IF OBJECT_ID(N'dbo.adf_metadata_01', N'U') IS NOT NULL
DROP TABLE dbo.adf_metadata_01
CREATE TABLE dbo.adf_metadata_01
(
ID INT IDENTITY(1,1) PRIMARY KEY,
NAME varchar(10),
update_timestamp DATETIME
);
INSERT INTO dbo.adf_metadata_01
(
NAME,
update_timestamp
)
SELECT 'A' AS NAME,CAST('2024-06-01' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'B' AS NAME,CAST('2024-06-02' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'C' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;
IF OBJECT_ID(N'dbo.adf_metadata_02', N'U') IS NOT NULL
DROP TABLE dbo.adf_metadata_02
CREATE TABLE dbo.adf_metadata_02
(
ID INT IDENTITY(1,1) PRIMARY KEY,
NAME varchar(10),
update_timestamp DATETIME
);
INSERT INTO dbo.adf_metadata_02
(
NAME,
update_timestamp
)
SELECT 'A' AS NAME,CAST('2024-06-01' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'B' AS NAME,CAST('2024-06-02' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'C' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;
SELECT
'dbo.adf_metadata_01' AS TABLE_NAME,
*
FROM
dbo.adf_metadata_01
UNION ALL
SELECT
'dbo.adf_metadata_02' AS TABLE_NAME,
*
FROM
dbo.adf_metadata_02;
2. Azure Data Factory にてメタデータ駆動コピータスクを構築
3. メイン制御テーブル(MainControlTable
)にて2つのカラムを追加してテーブルを作成
[LastTriggeredTimestamp] datetime2 DEFAULT '1900-01-01T00:00:00.000Z',
[IsLastTriggerSuccessful] bit DEFAULT 0
4. UpdateWatermarkColumnValue
ストアードプロシージャをIsLastTriggerSuccessful
列を連携方法に応じて更新するように変更して作成
CREATE PROCEDURE [adf_metadata].[UpdateWatermarkColumnValue_mx1]
@watermarkColumnStartValue nvarchar(max),
@Id [int],
@dataLoadingBehavior nvarchar(max)
AS
IF LOWER(@dataLoadingBehavior) = 'deltaload'
UPDATE [adf_metadata].[MainControlTable_mx1]
SET [DataLoadingBehaviorSettings] = JSON_MODIFY(
[DataLoadingBehaviorSettings],
'$.watermarkColumnStartValue',
@watermarkColumnStartValue
),
[IsLastTriggerSuccessful] = 1
WHERE Id = @Id
ELSE
UPDATE [adf_metadata].[MainControlTable_mx1]
SET [IsLastTriggerSuccessful] = 1
WHERE Id = @Id
GO
5. LastTriggeredTimestamp
列とIsLastTriggerSuccessful
列を更新するストアードプロシージャを作成
/*
このストアドプロシージャは、指定されたトリガーのステータスを更新します。
パラメータ:
- @TopLevelPipelineName: トップレベルパイプラインの名前(varchar(max))
- @TriggerName: トリガーの名前(nvarchar(max))
- @TriggerTimestampString: トリガーのタイムスタンプ(nvarchar(25))
処理内容:
1. [adf_metadata].[MainControlTable_fj6] テーブルから、指定されたトップレベルパイプライン名、トリガー名、およびコピーが有効なレコードの最大および最小のトリガータイムスタンプを取得します。
2. 取得した最大および最小のトリガータイムスタンプと、渡されたトリガータイムスタンプを比較します。
3. 最大および最小のトリガータイムスタンプが渡されたトリガータイムスタンプと異なる場合、[adf_metadata].[MainControlTable_fj6] テーブルの該当するレコードの IsLastTriggerSuccessful を 0 に更新します。
注意事項:
- このストアドプロシージャは、[adf_metadata].[MainControlTable_fj6] テーブルを使用します。
- トリガータイムスタンプは、datetime2 型にキャストされる必要があります。
*/
CREATE OR ALTER PROCEDURE [adf_metadata].[UpdateTriggerStatus_mx1]
@TopLevelPipelineName varchar(max),
@TriggerName nvarchar(max),
@TriggerTimestampString nvarchar(25)
AS
BEGIN
DECLARE @MaxTimestamp datetime2, @MinTimestamp datetime2;
DECLARE @TriggerTimestamp datetime2;
SET @TriggerTimestamp = CAST(@TriggerTimestampString AS datetime2);
-- TriggerTimestampString の時刻に対する更新処理が既に実施されているかを確認
SELECT
@MaxTimestamp = MAX([LastTriggeredTimestamp]),
@MinTimestamp = MIN([LastTriggeredTimestamp])
FROM
[adf_metadata].[MainControlTable_mx1]
WHERE
[TopLevelPipelineName] = @TopLevelPipelineName
AND [TriggerName] LIKE '%' + @TriggerName + '%'
AND [CopyEnabled] = 1;
IF (@MaxTimestamp IS NULL OR @MaxTimestamp <> @TriggerTimestamp) OR (@MinTimestamp IS NULL OR @MinTimestamp <> @TriggerTimestamp)
BEGIN
-- TriggerTimestampString の時刻に対する更新処理が実施されていない場合に更新処理を実施
UPDATE [adf_metadata].[MainControlTable_mx1]
SET
[LastTriggeredTimestamp] = @TriggerTimestamp,
[IsLastTriggerSuccessful] = 0
WHERE
[TopLevelPipelineName] = @TopLevelPipelineName
AND [TriggerName] LIKE '%' + @TriggerName + '%'
AND [CopyEnabled] = 1;
END
END
6. MetadataDrivenCopyTask_xxx_TopLevel
パイプラインにてTargetTriggerTimestamp
パラメータを追加
7. MetadataDrivenCopyTask_xxx_TopLevel
パイプラインにTargetTriggerTimestampString
変数を追加
8. GetSumOfObjectsToCopy
アクティビティの前にストアードプロシージャアクティビティを追加
- 名前: 任意
- リンクドサービス: メイン制御テーブルと同じリンクドサービス
- ストアードプロシージャ名: [adf_metadata].[UpdateTriggerStatus_mx1]
- ストアードプロシージャのパラメータに下記を設定
- TopLevelPipelineName:
@pipeline().Pipeline
- TriggerName:
@pipeline().TriggerName
- TriggerTimestampString:
@variables('TargetTriggerTimestampString')
- TopLevelPipelineName:
9. 作成したストアードプロシージャアクティビティの前に変数の設定アクティビティを追加
- 名前: 任意
- 変数の種類: パイプライン変数
- 名前:
TargetTriggerTimestampString
- 値: 下記の式
@if(
empty(pipeline().parameters.TargetTriggerTimestamp)
,formatDateTime(
utcnow()
, 'yyyy-MM-dd 00:00:00'
)
,pipeline().parameters.TargetTriggerTimestamp
)
10. MetadataDrivenCopyTask_xxx_TopLevel
パイプラインにて、GetObjectsPerGroupToCopy
アクティビティとCopyObjectsInOneGroup
アクティビティの間に Filter アクティビティを追加
- 名前: 任意
- 項目:
@activity('GetObjectsPerGroupToCopy').output.value
- 条件:
@equals(item().IsLastTriggerSuccessful, false)
11. CopyObjectsInOneGroup
アクティビティのObjectsPerGroupToCopy
パラメータの値を修正
@activity('FilterRecordsForExecution').output.Value
12. MetadataDrivenCopyTask_xxx_BottomLevel
パイプラインにてUpdateWatermarkColumnValue
アクティビティのストアードプロシージャのパラメータにdataLoadingBehavior
を追加
@json(item().DataLoadingBehaviorSettings).dataLoadingBehavior
13. FullLoadOneObject
アクティビティの後にストアードプロシージャを追加
- 名前: 任意
- リンクドサービス: メイン制御テーブルと同じリンクドサービス
- ストアードプロシージャ名:
[adf_metadata].[UpdateWatermarkColumnValue_mx1]
- ストアードプロシージャのパラメータに
- Id:
@item().Id
- watermarkColumnStartValue:
''
- dataLoadingBehavior:
@json(item().DataLoadingBehaviorSettings).dataLoadingBehavior
- Id:
動作検証
基本的な動作確認
1. デバック実行を行い2つのテーブルが連携されたことを確認
2. Azure SQL Database にてメイン制御テーブルの列と
列が更新されていることを確認
SELECT
*
FROM
[adf_metadata].[MainControlTable_mx1]
3. Azure Data Factory にて再度デバック実行しデータが連携されないことを確認
失敗したパイプライン([IsLastTriggerSuccessful] = 0
)のみ実行されることを確認
1. Azure SQL Database にてメイン制御テーブルにおける片方のパイプラインのIsLastTriggerSuccessful
列を0
に設定
UPDATE [adf_metadata].[MainControlTable_mx1]
SET [IsLastTriggerSuccessful] = 0
WHERE Id = 1;
SELECT
*
FROM
[adf_metadata].[MainControlTable_mx1]
2. Azure Data Factory にてデバック実行し1つのみデータが連携されているを確認
失敗したパイプラインでは下記のように再実行。
成功済みのパイプラインでは、MetadataDrivenCopyTask_xxx_BottomLevel
にて下記のように処理がスキップ。
3. メイン制御テーブルが更新されていることを確認
SELECT
*
FROM
[adf_metadata].[MainControlTable_mx1]
変更履歴
1. エラーがあった場合にMetadataDrivenCopyTask_xxx_MiddleLevel
パイプラインのGetObjectsPerGroupToCopy
にて想定通りにレコードを取得できない事象への変更
事象について
MetadataDrivenCopyTask_xxx_MiddleLevel
パイプラインのGetObjectsPerGroupToCopy
にて下記のように連番で値を取得するようになっており、成功したパイプラインがある場合に連番のずれにより実行されない設定値があることが判明。
WITH OrderedControlTable AS (
SELECT *,
ROW_NUMBER() OVER (ORDER BY [TaskId], [Id] DESC) AS RowNumber
FROM meta.MainControlTable_pwf
WHERE TopLevelPipelineName = 'MetadataDrivenCopyTask_pwf_TopLevel'
AND TriggerName LIKE '%Sandbox%'
AND CopyEnabled = 1
AND IsLastTriggerSuccessful = 0
)
SELECT *
FROM OrderedControlTable
WHERE RowNumber BETWEEN 1 AND 1;
また、メタデータの仕様として1件の処理でエラーが発生しても処理は止まらずに、最後まで実施されることを把握しました。
変更箇所
GetSumOfObjectsToCopyアクティビティのクエリにて追記したIsLastTriggerSuccessful列のフィルタリングを削除。
MetadataDrivenCopyTask_xxx_MiddleLevel
パイプラインにてGetObjectsPerGroupToCopy
アクティビティのクエリにて追記した IsLastTriggerSuccessful列のフィルタリングを削除。
MetadataDrivenCopyTask_xxx_MiddleLevel
パイプラインにて Filter アクティビティを追加し、後続のCopyObjectsInOneGroup
アクティビティのパラメータを Filter アクティビティの出力値に変更。
変更による影響
従来まではエラーとなったパイプラインのみ実行される仕様であったが、正常終了したパイプラインはデータの取り込みをスキップするように変更
正常終了したパイプラインは下記のようにデータ取り込みをスキップ。
エラー修正したパイプラインはデータの取り込みを実施。