Salesforce プラットフォームのイベントをデータクラウド取り込み API に直接ストリーミングする完全な「オンプラットフォーム」アプローチ。
Data Cloud にリアルタイムでストリーミングしたい複数のプラットフォーム イベント タイプがあるとします。これは複数の組織からのものであり、複数のデータ クラウド インスタンスにストリーミングされる可能性があります。
これには、柔軟で再利用可能なソリューションが必要です。これは、あらゆる種類のプラットフォーム イベントに使用して、任意のデータ クラウド インジェスト ストリーミング API にマッピングできる単一の実装です。そこで、ミニイベントブリッジを作ることにしました…
さらに… プラットフォーム イベント オブジェクトをマッピングできたら、あらゆる種類の Salesforce オブジェクトにまったく同じロジックを実装できます。これは、フロー、LWC、Apex が最小限のコードでイベントをデータ クラウドに直接ストリーミングできるため、非常に優れています。
要件
正確に達成したいことの概要を説明することから始めましょう。
- 任意のタイプのプラットフォーム イベントまたは sObject を任意の Data Lake オブジェクトにマップします
- 複数の組織から再利用できる構成可能なセットアップ
- 必要なコードは最小限でありながら非常に柔軟なソリューション
プロセスは次のようになります。
私たちは何が必要なのか?
セキュリティ上の理由から名前付き認証情報を利用する、任意のスポーク組織から 1 つ以上のデータクラウド インスタンスへの安全な接続。これを実現するためにカスタム認証プロバイダー パッケージを
作成しました。設定方法については説明書をご覧ください。
-
sObject フィールドと取り込み API オブジェクト フィールド間のマッピングを保持するカスタム メタデータ設定。
構成では、ターゲットの取り込み API コネクタとターゲットのオブジェクト名も追跡する必要があります。
最後に、名前付き資格情報を定義する必要があります。 -
Ingestion API を使用すると、複数のレコードを同時に取り込むことができるため、Apex のベスト プラクティスに従って、ソリューションを一括化する必要があります。
-
コードは次のようになります。
trigger PlatformEventTrigger on PlatformEvent ( after insert ) {
// ユーザー定義の構成に基づいてイベント レコードをストリーミングします
// このロジックは任意のクラス/メソッドから呼び出すことができます
// Async メソッドはトリガーのみに必要です
// これがすべてのコードですエンド ユーザーは
utl.Dc.streamRecordsToDataCloudAsync( '[CONFIG_NAME]' ,trigger.New);について心配する必要があります。
考慮すべき限界
-
インジェスト API ストリーミング ペイロードは 200 kb を超えることができないため、大量のレコードを送信することはできません。ただし、通常は 1 つだけである必要があります。
-
Ingestion API を呼び出すと、データ クラウドの制限が侵食されます。
-
アウトバウンドコールは Salesforce の API 制限を消費しません
-
apex トリガーを使用してプラットフォーム イベントをサブスクライブしても、サブスクライバーの制限にはカウントされません
-
プラットフォーム イベントの作成は、引き続き (時間ごとの) 制限にカウントされます
-
Apex トリガーを使用するため、Data Cloud への API 呼び出しは非同期で起動する必要があります。これは将来のメソッドを使用します。将来のメソッドがいつ起動されるかは、リソースの可用性に基づいています。通常、これらは即座に実行されますが、理論的には数分かかる場合があり、これはほぼリアルタイムのソリューションとなります。ストリーミング取り込み API は 2 分ごとにレコードを処理するため、これが実際の問題であるとは考えませんが、留意してください。
プラットフォームのイベント トリガー コールアウト アーキテクチャに関する注意事項
Web サービス呼び出しを介してイベントを中継するのは、少しアンチパターンであると考えられます。Cometd や gRPC のような (比較的複雑な) サブスクライバ クライアントが必要なく、サブスクライバの制限を侵食しないという利点がありますが、いくつかの欠点もあります。
コールアウトが失敗した場合、ReplayId を利用して送信されたイベントを再度取得する方法はありません。ソリューションの堅牢性がどの程度必要かによっては、カスタムのエラー処理および再試行メカニズムの実装が必要になる場合があります。私は個人的に、多くのシナリオではメリットがデメリットを上回ると考えていますが、この種の実装の影響については十分に認識してください。
重要
セキュリティは簡単なテーマではありません。この (または他の) ソリューションを実装する前に、認定されたセキュリティの専門家および認定された実装パートナーと協力して、常に何を行っているかを検証してください。
TL;DR :: データ クラウド ユーティリティ
マッピング オブジェクトと、Apex または Apex トリガーからデータ クラウドにレコードをストリーミングするための同期および非同期メソッドを含むデータ クラウド ユーティリティを作成しました。
このユーティリティには、一括取り込みと YAML 作成用のツール セットも付属しています。詳細なセットアップ全体を通じて、ユーティリティのメソッドを参照します。
G itHubリポジトリはここにあります:
https://github.com/jfwberg/lightweight-data-cloud-util
ステップバイステップのセットアップ例
さて、話は十分にしましたので、例を設定しましょう。この例では、スマート デバイス イベントを表すプラットフォーム イベントがあります。これには、照明を点灯したり、電力が安い時間に洗濯機を稼働させるようにスケジュールを設定したり、温度が設定されたしきい値を下回ったときに暖房を有効にしたりすることができます。
注: この例全体はプラットフォーム イベントに基づいていますが、あらゆる種類の Salesforce オブジェクトや、フローや LWC で簡単に動作させることができる Apex でも動作します。
完全に構成されたデモは、Git リポジトリで入手できます。
01 :: スポーク組織 :: パッケージをインストールする
更新: セットアップに関する詳細なステップバイステップガイドをここに書きました: Salesforce Data Cloud Utility および Ingestion Api UI のセットアップ手順これらの手順の最初のステップにリストされているすべてのパッケージをインストールしてください。
02 :: 接続アプリと名前付き認証情報をセットアップする
接続するには、スポーク組織に名前付き認証情報を設定し、データクラウド組織に接続アプリを設定する必要があります。指示に従ってください。
複数のスポーク組織または複数のデータクラウドインスタンスが必要な場合は、すべてのターゲットで手順を繰り返すだけです。
03 :: スポーク組織 :: プラットフォーム イベントを作成する
セットアップでは、プラットフォーム イベントを作成します。この例では、イベントは Smart_Device_Event__e と呼ばれ、スマート ホーム デバイスから送信されるイベントを表します。
実例を示すためにいくつかのフィールドを作成します。Action__c はオンになっていますか、それともオフになっていますか? Device_Type__c (つまり電球)、数値フィールドをテストするためのStatus_Code__c 、および日付/時刻フィールド値をテストするためのTimestamp__c 。
イベントを一意にするために、EventUuIdフィールドを使用できます。プッシュ動作を「すぐに公開」に設定します。
04 :: データクラウド :: インジェスト API コネクタの作成
- データクラウドを開き、設定歯車からデータクラウドのセットアップに移動します
- 左側のメニューバーで「Ingestion API」ボタンをクリックします
- 右上隅にある「新規」ボタンをクリックし、取り込みコネクタに名前を付けます。取り込みソースはプラットフォーム イベントからのスマート デバイス イベントになるため、これを「スマート デバイス イベント コネクタ」と呼びます。
- こで、イベント データをマップするオブジェクトを表す YAML ファイルを作成します。フォーマットの詳細については、ドキュメントを参照してください。ファイルをドライブ上のどこかに保存します。ユーティリティには SObject 2 YAML ジェネレーターがあることに注意してください。それも活用できます。
openapi: 3.0.3
components:
schemas:
Smart_Device_Event:
type: object
properties:
ReplayId:
type: string
EventUuid:
type: string
Action:
type: string
Device_Type:
type: string
Status_Code:
type: number
Status_Reason:
type: string
Timestamp:
type: string
format: date-time
- セットアップでは、新しく作成した取り込み API コネクタの詳細ページで [ファイルをアップロード] ボタンを押して、YAML ファイルをアップロードします。フィールドのプレビューが表示されるはずです。「保存」を押します
。これらのフィールド名は、後でマッピング設定で設定する予定のフィールド名と同じなのでメモしてください。
05 :: データクラウド :: 新しいデータストリームを作成する
-
これで初期設定は完了です。Data Cloud アプリに戻り、[データ ストリーム] タブに移動します。
-
「新規」をクリックし、「取り込みAPI」タイルを選択して「次へ」をクリックします。
-
作成したばかりの取り込み API を選択し、YAML ファイルからオブジェクトを選択します。「次へ」をクリックしてください
- 希望するイベント カテゴリの種類を選択します。この場合はエンゲージメント データなので、[エンゲージメント]を選択します。プライマリにはイベントの一意の識別子であるEventUuidフィールドを選択し、タイムスタンプにはカスタムのTimestampフィールドを選択します。準備ができたら「次へ」をクリックします
- 必要に応じてデータスペース名を設定するか、「デフォルト」のままにして「デプロイ」を押します。
- これで、データ クラウドがすべてセットアップされ、ストリーミング データを受信する準備が整いました。
06 :: スポーク組織 :: 構成とマッピングを作成する
次に、スポーク組織に戻り、カスタム メタデータを設定して、設定を作成し、カスタム メタデータ レコードをマッピングします。
- 「Data Cloud Ingestion API Configuration」カスタムメタデータタイプの横にある「Manage Records」をクリックし、「New」をクリックします。
- ラベルとして、プラットフォーム イベントに一致するものを選択します。この場合は「スマート デバイス イベント構成」です。この API 名が、後で Apex コードで識別子として必要になるわけではありません。
- [名前付き認証情報名] で、データ クラウドの名前付き認証情報の API 名を指定します。(動作テストを行ったことを確認してください)。複数のデータ クラウド インスタンスにストリーミングする場合は、これらすべてのレコードを複製する必要があります。これは一般的な使用例ではないと思うので、特にそのために何も構築していません。
- インジェスト API コネクタ名を、先ほど設定したコネクタ名に設定します。この場合は「Smart_Device_Event_Connector」
- 最後に、ターゲット オブジェクト名を、取り込み API / YAML ファイルで指定されている名前に設定します。この場合は「Smart_Device_Event」です。
-
「保存」を押します
-
[設定] のカスタム メタデータ タイプに戻り、「データ クラウド インジェスト API フィールド マッピング」メタデータ タイプの横にあるレコードの管理をクリックします。
-
「新規」をクリックします
-
フィールドごとにマッピング レコードを作成します。マッピング名が一意のキーであることを確認してください (残念ながら、これは必須であり、手動です)
-
ソース (プラットフォーム イベント フィールド) とターゲット (取り込み API コネクタ フィールド) を選択し、それに応じて構成レコードへのルックアップを設定します。
- すべて完了すると、マッピング テーブルは次のようになります。
07 :: スポーク組織 :: トリガーの作成
設定が完了したので、プラットフォーム イベントにトリガーを作成します。
trigger SmartDeviceEventTrigger on Smart_Device_Event__e (after insert) {
// データをデータ クラウドにストリーミングします
utl.Dc.streamRecordsToDataCloudAsync(
JSON.serialize(trigger.new),
'Smart_Device_Event_Configuration'
);
}
- トリガーではすべて、utl.Dc.streamRecordsToDataCloudAsync()メソッドを呼び出します。これは将来的な方法です。(したがって非同期です)
- 最初の引数は、プラットフォーム イベントの JSON シリアル化バージョンです。これは、future メソッドがオブジェクトを引数として受け取ることができないためです。ということで往復連載をします。非同期メソッドを使用する場合は、sObject リストを渡します。
- 2 番目の引数はメタデータ構成レコードの名前であり、おそらくカスタム ラベル内にある必要があります。しかし、デモの場合はこれで十分です。
08 :: スポーク組織 :: PlatformEventSubscriberConfig メタデータを作成する
外部資格情報では、名前付き資格情報を介して外部資格情報を呼び出すユーザーが、プロファイルまたは権限セットを通じて名前付きプリンシパルにアクセスできる必要があります。
残念ながら、プラットフォーム イベントは、アクセス権を割り当てることができない自動化されたユーザーとして実行されるため、将来のメソッドを通じてコールアウトを行うことは不可能になります。
この問題を解決する唯一の方法は、プラットフォーム イベントを実行する実行 (システム) ユーザーを指定することです。これは、PlatformEventSubscriberConfigメタデータ タイプを使用して行われ、セットアップを通じて行うことはできません。メタデータ API を通じて行う必要があります。
リポジトリ内のデモ フォルダーにはサンプルが含まれています。デプロイする前に必ずユーザー名を更新してください。
実行中のユーザー情報と手順 07 で作成したプラットフォーム イベント トリガーの名前を含む XML ファイルを作成します。
ファイルにSmartDeviceEventTrigger.platformEventSubscriberConfig-meta.xml という名前を付け、メタデータ API または VSCode を使用してデプロイします。
<?xml version= "1.0" encoding= "UTF-8" ?>
< PlatformEventSubscriberConfig xmlns = "http://soap.sforce.com/2006/04/metadata" >
< platformEventConsumer > SmartDeviceEventTrigger </ platformEventConsumer >
<バッチサイズ> 100 </batchSize> <masterLabel> SmartDeviceEventTriggerConfig </masterLabel> <user>
test-382rc1gvovzk@example.com </user> <isProtected> false
</isProtected> </PlatformEventSubscriberConfig>
プラットフォーム イベントで [サブスクリプション]に移動し、[管理] をクリックして [一時停止]をクリックし、新しいユーザーを有効にすることが重要です。デモのようにすべてを一度にデプロイすると、すぐに機能するはずです。しかし、それが機能しない理由を理解することで、頭痛の種を軽減できるかもしれません。実行中のユーザーが外部認証情報にアクセスできることを確認してください。
09 :: スポーク組織 :: テスト
これですべての設定が完了しました。あとは、いくつかのプラットフォーム イベントを起動して、それらがデータ クラウドにストリーミングされるのを監視するだけです。イベントを公開する例を次に示します。EventUuid と ReplayId は自動的に設定されます。
EventBus.publish(
new Smart_Device_Event__e(
Action__c = 'Switch State',
Device_Type__c = 'Lightbulb',
Status_Code__c = 1,
Status_Reason__c = 'Switched On (1)',
Timestamp__c = Datetime.now()
)
);
データ クラウドで[データ エクスプローラー]タブに移動し、選択したデータ スペースでスマート デバイス イベント コネクタデータ レイク オブジェクトを選択すると、イベントがストリーミングされているのを確認します。
10 :: スポーク組織 :: 例
完全な構成例は GitHub リポジトリで入手できます。これには、プラットフォーム イベントの発生をテストするための LWC が含まれています。試してみて、何ができるかをよりよく理解することができます。
結論
これで完了です。これは、カスタム メタデータ構成を使用して、1 行のコード (およびいくつかのパッケージ マジック) でデータを Data Cloud にストリーミングする方法の明確な例です。
セットアップには少し時間がかかりますが、柔軟性があり、定型的なコードを記述する必要がなく、接続は名前付き資格情報によって処理されます。