はじめに
今回は、Muleの標準コンポーネントでSalesforceへBulkAPIのUpsert処理を実装しました。私があまりMuleSoftに詳しく、Muleの標準コンポーネントがどこまで共通化されているのか分からなかったため、Salesforceコネクタの標準モジュールを組み合わせや、実行した際の挙動を確認しました。
メッセージフローの設定
SalesforceコネクタのBulkAPIのUpsertモジュールが、エラーなく動作するところまで設定すると、下記のようになりました。
メッセージフローの処理の流れ
- 『Set Payload』モジュールで、『Upsert bulk』モジュールで送信するデータを設定します。
- 『Upsert bulk』モジュールで、対象オブジェクト、外部ID、データを設定すると、モジュール内で、Jobの作成、処理仕様の登録、バッチ登録を実行してくれます。
- 『Until Successful』モジュールで、バッチが完了するまでポーリングを実施します。
- 『Bulk result』モジュールで、バッチの結果を取得します。
- 『Transform Message』モジュールで、バッチ結果のレコード登録結果のみを取得します。
- そのほかにJobの終了モジュールが必要ですが今回は基本的な流れが確認できればよかったので省略しました。
バッチステータスのポーリング処理
『Until Successful』モジュールは、スコープ内で例外をキャッチすると処理を繰り返します。このフローでは、『Bulk result』モジュールで、バッチステータスが未完了の場合に例外を発生し(下記のエラー)、バッチステータスが完了になるまで繰り返し『Bulk result』モジュールが実行されます。
org.mule.runtime.core.internal.exception.OnErrorPropagateHandler:

レコード処理結果の取得処理
『Transform Message』モジュールで、『Bulk result』モジュールの結果を取得するロジックは以下の通りです。
動作を確認
Salesforceの『一括読み込みジョブ』で実行された結果を確認すると、ジョブが登録されバッチが正常に実行できていることを確認できました。ただ、同じメッセージフロー内でも『Upsert bulk』モジュールが分かれてしまうとJobが複数できてしまうので、運用面を考えると考慮が必要ですね。
外部IDで参照項目設定
JSON形式で、リレーション項目と外部ID項目を下の図のように定義します。
正常にUpsertされると、下記の図のように親レコードと参照関係が設定されます。
最後に
標準モジュールを組み合わせると簡単にBulkAPIの処理を実装する事ができました。
同じメッセージフローないでも複数のJobが分かれてしまうのは、運用面で障害分析や切り戻しが難しくなるので、次は、SalesforceコネクタのJobとBatchモジュールを使って、引き続き調査します。
まずは、動くパターンを一つ見つける事ができたのでよかったです。