はじめに
この記事は、SalesforceのChange Data Capture (CDC)機能とConfluentのSalesforce用コネクターにより、Salesforce上でのデータ変更内容をConfluent(Kafka)のトピックに即時連携する方法を確認したものです。Salesforceからどんな形のメッセージがトピックに書き込まれるのかを確認できる内容になっています。
参考にしたサイト
- SalesforceのChange Data Capture (CDC)機能
- ConfluentのSalesforce用コネクター(Salesforce Change Data Capture Source Connector for Confluent Platform)
2のサイトにQuick Startがあり、それを参考にトライしていますが、記載されている通りに実施するだけではうまくいきません。また、Salesforce初心者がゼロからトライするには説明が簡素で難しいです。公式ページや海外サイトを確認してもピンポイントでほしい情報を得られず、何度かトライ&エラーをしました。
以下は、最終的に成功した際の情報を整理して記載したものですので、実績のある方法として誰かの参考になれば幸いです。
バージョン情報
- OS:Red Hat Enterprise Linux release 8.4 (Ootpa)
- Confluent:Confluent Platform 7.2.1 (試用版・シングルノード)
- Connector:SalesforceCdcSourceConnector 2.0.12
手順1. Salesforce Developer Editionのサインアップ
Salesforceには30日間無料のトライアル用アカウントと開発者向けの無料アカウント(Developer Edition)があるようです。SalesforceのChange Data Capture (CDC)機能が使用できるのはDeveloper Editionです。
30日間無料のトライアル用アカウントにはCDCの機能が利用できない:
Developer EditionではCDCの機能が利用できる:
Developer Editionサインアップはこのサイトに記載された方法に従います。
(2) 登録したメールアドレスにメールが来るので内容を確認します。
手順2. Salesforce側の設定
(1) 右上隅にある歯車アイコンを選択し、[設定] を選択します。[クイック検索] 検索ボックスに「アプリ」と入力し、フィルターされた結果で [アプリケーションマネージャー] を選択します。
(2) 右上にある[新規接続アプリケーション]を選択します。
(3) [新規接続アプリケーション]作成画面で、基本情報にある必須入力欄を入力します。
(4) [新規接続アプリケーション]作成画面で、[API(OAuth 設定の有効化)]に以下の設定をします。
[OAuth 設定の有効化][デバイスフローで有効化]にチェック
[選択した OAuth 範囲]にある[利用可能な OAuth 画面]をすべて選択し[選択した OAuth 範囲]に追加
(5) 作成した接続アプリケーションの「Consumer Key」と「Consumer Secret」を取得します。
[コンシューマの詳細を管理]を選択
下記のような画面が表示されるのでメールを受信したら確認コードを入力して[検証]を選択
「Consumer Key」と「Consumer Secret」が表示されるので記録
(6) セキュリティトークンのリセットを行います。
左側のパネルより[私のセキュリティトークンのリセット]を選択し、[セキュリティトークンのリセット]ボタンをクリック
(7) 右上隅にある歯車アイコンを選択し、[設定] を選択します。[クイック検索] 検索ボックスに「OAuth」と入力し、フィルターされた結果で [OAuth および OpenID Connect 設定] を選択します。[OAuth ユーザ名パスワードフローを許可]をオフからオンに変更します。
※この手順はConfluentのQuickStartには記載がない手順ですが、必須の手順です。この手順を行わないとConfluentのコネクターで正しい認証情報を設定しても認証エラーになります。この設定に関しては下記のサイトが参考になります。
OAuth 2.0 Username-Password Flow Blocked by Default in New Orgs
手順3. Confluent側でのConnectorの設定
(1) Salesforce Change Data Capture Source Connector for Confluent Platformをインストールします。
Confluent-hubから「Salesforce Connector (Source and Sink)」をインストール
インストールのためのコマンド実行
confluent-hub install confluentinc/kafka-connect-salesforce:2.0.12
(2) Kafka Connectを再起動します。
confluent local services connect stop
confluent local services connect start
(3) 使用可能なコネクタープラグインにSalesforceのコネクターが追加されたことを確認します。
curl -s localhost:8083/connector-plugins | jq
(4) コネクターを作成します。
curl -X PUT localhost:8083/connectors/sfcdc-account-source/config -H "Content-Type: application/json" \
--data '{
"name":"sfcdc-account-source",
"connector.class":"io.confluent.salesforce.SalesforceCdcSourceConnector",
"tasks.max":"1",
"kafka.topic":"sfcdc-account-topic",
"salesforce.cdc.channel" : "AccountChangeEvent",
"salesforce.initial.start" : "all",
"salesforce.consumer.key" : "3MVG・・(省略)・・edqI",
"salesforce.consumer.secret" : "9195・・(省略)・・C8B6",
"salesforce.username" : "xxxxx@xxx.com(Salesforceログイン用メールアドレス)",
"salesforce.password" : "xxxxxxxxxxxxxx(Salesforceログイン用パスワード)",
"salesforce.password.token" : "T2L・・(省略)・・Iuj",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}' | jq
- nameはコネクター名。
- connector.classはコネクタープラグインのクラス名(上記固定)
- tasksはスレッド数。このコネクターは上限が1という仕様。
- kafka.topicはトピック名。
- salesforce.cdc.channelは⚪︎⚪︎⚪︎ChangeEventとなる。⚪︎⚪︎⚪︎の部分はSalesforce内でのエンティティ名が入る。取引先(Account)のデータ変更イベントであればAccountChangeEventとなる。
- salesforce.initial.startはコネクターの作成前にトピック内にある全てのメッセージをコンシューム対象にするか(all)、コネクター作成後からトピックに書き込まれたメッセージをコンシューム対象にするか(latest)。
- salesforce.consumer.key、salesforce.consumer.secret、salesforce.username、salesforce.password、salesforce.password.tokenはSalesforceおよびSalesforceの接続アプリケーションに接続するための認証情報。
- confluent.topic.bootstrap.serversはブローカーリスト。
- confluent.topic.replication.factorはトピックのレプリカ数。デフォルト値が「3」のため試用版のシングルノード構成のConfluentでは「1」を指定する必要がある。
コネクターのステータス確認
curl -s localhost:8083/connectors/sfcdc-account-source/status | jq
手順4. Salesforceを使用してAccount(取引先)を追加・変更・削除
(1) Accountに対する変更データのキャプチャーを有効にします。
取引先(Account)を[使用可能なエンティティ]から[選択されたエンティティ]にする
(2) Accountを追加します。(TestAccount1を追加)
(3) Accountを追加します。(TestAccount2を追加)
(4) Accountを変更します。(TestAccount2を変更)
Phoneを090-1111-1111から090-2222-2222に変更
(5) Accountを削除します。(TestAccount2を削除)
手順5. Confluent(Kafka)のトピックやスキーマを確認する
スキーマサブジェクト確認
curl -sX GET http://localhost:8081/subjects | jq . | grep sfcdc
"sfcdc-account-topic-value"
- スキーマサブジェクトは「トピック名-key」または「トピック名-value」になるが、Salesforce CDC+コネクターによってトピックに書き込まれるメッセージのスキーマには「トピック名-key」のサブジェクトはない。
スキーマ確認
curl -sX GET http://localhost:8081/subjects/sfcdc-account-topic-value/versions/latest | jq .schema -r | jq
{
"type": "record",
"name": "AccountChangeEvent",
"namespace": "io.confluent.salesforce",
"fields": [
{
"name": "Id",
"type": {
"type": "string",
"connect.doc": "Unique identifier for the object."
}
},
{
"name": "ReplayId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ChangeEventHeader",
"type": {
"type": "record",
"name": "ChangeEventHeader",
"namespace": "",
"fields": [
{
"name": "entityName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "recordIds",
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"default": null
},
{
"name": "changeType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "changedFields",
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"default": null
},
{
"name": "changeOrigin",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "transactionKey",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "sequenceNumber",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "commitTimestamp",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "commitUser",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "commitNumber",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "ChangeEventHeader"
}
},
{
"name": "Name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LastName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "FirstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Salutation",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ParentId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingStreet",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingCity",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingState",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingPostalCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingCountry",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingLatitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "BillingLongitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "BillingGeocodeAccuracy",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingAddress",
"type": [
"null",
{
"type": "record",
"name": "Address",
"fields": [
{
"name": "GeocodeAccuracy",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "State",
"type": [
"null",
{
"type": "string",
"connect.doc": ""
}
],
"default": null
},
{
"name": "Street",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PostalCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Country",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Latitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "City",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Longitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "CountryCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "StateCode",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.confluent.salesforce.Address"
}
],
"default": null
},
{
"name": "ShippingStreet",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingCity",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingState",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingPostalCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingCountry",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingLatitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "ShippingLongitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "ShippingGeocodeAccuracy",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingAddress",
"type": [
"null",
"Address"
],
"default": null
},
{
"name": "Phone",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Fax",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AccountNumber",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Website",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Sic",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Industry",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AnnualRevenue",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NumberOfEmployees",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "Ownership",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "TickerSymbol",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Description",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Rating",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Site",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "OwnerId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CreatedDate",
"type": [
"null",
{
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "CreatedById",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LastModifiedDate",
"type": [
"null",
{
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "LastModifiedById",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Jigsaw",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "JigsawCompanyId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CleanStatus",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AccountSource",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DunsNumber",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Tradestyle",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NaicsCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NaicsDesc",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "YearStarted",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SicDesc",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DandbCompanyId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "OperatingHoursId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CustomerPriority__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SLA__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Active__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NumberofLocations__c",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "UpsellOpportunity__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SLASerialNumber__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SLAExpirationDate__c",
"type": [
"null",
{
"type": "int",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Date",
"logicalType": "date"
}
],
"default": null
},
{
"name": "_ObjectType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "_EventType",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.confluent.salesforce.AccountChangeEvent"
}
メッセージ確認
ksql> print 'sfcdc-account-topic' from beginning;
- Keyはなし。
- メッセージ毎にrowtimeあり。(下記にイメージあり)
- メッセージのバリュー部だけJson形式でパースしたものが下記の通り。(上からTestAccount1の追加、TestAccount2の追加、TestAccount2の変更、TestAccount2の削除)
{
"Id": "0015j000016nbijAAA",
"ReplayId": "9021830",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nbijAAA"
],
"changeType": "CREATE",
"changedFields": [],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004bf8f-a136-35e0-232f-9f62eee73abb",
"sequenceNumber": 1,
"commitTimestamp": 1691131295000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677598623316
},
"Name": "TestAccount1",
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": "Prospect",
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": null,
"Fax": null,
"AccountNumber": "11111",
"Website": null,
"Sic": null,
"Industry": "Banking",
"AnnualRevenue": null,
"NumberOfEmployees": null,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": "0055j0000099yTYAAY",
"CreatedDate": 1691131295000,
"CreatedById": "0055j0000099yTYAAY",
"LastModifiedDate": 1691131295000,
"LastModifiedById": "0055j0000099yTYAAY",
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": "Pending",
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
"Id": "0015j000016nzGtAAI",
"ReplayId": "9022402",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nzGtAAI"
],
"changeType": "CREATE",
"changedFields": [],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004c09f-20f4-8968-5df4-ab3738e95f28",
"sequenceNumber": 1,
"commitTimestamp": 1691132462000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677611261555
},
"Name": "TestAccount2",
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": null,
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": "090-1111-1111",
"Fax": null,
"AccountNumber": "22222",
"Website": null,
"Sic": null,
"Industry": "Chemicals",
"AnnualRevenue": null,
"NumberOfEmployees": 10000,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": "0055j0000099yTYAAY",
"CreatedDate": 1691132462000,
"CreatedById": "0055j0000099yTYAAY",
"LastModifiedDate": 1691132462000,
"LastModifiedById": "0055j0000099yTYAAY",
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": "Pending",
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
"Id": "0015j000016nzGtAAI",
"ReplayId": "9022436",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nzGtAAI"
],
"changeType": "UPDATE",
"changedFields": [
"Phone",
"LastModifiedDate"
],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004c0b9-17df-c835-9f02-7ed1af25dea9",
"sequenceNumber": 1,
"commitTimestamp": 1691132573000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677612525804
},
"Name": null,
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": null,
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": "090-2222-2222",
"Fax": null,
"AccountNumber": null,
"Website": null,
"Sic": null,
"Industry": null,
"AnnualRevenue": null,
"NumberOfEmployees": null,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": null,
"CreatedDate": null,
"CreatedById": null,
"LastModifiedDate": 1691132573000,
"LastModifiedById": null,
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": null,
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
"Id": "0015j000016nzGtAAI",
"ReplayId": "9022465",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nzGtAAI"
],
"changeType": "DELETE",
"changedFields": [],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004c0cb-a153-ef60-2081-04ff9923ff23",
"sequenceNumber": 1,
"commitTimestamp": 1691132653000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677613463020
},
"Name": null,
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": null,
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": null,
"Fax": null,
"AccountNumber": null,
"Website": null,
"Sic": null,
"Industry": null,
"AnnualRevenue": null,
"NumberOfEmployees": null,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": null,
"CreatedDate": null,
"CreatedById": null,
"LastModifiedDate": null,
"LastModifiedById": null,
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": null,
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
おわりに(考察)
- Salesforceからキーありでメッセージングできるかもしれませんが、自然体ではメッセージのスキーマにはキーがないようです。メッセージの使い勝手として、キーは必要だと思います。コネクターのSMT(Single Message Transforms)によってキーを付与するか、KsqlDBのクエリーによるバリュー項目のキー昇格を行う必要があります。
- 既存のテーブルにメッセージをシンクする場合、項目の取捨選択や型変換が必要になります。これはksqlDBで対応することができます。
- トピックからJDBCSinkConnectorでDBにデータ反映する場合は、changeTypeが"DELETE"のメッセージをTombstoneレコードに変換する必要があります。ksqlDBでこれを実施する方法を確認済みですがテクニックが要ります。(別途方法を投稿できたらと思います)
- Salesforceのアカウントに割り当てられたチェンジイベント発行数の上限がユースケースに見合ったものかどうかあらかじめ確認が必要です。
- 上記の検証はパブリックなSalesforceのアカウントでのトライでしたが、プライベートな環境のSalesforceに対する接続には別途検証が必要になります。