はじめに
前回の記事では、差分検出によるDB書き込み削減について解説しました。
本記事では、毎時25万件のデータを同期するCronバッチ処理において、以下の3つの特性をどのように実現しているかを解説します。
| 特性 | 意味 |
|---|---|
| 冪等性 | 同じ処理を何度実行しても結果が同じ |
| 再開可能性 | 途中で落ちても、次の実行で続きから処理できる |
| 障害耐性 | 一時的なエラーで全体が止まらない。異常を検知して安全に中断できる |
これらを組み合わせることで、Safe to Retry(後から実行した方が勝ち) なバッチ処理を実現しています。
この設計により、稼働開始から2年間、手動での修正が必要なエラーは数回しか発生していません(実装変更時のバグを除く)。
処理フローの全体像
[毎時30分] cron_crawling.php
│
├─ 状態チェック(前回処理が実行中でないか確認)
│
└─ hourlyTask()
│
├─ カテゴリごとに:APIからJSON取得 → ファイル保存 → DBマージ
│
└─ ランキング永続化、画像更新、キャッシュパージ
[毎時0分] cron_half_check.php
│
└─ 30分の処理が未完了なら補完実行
毎時30分の処理が完了しなかった場合、毎時0分のチェック処理が補完します。この「二段構え」が障害耐性の基本です。
コード: SyncOpenChat.php
冪等性と再開可能性: ファイルタイムスタンプによるスキップ
オープンチャット公式サイトのAPIから取得したデータは、
カテゴリ(趣味、スポーツ、音楽など)ごとにファイル保存しています。各カテゴリの処理開始時に「今時間帯に取得済みか?」をファイルの更新時刻で判定します。
「常に最新データを取得すれば良くないか?」と思うかもしれません。
APIから取得するデータは1時間毎に更新されており、1カテゴリの取得には30秒程度掛かります。
失敗による再試行が重なり、ある時間帯のデータを取りこぼしてしまうことを防ぐためにもロスを最小限にしています。
// OpenChatApiDbMerger.php
$callbackByCategoryBefore = function (string $category) use ($positionStore): bool {
// ファイルの更新時刻(時間単位に正規化)
$fileTime = $positionStore->getFileDateTime($category)->format('Y-m-d H:i:s');
// 現在のCron実行時刻(時間単位に正規化)
$now = OpenChatServicesUtility::getModifiedCronTime('now')->format('Y-m-d H:i:s');
// 同じ時間帯ならtrue(= スキップ)
return $fileTime === $now;
};
ファイルが存在しない場合は十分古い日時を返すことで、必ず処理が実行されます。
// AbstractRankingPositionStore.php
function getFileDateTime(string $category = '0'): \DateTime
{
if (!file_exists($this->filePath() . "/{$category}.dat")) {
return new \DateTime('2000-01-01 00:00:00'); // 古い日時 → 必ず処理される
}
return $this->getModifiedFileTime($this->filePath() . "/{$category}.dat");
}
この仕組みにより:
- 冪等性: 同じ時間帯に何度実行しても、処理済みカテゴリはスキップされ、結果が同じになる
- 再開可能性: 途中で落ちても、未処理のカテゴリだけが処理される
コード:
再開可能性: 状態フラグによるリトライ判定
処理の実行状態をDBで管理しています。開始時にフラグをON、終了時にOFFにします。途中で落ちるとONのままになるため、次回実行時にリトライが必要だと判断できます。
// SyncOpenChat.php
private function hourlyTask()
{
$this->state->setTrue(StateType::isHourlyTaskActive); // 処理開始
$this->merger->fetchOpenChatApiRankingAll();
$this->state->setFalse(StateType::isHourlyTaskActive); // 正常終了(途中で落ちるとここに到達しない)
$this->hourlyTaskAfterDbMerge(...);
}
毎時0分のチェック処理では、このフラグを見てリトライを判断します。
// SyncOpenChat.php
function handleHalfHourCheck()
{
if ($this->state->getBool(StateType::isHourlyTaskActive)) {
$this->retryHourlyTask(); // フラグがON = 異常終了 → リトライ
} elseif (!$this->rankingPositionHourChecker->isLastHourPersistenceCompleted()) {
$this->hourlyTaskAfterDbMerge(true); // DB永続化だけ未完了 → 補完
}
}
リトライ時も前述のファイルタイムスタンプで処理済みカテゴリはスキップされるため、未処理分だけが効率的に処理されます。
コード: SyncOpenChatStateRepository.php
障害耐性: エラーハンドリングと安全な中断
連続エラーの検知
一時的なAPIサーバーエラーではすぐに止めず、連続エラー回数が閾値を超えた場合のみ中断します。成功すればカウントリセット。
// ErrorCounter.php
class ErrorCounter
{
private int $continuousErrorsCount = 0;
private int $maxContinuousErrors = 3;
public function increaseCount() { $this->continuousErrorsCount++; }
public function resetCount() { $this->continuousErrorsCount = 0; }
public function hasExceededMaxErrors() { return $this->continuousErrorsCount > $this->maxContinuousErrors; }
}
foreach ($openChatIdArray as $id) {
$result = $this->openChatUpdater->fetchUpdateOpenChat($id);
$result === false
? $this->errorCounter->increaseCount()
: $this->errorCounter->resetCount();
if ($this->errorCounter->hasExceededMaxErrors()) {
throw new \RuntimeException('連続エラー回数が上限を超えました');
}
}
コード: ErrorCounter.php
killFlagによるグレースフルシャットダウン
長時間処理を安全に中断するため、killFlagを使います。処理の各ステップでフラグをチェックし、trueなら例外で中断します。
// OpenChatApiDbMerger.php
private function checkKillFlag()
{
$this->syncOpenChatStateRepository
->getBool(SyncOpenChatStateType::openChatApiDbMergerKillFlag)
&& throw new ApplicationException('強制終了しました');
}
static function setKillFlagTrue()
{
app(SyncOpenChatStateRepositoryInterface::class)
->setTrue(SyncOpenChatStateType::openChatApiDbMergerKillFlag);
}
永続化の完了チェック
ファイルの時刻とDBの最終レコード時刻を比較し、一致しなければ毎時0分のチェック処理で補完します。
// RankingPositionHourPersistenceLastHourChecker.php
function isLastHourPersistenceCompleted(): bool
{
$fileTime = $this->rankingPositionStore->getFileDateTime()->format('Y-m-d H:i:s');
$dbTime = $this->rankingPositionHourRepository->getLastHour();
return $fileTime === $dbTime;
}
コード: RankingPositionHourPersistenceLastHourChecker.php
Safe to Retry(後から実行した方が勝ち)
これまでの仕組みと差分更新(前回記事参照)を組み合わせることで、Safe to Retryを実現しています。
[10:30] APIデータ取得 → ファイル保存 → DBマージ
[10:35] カテゴリ3の処理中に落ちる
[11:00] 0分チェックでリトライ → カテゴリ1,2はスキップ、カテゴリ3以降を処理
[11:30] 次の時間帯 → 新しいAPIデータで上書き、DBは差分更新で最新状態に収束
同一時間帯では処理済みカテゴリをスキップし、次の時間帯では新しいデータで上書き。DB更新は変更カラムのみ(冪等)なので、何度処理しても副作用がありません。
まとめ
| 特性 | 実現方法 |
|---|---|
| 冪等性 | ファイルタイムスタンプで処理済みカテゴリをスキップ、差分更新で変更分のみDB書き込み |
| 再開可能性 | 状態フラグで異常終了を検知、カテゴリ単位のスキップで未処理分から再開 |
| 障害耐性 | 連続エラー検知、killFlagによる安全な中断、0分チェックによる補完 |
これらにより、Safe to Retry(後から実行した方が勝ち) なバッチ処理を実現。途中で落ちても次の実行で完了でき、何度実行しても最新状態に収束します。