概要
Azure Data Factory のメタデータ駆動のコピータスクにて、DataLoadingBehaviorSettings
列の json 内に保持されているwatermarkColumnStartValue
をwatermarkColumnStartValue
列を単独の列として保持する方法を共有します。
実施手順
1. テーブル作成とデータ挿入のスクリプトを修正
テーブル定義にてwatermarkColumnStartValue
列を追加。
CREATE TABLE [dbo].[MainControlTable_557](
[Id] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[SourceObjectSettings] [nvarchar](max) NULL,
[SourceConnectionSettingsName] [varchar](max) NULL,
[CopySourceSettings] [nvarchar](max) NULL,
[SinkObjectSettings] [nvarchar](max) NULL,
[SinkConnectionSettingsName] [varchar](max) NULL,
[CopySinkSettings] [nvarchar](max) NULL,
[CopyActivitySettings] [nvarchar](max) NULL,
[TopLevelPipelineName] [varchar](max) NULL,
[TriggerName] [nvarchar](max) NULL,
[DataLoadingBehaviorSettings] [nvarchar](max) NULL,
+ watermarkColumnStartValue [nvarchar](max) NULL,
[TaskId] [int] NULL,
[CopyEnabled] [bit] NULL
)
メイン制御テーブルにおけるDataLoadingBehaviorSettings
列の json 内に保持されているwatermarkColumnStartValue
をwatermarkColumnStartValue
列として単独化。
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "DeltaLoad",
"watermarkColumnName": "update_timestamp",
"watermarkColumnType": "DateTime",
- "watermarkColumnStartValue": "2000-01-01T00:00:00.000Z"
},
+ "watermarkColumnStartValue": "2000-01-01T00:00:00.000Z",
"TaskId": 0,
2. メイン制御テーブルへの更新文を修正
UPDATE [dbo].[MainControlTable_557]
- SET [DataLoadingBehaviorSettings]=JSON_MODIFY([DataLoadingBehaviorSettings],'$.watermarkColumnStartValue', @watermarkColumnStartValue) WHERE Id = @Id
+ SET [watermarkColumnStartValue]=@watermarkColumnStartValue WHERE Id = @Id
3. MetadataDrivenCopyTask_xxx_BottomLevel
におけるDeltaLoadOneObject
アクティビティのソースのクエリを変更
- json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
+ item().watermarkColumnStartValue,
- concat('''', json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue, ''''))}
+ concat('''', item().watermarkColumnStartValue, ''''))}
3. MetadataDrivenCopyTask_xxx_BottomLevel
におけるDeltaLoadOneObject
アクティビティのソースのクエリを変更
- json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue,
+ item().watermarkColumnStartValue,
- concat('''', json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue, ''''))}
+ concat('''', item().watermarkColumnStartValue, ''''))}
4. MetadataDrivenCopyTask_xxx_BottomLevel
におけるDeltaLoadOneObject
アクティビティのシンクcw_fileName
のを変更
- @{json(item().SinkObjectSettings).fileName}-@{json(item().DataLoadingBehaviorSettings).watermarkColumnStartValue}-@{activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue}
+ @{json(item().SinkObjectSettings).fileName}-@{item().watermarkColumnStartValue}-@{activity('GetMaxWatermarkValue').output.firstRow.CurrentMaxWaterMarkColumnValue}
動作確認
パイプラインを実行して正常終了することを確認
メイン制御テーブルにおけるwatermarkColumnStartValue
列が更新されていることを確認
SELECT * FROM [dbo].[MainControlTable_557]