概要
Azure Data Factory にて Salesforce をシンクとした場合のデータ連携パターンを共有します。
基本的な方針
データ連携パターン
Azure Data Factory にて Salesforce をシンクとする場合には、パイプラインの実装方法とデータの操作方法の組み合わせで構築する必要があります。
パイプラインの実装方法としては下記の方法があります。
- Salesforce コネクタによるコピーアクティビティ
- Salesforce REST API による Web アクティビティ
- Salesforce Bulk API 2.0 による Web アクティビティ
データの操作方法としては下記があります。HARD DELETE については、デフォルトで実施することができないため、データ連携パターンの対象外とします。
- INSERT
- UPSERT (外部 ID による UPDATE or INSERT)
- UPDATE (Salesforce ID による UPDATE or INSERT)
- DELETE(論理削除)
- HARD DELETE(物理削除)
Salesforce コネクタによるコピーアクティビティでは、 INSERT と UPSERT のみをサポートされています。
操作の書き込み動作。
使用可能な値: Insert および Upsert。
引用元:Salesforce との間でデータをコピーする - Azure Data Factory & Azure Synapse | Microsoft Learn
以上の情報を整理すると、Azure Data Factory にて Salesforce をシンクとする場合には下記の方法があります。
- コピーアクティビティ(Salesforce コネクタ)による INSERT
- コピーアクティビティ(Salesforce コネクタ)による UPSERT
- Web アクティビティ(Salesforce REST API)による INSERT
- Web アクティビティ(Salesforce REST API)による UPSERT
- Web アクティビティ(Salesforce REST API)による UPDATE
- Web アクティビティ(Salesforce REST API)による DELETE
- Web アクティビティ(Salesforce Bulk API 2.0)による INSERT
- Web アクティビティ(Salesforce Bulk API 2.0)による UPSERT
- Web アクティビティ(Salesforce Bulk API 2.0)による UPDATE
- Web アクティビティ(Salesforce Bulk API 2.0)による DELETE
Salesforce REST API による Web アクティビティの実装方針
ソースから参照アクティビティにて json 形式の配列データを取得し、 ForEach アクティビティにて単一の json 形式をボディに渡して処理を実行します。
参照アクティビティの出力サイズに上限があることに注意が必要です。
引用元:ルックアップ アクティビティ - Azure Data Factory & Azure Synapse | Microsoft Learn
Salesforce Bulk API 2.0 による Web アクティビティの実装方針
Salesforce Bulk API 2.0 を利用する際には、 CSV 形式データを反映する必要があります。
引用元:ステップ 5: 一括更新/挿入 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
CSV 形式のデータを指定するために、ストレージ上の CSV ファイルの内容を REST API で取得した値を Salesforce REST API 実行時の Body に指定します。
Web アクティビティの出力サイズに上限があることに注意が必要です。
引用元:Web アクティビティ - Azure Data Factory & Azure Synapse | Microsoft Learn
Salesforce Bulk API 2.0 の実行方法
Salesforce Bulk API 2.0 により Salseforce へ書き込みを実施する場合には、下記の API を順に実行する必要があります。
- ジョブの作成 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
- ジョブデータのアップロード | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
- アップロードの完了 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
- ジョブ情報の取得 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
ジョブの実行状況をジョブ情報の取得のstate
の値で確認できます。
引用元:ジョブ情報の取得 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
失敗を厳密に管理したい場合には、stae
がFailed
であることを監視するだけでなく、numberRecordsFailed
の値も監視する必要があります。レコードの書き込みが失敗したとしてもstate
がJob Complete
になるためです。
引用元:ジョブ情報の取得 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
データ連携パターン
1. コピーアクティビティ(Salesforce コネクタ)による INSERT
利用する Salesforce の API
Salesforce コネクタを利用する場合には Salesforce Bulk API 2.0 が利用されます。
Salesforce Bulk API 2.0 を使用して、データのクエリと取り込みを行います。
引用元:Salesforce との間でデータをコピーする - Azure Data Factory & Azure Synapse | Microsoft Learn
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"Name","ExternalId__c"
"A Company","1"
"B Compan","2"
"C Company","3"
パイプラインを実行
データの確認
2. コピーアクティビティ(Salesforce コネクタ)による UPSERT
利用する Salesforce の API
Salesforce コネクタを利用する場合には Salesforce Bulk API 2.0 が利用されます。
Salesforce Bulk API 2.0 を使用して、データのクエリと取り込みを行います。
引用元:Salesforce との間でデータをコピーする - Azure Data Factory & Azure Synapse | Microsoft Learn
パイプライン
パイプラインの全体像
コピーアクティビティのシンクの設定
書き込む動作
をUpsert
に、外部ID フィールド
をADFTable01__c
に設定
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"ExternalId__c","Name"
"3","CCC Company"
"4","D Company"
パイプラインを実行
データの確認
3. Web アクティビティ(Salesforce REST API)による INSERT
利用する Salesforce の API
下記の API を利用します。
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"Name","ExternalId__c"
"A Company","1"
"B Compan","2"
"C Company","3"
パイプラインを実行
データの確認
4. Web アクティビティ(Salesforce REST API)による UPSERT
利用する Salesforce の API
下記の API を利用します。
パイプライン
パイプラインの全体像
json のデータから外部 ID 項目の除外
Body に渡す json データにて外部 ID 項目があるとエラーとなるため、 下記のような式により外部 ID のキーと値を除外します。
@replace(
string(item()),
concat(
'"',
pipeline().parameters.SalesforceExternalIdFieldName,
'":"',
item()[pipeline().parameters.SalesforceExternalIdFieldName],
'",'
),
''
)
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"ExternalId__c","Name"
"3","CCC Company"
"4","D Company"
パイプラインを実行
データの確認
5. Web アクティビティ(Salesforce REST API)による UPDATE
利用する Salesforce の API
下記の API を利用します。
パイプライン
パイプラインの全体像
json のデータから Salesforce ID 項目の除外
Body に渡す json データにて Salesforce ID 項目があるとエラーとなるため、 下記のような式により Id のキーと値を除外します。
@replace(
string(item()),
concat(
'"Id":"',
item().id,
'",'
),
''
)
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
Id 列は Salesforce ID 項目であるため、既存の ID を指定する必要があります。
"Id","Name"
"a02IT00001x1f9u","BBB Company"
パイプラインを実行
データの確認
6. Web アクティビティ(Salesforce REST API)による DELETE
利用する Salesforce の API
下記の API を利用します。
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
Id 列は Salesforce ID 項目であるため、既存の ID を指定する必要があります。
"Id"
"a02IT00001x1f9p"
"a02IT00001x1f9u"
パイプラインを実行
データの確認
7. Web アクティビティ(Salesforce Bulk API 2.0)による INSERT
利用する Salesforce の API
ジョブの作成 API にてoperation
にinsert
を指定します。
引用元:ジョブの作成 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"Name","ExternalId__c"
"A Company","1"
"B Compan","2"
"C Company","3"
パイプラインを実行
データの確認
8. Web アクティビティ(Salesforce Bulk API 2.0)による UPSERT
利用する Salesforce の API
ジョブの作成 API にてoperation
にupsert
を指定します。
引用元:ジョブの作成 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"ExternalId__c","Name"
"3","CCC Company"
"4","D Company"
パイプラインを実行
データの確認
9. Web アクティビティ(Salesforce Bulk API 2.0)による UPDATE
利用する Salesforce の API
ジョブの作成 API にてoperation
にupdate
を指定します。
引用元:ジョブの作成 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"Id","Name"
"a02IT00001x1fA5","BBB Company"
パイプラインを実行
データの確認
10. Web アクティビティ(Salesforce Bulk API 2.0)による DELETE
利用する Salesforce の API
ジョブの作成 API にてoperation
にdelete
を指定します。
引用元:ジョブの作成 | Bulk API 2.0 および Bulk API 開発者ガイド | Salesforce Developers
パイプライン
パイプラインの全体像
実行確認
カスタムオブジェクトを準備
CSV ファイルを配置
"Id"
"a02IT00001x1fA6"
"a02IT00001x1fA5"