0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Data Factory のメタデータ駆動のコピータスクにて実行に失敗したコピーアクティビティのみ実行する方法

Last updated at Posted at 2024-06-07

概要

Azure Data Factory のメタデータ駆動のコピータスクにて実行に失敗したコピーアクティビティのみ実行する方法を共有します。メタデータ駆動のコピータスクにおける一部のパイプラインでエラーが発生した場合には、そのパイプラインのみ実行することができず、全体を再実行する必要があります。メタデータ駆動のコピータスクのメイン制御テーブル(MainControlTable)にて、実行タイミングと成功可否を保持させることで失敗したパイプラインのみ再実行するような実装方法を確立しました。

基本的な方針

メイン制御テーブル(MainControlTable)の下記のカラムにて、実行タイミングごとの成功可否を保持させます。その値の更新を行うために、実行タイミングごとにカラムの値の初期化処理の追加、実行対象パイプラインの抽出時に IsLastTriggerSuccessfull 列の追加、実行完了後の IsLastTriggerSuccessfullの更新処理の追加を実施する必要があります。実行タイミングについては、MetadataDrivenCopyTask_xxx_TopLevelに渡すパラメータ(TargetTriggerTimestamp)で設定します。

# カラム 概要
1 LastTriggeredTimestamp メタデータ駆動コピーのタスクの実行タイミングを保持する。
2 IsLastTriggerSuccessfull メタデータ駆動コピーのタスクの実行タイミングにて成功可否を保持する。

タイムスタンプを用いて実行タイミングを記録するため、日付と時刻を調整します。1日に1回の実行が必要な場合は、日付のみを含む値(例: 2024-06-01 00:00:00)を実行タイミングとして使用します。例えば、初回実行が6月1日の3:00に設定されており、エラーが発生した場合、6月1日の9:00に再実行することで、IsLastTriggerSuccessfull=0となっているパイプラインのみを再実行できます。次の日の6月2日の3:00に実行すると、実行タイミングが2024-06-02 00:00:00となり、すべてのパイプラインが実行されます。1日に複数回実行する必要がある場合は、特定の時刻に実行を丸めることで対応可能です。例として、1日に3回実行する場合、2024-06-01 00:00:002024-06-01 08:00:002024-06-01 16:00:00を実行タイミングとして設定します。

実行タイミングごとにカラムの値の初期化処理を行うために、ストアードプロシージャを追加しています。実行対象(TopLevelPipelineName列、TriggerName列、および、CopyEnabled列のフィルタリング結果)のLastTriggeredTimestamp列の値が想定の実行タイミングでない場合には、LastTriggeredTimestampIsLastTriggerSuccessful列の値を更新します。

image.png

<2024/8/13 追記 ここから>

実際に検証を実施したところ機能としては動作したのですが、特定のトリガーで実行することを前提とした場合に、エラー後の再実行にて TriggerName が Manual となってしまう課題が発覚しました。

image.png

その対応策としては下記の方法があり、なるべく 1 の方法を採用することをおすすめします。

  1. TriggerName をパラメータで渡すようにしておき再実行時にも同じ TriggerName を渡すようにする方法
  2. TriggerName でフィルタリングされるロジックをすべて削除する方法

TriggerName をパラメータで渡す方法は下記の記事で紹介しています。

<2024/8/13 追記 ここまで>

構築手順

前提条件

  • Azure SQL Database をソースとしたメタデータ駆動のコピータスクを構築すること
  • mx1のサフィックスがついたオブジェクト(例:MainControlTable_mx1)に対する SQL を記述するー

1. Azure SQL Database にてテーブル作成と初期データを登録

IF  OBJECT_ID(N'dbo.adf_metadata_01', N'U') IS NOT NULL
    DROP TABLE dbo.adf_metadata_01
CREATE TABLE dbo.adf_metadata_01
(
    ID INT IDENTITY(1,1) PRIMARY KEY,
	NAME varchar(10),
    update_timestamp DATETIME
);

INSERT INTO dbo.adf_metadata_01
(
	NAME,
	update_timestamp
)
SELECT 'A' AS NAME,CAST('2024-06-01' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'B' AS NAME,CAST('2024-06-02' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'C' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;



IF  OBJECT_ID(N'dbo.adf_metadata_02', N'U') IS NOT NULL
    DROP TABLE dbo.adf_metadata_02
CREATE TABLE dbo.adf_metadata_02
(
    ID INT IDENTITY(1,1) PRIMARY KEY,
	NAME varchar(10),
    update_timestamp DATETIME
);

INSERT INTO dbo.adf_metadata_02
(
	NAME,
	update_timestamp
)
SELECT 'A' AS NAME,CAST('2024-06-01' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'B' AS NAME,CAST('2024-06-02' AS datetime2) AS update_timestamp
UNION ALL
SELECT 'C' AS NAME,CAST('2024-06-03' AS datetime2) AS update_timestamp
;


SELECT
	'dbo.adf_metadata_01' AS TABLE_NAME,
	*
	FROM
		dbo.adf_metadata_01
UNION ALL
SELECT
	'dbo.adf_metadata_02' AS TABLE_NAME,
	*
	FROM
		dbo.adf_metadata_02;

image.png

2. Azure Data Factory にてメタデータ駆動コピータスクを構築

image.png

3. メイン制御テーブル(MainControlTable)にて2つのカラムを追加してテーブルを作成

    [LastTriggeredTimestamp] datetime2 DEFAULT '1900-01-01T00:00:00.000Z',
    [IsLastTriggerSuccessful] bit DEFAULT 0

image.png

4. UpdateWatermarkColumnValueストアードプロシージャをIsLastTriggerSuccessful列を連携方法に応じて更新するように変更して作成

CREATE PROCEDURE [adf_metadata].[UpdateWatermarkColumnValue_mx1]
@watermarkColumnStartValue nvarchar(max),
@Id [int],
@dataLoadingBehavior nvarchar(max)
AS
IF LOWER(@dataLoadingBehavior) = 'deltaload'
    UPDATE [adf_metadata].[MainControlTable_mx1]
    SET [DataLoadingBehaviorSettings] = JSON_MODIFY(
            [DataLoadingBehaviorSettings],
            '$.watermarkColumnStartValue',
            @watermarkColumnStartValue
        ),
        [IsLastTriggerSuccessful] = 1
    WHERE Id = @Id
ELSE
    UPDATE [adf_metadata].[MainControlTable_mx1]
    SET [IsLastTriggerSuccessful] = 1
    WHERE Id = @Id
GO

image.png

5. LastTriggeredTimestamp列とIsLastTriggerSuccessful列を更新するストアードプロシージャを作成

/*
このストアドプロシージャは、指定されたトリガーのステータスを更新します。

パラメータ:
- @TopLevelPipelineName: トップレベルパイプラインの名前(varchar(max))
- @TriggerName: トリガーの名前(nvarchar(max))
- @TriggerTimestampString: トリガーのタイムスタンプ(nvarchar(25))

処理内容:
1. [adf_metadata].[MainControlTable_fj6] テーブルから、指定されたトップレベルパイプライン名、トリガー名、およびコピーが有効なレコードの最大および最小のトリガータイムスタンプを取得します。
2. 取得した最大および最小のトリガータイムスタンプと、渡されたトリガータイムスタンプを比較します。
3. 最大および最小のトリガータイムスタンプが渡されたトリガータイムスタンプと異なる場合、[adf_metadata].[MainControlTable_fj6] テーブルの該当するレコードの IsLastTriggerSuccessful を 0 に更新します。

注意事項:
- このストアドプロシージャは、[adf_metadata].[MainControlTable_fj6] テーブルを使用します。
- トリガータイムスタンプは、datetime2 型にキャストされる必要があります。
*/

CREATE OR ALTER PROCEDURE [adf_metadata].[UpdateTriggerStatus_mx1]
    @TopLevelPipelineName varchar(max),
    @TriggerName nvarchar(max),
    @TriggerTimestampString nvarchar(25)
AS
BEGIN
    DECLARE @MaxTimestamp datetime2, @MinTimestamp datetime2;
    DECLARE @TriggerTimestamp datetime2;
    SET @TriggerTimestamp = CAST(@TriggerTimestampString AS datetime2);

    -- TriggerTimestampString の時刻に対する更新処理が既に実施されているかを確認
    SELECT
        @MaxTimestamp = MAX([LastTriggeredTimestamp]),
        @MinTimestamp = MIN([LastTriggeredTimestamp])
    FROM
        [adf_metadata].[MainControlTable_mx1]
    WHERE
        [TopLevelPipelineName] = @TopLevelPipelineName
        AND [TriggerName] LIKE '%' + @TriggerName + '%'
        AND [CopyEnabled] = 1;
    IF (@MaxTimestamp IS NULL OR @MaxTimestamp <> @TriggerTimestamp) OR (@MinTimestamp IS NULL OR @MinTimestamp <> @TriggerTimestamp)
    BEGIN
        -- TriggerTimestampString の時刻に対する更新処理が実施されていない場合に更新処理を実施
        UPDATE [adf_metadata].[MainControlTable_mx1]
        SET
            [LastTriggeredTimestamp] = @TriggerTimestamp,
            [IsLastTriggerSuccessful] = 0
        WHERE
            [TopLevelPipelineName] = @TopLevelPipelineName
            AND [TriggerName] LIKE '%' + @TriggerName + '%'
            AND [CopyEnabled] = 1;
    END
END

image.png

6. MetadataDrivenCopyTask_xxx_TopLevelパイプラインにてTargetTriggerTimestampパラメータを追加

image.png

7. MetadataDrivenCopyTask_xxx_TopLevelパイプラインにTargetTriggerTimestampString変数を追加

image.png

8. GetSumOfObjectsToCopyアクティビティの前にストアードプロシージャアクティビティを追加

  • 名前: 任意
  • リンクドサービス: メイン制御テーブルと同じリンクドサービス
  • ストアードプロシージャ名: [adf_metadata].[UpdateTriggerStatus_mx1]
  • ストアードプロシージャのパラメータに下記を設定
    • TopLevelPipelineName: @pipeline().Pipeline
    • TriggerName: @pipeline().TriggerName
    • TriggerTimestampString: @variables('TargetTriggerTimestampString')

image.png

9. 作成したストアードプロシージャアクティビティの前に変数の設定アクティビティを追加

  • 名前: 任意
  • 変数の種類: パイプライン変数
  • 名前: TargetTriggerTimestampString
  • 値: 下記の式
@if(
    empty(pipeline().parameters.TargetTriggerTimestamp)
    ,formatDateTime(
        utcnow()
        , 'yyyy-MM-dd 00:00:00'
    )
    ,pipeline().parameters.TargetTriggerTimestamp
)

image.png

10. MetadataDrivenCopyTask_xxx_TopLevelパイプラインにて、GetObjectsPerGroupToCopyアクティビティとCopyObjectsInOneGroupアクティビティの間に Filter アクティビティを追加

  • 名前: 任意
  • 項目: @activity('GetObjectsPerGroupToCopy').output.value
  • 条件: @equals(item().IsLastTriggerSuccessful, false)

image.png

11. CopyObjectsInOneGroupアクティビティのObjectsPerGroupToCopyパラメータの値を修正

@activity('FilterRecordsForExecution').output.Value

image.png

12. MetadataDrivenCopyTask_xxx_BottomLevelパイプラインにてUpdateWatermarkColumnValueアクティビティのストアードプロシージャのパラメータにdataLoadingBehaviorを追加

@json(item().DataLoadingBehaviorSettings).dataLoadingBehavior

image.png

13. FullLoadOneObjectアクティビティの後にストアードプロシージャを追加

  • 名前: 任意
  • リンクドサービス: メイン制御テーブルと同じリンクドサービス
  • ストアードプロシージャ名: [adf_metadata].[UpdateWatermarkColumnValue_mx1]
  • ストアードプロシージャのパラメータに
    • Id: @item().Id
    • watermarkColumnStartValue: ''
    • dataLoadingBehavior: @json(item().DataLoadingBehaviorSettings).dataLoadingBehavior

image.png

動作検証

基本的な動作確認

1. デバック実行を行い2つのテーブルが連携されたことを確認

image.png

image.png

2. Azure SQL Database にてメイン制御テーブルの列と列が更新されていることを確認

SELECT
	*
	FROM
		[adf_metadata].[MainControlTable_mx1]

image.png

3. Azure Data Factory にて再度デバック実行しデータが連携されないことを確認

image.png

失敗したパイプライン([IsLastTriggerSuccessful] = 0)のみ実行されることを確認

1. Azure SQL Database にてメイン制御テーブルにおける片方のパイプラインのIsLastTriggerSuccessful列を0に設定

UPDATE [adf_metadata].[MainControlTable_mx1]
SET [IsLastTriggerSuccessful] = 0
WHERE Id = 1;
SELECT
	*
	FROM
		[adf_metadata].[MainControlTable_mx1]

image.png

2. Azure Data Factory にてデバック実行し1つのみデータが連携されているを確認

失敗したパイプラインでは下記のように再実行。

image.png

成功済みのパイプラインでは、MetadataDrivenCopyTask_xxx_BottomLevelにて下記のように処理がスキップ。

image.png

3. メイン制御テーブルが更新されていることを確認

SELECT
	*
	FROM
		[adf_metadata].[MainControlTable_mx1]

image.png

変更履歴

1. エラーがあった場合にMetadataDrivenCopyTask_xxx_MiddleLevelパイプラインのGetObjectsPerGroupToCopyにて想定通りにレコードを取得できない事象への変更

事象について

MetadataDrivenCopyTask_xxx_MiddleLevelパイプラインのGetObjectsPerGroupToCopyにて下記のように連番で値を取得するようになっており、成功したパイプラインがある場合に連番のずれにより実行されない設定値があることが判明。

WITH OrderedControlTable AS (
    SELECT *,
        ROW_NUMBER() OVER (ORDER BY [TaskId], [Id] DESC) AS RowNumber
    FROM meta.MainControlTable_pwf
    WHERE TopLevelPipelineName = 'MetadataDrivenCopyTask_pwf_TopLevel'
        AND TriggerName LIKE '%Sandbox%'
        AND CopyEnabled = 1
        AND IsLastTriggerSuccessful = 0
)
SELECT *
FROM OrderedControlTable
WHERE RowNumber BETWEEN 1 AND 1;

image.png

また、メタデータの仕様として1件の処理でエラーが発生しても処理は止まらずに、最後まで実施されることを把握しました。

image.png

変更箇所

GetSumOfObjectsToCopyアクティビティのクエリにて追記したIsLastTriggerSuccessful列のフィルタリングを削除。

image.png

MetadataDrivenCopyTask_xxx_MiddleLevelパイプラインにてGetObjectsPerGroupToCopyアクティビティのクエリにて追記した IsLastTriggerSuccessful列のフィルタリングを削除。

image.png

MetadataDrivenCopyTask_xxx_MiddleLevelパイプラインにて Filter アクティビティを追加し、後続のCopyObjectsInOneGroupアクティビティのパラメータを Filter アクティビティの出力値に変更。

image.png

image.png

変更による影響

従来まではエラーとなったパイプラインのみ実行される仕様であったが、正常終了したパイプラインはデータの取り込みをスキップするように変更

image.png

正常終了したパイプラインは下記のようにデータ取り込みをスキップ。

image.png

エラー修正したパイプラインはデータの取り込みを実施。

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?