Heroku Advent Calendar 2023 21日目の記事です🎄
はじめに
はいどうもー!リバネス開発チームのトミー(@tomyf)です(`・ω・´)
弊社にSalesforceを導入してから10年が経とうとしています。
今回は Data Cloud を使うことになったので、蓄積された20万件以上のユーザアカウントの中で、同一人物と思われるユーザアカウントを統合して、まとめて管理できるようにしようと思います。
Data Cloud についての詳しい説明についてはこちらをご確認してください。
https://help.salesforce.com/s/articleView?id=sf.c360_a_data_cloud.htm&type=5
Data Cloud の動きを簡単に説明すると、「姓名 + 誕生日」や「姓名 + 電話番号」などの組み合わせで一致するユーザアカウントを紐付けて、解決済みIDとして作成します。解決済みIDと解決前IDのリンクデータも作成されるため、どのユーザアカウントが統合されたか知ることができます。
今回は Heroku の記事なので Data Cloud の設定の話はまた今度にしますw
この記事では Heroku を使ってどのように Data Cloud から Sales Cloud にデータ連携したのか解説します。
ガッツリとプログラミングが絡んできますので、ある程度プログラミングができる方向けとなっています(*・ω・)b
なぜ Heroku Connect なのか?
Data Cloud の解決済みIDを Sales Cloud に戻す方法として Salesforce Flow Builder があるのですが、こちらを使わないのか?と疑問に思うかもしれません。
これは解決済みIDのデータ量が多くて、ガバナ制限にかかることが原因です。
弊社では Heroku プラットフォーム上でWebアプリを稼働させているので、これ幸いと Heroku から Data Cloud API 経由でデータを取得することにしました。
取得したデータは Heroku Connect されている Heroku Postgres に格納します。
なぜ Sales Cloud に連携するのか?
Data Cloud の機能ではID解決をしてくれますが、データをグラフィカルに表示する機能がないため、どのユーザアカウントがどれと紐づいたのかがイマイチわからないという状況になります。
そのため、ApexやWebアプリ側で解決済みIDを見やすい形で表示することを目的に Sales Cloud に連携を行います。
環境
- Laravel v9.25.1
- PHP v8.1.26
APIで取得してから Sales Cloud にデータ連携する手順
ここからは Laravel (PHP) での処理の内容になります。
まずは、APIから取得した統合リンクを格納するテーブル (unified_link_individuals) を作成します。
また、 Sales Cloud のリバネスIDのテーブル名を (lnestid__c) とします。
unified_link_individuals
id: uuid
individual_id: varchar(255) // リバネスID
unified_individual_id: varchar(255) // 解決済みID
unified_at: timestamp(0) // Data Cloud で解決した日時
applied_at: timestamp(0) // Sales Cloud のリバネスIDテーブルに反映した日時
created_at: timestamp(0) // このレコードが作成された日時
updated_at: timestamp(0) // このレコードが更新された日時
手順
- Data Cloud API で統合リンクを取得する
- Heroku Postgres の統合リンクテーブル (unified_link_individuals) に格納する
- Heroku Connect されているリバネスIDのテーブル (lnestid__c) と統合リンクテーブルをジョインして、一括でアップデートする
Data Cloud API で統合リンクを取得する
Data Cloud API を使用するためのドキュメントはこちらです。
https://developer.salesforce.com/docs/atlas.en-us.c360a_api.meta/c360a_api/c360a_getting_started_with_cdp.htm
まずは、 Salesforce の認証トークンを取得し、認証トークンを使用して Data Cloud のアクセストークンを取得する必要があります。
この記事では login 関数内でアクセストークンの取得を行っています。
use Illuminate\Support\Carbon;
private function fetchUnifiedLinkIndividual()
{
$dataTransformAPIName = config('services.salesfoce.data_transform_api_name');
$maxUnifiedAt = new Carbon(UnifiedLinkIndividual::max('unified_at') ?? '2023-01-01');
$date = $maxUnifiedAt->copy()->addDay()->toDateString();
$response = self::login();
$data = self::executeQuery(
$response['instance_url'],
$response['access_token'],
"SELECT * FROM UnifiedLinkssotIndividual***__dlm WHERE ssot__DataSourceId__c = 'DataCloudTransforms' AND ssot__DataSourceObjectId__c = '${dataTransformAPIName}' AND CreatedDate__c > Date('${date}')"
);
echo 'api done: ' . date('Y-m-d H:i:s') . ' | ' . memory_get_usage() / (1024 * 1024) . 'MB' . "\n";
echo "data count:".count($data)."\n";
return $data;
}
こちらが Salesforce 認証トークンの取得と Data Cloud アクセストークンの取得を行う login
関数です。
use Illuminate\Support\Facades\Http;
private function login()
{
$response = Http::acceptJson()->asForm()->baseUrl('https://login.salesforce.com')->post('/services/oauth2/token', [
'grant_type' => 'password',
'client_id' => config('services.salesfoce.client_id'),
'client_secret' => config('services.salesfoce.client_secret'),
'username' => config('services.salesfoce.username'),
'password' => config('services.salesfoce.password'),
]);
return Http::acceptJson()->asForm()->baseUrl($response['instance_url'])->post('/services/a360/token', [
'grant_type' => 'urn:salesforce:grant-type:external:cdp',
'subject_token_type' => 'urn:ietf:params:oauth:token-type:access_token',
'subject_token' => $response['access_token'],
]);
}
executeQuery
関数でクエリAPIを実行します。この時、アクセストークンを取得した際に付随してきた $instanceUrl
と $accessToken
を一緒に渡します。
クエリAPI は少し癖があり、レスポンス内の done
が false なら、もう一度クエリAPIを実行する必要があります。
そのため、引数に $nextBatchId
があればクエリAPIの続きを取得するようにしています。
use Illuminate\Support\Facades\Http;
private function executeQuery($instanceUrl, $accessToken, $sql, $nextBatchId = null)
{
if ($nextBatchId) {
$response = Http::withToken($accessToken)->acceptJson()->asJson()->baseUrl('https://' . $instanceUrl)->get('/api/v2/query/' . $nextBatchId , [
'sql' => $sql,
]);
} else {
$response = Http::withToken($accessToken)->acceptJson()->asJson()->baseUrl('https://' . $instanceUrl)->post('/api/v2/query', [
'sql' => $sql,
]);
}
$additionalData = [];
if (!$response['done']) {
$additionalData = self::executeQuery($instanceUrl, $accessToken, $sql, $response['nextBatchId']);
}
$keys = collect($response['metadata'])->sortBy('placeInOrder')->keys()->toArray();
$data = collect($response['data'])->map(function ($item) use ($keys) {
return collect($item)->mapWithKeys(function ($item, $index) use ($keys) {
return [$keys[$index] => $item];
})->toArray();
})->toArray();
return collect($data)->concat($additionalData)->toArray();
}
Heroku Postgres の統合リンクテーブル (unified_link_individuals) に格納する
fetchUnifiedLinkIndividual
関数で取得したデータを渡します。
chunk(5000)
で5000件ずつに分けて処理することでDBへの負荷を抑えています。
use App\Models\UnifiedLinkIndividual;
private function upsertUnifiedLinkIndividual($data)
{
collect($data)
->map(function ($value) {
return [
'id' => Str::orderedUuid()->toString(),
'individual_id' => $value['SourceRecordId__c'],
'unified_individual_id' => $value['UnifiedRecordId__c'],
'unified_at' => $value['CreatedDate__c'],
'applied_at' => null,
];
})
->chunk(5000)
->each(function ($values) {
UnifiedLinkIndividual::upsert($values->toArray(), ['individual_id'], ['unified_individual_id', 'unified_at', 'applied_at']);
});
}
Heroku Connect されているリバネスIDのテーブル (lnestid__c) と統合リンクテーブルをジョインして、一括でアップデートする
こちらも5000件ずつに分けて処理を行います。
lnestid__c テーブルの unifiedrecordid__c に解決済みIDを反映していきます。
反映が終わったデータを対象に unified_link_individuals テーブルの applied_at に日時を格納します。
use App\Models\UnifiedLinkIndividual;
use App\Models\User;
private function applyUnifiedLinkIndividual()
{
$_user = new User;
$_unifiedLinkIndividual = new UnifiedLinkIndividual;
UnifiedLinkIndividual::query()
->whereNull('applied_at')
->chunkById(5000, function ($unifiedLinkIndividuals) use ($_user, $_unifiedLinkIndividual) {
User::query()
->join($_unifiedLinkIndividual->getTable(), function ($query) use ($unifiedLinkIndividuals, $_user, $_unifiedLinkIndividual) {
$query
->on($_unifiedLinkIndividual->qualifyColumn('individual_id'), $_user->qualifyColumn('sfid'))
->whereIn($_unifiedLinkIndividual->qualifyColumn('id'), $unifiedLinkIndividuals->pluck('id')->toArray());
})
->updateFrom([
'unifiedrecordid__c' => \DB::raw($_unifiedLinkIndividual->qualifyColumn('unified_individual_id')),
]);
UnifiedLinkIndividual::query()
->whereIn('id', $unifiedLinkIndividuals->pluck('id')->toArray())
->update([
'applied_at' => now(),
]);
});
}
おわりに
以上の手順で
Data Cloud → Heroku Postgres → Sales Cloud
の連携ができるようになりました!
Data Cloud のID解決はフローによって毎日行われるので、 それに合わせて Heroku 側での処理も Heroku Scheduler にバッチ処理を登録して、毎日行うように設定します。
データ連携が出来たので、次は連携した解決済みIDをグラフィカルに表示するWebアプリケーションを開発したいと思います!
それではまたお会いしましょう!
バイバイ〜(´・ω・`)ノシ