概要
Azure Data Factory のメタデータ駆動のコピータスクにてメタデータを Python (Databricks)環境で生成する方法を共有します。
メタデータ駆動のコピータスクを利用する場合にはメイン制御テーブルに登録する下記のような json 文字列を作成する必要があるのですが、Azure Data Factory 上で GUI で作成するの際にはとても退屈なときがあります。
[
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_02"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_02",
"folderPath": "adf_metadata_02",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1
}
]
下記のような Excel から json 形式の文字列を生成すれば開発生産性が高くなると考えました。
上記の Excel に記載されている下記のような 3 つのメタデータを生成します。Excel からの生成に加えて、Id
列を Excel で管理しwatermarkColumnStartValue
列を独立した列とすることで Merge 文により Upsert 処理でメイン制御テーブルを更新できるようにします。本記事では、その手順を紹介します。
クリックして json 形式の文字列を展開
[
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_01"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_01",
"folderPath": "adf_metadata_01",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1,
"Id": 1,
"watermarkColumnStartValue": null
},
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_02"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_02",
"folderPath": "adf_metadata_02",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1,
"Id": 2,
"watermarkColumnStartValue": null
},
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_03"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_03",
"folderPath": "adf_metadata_03",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1,
"Id": 3,
"watermarkColumnStartValue": null
}
]
手順
1. Azure Data Factory にてメタデータ駆動コピータスクを作成してメイン制御テーブルに登録する SQL 文をを取得
クリックして メイン制御テーブルに登録する SQL 文を展開
/****** Object: Table [dbo].[MainControlTable_m8e] ******/
CREATE TABLE [dbo].[MainControlTable_m8e](
[Id] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[SourceObjectSettings] [nvarchar](max) NULL,
[SourceConnectionSettingsName] [varchar](max) NULL,
[CopySourceSettings] [nvarchar](max) NULL,
[SinkObjectSettings] [nvarchar](max) NULL,
[SinkConnectionSettingsName] [varchar](max) NULL,
[CopySinkSettings] [nvarchar](max) NULL,
[CopyActivitySettings] [nvarchar](max) NULL,
[TopLevelPipelineName] [varchar](max) NULL,
[TriggerName] [nvarchar](max) NULL,
[DataLoadingBehaviorSettings] [nvarchar](max) NULL,
[TaskId] [int] NULL,
[CopyEnabled] [bit] NULL
)
DECLARE @MainControlMetadata NVARCHAR(max) = N'[
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_01"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": null,
"folderPath": null,
"fileSystem": "metadata-test"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1
}
]';
INSERT INTO [dbo].[MainControlTable_m8e] (
[SourceObjectSettings],
[SourceConnectionSettingsName],
[CopySourceSettings],
[SinkObjectSettings],
[SinkConnectionSettingsName],
[CopySinkSettings],
[CopyActivitySettings],
[TopLevelPipelineName],
[TriggerName],
[DataLoadingBehaviorSettings],
[TaskId],
[CopyEnabled])
SELECT * FROM OPENJSON(@MainControlMetadata)
WITH ([SourceObjectSettings] [nvarchar](max) AS JSON,
[SourceConnectionSettingsName] [varchar](max),
[CopySourceSettings] [nvarchar](max) AS JSON,
[SinkObjectSettings] [nvarchar](max) AS JSON,
[SinkConnectionSettingsName] [varchar](max),
[CopySinkSettings] [nvarchar](max) AS JSON,
[CopyActivitySettings] [nvarchar](max) AS JSON,
[TopLevelPipelineName] [varchar](max),
[TriggerName] [nvarchar](max) AS JSON,
[DataLoadingBehaviorSettings] [nvarchar](max) AS JSON,
[TaskId] [int],
[CopyEnabled] [bit])
2. Excel にて json 形式の文字列における修正箇所の一覧を作成
本記事では下記のような Excel ファイルにより置換します。階層になっている項目についてはドット(.
)により区切り、後述する Python コードにて json 内の文字列を置換します。
- Excel ファイルにて下記を実施
-
管理テーブル.xlsx
というファイル名とする -
管理テーブル
というシート名とする - 表を
テーブル1
というテーブルで保持する
-
- 下記の列を保持
- Id
- SourceObjectSettings.schema
- SourceObjectSettings.table
- SinkObjectSettings.fileSystem
- SinkObjectSettings.folderPath
- SinkObjectSettings.fileName
- watermarkColumnStartValue
3. Databricks Repos を作成してconf
というディレクトリを作成
4. Excel ファイルをインポート
5. conf
フォルダの 1 つ上の階層に下記のコードをもつファイルをインポート
クリックして Python コードを展開
# Databricks notebook source
# MAGIC %md
# MAGIC ## データベースオブジェクトを作成
# COMMAND ----------
# MAGIC %md
# MAGIC ## メタデータ駆動コピーアクティビティを追加する方法
# COMMAND ----------
# MAGIC %md
# MAGIC #### ライブラリや変数を定義
# COMMAND ----------
# MAGIC %pip install openpyxl -q
# MAGIC dbutils.library.restartPython()
# COMMAND ----------
import json
import os
import pprint
import pandas as pd
import inspect
from collections import OrderedDict
from openpyxl import load_workbook
from pyspark.sql.functions import expr, max
# COMMAND ----------
# CreateControlTable_xxx.sql における変数の一部を貼り付け
base_json_str_01 = """{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_01"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": null,
"folderPath": "test06",
"fileSystem": "metadata-test"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1
}"""
# base_json_str に追加すべき設定を追加
additional_conf = {
"Id": 0,
"watermarkColumnStartValue": "",
}
base_json_dict = json.loads(base_json_str_01)
base_dict = {**base_json_dict, **additional_conf}
sort_json_dict = json.loads(base_json_str_01, object_pairs_hook=OrderedDict)
base_dict = {**sort_json_dict, **additional_conf}
sort_order = list(base_dict.keys())
print(sort_order)
pprint.pprint(base_dict)
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Excel ファイルにより設定値を変更
# COMMAND ----------
# Excel ファイルに関する情報を定義
folder_name = "conf"
file_name = "管理テーブル.xlsx"
sheet_name = "管理テーブル"
table_name = "テーブル1"
# Excel から取得する列の一覧を定義
tgt_columns = [
"Id",
"SourceObjectSettings.schema",
"SourceObjectSettings.table",
"SinkObjectSettings.fileSystem",
"SinkObjectSettings.folderPath",
"SinkObjectSettings.fileName",
"watermarkColumnStartValue",
]
# ベースとなる設定値にて json 形式の文字列として保持しない列を定義
non_json_cols = [
"Id",
"TaskId",
"CopyEnabled",
"TopLevelPipelineName",
"TriggerName",
"DatabricksNotebook",
"SwapDataLoadingBehaviorSettings",
"watermarkColumnStartValue"
]
# folder_name が指定されている場合には / を追加
if folder_name != "":
folder_name = f"/{folder_name}"
# 現在のディレクトリに基づきファイルパスを定義
current_directory = os.getcwd()
current_dir = current_directory
file_path = f"{current_dir}{folder_name}/{file_name}"
# COMMAND ----------
# Excelファイルからワークブックを読み込む
wb = load_workbook(filename=file_path, data_only=True)
# テーブルのセル範囲を取得する
table = wb[sheet_name].tables["テーブル1"]
data = table.ref
# セル範囲のデータを2次元のリストとして取得する
values = []
for row in wb[sheet_name][data]:
row_values = [cell.value for cell in row]
values.append(row_values)
# Pandas データフレームを作成
pdf = pd.DataFrame(values[1:], columns=values[0])
pdf = pdf[tgt_columns]
# Spark データフレームに変換後、カラムの値を調整
df = spark.createDataFrame(pdf)
for col_s in df.schema:
col_n = col_s.name
# タイムスタンプ型の場合に、文字列型に変換する
if col_s.dataType.typeName() == "timestamp":
df = df.withColumn(
col_n,
expr(f"date_format(`{col_n}`, 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS\\'Z\\'')"),
)
# TriggerName 列にて Sandbox を追加して Array 型に変換
if col_n == "TriggerName":
df = df.withColumn(col_n, expr(f"array('Sandbox', `{col_n}`)"))
# Spark データフレームを 辞書型変数に変換
config_changes_dict = []
for row in df.collect():
config_changes_dict.append(row.asDict())
# データフレームの内容を表示
df.display()
# COMMAND ----------
returned_json_dicts = []
for modifed_c in config_changes_dict:
modified_dict = base_dict.copy()
# Excel ファイル内容に基づきベースとなる設定値の内容を変更
for val_k, val_v in modifed_c.items():
# `.`を含むカラム名の場合、`.`を区切り文字として分割し、辞書型変数にて値を更新する
tgt_items = val_k.split(".")
if len(tgt_items) == 1:
modified_dict[tgt_items[-1]] = val_v
else:
value = modified_dict
for tgt_i in tgt_items[:-1]:
value = value.setdefault(tgt_i, {})
value[tgt_items[-1]] = val_v
# json 形式の文字列として保持すべきカラムにて文字列化
returned_json_dict = {}
for col_n in modified_dict.keys():
if col_n in non_json_cols:
returned_json_dict[col_n] = modified_dict[col_n]
else:
returned_json_dict[col_n] = dict(modified_dict[col_n])
# Databricks に制御テーブルを保持する場合には下記を有効化
# returned_json_dict[col_n] = json.dumps(modified_dict[col_n], ensure_ascii=False)
returned_json_dicts.append(dict(returned_json_dict))
sorted_returned_json_dicts = [{k: d[k] for k in sort_order} for d in returned_json_dicts]
returned_json = json.dumps(
sorted_returned_json_dicts,
indent=4,
ensure_ascii=False,
)
print(returned_json)
# COMMAND ----------
# 書き込み json の情報を定義
folder_name = "conf"
json_file_name = "metadta.json"
# json ファイルとして書き込み
current_directory = os.getcwd()
tgt_directory = os.path.join(current_directory, folder_name)
file_path = os.path.join(tgt_directory, json_file_name)
with open(file_path, 'w') as file:
file.write(returned_json)
# COMMAND ----------
# end
6. cmd 6 におけるbase_json_str_01
変数を取得した単一の json 形式の文字列に変更
7. ノートブックを実行し正常終了することを確認
8. /conf/metadta.json
ファイルを開き、ファイル内容を記録する
9. テーブル作成文にてId
列のIDENTITY プロパティを削除、及び、watermarkColumnStartValue
列を追加後にテーブルを作成を実行
CREATE TABLE [dbo].[MainControlTable_m8e](
- [Id] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
+ [Id] [int] NOT NULL PRIMARY KEY,
[SourceObjectSettings] [nvarchar](max) NULL,
[SourceConnectionSettingsName] [varchar](max) NULL,
[CopySourceSettings] [nvarchar](max) NULL,
[SinkObjectSettings] [nvarchar](max) NULL,
[SinkConnectionSettingsName] [varchar](max) NULL,
[CopySinkSettings] [nvarchar](max) NULL,
[CopyActivitySettings] [nvarchar](max) NULL,
[TopLevelPipelineName] [varchar](max) NULL,
[TriggerName] [nvarchar](max) NULL,
[DataLoadingBehaviorSettings] [nvarchar](max) NULL,
+ [watermarkColumnStartValue] [nvarchar](max) NULL,
[TaskId] [int] NULL,
[CopyEnabled] [bit] NULL
)
10. INSERT 文を MERGE 文に変更
DECLARE @MainControlMetadata NVARCHAR(max) = N'[]';
MERGE INTO [dbo].[MainControlTable_m8e] AS TARGET
USING (
SELECT * FROM OPENJSON(@MainControlMetadata)
WITH (
[Id] [int],
[SourceObjectSettings] [nvarchar](max) AS JSON,
[SourceConnectionSettingsName] [varchar](max),
[CopySourceSettings] [nvarchar](max) AS JSON,
[SinkObjectSettings] [nvarchar](max) AS JSON,
[SinkConnectionSettingsName] [varchar](max),
[CopySinkSettings] [nvarchar](max) AS JSON,
[CopyActivitySettings] [nvarchar](max) AS JSON,
[TopLevelPipelineName] [varchar](max),
[TriggerName] [nvarchar](max) AS JSON,
[DataLoadingBehaviorSettings] [nvarchar](max) AS JSON,
[watermarkColumnStartValue] nvarchar(max),
[TaskId] [int],
[CopyEnabled] [bit]
)
) AS SOURCE
ON TARGET.[Id] = SOURCE.[Id]
WHEN MATCHED THEN
UPDATE SET
TARGET.[SourceObjectSettings] = SOURCE.[SourceObjectSettings],
TARGET.[SourceConnectionSettingsName] = SOURCE.[SourceConnectionSettingsName],
TARGET.[CopySourceSettings] = SOURCE.[CopySourceSettings],
TARGET.[SinkObjectSettings] = SOURCE.[SinkObjectSettings],
TARGET.[SinkConnectionSettingsName] = SOURCE.[SinkConnectionSettingsName],
TARGET.[CopySinkSettings] = SOURCE.[CopySinkSettings],
TARGET.[CopyActivitySettings] = SOURCE.[CopyActivitySettings],
TARGET.[TopLevelPipelineName] = SOURCE.[TopLevelPipelineName],
TARGET.[TriggerName] = SOURCE.[TriggerName],
TARGET.[DataLoadingBehaviorSettings] = SOURCE.[DataLoadingBehaviorSettings],
TARGET.[TaskId] = SOURCE.[TaskId],
TARGET.[CopyEnabled] = SOURCE.[CopyEnabled]
WHEN NOT MATCHED BY TARGET THEN
INSERT (
[Id],
[SourceObjectSettings],
[SourceConnectionSettingsName],
[CopySourceSettings],
[SinkObjectSettings],
[SinkConnectionSettingsName],
[CopySinkSettings],
[CopyActivitySettings],
[TopLevelPipelineName],
[TriggerName],
[DataLoadingBehaviorSettings],
[watermarkColumnStartValue],
[TaskId],
[CopyEnabled]
)
VALUES (
SOURCE.[Id],
SOURCE.[SourceObjectSettings],
SOURCE.[SourceConnectionSettingsName],
SOURCE.[CopySourceSettings],
SOURCE.[SinkObjectSettings],
SOURCE.[SinkConnectionSettingsName],
SOURCE.[CopySinkSettings],
SOURCE.[CopyActivitySettings],
SOURCE.[TopLevelPipelineName],
SOURCE.[TriggerName],
SOURCE.[DataLoadingBehaviorSettings],
SOURCE.[watermarkColumnStartValue],
SOURCE.[TaskId],
SOURCE.[CopyEnabled]
);
11. MainControlMetadata
変数に生成した json 形式の文字列をセットした上で実行
クリックして メイン制御テーブルに登録する SQL 文を展開
DECLARE @MainControlMetadata NVARCHAR(max) = N'[
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_01"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_01",
"folderPath": "adf_metadata_01",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1,
"Id": 1,
"watermarkColumnStartValue": null
},
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_02"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_02",
"folderPath": "adf_metadata_02",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1,
"Id": 2,
"watermarkColumnStartValue": null
},
{
"SourceObjectSettings": {
"schema": "dbo",
"table": "adf_metadata_03"
},
"SinkObjectSettings": {
"compressionCodec": "snappy",
"fileName": "adf_metadata_03",
"folderPath": "adf_metadata_03",
"fileSystem": "adf-copy"
},
"CopySourceSettings": {
"partitionOption": "None",
"sqlReaderQuery": null,
"partitionLowerBound": null,
"partitionUpperBound": null,
"partitionColumnName": null,
"partitionNames": null
},
"CopyActivitySettings": {
"translator": {
"type": "TabularTranslator",
"typeConversion": true
}
},
"TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
"TriggerName": [
"Sandbox",
"Manual"
],
"DataLoadingBehaviorSettings": {
"dataLoadingBehavior": "FullLoad"
},
"TaskId": 0,
"CopyEnabled": 1,
"Id": 3,
"watermarkColumnStartValue": null
}
]';
MERGE INTO [dbo].[MainControlTable_m8e] AS TARGET
USING (
SELECT * FROM OPENJSON(@MainControlMetadata)
WITH (
[Id] [int],
[SourceObjectSettings] [nvarchar](max) AS JSON,
[SourceConnectionSettingsName] [varchar](max),
[CopySourceSettings] [nvarchar](max) AS JSON,
[SinkObjectSettings] [nvarchar](max) AS JSON,
[SinkConnectionSettingsName] [varchar](max),
[CopySinkSettings] [nvarchar](max) AS JSON,
[CopyActivitySettings] [nvarchar](max) AS JSON,
[TopLevelPipelineName] [varchar](max),
[TriggerName] [nvarchar](max) AS JSON,
[DataLoadingBehaviorSettings] [nvarchar](max) AS JSON,
[watermarkColumnStartValue] nvarchar(max),
[TaskId] [int],
[CopyEnabled] [bit]
)
) AS SOURCE
ON TARGET.[Id] = SOURCE.[Id]
WHEN MATCHED THEN
UPDATE SET
TARGET.[SourceObjectSettings] = SOURCE.[SourceObjectSettings],
TARGET.[SourceConnectionSettingsName] = SOURCE.[SourceConnectionSettingsName],
TARGET.[CopySourceSettings] = SOURCE.[CopySourceSettings],
TARGET.[SinkObjectSettings] = SOURCE.[SinkObjectSettings],
TARGET.[SinkConnectionSettingsName] = SOURCE.[SinkConnectionSettingsName],
TARGET.[CopySinkSettings] = SOURCE.[CopySinkSettings],
TARGET.[CopyActivitySettings] = SOURCE.[CopyActivitySettings],
TARGET.[TopLevelPipelineName] = SOURCE.[TopLevelPipelineName],
TARGET.[TriggerName] = SOURCE.[TriggerName],
TARGET.[DataLoadingBehaviorSettings] = SOURCE.[DataLoadingBehaviorSettings],
TARGET.[TaskId] = SOURCE.[TaskId],
TARGET.[CopyEnabled] = SOURCE.[CopyEnabled]
WHEN NOT MATCHED BY TARGET THEN
INSERT (
[Id],
[SourceObjectSettings],
[SourceConnectionSettingsName],
[CopySourceSettings],
[SinkObjectSettings],
[SinkConnectionSettingsName],
[CopySinkSettings],
[CopyActivitySettings],
[TopLevelPipelineName],
[TriggerName],
[DataLoadingBehaviorSettings],
[watermarkColumnStartValue],
[TaskId],
[CopyEnabled]
)
VALUES (
SOURCE.[Id],
SOURCE.[SourceObjectSettings],
SOURCE.[SourceConnectionSettingsName],
SOURCE.[CopySourceSettings],
SOURCE.[SinkObjectSettings],
SOURCE.[SinkConnectionSettingsName],
SOURCE.[CopySinkSettings],
SOURCE.[CopyActivitySettings],
SOURCE.[TopLevelPipelineName],
SOURCE.[TriggerName],
SOURCE.[DataLoadingBehaviorSettings],
SOURCE.[watermarkColumnStartValue],
SOURCE.[TaskId],
SOURCE.[CopyEnabled]
);