LoginSignup
0
0
この記事誰得? 私しか得しないニッチな技術で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Azure Data Factory のメタデータ駆動のコピータスクにてメタデータを Python (Databricks)環境で生成する方法

Posted at

概要

Azure Data Factory のメタデータ駆動のコピータスクにてメタデータを Python (Databricks)環境で生成する方法を共有します。

メタデータ駆動のコピータスクを利用する場合にはメイン制御テーブルに登録する下記のような json 文字列を作成する必要があるのですが、Azure Data Factory 上で GUI で作成するの際にはとても退屈なときがあります。

[
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_02"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_02",
            "folderPath": "adf_metadata_02",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1
    }
]

下記のような Excel から json 形式の文字列を生成すれば開発生産性が高くなると考えました。

image.png

上記の Excel に記載されている下記のような 3 つのメタデータを生成します。Excel からの生成に加えて、Id列を Excel で管理しwatermarkColumnStartValue列を独立した列とすることで Merge 文により Upsert 処理でメイン制御テーブルを更新できるようにします。本記事では、その手順を紹介します。

クリックして json 形式の文字列を展開
[
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_01"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_01",
            "folderPath": "adf_metadata_01",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1,
        "Id": 1,
        "watermarkColumnStartValue": null
    },
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_02"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_02",
            "folderPath": "adf_metadata_02",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1,
        "Id": 2,
        "watermarkColumnStartValue": null
    },
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_03"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_03",
            "folderPath": "adf_metadata_03",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_0jc_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1,
        "Id": 3,
        "watermarkColumnStartValue": null
    }
]

手順

1. Azure Data Factory にてメタデータ駆動コピータスクを作成してメイン制御テーブルに登録する SQL 文をを取得

クリックして メイン制御テーブルに登録する SQL 文を展開
            /****** Object:  Table [dbo].[MainControlTable_m8e] ******/
            CREATE TABLE [dbo].[MainControlTable_m8e](
                [Id] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
                [SourceObjectSettings] [nvarchar](max) NULL,
                [SourceConnectionSettingsName] [varchar](max) NULL,
                [CopySourceSettings] [nvarchar](max) NULL,
                [SinkObjectSettings] [nvarchar](max) NULL,
                [SinkConnectionSettingsName] [varchar](max) NULL,
                [CopySinkSettings] [nvarchar](max) NULL,
                [CopyActivitySettings] [nvarchar](max) NULL,
                [TopLevelPipelineName] [varchar](max) NULL,
                [TriggerName] [nvarchar](max) NULL,
                [DataLoadingBehaviorSettings] [nvarchar](max) NULL,
                [TaskId] [int] NULL,
                [CopyEnabled] [bit] NULL
        ) 
            DECLARE @MainControlMetadata NVARCHAR(max)  = N'[
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_01"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": null,
            "folderPath": null,
            "fileSystem": "metadata-test"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1
    }
]';
            INSERT INTO [dbo].[MainControlTable_m8e] (
                [SourceObjectSettings],
                [SourceConnectionSettingsName],
                [CopySourceSettings],
                [SinkObjectSettings],
                [SinkConnectionSettingsName],
                [CopySinkSettings],
                [CopyActivitySettings],
                [TopLevelPipelineName],
                [TriggerName],
                [DataLoadingBehaviorSettings],
                [TaskId],
                [CopyEnabled])
            SELECT * FROM OPENJSON(@MainControlMetadata)
                WITH ([SourceObjectSettings] [nvarchar](max) AS JSON,
                [SourceConnectionSettingsName] [varchar](max),
                [CopySourceSettings] [nvarchar](max) AS JSON,
                [SinkObjectSettings] [nvarchar](max) AS JSON,
                [SinkConnectionSettingsName] [varchar](max),
                [CopySinkSettings] [nvarchar](max) AS JSON,
                [CopyActivitySettings] [nvarchar](max) AS JSON,
                [TopLevelPipelineName] [varchar](max),
                [TriggerName] [nvarchar](max) AS JSON,
                [DataLoadingBehaviorSettings] [nvarchar](max) AS JSON,
                [TaskId] [int],
                [CopyEnabled] [bit])

image.png

2. Excel にて json 形式の文字列における修正箇所の一覧を作成

本記事では下記のような Excel ファイルにより置換します。階層になっている項目についてはドット(.)により区切り、後述する Python コードにて json 内の文字列を置換します。

  • Excel ファイルにて下記を実施
    • 管理テーブル.xlsxというファイル名とする
    • 管理テーブルというシート名とする
    • 表をテーブル1というテーブルで保持する
  • 下記の列を保持
    • Id
    • SourceObjectSettings.schema
    • SourceObjectSettings.table
    • SinkObjectSettings.fileSystem
    • SinkObjectSettings.folderPath
    • SinkObjectSettings.fileName
    • watermarkColumnStartValue

image.png

3. Databricks Repos を作成してconfというディレクトリを作成

image.png

4. Excel ファイルをインポート

image.png

5. confフォルダの 1 つ上の階層に下記のコードをもつファイルをインポート

クリックして Python コードを展開
# Databricks notebook source
# MAGIC %md
# MAGIC ## データベースオブジェクトを作成

# COMMAND ----------

# MAGIC %md
# MAGIC ## メタデータ駆動コピーアクティビティを追加する方法

# COMMAND ----------

# MAGIC %md
# MAGIC #### ライブラリや変数を定義

# COMMAND ----------

# MAGIC %pip install openpyxl -q
# MAGIC dbutils.library.restartPython()

# COMMAND ----------

import json
import os
import pprint
import pandas as pd
import inspect

from collections import OrderedDict
from openpyxl import load_workbook
from pyspark.sql.functions import expr, max

# COMMAND ----------

# CreateControlTable_xxx.sql における変数の一部を貼り付け
base_json_str_01 = """{
    "SourceObjectSettings": {
        "schema": "dbo",
        "table": "adf_metadata_01"
    },
    "SinkObjectSettings": {
        "compressionCodec": "snappy",
        "fileName": null,
        "folderPath": "test06",
        "fileSystem": "metadata-test"
    },
    "CopySourceSettings": {
        "partitionOption": "None",
        "sqlReaderQuery": null,
        "partitionLowerBound": null,
        "partitionUpperBound": null,
        "partitionColumnName": null,
        "partitionNames": null
    },
    "CopyActivitySettings": {
        "translator": {
            "type": "TabularTranslator",
            "typeConversion": true
        }
    },
    "TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
    "TriggerName": [
        "Sandbox",
        "Manual"
    ],
    "DataLoadingBehaviorSettings": {
        "dataLoadingBehavior": "FullLoad"
    },
    "TaskId": 0,
    "CopyEnabled": 1
}"""

# base_json_str に追加すべき設定を追加
additional_conf = {
    "Id": 0,
    "watermarkColumnStartValue": "",
}

base_json_dict = json.loads(base_json_str_01)
base_dict = {**base_json_dict, **additional_conf}

sort_json_dict = json.loads(base_json_str_01, object_pairs_hook=OrderedDict)
base_dict = {**sort_json_dict, **additional_conf}
sort_order = list(base_dict.keys())

print(sort_order)
pprint.pprint(base_dict)

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC #### Excel ファイルにより設定値を変更

# COMMAND ----------

# Excel ファイルに関する情報を定義
folder_name = "conf"
file_name = "管理テーブル.xlsx"
sheet_name = "管理テーブル"
table_name = "テーブル1"

# Excel から取得する列の一覧を定義
tgt_columns = [
    "Id",
    "SourceObjectSettings.schema",
    "SourceObjectSettings.table",
    "SinkObjectSettings.fileSystem",
    "SinkObjectSettings.folderPath",
    "SinkObjectSettings.fileName",
    "watermarkColumnStartValue",
]

# ベースとなる設定値にて json 形式の文字列として保持しない列を定義
non_json_cols = [
    "Id",
    "TaskId",
    "CopyEnabled",
    "TopLevelPipelineName",
    "TriggerName",
    "DatabricksNotebook",
    "SwapDataLoadingBehaviorSettings",
    "watermarkColumnStartValue"
]

# folder_name が指定されている場合には / を追加
if folder_name != "":
    folder_name = f"/{folder_name}"

# 現在のディレクトリに基づきファイルパスを定義
current_directory = os.getcwd()
current_dir = current_directory
file_path = f"{current_dir}{folder_name}/{file_name}"

# COMMAND ----------

# Excelファイルからワークブックを読み込む
wb = load_workbook(filename=file_path, data_only=True)

# テーブルのセル範囲を取得する
table = wb[sheet_name].tables["テーブル1"]
data = table.ref

# セル範囲のデータを2次元のリストとして取得する
values = []
for row in wb[sheet_name][data]:
    row_values = [cell.value for cell in row]
    values.append(row_values)

# Pandas データフレームを作成
pdf = pd.DataFrame(values[1:], columns=values[0])
pdf = pdf[tgt_columns]

# Spark データフレームに変換後、カラムの値を調整
df = spark.createDataFrame(pdf)
for col_s in df.schema:
    col_n = col_s.name
    # タイムスタンプ型の場合に、文字列型に変換する
    if col_s.dataType.typeName() == "timestamp":
        df = df.withColumn(
            col_n,
            expr(f"date_format(`{col_n}`, 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS\\'Z\\'')"),
        )
    # TriggerName 列にて Sandbox を追加して Array 型に変換
    if col_n == "TriggerName":
        df = df.withColumn(col_n, expr(f"array('Sandbox', `{col_n}`)"))

# Spark データフレームを 辞書型変数に変換
config_changes_dict = []
for row in df.collect():
    config_changes_dict.append(row.asDict())

# データフレームの内容を表示
df.display()

# COMMAND ----------

returned_json_dicts = []
for modifed_c in config_changes_dict:
    modified_dict = base_dict.copy()

    # Excel ファイル内容に基づきベースとなる設定値の内容を変更
    for val_k, val_v in modifed_c.items():
        # `.`を含むカラム名の場合、`.`を区切り文字として分割し、辞書型変数にて値を更新する
        tgt_items = val_k.split(".")
        if len(tgt_items) == 1:
            modified_dict[tgt_items[-1]] = val_v
        else:
            value = modified_dict
            for tgt_i in tgt_items[:-1]:
                value = value.setdefault(tgt_i, {})
            value[tgt_items[-1]] = val_v

    # json 形式の文字列として保持すべきカラムにて文字列化
    returned_json_dict = {}
    for col_n in modified_dict.keys():
        if col_n in non_json_cols:
            returned_json_dict[col_n] = modified_dict[col_n]
        else:
            returned_json_dict[col_n] = dict(modified_dict[col_n])
            # Databricks に制御テーブルを保持する場合には下記を有効化
            # returned_json_dict[col_n] = json.dumps(modified_dict[col_n], ensure_ascii=False)

    returned_json_dicts.append(dict(returned_json_dict))

sorted_returned_json_dicts = [{k: d[k] for k in sort_order} for d in returned_json_dicts]
returned_json = json.dumps(
    sorted_returned_json_dicts,
    indent=4,
    ensure_ascii=False,
)
print(returned_json)

# COMMAND ----------

# 書き込み json の情報を定義
folder_name = "conf"
json_file_name = "metadta.json"

# json ファイルとして書き込み
current_directory = os.getcwd()
tgt_directory = os.path.join(current_directory, folder_name)
file_path = os.path.join(tgt_directory, json_file_name)
with open(file_path, 'w') as file:
    file.write(returned_json)

# COMMAND ----------

# end

image.png

6. cmd 6 におけるbase_json_str_01変数を取得した単一の json 形式の文字列に変更

image.png

7. ノートブックを実行し正常終了することを確認

image.png

8. /conf/metadta.jsonファイルを開き、ファイル内容を記録する

image.png

9. テーブル作成文にてId列のIDENTITY プロパティを削除、及び、watermarkColumnStartValue列を追加後にテーブルを作成を実行

CREATE TABLE [dbo].[MainControlTable_m8e](
-    [Id] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
+    [Id] [int] NOT NULL PRIMARY KEY,
    [SourceObjectSettings] [nvarchar](max) NULL,
    [SourceConnectionSettingsName] [varchar](max) NULL,
    [CopySourceSettings] [nvarchar](max) NULL,
    [SinkObjectSettings] [nvarchar](max) NULL,
    [SinkConnectionSettingsName] [varchar](max) NULL,
    [CopySinkSettings] [nvarchar](max) NULL,
    [CopyActivitySettings] [nvarchar](max) NULL,
    [TopLevelPipelineName] [varchar](max) NULL,
    [TriggerName] [nvarchar](max) NULL,
    [DataLoadingBehaviorSettings] [nvarchar](max) NULL,
+    [watermarkColumnStartValue] [nvarchar](max) NULL,
    [TaskId] [int] NULL,
    [CopyEnabled] [bit] NULL
)

image.png

10. INSERT 文を MERGE 文に変更

DECLARE @MainControlMetadata NVARCHAR(max)  = N'[]';
MERGE INTO [dbo].[MainControlTable_m8e] AS TARGET
USING (
    SELECT * FROM OPENJSON(@MainControlMetadata)
    WITH (
        [Id] [int],
        [SourceObjectSettings] [nvarchar](max) AS JSON,
        [SourceConnectionSettingsName] [varchar](max),
        [CopySourceSettings] [nvarchar](max) AS JSON,
        [SinkObjectSettings] [nvarchar](max) AS JSON,
        [SinkConnectionSettingsName] [varchar](max),
        [CopySinkSettings] [nvarchar](max) AS JSON,
        [CopyActivitySettings] [nvarchar](max) AS JSON,
        [TopLevelPipelineName] [varchar](max),
        [TriggerName] [nvarchar](max) AS JSON,
        [DataLoadingBehaviorSettings] [nvarchar](max) AS JSON,
        [watermarkColumnStartValue] nvarchar(max),
        [TaskId] [int],
        [CopyEnabled] [bit]
    )
) AS SOURCE
ON TARGET.[Id] = SOURCE.[Id]
WHEN MATCHED THEN
    UPDATE SET
        TARGET.[SourceObjectSettings] = SOURCE.[SourceObjectSettings],
        TARGET.[SourceConnectionSettingsName] = SOURCE.[SourceConnectionSettingsName],
        TARGET.[CopySourceSettings] = SOURCE.[CopySourceSettings],
        TARGET.[SinkObjectSettings] = SOURCE.[SinkObjectSettings],
        TARGET.[SinkConnectionSettingsName] = SOURCE.[SinkConnectionSettingsName],
        TARGET.[CopySinkSettings] = SOURCE.[CopySinkSettings],
        TARGET.[CopyActivitySettings] = SOURCE.[CopyActivitySettings],
        TARGET.[TopLevelPipelineName] = SOURCE.[TopLevelPipelineName],
        TARGET.[TriggerName] = SOURCE.[TriggerName],
        TARGET.[DataLoadingBehaviorSettings] = SOURCE.[DataLoadingBehaviorSettings],
        TARGET.[TaskId] = SOURCE.[TaskId],
        TARGET.[CopyEnabled] = SOURCE.[CopyEnabled]
WHEN NOT MATCHED BY TARGET THEN
    INSERT (
        [Id],
        [SourceObjectSettings],
        [SourceConnectionSettingsName],
        [CopySourceSettings],
        [SinkObjectSettings],
        [SinkConnectionSettingsName],
        [CopySinkSettings],
        [CopyActivitySettings],
        [TopLevelPipelineName],
        [TriggerName],
        [DataLoadingBehaviorSettings],
        [watermarkColumnStartValue],
        [TaskId],
        [CopyEnabled]
    )
    VALUES (
        SOURCE.[Id],
        SOURCE.[SourceObjectSettings],
        SOURCE.[SourceConnectionSettingsName],
        SOURCE.[CopySourceSettings],
        SOURCE.[SinkObjectSettings],
        SOURCE.[SinkConnectionSettingsName],
        SOURCE.[CopySinkSettings],
        SOURCE.[CopyActivitySettings],
        SOURCE.[TopLevelPipelineName],
        SOURCE.[TriggerName],
        SOURCE.[DataLoadingBehaviorSettings],
        SOURCE.[watermarkColumnStartValue],
        SOURCE.[TaskId],
        SOURCE.[CopyEnabled]
    );

11. MainControlMetadata変数に生成した json 形式の文字列をセットした上で実行

クリックして メイン制御テーブルに登録する SQL 文を展開
DECLARE @MainControlMetadata NVARCHAR(max)  = N'[
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_01"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_01",
            "folderPath": "adf_metadata_01",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1,
        "Id": 1,
        "watermarkColumnStartValue": null
    },
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_02"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_02",
            "folderPath": "adf_metadata_02",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1,
        "Id": 2,
        "watermarkColumnStartValue": null
    },
    {
        "SourceObjectSettings": {
            "schema": "dbo",
            "table": "adf_metadata_03"
        },
        "SinkObjectSettings": {
            "compressionCodec": "snappy",
            "fileName": "adf_metadata_03",
            "folderPath": "adf_metadata_03",
            "fileSystem": "adf-copy"
        },
        "CopySourceSettings": {
            "partitionOption": "None",
            "sqlReaderQuery": null,
            "partitionLowerBound": null,
            "partitionUpperBound": null,
            "partitionColumnName": null,
            "partitionNames": null
        },
        "CopyActivitySettings": {
            "translator": {
                "type": "TabularTranslator",
                "typeConversion": true
            }
        },
        "TopLevelPipelineName": "MetadataDrivenCopyTask_m8e_TopLevel",
        "TriggerName": [
            "Sandbox",
            "Manual"
        ],
        "DataLoadingBehaviorSettings": {
            "dataLoadingBehavior": "FullLoad"
        },
        "TaskId": 0,
        "CopyEnabled": 1,
        "Id": 3,
        "watermarkColumnStartValue": null
    }
]';
MERGE INTO [dbo].[MainControlTable_m8e] AS TARGET
USING (
    SELECT * FROM OPENJSON(@MainControlMetadata)
    WITH (
        [Id] [int],
        [SourceObjectSettings] [nvarchar](max) AS JSON,
        [SourceConnectionSettingsName] [varchar](max),
        [CopySourceSettings] [nvarchar](max) AS JSON,
        [SinkObjectSettings] [nvarchar](max) AS JSON,
        [SinkConnectionSettingsName] [varchar](max),
        [CopySinkSettings] [nvarchar](max) AS JSON,
        [CopyActivitySettings] [nvarchar](max) AS JSON,
        [TopLevelPipelineName] [varchar](max),
        [TriggerName] [nvarchar](max) AS JSON,
        [DataLoadingBehaviorSettings] [nvarchar](max) AS JSON,
        [watermarkColumnStartValue] nvarchar(max),
        [TaskId] [int],
        [CopyEnabled] [bit]
    )
) AS SOURCE
ON TARGET.[Id] = SOURCE.[Id]
WHEN MATCHED THEN
    UPDATE SET
        TARGET.[SourceObjectSettings] = SOURCE.[SourceObjectSettings],
        TARGET.[SourceConnectionSettingsName] = SOURCE.[SourceConnectionSettingsName],
        TARGET.[CopySourceSettings] = SOURCE.[CopySourceSettings],
        TARGET.[SinkObjectSettings] = SOURCE.[SinkObjectSettings],
        TARGET.[SinkConnectionSettingsName] = SOURCE.[SinkConnectionSettingsName],
        TARGET.[CopySinkSettings] = SOURCE.[CopySinkSettings],
        TARGET.[CopyActivitySettings] = SOURCE.[CopyActivitySettings],
        TARGET.[TopLevelPipelineName] = SOURCE.[TopLevelPipelineName],
        TARGET.[TriggerName] = SOURCE.[TriggerName],
        TARGET.[DataLoadingBehaviorSettings] = SOURCE.[DataLoadingBehaviorSettings],
        TARGET.[TaskId] = SOURCE.[TaskId],
        TARGET.[CopyEnabled] = SOURCE.[CopyEnabled]
WHEN NOT MATCHED BY TARGET THEN
    INSERT (
        [Id],
        [SourceObjectSettings],
        [SourceConnectionSettingsName],
        [CopySourceSettings],
        [SinkObjectSettings],
        [SinkConnectionSettingsName],
        [CopySinkSettings],
        [CopyActivitySettings],
        [TopLevelPipelineName],
        [TriggerName],
        [DataLoadingBehaviorSettings],
        [watermarkColumnStartValue],
        [TaskId],
        [CopyEnabled]
    )
    VALUES (
        SOURCE.[Id],
        SOURCE.[SourceObjectSettings],
        SOURCE.[SourceConnectionSettingsName],
        SOURCE.[CopySourceSettings],
        SOURCE.[SinkObjectSettings],
        SOURCE.[SinkConnectionSettingsName],
        SOURCE.[CopySinkSettings],
        SOURCE.[CopyActivitySettings],
        SOURCE.[TopLevelPipelineName],
        SOURCE.[TriggerName],
        SOURCE.[DataLoadingBehaviorSettings],
        SOURCE.[watermarkColumnStartValue],
        SOURCE.[TaskId],
        SOURCE.[CopyEnabled]
    );

image.png

12. メイン制御テーブルにデータが登録されたことを確認

image.png

動作確認

1. パイプラインを実行し正常終了することを確認

image.png

2. データが書きこまれたことを確認

image.png
image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0