LoginSignup
0
0

SalesforceのChange Data Capture (CDC) を試してみる

Last updated at Posted at 2023-08-07

1. やってみたこと

  • SalesforceのChange Data CaptureでPub/Sub APIを使用してサブスクライブしてみる
  • Pub/Sub APIを試してみる
  • この記事でやらないこと
    • プラットフォームイベント
    • Apexトリガー
    • CometD

2. バージョンや背景など

  • Summer '23 Patch 15.3 (2023/8/7時点)
  • @salesforce/cli/2.0.2 darwin-x64 node-v18.15.0
% sf version
@salesforce/cli/2.0.2 darwin-x64 node-v18.15.0
  • スクラッチ組織で試してみる

3. 試したこと

Change Data Capture Developer Guide
Pub/Sub API
こちらを参照しながら試していきます

3.1. スクラッチ組織を作成する

CDCを試すためのスクラッチ組織を作成します
以下のようなコマンドでスクラッチ組織を作成します
デフォルトのDevHub組織が既に作成および認証済みとします

sf org create scratch --edition developer --alias demo

Your scratch org is ready.というメッセージが表示され、sf org listで作成したスクラッチ組織を確認できます

以下のコマンドで作成したスクラッチ組織をブラウザで表示します

sf org open -o demo

3.2. Java Quick Start for Pub/Sub API

Java Quick Start for Pub/Sub API
こちらを参照しながらクイックスタートを試してみます

3.2.1. Step 1: Install Prerequisites

  • Javaをインストールします
    • 参照ページにはJava 11と記載されていますが、ここではJava 17をインストールしました
% java --version
openjdk 17.0.7 2023-04-18
OpenJDK Runtime Environment Temurin-17.0.7+7 (build 17.0.7+7)
OpenJDK 64-Bit Server VM Temurin-17.0.7+7 (build 17.0.7+7, mixed mode, sharing)
  • Apache Mavenをインストールします
% mvn --version
Apache Maven 3.9.4 (dfbb324ad4a7c8fb0bf182e6d91b0ae20e3d2dd9)
  • 今回はプラットフォームイベントは扱わないため、カスタムプラットフォームイベントCarMaintenanceは作成しません

3.2.2. Step 2: Clone and Build the Client

git clone https://github.com/developerforce/pub-sub-api.git
output
Cloning into 'pub-sub-api'...
remote: Enumerating objects: 337, done.
remote: Counting objects: 100% (124/124), done.
remote: Compressing objects: 100% (74/74), done.
remote: Total 337 (delta 65), reused 69 (delta 48), pack-reused 213
Receiving objects: 100% (337/337), 166.54 KiB | 7.57 MiB/s, done.
Resolving deltas: 100% (136/136), done.
  • pub-sub-api/javaに移動します
cd pub-sub-api/java
  • Mavenでビルドできることを確認します
    • 参照ページにはmvn clean installと記載されていますが、installは不要のためpackageを指定します
mvn clean package
  • 以下のようにBUILD SUCCESSと表示され、targetフォルダにpubsub-java-1.0-SNAPSHOT-jar-with-dependencies.jarが作成されていることを確認します
output
~~~ 省略 ~~~
[INFO] Building jar: ${demo_directory}/pub-sub-api/java/target/pubsub-java-1.0-SNAPSHOT.jar
[INFO] 
[INFO] --- assembly:3.3.0:single (default) @ pubsub-java ---
[INFO] Building jar: ${demo_directory}/pub-sub-api/java/target/pubsub-java-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18.889 s
[INFO] Finished at: 2023-08-07T11:42:06+09:00
[INFO] ------------------------------------------------------------------------
% ls -ln target
total 35392
drwxr-xr-x   2 501  20        64  8  7 11:41 archive-tmp
drwxr-xr-x  10 501  20       320  8  7 11:41 classes
drwxr-xr-x   4 501  20       128  8  7 11:41 generated-sources
drwxr-xr-x   3 501  20        96  8  7 11:41 maven-archiver
drwxr-xr-x   3 501  20        96  8  7 11:41 maven-status
drwxr-xr-x   4 501  20       128  8  7 11:41 protoc-dependencies
drwxr-xr-x   4 501  20       128  8  7 11:41 protoc-plugins
-rw-r--r--   1 501  20  17296039  8  7 11:42 pubsub-java-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r--   1 501  20    225639  8  7 11:41 pubsub-java-1.0-SNAPSHOT.jar

3.2.3. Step 3: Configure Client Parameters

  • src/main/resources/arguments.yamlを編集します
    • LOGIN_URL: 接続する組織のインスタンスURLを設定します
      • https://MyDomain.my.salesforce.com形式のURLです
      • sf org display -o demoを実行して、KEYがInstance UrlのVALUEを取得します
    • USERNAME: 接続するユーザーのユーザー名を設定します
      • sf org display -o demoを実行して、KEYがUsernameのVALUEを取得します
    • PASSWORD: 接続するユーザーのパスワードを設定します。
      • sf org generate password -o demoを実行して、パスワードを生成します
      • ログインIPアドレスの制限が設定されていない場合は、セキュリティトークンをパスワードに連結したものを設定します
      • セキュリティトークンはスクラッチ組織にログインして、[プロファイルを参照(プロファイルアイコン)
      • 設定 > 私の個人情報 > 私のセキュリティトークンのリセット]でセキュリティトークンのリセットを実行すると、設定されているメールアドレスに送信されます

  • 他のパラメータはデフォルトのままにします
  • mvn clean packageを実行して、ビルドし直します

3.2.4. Step 5: Subscribe to Change Events

  • Opportunityの変更をサブスクライブできるように設定します

    • スクラッチ組織で[設定 > インテグレーション > 変更データキャプチャ]をクリックします
    • 使用可能なエンティティ商談 (Opportunity)を選択します
      image.png
    • ▶︎をクリックします
    • 選択されたエンティティ商談 (Opportunity)が表示されます
      image.png
    • 保存します
  • 以下のコマンドを実行します

./run.sh processchangeeventheader.ProcessChangeEventHeader
  • 正常に接続できたら、以下のようなログが出力されます
output
% ./run.sh processchangeeventheader.ProcessChangeEventHeader
2023-08-07 11:56:36,431 [main] java.lang.Class - Setting Up Subscriber
2023-08-07 11:56:36,435 [main] java.lang.Class - Using grpcHost api.pubsub.salesforce.com and grpcPort 7443
2023-08-07 11:56:41,063 [main] java.lang.Class - Subscription started for topic: /data/OpportunityChangeEvent.
2023-08-07 11:56:46,069 [main] java.lang.Class - Subscription Active. Received 0 events.
2023-08-07 11:56:51,074 [main] java.lang.Class - Subscription Active. Received 0 events.
  • スクラッチ組織で、新規の商談を作成します
    image.png
  • 以下のようなログが出力されます
  • "entityName": "Opportunity""changeType": "CREATE"から商談が作成されたことが分かります
output
2023-08-07 11:58:03,934 [grpc-default-executor-1] java.lang.Class - Received event with Payload: {"ChangeEventHeader": {"entityName": "Opportunity", "recordIds": ["0060l00000b7cojAAA"], "changeType": "CREATE", "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/", "transactionKey": "00060cc5-b2dc-2036-45df-e764adec93e6", "sequenceNumber": 1, "commitTimestamp": 1691377080000, "commitNumber": 11070297359277, "commitUser": "0050l000007kzJvAAI", "nulledFields": [], "diffFields": [], "changedFields": []}, "AccountId": null, "IsPrivate": false, "Name": "商談 1", "Description": null, "StageName": "Prospecting", "Amount": null, "Probability": 10.0, "ExpectedRevenue": null, "TotalOpportunityQuantity": null, "CloseDate": 1691452800000, "Type": null, "NextStep": null, "LeadSource": null, "IsClosed": false, "IsWon": false, "ForecastCategory": "Pipeline", "ForecastCategoryName": "Pipeline", "CampaignId": null, "HasOpportunityLineItem": false, "Pricebook2Id": null, "OwnerId": "0050l000007kzJvAAI", "CreatedDate": 1691377080000, "CreatedById": "0050l000007kzJvAAI", "LastModifiedDate": 1691377080000, "LastModifiedById": "0050l000007kzJvAAI", "LastStageChangeDate": null, "ContactId": null, "ContractId": null, "LastAmountChangedHistoryId": null, "LastCloseDateChangedHistoryId": null}
Payloadの部分を抽出
{
  "ChangeEventHeader": {
    "entityName": "Opportunity",
    "recordIds": ["0060l00000b7cojAAA"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "00060cc5-b2dc-2036-45df-e764adec93e6",
    "sequenceNumber": 1,
    "commitTimestamp": 1691377080000,
    "commitNumber": 11070297359277,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "AccountId": null,
  "IsPrivate": false,
  "Name": "商談 1",
  "Description": null,
  "StageName": "Prospecting",
  "Amount": null,
  "Probability": 10.0,
  "ExpectedRevenue": null,
  "TotalOpportunityQuantity": null,
  "CloseDate": 1691452800000,
  "Type": null,
  "NextStep": null,
  "LeadSource": null,
  "IsClosed": false,
  "IsWon": false,
  "ForecastCategory": "Pipeline",
  "ForecastCategoryName": "Pipeline",
  "CampaignId": null,
  "HasOpportunityLineItem": false,
  "Pricebook2Id": null,
  "OwnerId": "0050l000007kzJvAAI",
  "CreatedDate": 1691377080000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691377080000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "LastStageChangeDate": null,
  "ContactId": null,
  "ContractId": null,
  "LastAmountChangedHistoryId": null,
  "LastCloseDateChangedHistoryId": null
}
  • スクラッチ組織で、商談を更新します
    image.png
  • 以下のようなログが出力されます
  • "entityName": "Opportunity""changeType": "UPDATE"から商談が更新されたことが分かります
  • 変更された項目がNameTypeLeadSourceLastModifiedDateであることが分かります
output
2023-08-07 12:00:14,850 [grpc-default-executor-2] java.lang.Class - Received event with Payload: {"ChangeEventHeader": {"entityName": "Opportunity", "recordIds": ["0060l00000b7cojAAA"], "changeType": "UPDATE", "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/", "transactionKey": "00060ce4-915d-c591-0f8f-32657c642562", "sequenceNumber": 1, "commitTimestamp": 1691377212000, "commitNumber": 11070298018555, "commitUser": "0050l000007kzJvAAI", "nulledFields": [], "diffFields": [], "changedFields": ["0x01002808"]}, "AccountId": null, "IsPrivate": null, "Name": "商談 1 (変更)", "Description": null, "StageName": null, "Amount": null, "Probability": null, "ExpectedRevenue": null, "TotalOpportunityQuantity": null, "CloseDate": null, "Type": "New Customer", "NextStep": null, "LeadSource": "Web", "IsClosed": null, "IsWon": null, "ForecastCategory": null, "ForecastCategoryName": null, "CampaignId": null, "HasOpportunityLineItem": null, "Pricebook2Id": null, "OwnerId": null, "CreatedDate": null, "CreatedById": null, "LastModifiedDate": 1691377212000, "LastModifiedById": null, "LastStageChangeDate": null, "ContactId": null, "ContractId": null, "LastAmountChangedHistoryId": null, "LastCloseDateChangedHistoryId": null}
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - ============================
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class -        Changed Fields       
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - ============================
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - Name
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - Type
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - LeadSource
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - LastModifiedDate
2023-08-07 12:00:14,854 [grpc-default-executor-2] java.lang.Class - ============================
Payloadの部分を抽出
{
  "ChangeEventHeader": {
    "entityName": "Opportunity",
    "recordIds": ["0060l00000b7cojAAA"],
    "changeType": "UPDATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "00060ce4-915d-c591-0f8f-32657c642562",
    "sequenceNumber": 1,
    "commitTimestamp": 1691377212000,
    "commitNumber": 11070298018555,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": ["0x01002808"]
  },
  "AccountId": null,
  "IsPrivate": null,
  "Name": "商談 1 (変更)",
  "Description": null,
  "StageName": null,
  "Amount": null,
  "Probability": null,
  "ExpectedRevenue": null,
  "TotalOpportunityQuantity": null,
  "CloseDate": null,
  "Type": "New Customer",
  "NextStep": null,
  "LeadSource": "Web",
  "IsClosed": null,
  "IsWon": null,
  "ForecastCategory": null,
  "ForecastCategoryName": null,
  "CampaignId": null,
  "HasOpportunityLineItem": null,
  "Pricebook2Id": null,
  "OwnerId": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": 1691377212000,
  "LastModifiedById": null,
  "LastStageChangeDate": null,
  "ContactId": null,
  "ContractId": null,
  "LastAmountChangedHistoryId": null,
  "LastCloseDateChangedHistoryId": null
}
  • スクラッチ組織で、商談を削除します
  • 以下のようなログが出力されます
  • "entityName": "Opportunity""changeType": "DELETE"から商談が削除されたことが分かります
output
2023-08-07 12:03:37,391 [grpc-default-executor-5] java.lang.Class - Received event with Payload: {"ChangeEventHeader": {"entityName": "Opportunity", "recordIds": ["0060l00000b7cojAAA"], "changeType": "DELETE", "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/", "transactionKey": "00060d13-c72f-f594-9d6e-e72646693be9", "sequenceNumber": 1, "commitTimestamp": 1691377416000, "commitNumber": 11070299160440, "commitUser": "0050l000007kzJvAAI", "nulledFields": [], "diffFields": [], "changedFields": []}, "AccountId": null, "IsPrivate": null, "Name": null, "Description": null, "StageName": null, "Amount": null, "Probability": null, "ExpectedRevenue": null, "TotalOpportunityQuantity": null, "CloseDate": null, "Type": null, "NextStep": null, "LeadSource": null, "IsClosed": null, "IsWon": null, "ForecastCategory": null, "ForecastCategoryName": null, "CampaignId": null, "HasOpportunityLineItem": null, "Pricebook2Id": null, "OwnerId": null, "CreatedDate": null, "CreatedById": null, "LastModifiedDate": null, "LastModifiedById": null, "LastStageChangeDate": null, "ContactId": null, "ContractId": null, "LastAmountChangedHistoryId": null, "LastCloseDateChangedHistoryId": null}
Payloadの部分を抽出
{
  "ChangeEventHeader": {
    "entityName": "Opportunity",
    "recordIds": ["0060l00000b7cojAAA"],
    "changeType": "DELETE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "00060d13-c72f-f594-9d6e-e72646693be9",
    "sequenceNumber": 1,
    "commitTimestamp": 1691377416000,
    "commitNumber": 11070299160440,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "AccountId": null,
  "IsPrivate": null,
  "Name": null,
  "Description": null,
  "StageName": null,
  "Amount": null,
  "Probability": null,
  "ExpectedRevenue": null,
  "TotalOpportunityQuantity": null,
  "CloseDate": null,
  "Type": null,
  "NextStep": null,
  "LeadSource": null,
  "IsClosed": null,
  "IsWon": null,
  "ForecastCategory": null,
  "ForecastCategoryName": null,
  "CampaignId": null,
  "HasOpportunityLineItem": null,
  "Pricebook2Id": null,
  "OwnerId": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": null,
  "LastModifiedById": null,
  "LastStageChangeDate": null,
  "ContactId": null,
  "ContractId": null,
  "LastAmountChangedHistoryId": null,
  "LastCloseDateChangedHistoryId": null
}

Salesforceでの変更内容を受け取ることができました

3.2.5. アクセストークンでの認証

3.2.4.ではユーザー名/パスワードによる認証でしたが、アクセストークンでの認証も試してみます

  • src/main/resources/arguments.yamlを編集します
    • LOGIN_URL: 接続する組織のインスタンスURLを設定します
    • USERNAME: 念の為、nullに設定します
    • PASSWORD: 念の為、nullに設定します
    • TENANT_ID: 以下の手順で値を取得して、設定します
      • スクラッチ組織で[設定 > 会社の設定 > 組織情報]をクリックします
      • salesforce.com 組織 IDの設定値を取得します
    • ACCESS_TOKEN: 以下の手順で値を取得して、設定します
      • sf org display -o demoを実行します
      • KEYAccess TokenVALUEを取得します
  • mvn clean packageを実行して、ビルドし直します
  • 以下のコマンドを実行します
./run.sh processchangeeventheader.ProcessChangeEventHeader
  • 3.2.4.と同じように商談を新規作成したり、更新や削除してみます

Salesforceでの変更内容を受け取ることができました

3.3. カスタムオブジェクトの変更イベントを試してみる

カスタムオブジェクトでも変更イベントを受け取ることができるか試してみます

3.3.1. カスタムオブジェクトを作成する

  • 以下のような定義で、カスタムオブジェクトを作成します
    • ラベル: Demo
    • API参照名: demo__c
    • 項目
      • Name: Name
      • description__c: テキスト(255)
      • type__c: 選択リスト [Foo|Bar]
        image.png
        image.png

3.3.2. カスタムオブジェクトの変更をサブスクライブできるようにする

  • [設定 > インテグレーション > 変更データキャプチャ]でDemo (demo__c)を選択して保存します
    image.png

3.3.3. pub-sub-apiのソースを修正する

  • 使用しているpub-sub-apiでは、サブスクライブするトピック名がソースにハードコードされているため、その部分をカスタムオブジェクトのものに修正します
    します
  • Subscription Channelsを参照すると、カスタムオブジェクトの場合は以下のようなチャネル名になります。今回の場合は/data/demo__ChangeEventになります

Single-Entity Channel for a Custom Object
/data/__ChangeEvent
For example, the channel to subscribe to change events for Employee__c custom object records is:
/data/Employee__ChangeEvent

  • src/main/java/processchangeeventheader/ProcessChangeEventHeader.javaを編集
    • SUBSCRIBER_TOPICで検索して、その行を以下のように修正します
private static final String SUBSCRIBER_TOPIC = "/data/demo__ChangeEvent";
  • mvn clean packageを実行して、ビルドし直します
  • 以下のコマンドを実行します
./run.sh processchangeeventheader.ProcessChangeEventHeader
  • 3.2.4.3.2.5.と同じようにDemoを新規作成したり、更新や削除してみます
  • 今回は開発者コンソールで[Debug > Open Execute Anonymous Window]から、Apexでinsertやupdate、deleteを行いました

Salesforceでの変更内容を受け取ることができました

3.4. 複数レコードを1つのトランザクションで変更した場合を試してみる

これまでは1つのトランザクションで1つのレコードの変更でしたが、複数レコードの変更の場合はどうなるかを試してみます
3.3.で作成したカスタムオブジェクトの変更イベントを見てみます
以降では、変更イベントのPayloadの部分を抽出して見てみます

3.4.1. 複数レコードを新規作成した場合

  • transactionKey00061528-ef13-32ab-d116-1f3c44375e35であり、同じ値になっています
  • sequenceNumber12と連番になっています
  • 上記の2項目により、どのようにトランザクション処理すればよいかが分かります
1つ目の新規作成レコード
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOXAAZ"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061528-ef13-32ab-d116-1f3c44375e35",
    "sequenceNumber": 1,
    "commitTimestamp": 1691386302000,
    "commitNumber": 11070340699356,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": "0050l000007kzJvAAI",
  "Name": "Foo",
  "CreatedDate": 1691386302000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691386302000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "description__c": null,
  "type__c": null
}
2つ目の新規作成レコード
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOYAAZ"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061528-ef13-32ab-d116-1f3c44375e35",
    "sequenceNumber": 2,
    "commitTimestamp": 1691386302000,
    "commitNumber": 11070340699356,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": "0050l000007kzJvAAI",
  "Name": "Bar",
  "CreatedDate": 1691386302000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691386302000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "description__c": null,
  "type__c": "Bar"
}

3.4.2. 新規作成、更新、削除を同時に行なった場合

1つ目のレコードを削除
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOXAAZ"],
    "changeType": "DELETE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061578-d102-aea6-35dd-2a853b86109a",
    "sequenceNumber": 1,
    "commitTimestamp": 1691386646000,
    "commitNumber": 11070342077296,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": null,
  "Name": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": null,
  "LastModifiedById": null,
  "description__c": null,
  "type__c": null
}
2つ目のレコードを更新
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOYAAZ"],
    "changeType": "UPDATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061578-d102-aea6-35dd-2a853b86109a",
    "sequenceNumber": 2,
    "commitTimestamp": 1691386646000,
    "commitNumber": 11070342077296,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": ["0xA0"]
  },
  "OwnerId": null,
  "Name": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": 1691386646000,
  "LastModifiedById": null,
  "description__c": "Bar description",
  "type__c": null
}
3つ目のレコードを新規作成
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOmAAJ"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061578-d102-aea6-35dd-2a853b86109a",
    "sequenceNumber": 3,
    "commitTimestamp": 1691386646000,
    "commitNumber": 11070342077296,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": "0050l000007kzJvAAI",
  "Name": "Baz",
  "CreatedDate": 1691386646000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691386646000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "description__c": null,
  "type__c": null
}

3.4.3. 複数レコードに対して同じ内容で更新

  • 複数レコードのdescription__cxyzに更新しました
  • recordIdsに複数レコードのIdの値が設定されています
  • Change Event Body Fieldsは、同じ変更内容のため1つのみになっています
複数レコードを同じ内容で更新
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOYAAZ", "a000l00000L2aOmAAJ"],
    "changeType": "UPDATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061d4e-718e-4197-06cc-5de979fe27cf",
    "sequenceNumber": 1,
    "commitTimestamp": 1691386930000,
    "commitNumber": 11070343349297,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": ["0xA0"]
  },
  "OwnerId": null,
  "Name": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": 1691386930000,
  "LastModifiedById": null,
  "description__c": "xyz",
  "type__c": null
}

3.4.4. 複数レコードを削除

  • recordIdsに削除したレコードのId値が設定されています
複数レコードを削除
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2aOYAAZ", "a000l00000L2aOmAAJ"],
    "changeType": "DELETE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "000615c8-3ffb-d887-bb96-2fba05fbbf4b",
    "sequenceNumber": 1,
    "commitTimestamp": 1691386986000,
    "commitNumber": 11070343575133,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": null,
  "Name": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": null,
  "LastModifiedById": null,
  "description__c": null,
  "type__c": null
}

3.5. 権限関連を試してみる

上記までは、スクラッチ組織でのシステム管理者ユーザーで接続していたため、強い権限を持っていました
権限を制限したユーザーで接続した場合にどうなるか試してみます

3.5.1. ユーザーを作成する

  • プロファイルSalesforce API Only System Integrationsでユーザーを作成します
  • 以下のようなコマンドを実行します
sf org create user --set-unique-username email=demo@example.com profileName='Salesforce API Only System Integrations' -o demo -a demo-apiuser --json
output
{
  "status": 0,
  "result": {
    "orgId": "00D0l000000HbbyEAC",
    "permissionSetAssignments": [],
    "fields": {
      "id": "0050l000007lD2rAAE",
      "username": "1691392785247_test-wcs2obwzvb6k@example.com",
      "alias": "User",
      "email": "demo@example.com",
      "emailencodingkey": "ISO-2022-JP",
      "languagelocalekey": "ja",
      "localesidkey": "ja_JP",
      "profileid": "00e0l000000g4LlAAI",
      "lastname": "User",
      "timezonesidkey": "Asia/Tokyo",
      "profilename": "Salesforce API Only System Integrations"
    }
  },
  "warnings": [
    "The --target-dev-hub flag is deprecated and is no longer used by this command. The flag will be removed in API version 57.0 or later."
  ]
}
  • 以下のコマンドで作成したユーザーの接続情報を確認します
sf org list users -o demo 
output
Warning: The --target-dev-hub flag is deprecated and is no longer used by this command. The flag will be removed in API version 57.0 or later.
=== Users in org 00D0l000000HbbyEAC

 Default Alias        Username                                    Profile Name                            User Id            
 ─────── ──────────── ─────────────────────────────────────────── ─────────────────────────────────────── ────────────────── 
 (A)     demo         test-wcs2obwzvb6k@example.com               システム管理者                          0050l000007kzJvAAI 
         demo-apiuser 1691392785247_test-wcs2obwzvb6k@example.com Salesforce API Only System Integrations 0050l000007lD2rAAE 
  • 以下のコマンドで作成したユーザーのアクセストークンを取得します
sf org display -o demo-apiuser
  • pub-sub-apiのsrc/main/resources/arguments.yamlを修正する
    • ACCESS_TOKENを作成したユーザーのAccess Tokenに変更します
  • mvn clean packageを実行して、ビルドし直します
  • これで準備ができたので、次以降でいくつか試していきます

3.5.2. 権限が何もない場合

  • 権限セットライセンスもオブジェクトに対する権限も何も付与されていない状態で、接続を試みます
% ./run.sh processchangeeventheader.ProcessChangeEventHeader
2023-08-07 16:32:49,246 [main] java.lang.Class - Setting Up Subscriber
2023-08-07 16:32:49,250 [main] java.lang.Class - Using grpcHost api.pubsub.salesforce.com and grpcPort 7443
2023-08-07 16:32:53,871 [main] java.lang.Class - Error during fetching topic
io.grpc.StatusRuntimeException: PERMISSION_DENIED: An error occurred while getting the metadata for org CORE/prod/00D0l000000Hbby and topic /data/demo__ChangeEvent. Ensure the credentials and topic name are correct. rpcId: 1aba76e2-6aec-4651-8ea4-ebac6b9963b3
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at com.salesforce.eventbus.protobuf.PubSubGrpc$PubSubBlockingStub.getTopic(PubSubGrpc.java:443)
	at utility.CommonContext.setupTopicDetails(CommonContext.java:133)
	at genericpubsub.Subscribe.<init>(Subscribe.java:72)
	at processchangeeventheader.ProcessChangeEventHeader.<init>(ProcessChangeEventHeader.java:48)
	at processchangeeventheader.ProcessChangeEventHeader.main(ProcessChangeEventHeader.java:106)
2023-08-07 16:32:53,895 [main] java.lang.Class - Error while processing Change events
2023-08-07 16:32:53,895 [main] java.lang.Class -  === GRPC Exception ===
io.grpc.StatusRuntimeException: PERMISSION_DENIED: An error occurred while getting the metadata for org CORE/prod/00D0l000000Hbby and topic /data/demo__ChangeEvent. Ensure the credentials and topic name are correct. rpcId: 1aba76e2-6aec-4651-8ea4-ebac6b9963b3
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at com.salesforce.eventbus.protobuf.PubSubGrpc$PubSubBlockingStub.getTopic(PubSubGrpc.java:443)
	at utility.CommonContext.setupTopicDetails(CommonContext.java:133)
	at genericpubsub.Subscribe.<init>(Subscribe.java:72)
	at processchangeeventheader.ProcessChangeEventHeader.<init>(ProcessChangeEventHeader.java:48)
	at processchangeeventheader.ProcessChangeEventHeader.main(ProcessChangeEventHeader.java:106)
2023-08-07 16:32:53,896 [main] java.lang.Class -  === Trailers ===
2023-08-07 16:32:53,897 [main] java.lang.Class - [Trailer] = date [Value] = Mon, 07 Aug 2023 07:32:53 GMT
2023-08-07 16:32:53,897 [main] java.lang.Class - [Trailer] = content-type [Value] = application/grpc
2023-08-07 16:32:53,897 [main] java.lang.Class - [Trailer] = rpc-id [Value] = 1aba76e2-6aec-4651-8ea4-ebac6b9963b3
2023-08-07 16:32:53,897 [main] java.lang.Class - [Trailer] = error-code [Value] = sfdc.platform.eventbus.grpc.topic.meta.permission
2023-08-07 16:32:53,898 [main] java.lang.Class - [Trailer] = type [Value] = GetTopic
  • トピックの取得に失敗してエラーになったようです
  • 以下のエラーメッセージが見えます
Error during fetching topic
io.grpc.StatusRuntimeException: PERMISSION_DENIED: An error occurred while getting the metadata for org CORE/prod/00D0l000000Hbby and topic /data/demo__ChangeEvent. Ensure the credentials and topic name are correct.

3.5.3. 権限セットライセンスを付与して、カスタムオブジェクトへの参照権限を付与した場合

  • 権限セットライセンスSalesforce API Integrationを割り当てます
    image.png
  • demo__cのオブジェクト権限のうち、参照のみ有効にした権限セットを割り当てます
    image.png
    image.png
% ./run.sh processchangeeventheader.ProcessChangeEventHeader
2023-08-07 16:39:51,420 [main] java.lang.Class - Setting Up Subscriber
2023-08-07 16:39:51,429 [main] java.lang.Class - Using grpcHost api.pubsub.salesforce.com and grpcPort 7443
2023-08-07 16:39:56,352 [main] java.lang.Class - Error during fetching topic
io.grpc.StatusRuntimeException: NOT_FOUND: The topic information for org CORE/prod/00D0l000000Hbby and topic /data/demo__ChangeEvent not found. Ensure the topic name is correct and that the topic exists. rpcId: bde2301f-c974-48e2-bf60-1f120da510aa
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at com.salesforce.eventbus.protobuf.PubSubGrpc$PubSubBlockingStub.getTopic(PubSubGrpc.java:443)
	at utility.CommonContext.setupTopicDetails(CommonContext.java:133)
	at genericpubsub.Subscribe.<init>(Subscribe.java:72)
	at processchangeeventheader.ProcessChangeEventHeader.<init>(ProcessChangeEventHeader.java:48)
	at processchangeeventheader.ProcessChangeEventHeader.main(ProcessChangeEventHeader.java:106)
2023-08-07 16:39:56,376 [main] java.lang.Class - Error while processing Change events
2023-08-07 16:39:56,376 [main] java.lang.Class -  === GRPC Exception ===
io.grpc.StatusRuntimeException: NOT_FOUND: The topic information for org CORE/prod/00D0l000000Hbby and topic /data/demo__ChangeEvent not found. Ensure the topic name is correct and that the topic exists. rpcId: bde2301f-c974-48e2-bf60-1f120da510aa
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at com.salesforce.eventbus.protobuf.PubSubGrpc$PubSubBlockingStub.getTopic(PubSubGrpc.java:443)
	at utility.CommonContext.setupTopicDetails(CommonContext.java:133)
	at genericpubsub.Subscribe.<init>(Subscribe.java:72)
	at processchangeeventheader.ProcessChangeEventHeader.<init>(ProcessChangeEventHeader.java:48)
	at processchangeeventheader.ProcessChangeEventHeader.main(ProcessChangeEventHeader.java:106)
2023-08-07 16:39:56,376 [main] java.lang.Class -  === Trailers ===
2023-08-07 16:39:56,377 [main] java.lang.Class - [Trailer] = date [Value] = Mon, 07 Aug 2023 07:39:56 GMT
2023-08-07 16:39:56,378 [main] java.lang.Class - [Trailer] = content-type [Value] = application/grpc
2023-08-07 16:39:56,378 [main] java.lang.Class - [Trailer] = rpc-id [Value] = bde2301f-c974-48e2-bf60-1f120da510aa
2023-08-07 16:39:56,378 [main] java.lang.Class - [Trailer] = error-code [Value] = sfdc.platform.eventbus.grpc.topic.not.found
2023-08-07 16:39:56,378 [main] java.lang.Class - [Trailer] = type [Value] = GetTopic
  • トピックの取得に失敗してエラーになったようです
  • 以下のエラーメッセージが見えます
Error during fetching topic
io.grpc.StatusRuntimeException: NOT_FOUND: The topic information for org CORE/prod/00D0l000000Hbby and topic /data/demo__ChangeEvent not found. Ensure the topic name is correct and that the topic exists.

3.5.4. 権限セットライセンスを付与して、カスタムオブジェクトへの参照権限、すべて表示権限を付与した場合

  • demo__cのオブジェクト権限のうち、参照すべて表示を有効にした権限セットを割り当てます
    image.png
% ./run.sh processchangeeventheader.ProcessChangeEventHeader
2023-08-07 16:45:56,212 [main] java.lang.Class - Setting Up Subscriber
2023-08-07 16:45:56,222 [main] java.lang.Class - Using grpcHost api.pubsub.salesforce.com and grpcPort 7443
2023-08-07 16:46:00,693 [main] java.lang.Class - Subscription started for topic: /data/demo__ChangeEvent.
2023-08-07 16:46:05,697 [main] java.lang.Class - Subscription Active. Received 0 events.
  • 正常に接続できたようです

  • demo__cのレコードを新規作成してみます

  • 以下のようなログが出力されました

  • 変更イベントをサブスクライブできました

Payloadの部分のみ
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2af4AAB"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00062421-826d-39fd-64a0-d561fd2985a4",
    "sequenceNumber": 1,
    "commitTimestamp": 1691394434000,
    "commitNumber": 11070379580702,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": "0050l000007kzJvAAI",
  "Name": "Foo",
  "CreatedDate": 1691394434000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691394434000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "description__c": null,
  "type__c": null
}
  • 更新や削除の変更イベントも正常にサブスクライブできました

3.5.5. CometD および Pub/Sub API サブスクライバーによって受信される変更イベントに必要な権限

Required Permissions for Change Events Received by CometD and Pub/Sub API Subscribers

変更イベントを受信するには 必要な許可
特定の標準オブジェクトまたはカスタム オブジェクト: オブジェクトのすべてを表示

ここに上記のように必要な権限が記載されていました
他の変更イベントを使用する場合には、他の権限が必要のようなので気をつけましょう

3.5.6. Field-Level Securityの考慮

Field-Level Security
このページを参照すると、FLSも適用されそうです

  • 作成したdemo__cオブジェクトのFLSはデフォルトでチェックされたものをそのまま設定されていました

  • そのため、プロファイルSalesforce API Only System Integrationsも参照可能となっていたようです

  • カスタム項目について、プロファイルSalesforce API Only System Integrationsの参照可能を無効にしてみます

  • IdNameは必須項目のため参照可能を無効にはできません

  • Namedescription__ctype__cを全て設定して、demo__cレコードを新規作成します

execute anonymous apex
insert new demo__c(Name='Foo', description__c='Foo description', type__c='Foo');
  • 参照可能でないdescription__ctype__cの変更内容が送られていないことが分かります
Payloadの部分
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2afJAAR"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00065a51-cfc8-aae9-d88b-203390cc012a",
    "sequenceNumber": 1,
    "commitTimestamp": 1691395384000,
    "commitNumber": 11070384233657,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": "0050l000007kzJvAAI",
  "Name": "Foo",
  "CreatedDate": 1691395384000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691395384000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "description__c": null,
  "type__c": null
}
  • 権限セットでdescription__ctype__cに参照権限を付与してみます
    image.png
  • Namedescription__ctype__cを全て設定して、demo__cレコードを新規作成します
execute anonymous apex
insert new demo__c(Name='Bar', description__c='Bar description', type__c='Bar');
  • 参照可能となったdescription__ctype__cの変更内容が送られてきていることが分かります
Payloadの部分
{
  "ChangeEventHeader": {
    "entityName": "demo__c",
    "recordIds": ["a000l00000L2afnAAB"],
    "changeType": "CREATE",
    "changeOrigin": "com/salesforce/api/soap/58.0;client=devconsole",
    "transactionKey": "00061e0b-31de-9466-eb38-0b33b5989f16",
    "sequenceNumber": 1,
    "commitTimestamp": 1691396070000,
    "commitNumber": 11070387442420,
    "commitUser": "0050l000007kzJvAAI",
    "nulledFields": [],
    "diffFields": [],
    "changedFields": []
  },
  "OwnerId": "0050l000007kzJvAAI",
  "Name": "Bar",
  "CreatedDate": 1691396070000,
  "CreatedById": "0050l000007kzJvAAI",
  "LastModifiedDate": 1691396070000,
  "LastModifiedById": "0050l000007kzJvAAI",
  "description__c": "Bar description",
  "type__c": "Bar"
}

3.6. 変更イベントの発行と配信の使用状況を監視する

Monitor Change Event Publishing and Delivery Usage
こちらのページに記載されています

  • PlatformEventUsageMetricオブジェクトをクエリすることで分かるようです

変更イベントの場合、これらのメトリクスの使用状況データをクエリできます。最初の値は、クエリで指定したメトリック名の値です。
CHANGE_EVENTS_PUBLISHED- 公開された変更データ キャプチャ イベントの数
CHANGE_EVENTS_DELIVERED- CometD および Pub/Sub API クライアントに配信された変更データ キャプチャ イベントの数、empApi Lightning コンポーネントとイベントリレー

  • 試しに以下のSOQLを実行してみます
SELECT Name, StartDate, EndDate, Value FROM PlatformEventUsageMetric

image.png

  • 直近の24時間で、21件公開および配信されていることが分かります

  • CDCに関するそれぞれの割り当てについては以下のページに記載があります
    Change Data Capture Allocations

    • デフォルトの割り当てについて記載されています
    • アドオンライセンスを購入することで割り当てを増やすことができます

3.7. 変更データ キャプチャ用に選択できるエンティティの最大数

Change Data Capture Allocations
割り当ての中にデフォルトの標準チャネルまたはカスタム チャネルで変更データ キャプチャ用に選択できるエンティティ (標準オブジェクトとカスタム オブジェクトを含む) の最大数があります

5つを超えて変更データキャプチャに選択しようとするとどうなるか見てみます

  • 5つのエンティティを選択済みの状態
    image.png
  • ユーザ (User)を選択されたエンティティに追加します
    image.png
  • 保存すると以下のようなエラーメッセージが表示されて、5つを超えて選択することはできませんでした
    image.png
  • この記事では作成していませんが、エラーメッセージからカスタムチャネルを作成している場合に、そのチャネル数も合計数に含まれるようです

エンティティをこれ以上選択できません。変更データキャプチャで選択された一意のエンティティの最大数を超えました。エンティティの合計数には、ここで選択されたエンティティとカスタムチャネルを介して選択されたエンティティが含まれます。

3.8. tenantIdについて

  • アクセストークンを使用した認証の場合、tenantIdが必要になります
  • 接続する組織の設定画面以外で取得できるのか見てみます
  • 接続アプリケーションを経由してアクセストークンを取得すると、以下のようなレスポンスになります
アクセストークン取得した際のレスポンス(クライアントログインフローで取得)
{
  "access_token": "******",
  "signature": "******",
  "scope": "api",
  "instance_url": "https://MyDomain.my.salesforce.com",
  "id": "https://test.salesforce.com/id/${tenantId}EAC/005***************",
  "token_type": "Bearer",
  "issued_at": "1691397914471"
}
  • id値のid/の後ろのパス部分がtenantIdの値+3文字になっていました
  • 試しに${tenantId}EACを指定して、接続を試みてみます
  • 接続できました! この方法が使えそうです
  • もし他のより良い方法があったら教えていただけると助かります!

4. Appendix

4.1. Get StartedやPub/Sub APIクライアントのJavaサンプルをトレースしながら試してみる

  • Javaサンプルのうち、今回試した部分について抽出してトレースしながら理解をしてみました
    • protoファイルからコードの自動生成
    • 変更イベントをサブスクライブする
    • 認証は既に取得済みのアクセストークンを利用する
    • コードなどで気になるところを調べてみる

参考にした記事

Change Data Capture Developer Guide
Pub/Sub API

おわり。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0