1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Salesforceの変更データをConfluent(Kafka)に即時連携する方法

Last updated at Posted at 2023-08-05

はじめに

この記事は、SalesforceのChange Data Capture (CDC)機能とConfluentのSalesforce用コネクターにより、Salesforce上でのデータ変更内容をConfluent(Kafka)のトピックに即時連携する方法を確認したものです。Salesforceからどんな形のメッセージがトピックに書き込まれるのかを確認できる内容になっています。

参考にしたサイト

  1. SalesforceのChange Data Capture (CDC)機能
  2. ConfluentのSalesforce用コネクター(Salesforce Change Data Capture Source Connector for Confluent Platform)

2のサイトにQuick Startがあり、それを参考にトライしていますが、記載されている通りに実施するだけではうまくいきません。また、Salesforce初心者がゼロからトライするには説明が簡素で難しいです。公式ページや海外サイトを確認してもピンポイントでほしい情報を得られず、何度かトライ&エラーをしました。
以下は、最終的に成功した際の情報を整理して記載したものですので、実績のある方法として誰かの参考になれば幸いです。

バージョン情報

  • OS:Red Hat Enterprise Linux release 8.4 (Ootpa)
  • Confluent:Confluent Platform 7.2.1 (試用版・シングルノード)
  • Connector:SalesforceCdcSourceConnector 2.0.12

手順1. Salesforce Developer Editionのサインアップ

Salesforceには30日間無料のトライアル用アカウントと開発者向けの無料アカウント(Developer Edition)があるようです。SalesforceのChange Data Capture (CDC)機能が使用できるのはDeveloper Editionです。

30日間無料のトライアル用アカウントにはCDCの機能が利用できない:
image.png

Developer EditionではCDCの機能が利用できる:
image.png

Developer Editionサインアップはこのサイトに記載された方法に従います。

(1) サインアップ用のページで必要情報を登録します。
image.png

(2) 登録したメールアドレスにメールが来るので内容を確認します。
image.png

(3) ログインできることを確認します。
image.png

手順2. Salesforce側の設定

(1) 右上隅にある歯車アイコンを選択し、[設定] を選択します。[クイック検索] 検索ボックスに「アプリ」と入力し、フィルターされた結果で [アプリケーションマネージャー] を選択します。
image.png
image.png

(2) 右上にある[新規接続アプリケーション]を選択します。
image.png

(3) [新規接続アプリケーション]作成画面で、基本情報にある必須入力欄を入力します。
image.png

(4) [新規接続アプリケーション]作成画面で、[API(OAuth 設定の有効化)]に以下の設定をします。
[OAuth 設定の有効化][デバイスフローで有効化]にチェック
image.png

[選択した OAuth 範囲]にある[利用可能な OAuth 画面]をすべて選択し[選択した OAuth 範囲]に追加
image.png

[保存]を選択
image.png

下記のような画面が表示されるので「次へ」を選択
image.png

「接続アプリケーションを管理する」の画面が表示される
image.png

(5) 作成した接続アプリケーションの「Consumer Key」と「Consumer Secret」を取得します。
[コンシューマの詳細を管理]を選択
image.png

[コンシューマの詳細を管理]を選択
image.png

下記のような画面が表示されるのでメールを受信したら確認コードを入力して[検証]を選択
image.png

「Consumer Key」と「Consumer Secret」が表示されるので記録
image.png

(6) セキュリティトークンのリセットを行います。

画面右上のプロファイルのアイコンを選択し[設定]を選択
image.png

左側のパネルより[私のセキュリティトークンのリセット]を選択し、[セキュリティトークンのリセット]ボタンをクリック
image.png

下記のような画面になるのでメールが受信されるのを待つ
image.png

メールで受信したユーザ名とセキュリティトークンを記録する
image.png

(7) 右上隅にある歯車アイコンを選択し、[設定] を選択します。[クイック検索] 検索ボックスに「OAuth」と入力し、フィルターされた結果で [OAuth および OpenID Connect 設定] を選択します。[OAuth ユーザ名パスワードフローを許可]をオフからオンに変更します。

image.png

※この手順はConfluentのQuickStartには記載がない手順ですが、必須の手順です。この手順を行わないとConfluentのコネクターで正しい認証情報を設定しても認証エラーになります。この設定に関しては下記のサイトが参考になります。

OAuth 2.0 Username-Password Flow Blocked by Default in New Orgs

手順3. Confluent側でのConnectorの設定

(1) Salesforce Change Data Capture Source Connector for Confluent Platformをインストールします。

Confluent-hubから「Salesforce Connector (Source and Sink)」をインストール
image.png

インストールのためのコマンド実行

confluent-hub install confluentinc/kafka-connect-salesforce:2.0.12

(2) Kafka Connectを再起動します。

confluent local services connect stop
confluent local services connect start

(3) 使用可能なコネクタープラグインにSalesforceのコネクターが追加されたことを確認します。

curl -s localhost:8083/connector-plugins | jq

出力例
image.png

(4) コネクターを作成します。

curl -X PUT localhost:8083/connectors/sfcdc-account-source/config -H "Content-Type: application/json" \
--data '{
    "name":"sfcdc-account-source",
    "connector.class":"io.confluent.salesforce.SalesforceCdcSourceConnector",
    "tasks.max":"1",
    "kafka.topic":"sfcdc-account-topic",
    "salesforce.cdc.channel" : "AccountChangeEvent",
    "salesforce.initial.start" : "all",
    "salesforce.consumer.key" : "3MVG・・(省略)・・edqI",
    "salesforce.consumer.secret" : "9195・・(省略)・・C8B6",
    "salesforce.username" : "xxxxx@xxx.com(Salesforceログイン用メールアドレス)",
    "salesforce.password" : "xxxxxxxxxxxxxx(Salesforceログイン用パスワード)",
    "salesforce.password.token" : "T2L・・(省略)・・Iuj",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1"
}' | jq
  • nameはコネクター名。
  • connector.classはコネクタープラグインのクラス名(上記固定)
  • tasksはスレッド数。このコネクターは上限が1という仕様。
  • kafka.topicはトピック名。
  • salesforce.cdc.channelは⚪︎⚪︎⚪︎ChangeEventとなる。⚪︎⚪︎⚪︎の部分はSalesforce内でのエンティティ名が入る。取引先(Account)のデータ変更イベントであればAccountChangeEventとなる。
  • salesforce.initial.startはコネクターの作成前にトピック内にある全てのメッセージをコンシューム対象にするか(all)、コネクター作成後からトピックに書き込まれたメッセージをコンシューム対象にするか(latest)。
  • salesforce.consumer.key、salesforce.consumer.secret、salesforce.username、salesforce.password、salesforce.password.tokenはSalesforceおよびSalesforceの接続アプリケーションに接続するための認証情報。
  • confluent.topic.bootstrap.serversはブローカーリスト。
  • confluent.topic.replication.factorはトピックのレプリカ数。デフォルト値が「3」のため試用版のシングルノード構成のConfluentでは「1」を指定する必要がある。

コネクターのステータス確認

curl -s localhost:8083/connectors/sfcdc-account-source/status | jq

出力例
image.png

手順4. Salesforceを使用してAccount(取引先)を追加・変更・削除

(1) Accountに対する変更データのキャプチャーを有効にします。

[設定]から[変更データキャプチャ]を選択
image.png

取引先(Account)を[使用可能なエンティティ]から[選択されたエンティティ]にする
image.png

(2) Accountを追加します。(TestAccount1を追加)
image.png
image.png

(3) Accountを追加します。(TestAccount2を追加)
image.png

(4) Accountを変更します。(TestAccount2を変更)
image.png
Phoneを090-1111-1111から090-2222-2222に変更
image.png

(5) Accountを削除します。(TestAccount2を削除)
image.png
image.png

手順5. Confluent(Kafka)のトピックやスキーマを確認する

スキーマサブジェクト確認

curl -sX GET http://localhost:8081/subjects | jq . | grep sfcdc
  "sfcdc-account-topic-value"
  • スキーマサブジェクトは「トピック名-key」または「トピック名-value」になるが、Salesforce CDC+コネクターによってトピックに書き込まれるメッセージのスキーマには「トピック名-key」のサブジェクトはない。

スキーマ確認

curl -sX GET http://localhost:8081/subjects/sfcdc-account-topic-value/versions/latest | jq .schema -r | jq
{
  "type": "record",
  "name": "AccountChangeEvent",
  "namespace": "io.confluent.salesforce",
  "fields": [
    {
      "name": "Id",
      "type": {
        "type": "string",
        "connect.doc": "Unique identifier for the object."
      }
    },
    {
      "name": "ReplayId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ChangeEventHeader",
      "type": {
        "type": "record",
        "name": "ChangeEventHeader",
        "namespace": "",
        "fields": [
          {
            "name": "entityName",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "recordIds",
            "type": [
              "null",
              {
                "type": "array",
                "items": "string"
              }
            ],
            "default": null
          },
          {
            "name": "changeType",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "changedFields",
            "type": [
              "null",
              {
                "type": "array",
                "items": "string"
              }
            ],
            "default": null
          },
          {
            "name": "changeOrigin",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "transactionKey",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "sequenceNumber",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "commitTimestamp",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "commitUser",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "commitNumber",
            "type": [
              "null",
              "long"
            ],
            "default": null
          }
        ],
        "connect.name": "ChangeEventHeader"
      }
    },
    {
      "name": "Name",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "LastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "FirstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Salutation",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Type",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ParentId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingStreet",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingCity",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingState",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingPostalCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingCountry",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingLatitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "BillingLongitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "BillingGeocodeAccuracy",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingAddress",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Address",
          "fields": [
            {
              "name": "GeocodeAccuracy",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "State",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.doc": ""
                }
              ],
              "default": null
            },
            {
              "name": "Street",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "PostalCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Country",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Latitude",
              "type": [
                "null",
                "double"
              ],
              "default": null
            },
            {
              "name": "City",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Longitude",
              "type": [
                "null",
                "double"
              ],
              "default": null
            },
            {
              "name": "CountryCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "StateCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ],
          "connect.name": "io.confluent.salesforce.Address"
        }
      ],
      "default": null
    },
    {
      "name": "ShippingStreet",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingCity",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingState",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingPostalCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingCountry",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingLatitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "ShippingLongitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "ShippingGeocodeAccuracy",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingAddress",
      "type": [
        "null",
        "Address"
      ],
      "default": null
    },
    {
      "name": "Phone",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Fax",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "AccountNumber",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Website",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Sic",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Industry",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "AnnualRevenue",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NumberOfEmployees",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "Ownership",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "TickerSymbol",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Description",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Rating",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Site",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "OwnerId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "CreatedDate",
      "type": [
        "null",
        {
          "type": "long",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    },
    {
      "name": "CreatedById",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "LastModifiedDate",
      "type": [
        "null",
        {
          "type": "long",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    },
    {
      "name": "LastModifiedById",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Jigsaw",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "JigsawCompanyId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "CleanStatus",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "AccountSource",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "DunsNumber",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Tradestyle",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NaicsCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NaicsDesc",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "YearStarted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SicDesc",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "DandbCompanyId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "OperatingHoursId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "CustomerPriority__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SLA__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Active__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NumberofLocations__c",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "UpsellOpportunity__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SLASerialNumber__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SLAExpirationDate__c",
      "type": [
        "null",
        {
          "type": "int",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Date",
          "logicalType": "date"
        }
      ],
      "default": null
    },
    {
      "name": "_ObjectType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "_EventType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "io.confluent.salesforce.AccountChangeEvent"
}

メッセージ確認

ksql> print 'sfcdc-account-topic' from beginning;
  • Keyはなし。
  • メッセージ毎にrowtimeあり。(下記にイメージあり)
    image.png
  • メッセージのバリュー部だけJson形式でパースしたものが下記の通り。(上からTestAccount1の追加、TestAccount2の追加、TestAccount2の変更、TestAccount2の削除)
{
  "Id": "0015j000016nbijAAA",
  "ReplayId": "9021830",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nbijAAA"
    ],
    "changeType": "CREATE",
    "changedFields": [],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004bf8f-a136-35e0-232f-9f62eee73abb",
    "sequenceNumber": 1,
    "commitTimestamp": 1691131295000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677598623316
  },
  "Name": "TestAccount1",
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": "Prospect",
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": null,
  "Fax": null,
  "AccountNumber": "11111",
  "Website": null,
  "Sic": null,
  "Industry": "Banking",
  "AnnualRevenue": null,
  "NumberOfEmployees": null,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": "0055j0000099yTYAAY",
  "CreatedDate": 1691131295000,
  "CreatedById": "0055j0000099yTYAAY",
  "LastModifiedDate": 1691131295000,
  "LastModifiedById": "0055j0000099yTYAAY",
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": "Pending",
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
  "Id": "0015j000016nzGtAAI",
  "ReplayId": "9022402",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nzGtAAI"
    ],
    "changeType": "CREATE",
    "changedFields": [],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004c09f-20f4-8968-5df4-ab3738e95f28",
    "sequenceNumber": 1,
    "commitTimestamp": 1691132462000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677611261555
  },
  "Name": "TestAccount2",
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": null,
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": "090-1111-1111",
  "Fax": null,
  "AccountNumber": "22222",
  "Website": null,
  "Sic": null,
  "Industry": "Chemicals",
  "AnnualRevenue": null,
  "NumberOfEmployees": 10000,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": "0055j0000099yTYAAY",
  "CreatedDate": 1691132462000,
  "CreatedById": "0055j0000099yTYAAY",
  "LastModifiedDate": 1691132462000,
  "LastModifiedById": "0055j0000099yTYAAY",
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": "Pending",
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
  "Id": "0015j000016nzGtAAI",
  "ReplayId": "9022436",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nzGtAAI"
    ],
    "changeType": "UPDATE",
    "changedFields": [
      "Phone",
      "LastModifiedDate"
    ],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004c0b9-17df-c835-9f02-7ed1af25dea9",
    "sequenceNumber": 1,
    "commitTimestamp": 1691132573000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677612525804
  },
  "Name": null,
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": null,
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": "090-2222-2222",
  "Fax": null,
  "AccountNumber": null,
  "Website": null,
  "Sic": null,
  "Industry": null,
  "AnnualRevenue": null,
  "NumberOfEmployees": null,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": 1691132573000,
  "LastModifiedById": null,
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": null,
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
  "Id": "0015j000016nzGtAAI",
  "ReplayId": "9022465",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nzGtAAI"
    ],
    "changeType": "DELETE",
    "changedFields": [],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004c0cb-a153-ef60-2081-04ff9923ff23",
    "sequenceNumber": 1,
    "commitTimestamp": 1691132653000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677613463020
  },
  "Name": null,
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": null,
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": null,
  "Fax": null,
  "AccountNumber": null,
  "Website": null,
  "Sic": null,
  "Industry": null,
  "AnnualRevenue": null,
  "NumberOfEmployees": null,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": null,
  "LastModifiedById": null,
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": null,
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}

おわりに(考察)

  • Salesforceからキーありでメッセージングできるかもしれませんが、自然体ではメッセージのスキーマにはキーがないようです。メッセージの使い勝手として、キーは必要だと思います。コネクターのSMT(Single Message Transforms)によってキーを付与するか、KsqlDBのクエリーによるバリュー項目のキー昇格を行う必要があります。
  • 既存のテーブルにメッセージをシンクする場合、項目の取捨選択や型変換が必要になります。これはksqlDBで対応することができます。
  • トピックからJDBCSinkConnectorでDBにデータ反映する場合は、changeTypeが"DELETE"のメッセージをTombstoneレコードに変換する必要があります。ksqlDBでこれを実施する方法を確認済みですがテクニックが要ります。(別途方法を投稿できたらと思います)
  • Salesforceのアカウントに割り当てられたチェンジイベント発行数の上限がユースケースに見合ったものかどうかあらかじめ確認が必要です。
  • 上記の検証はパブリックなSalesforceのアカウントでのトライでしたが、プライベートな環境のSalesforceに対する接続には別途検証が必要になります。
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?