はじめに
前回の記事では、オプチャグラフのデータパイプライン全体像を解説しました。
本記事では、その中核となる「差分検出」の仕組みを、実際のコードとともに詳しく解説します。
25万件のデータを毎時同期しながら、無駄なDB書き込みを最小限に抑える設計です。
この設計によってすべての処理に掛かる時間は1分程度になりました(APIからデータダウンロードを除く)。
解決したい課題
毎時25万件のデータを取得するが、実際に変更があるのは数千件程度。素朴に全件UPDATEすると:
- DBへの書き込み負荷が高い
- 更新日時が全件書き換わり、「本当に変更があったデータ」がわからなくなる
- レンタルサーバーでは処理が間に合わない
解決策:3つのDTOで責務を分離する
[API JSON]
↓ OpenChatApiDtoFactory
[OpenChatDto] ←── APIからのデータを表現
↓
↓ ←── [OpenChatRepositoryDto] ←── DBの現在のデータを表現
↓ ↑
↓ OpenChatDataForUpdaterWithCacheRepository(メモリキャッシュ)
↓
↓ OpenChatUpdaterDtoFactory(差分検出)
↓
[OpenChatUpdaterDto] ←── 変更があったフィールドだけを保持
↓
↓ UpdateOpenChatRepository
↓
[MySQL UPDATE(変更カラムのみ)]
各DTOの役割
| DTO | 責務 | コード |
|---|---|---|
OpenChatDto |
APIレスポンスを表現 | OpenChatDto.php |
OpenChatRepositoryDto |
DBの現在の状態を表現 | OpenChatRepositoryDto.php |
OpenChatUpdaterDto |
変更フィールドのみ保持 | OpenChatUpdaterDto.php |
実装の詳細
1. OpenChatDto - APIデータの受け皿
class OpenChatDto
{
public string $name;
public string $desc;
public string $profileImageObsHash;
public ?int $memberCount;
public ?string $emid = null;
public ?int $createdAt = null;
public ?int $category = null;
public ?int $emblem = null;
public int $joinMethodType;
}
APIのJSONから必要なフィールドだけを抽出した、シンプルなデータ構造です。
コード: OpenChatDto.php
2. OpenChatApiDtoFactory - バリデーションと変換
class OpenChatApiDtoFactory
{
function validateAndMapToOpenChatDto(array $apiData, \Closure $callback): array
{
$errors = [];
$squares = $apiData['squaresByCategory'][0]['squares'];
foreach ($squares as $square) {
try {
$dto = $this->validateAndMapToOpenChatApiDtoFromSquare($square, $categoryId);
$callback($dto);
} catch (\RuntimeException $e) {
$errors[] = $e->getMessage();
}
}
return $errors;
}
private function validateAndMapToOpenChatApiDtoFromSquare(array $square, int $categoryId): OpenChatDto
{
$dto = new OpenChatDto;
// Validatorで型チェックしながら値を設定
$dto->emid = Validator::str($square['square']['emid'], e: \RuntimeException::class);
$dto->name = Validator::str($square['square']['name'], emptyAble: true, e: \RuntimeException::class);
// ... 他のフィールドも同様
return $dto;
}
}
ポイント:
- バリデーションエラーは例外で処理し、エラーログに記録
- 1件のエラーで全体が止まらないよう、ループ内でtry-catch
- コールバック関数で後続処理を注入できる
コード: OpenChatApiDtoFactory.php
3. OpenChatDataForUpdaterWithCacheRepository - 25万件メモリキャッシュ
class OpenChatDataForUpdaterWithCacheRepository implements OpenChatDataForUpdaterWithCacheRepositoryInterface
{
/** @var OpenChatRepositoryDto[]|null */
private static ?array $openChatDataCache = null;
/** @var string[]|null */
private static ?array $openChatEmidCache = null;
public static function cacheOpenChatData(): void
{
$query = "SELECT id, emid, name, description, img_url, ... FROM open_chat ORDER BY id ASC";
$dataArray = DB::fetchAll($query);
self::$openChatEmidCache = array_column($dataArray, 'emid');
self::$openChatDataCache = [];
foreach ($dataArray as $data) {
self::$openChatDataCache[] = new OpenChatRepositoryDto($data['id'], $data);
}
}
public function getOpenChatDataByEmid(string $emid): OpenChatRepositoryDto|false
{
if (!isset(self::$openChatEmidCache, self::$openChatDataCache)) {
$this->cacheOpenChatData();
}
$key = array_search($emid, self::$openChatEmidCache);
if ($key === false) {
return false;
}
return self::$openChatDataCache[$key];
}
}
ポイント:
-
static変数でプロセス内キャッシュ(同一リクエスト内で再利用) - emidの配列を別に持ち、
array_searchで高速検索 - 25万件を毎回DBから取得するより圧倒的に速い
コード:
- OpenChatDataForUpdaterWithCacheRepository.php
- OpenChatDataForUpdaterWithCacheRepositoryInterface.php
4. OpenChatUpdaterDtoFactory - 差分検出の核心
class OpenChatUpdaterDtoFactory
{
function mapToDto(OpenChatRepositoryDto $repoDto, OpenChatDto $apiDto, bool $updateMember): OpenChatUpdaterDto
{
$updaterDto = new OpenChatUpdaterDto($repoDto->open_chat_id);
// 全フィールドをループで比較
foreach ($repoDto as $prop => $value) {
if ($prop === 'open_chat_id') {
continue;
}
// 値が同じなら null(= 更新不要)
if (!isset($apiDto->$prop) || $apiDto->$prop === $value) {
$updaterDto->$prop = null;
continue;
}
// 値が違うなら新しい値をセット
$updaterDto->$prop = $apiDto->$prop;
}
// メタデータが変わった場合のみ updated_at を更新
if ($updaterDto->name !== null || $updaterDto->desc !== null || ...) {
$updaterDto->rewriteUpdateAtTime($this->dateTime);
}
return $updaterDto;
}
}
ポイント:
-
foreach ($repoDto as $prop => $value)でプロパティを走査 - 変更がないフィールドは
nullにすることで、後続のUPDATE文から除外 -
updated_atは意味のある変更(名前・説明など)があった場合のみ更新
コード: OpenChatUpdaterDtoFactory.php
5. OpenChatUpdaterDto - 変更情報の保持
class OpenChatUpdaterDto
{
public int $open_chat_id;
public ?string $updated_at = null;
// null = 変更なし、値あり = 変更あり
public ?string $name = null;
public ?string $desc = null;
public ?string $profileImageObsHash = null;
public ?int $memberCount = null;
// ...
function getUpdateItems(): ?string
{
$updateItems = [
'name' => $this->name !== null,
'description' => $this->desc !== null,
// ...
];
return array_filter($updateItems, fn($item) => $item)
? json_encode($updateItems)
: null;
}
}
ポイント:
- 全フィールドがnullable。
null= 更新不要という規約 -
getUpdateItems()で「何が変わったか」をJSON化して記録(デバッグ・分析用)
6. UpdateOpenChatRepository - 動的UPDATE文生成
class UpdateOpenChatRepository implements UpdateOpenChatRepositoryInterface
{
public function updateOpenChatRecord(OpenChatUpdaterDto $dto): void
{
$columnsToSet = [
'updated_at' => $dto->updated_at ?? null,
'name' => $dto->name ?? null,
'description' => $dto->desc ?? null,
// ...
];
// null以外のカラムだけを抽出
$columnsToUpdate = array_filter($columnsToSet, fn ($value) => $value !== null);
if (!$columnsToUpdate) {
return; // 更新するものがなければ何もしない
}
// 動的にSET句を生成
$setStatement = implode(',', array_map(
fn ($column) => "{$column} = :{$column}",
array_keys($columnsToUpdate)
));
$columnsToUpdate['id'] = $dto->open_chat_id;
DB::execute(
"UPDATE open_chat SET {$setStatement} WHERE id = :id",
$columnsToUpdate
);
}
}
ポイント:
- 変更があったカラムだけでUPDATE文を動的生成
- 全フィールドがnullなら何もせずreturn
コード:
なぜこの設計にしたか
DTOを分けた理由
最初は1つのクラスで全部やろうとしていました。
しかし、以下の問題が発生しました:
- APIのフィールド名とDBのカラム名が微妙に違う(
descvsdescription) - 「APIから来た値」と「DBにある値」と「更新すべき値」が混在して混乱
- 変更時にどこを直せばいいかわからなくなる
DTOを分けることで、各段階で何を扱っているかが明確になりました。
インターフェースを使う理由
- テスト時にモックに差し替えられる(25万件読み込まなくていい)
- 将来的に別の実装(Redis等)に変えたくなっても、呼び出し側を変更しなくていい
nullを「更新不要」の意味で使った理由
「変更があったかどうか」をboolで別に持つ方法もありますが、冗長になります。
// こうではなく
class UpdaterDto {
public ?string $name;
public bool $nameChanged;
}
// こう
class UpdaterDto {
public ?string $name; // nullなら変更なし、値があれば変更あり
}
nullableを活用することで、フィールドが増えても対応しやすくなりました。
データフロー全体図
クローリングから差分更新までの流れをまとめます。
// SyncOpenChat.php から呼び出し
$this->merger->fetchOpenChatApiRankingAll();
↓
// OpenChatApiDbMerger.php
// 1. APIからデータ取得
// 2. 各データをOpenChatDtoに変換
// 3. 差分検出してDB更新
$this->openChatApiDbMergerProcess->validateAndMapToOpenChatDtoCallback($apiDto);
↓
// OpenChatApiDbMergerProcess.php
// emidでキャッシュを検索
$repoDto = $this->openChatDataWithCache->getOpenChatDataByEmid($apiDto->emid);
if (!$repoDto) {
$this->add($apiDto); // 新規追加
return;
}
// 差分がなければreturn
if ($repoDto->name === $apiDto->name && ...) {
return null;
}
// 差分があれば更新
$this->openChatMargeUpdateProcess->mergeUpdateOpenChat($repoDto, $apiDto);
関連コード:
- SyncOpenChat.php - 毎時実行のエントリーポイント
- OpenChatApiDbMerger.php - クローリング全体制御
- OpenChatApiDbMergerProcess.php - 1件ごとの処理
まとめ
25万件を毎時同期するという要件に対して、以下の設計で対応しました。
- DTO分離: API、DB、更新情報をそれぞれ別のDTOで表現
- メモリキャッシュ: 全件をメモリに載せて高速比較
- 差分検出: 変更があったフィールドのみnull以外にセット
- 動的SQL: 変更カラムだけでUPDATE文を生成
結果として、25万件の取得に対して実際のDB書き込みは数千件程度に抑えられています。
APIからのデータダウンロードを除き、DBの更新処理に掛かる時間は約1分程度です。