はじめに
新サービスの立ち上げにあたり、Google Cloud 上で運用しているサービス間のデータベース連携の実現にTransactional Outbox Patternを採用しました。
本記事では、その構築の詳細について解説します。
運用フェーズの対応については次回の記事で取り上げます。
TL; DR
- Google Cloud 上で Transactional Outbox Pattern を用い、サービス間のデータベース連携を実装した
- Outbox Table・Message Relay(Cloud Run + Cloud Scheduler)・Pub/Sub・Database Updater を組み合わせ、データベースのコピー参照を実現
- 他の手法(直接参照・CDC・Event Sourcing 等)と比較し、原子性・独立性・実装容易性の観点から Outbox Pattern を採用した
Transactional Outbox Pattern とは
Transactional Outbox Patternは、あるサービスが自身のDatabase(DB)へデータを保存する際に、同一トランザクションで別テーブル(Outbox Table)にイベントを記録し、そのイベントを非同期に他サービスへ通知するアーキテクチャです。
今回のケースでは、新サービスが既存サービスのDBを参照できる必要がありました。
そこで、Transactional Outbox Patternを用い、既存サービスにてDBへの書き込みと同時にイベントを作成し、非同期的に同様のレコードを新サービス側のDBに書き込むことで既存サービスのDBを参照する仕組みを実現しました(下図)。
Outbox Tableのレコード例
テーブルへのCreate, Update, Deleteのイベントを値と共に保存する
{
"id": "01K3CX6VXDK242M8KZW29W79GA",
"eventType": "CREATED",
"table_name": "Table A",
"table_id": "01K3CXA14AXQ544Q078GMGQ7KR",
"payload": "{\"id\":\"01K3CXA14AXQ544Q078GMGQ7KR\",\"name\":\"山田太郎\",\"job\":\"Recruitment DX\",\"gender\":\"female\"}",
"status": "SENT",
"created_at": "2025-08-24T07:20:30Z"
}
その他のデータ参照手法との比較
サービス間のデータベース連携について、Transactional Outbox Pattern以外で検討した手法を示します。
区分 | 手法 | 概要 | 選択しなかった理由 |
---|---|---|---|
直接参照 | そのサービスのDBを直接参照 | データ参照用APIを追加し、リクエスト毎にデータ取得 | 障害が伝播、独立性が失われる |
コピー参照 | ダブルライト | DB更新後にMessage Brokerへイベント送信 | 原子性がなく、不整合が起きる可能性がある |
コピー参照 | 同一トランザクション内でイベント通知 | トランザクション内でMessage Brokerへイベント送信 | トランザクションが長時間占有される |
コピー参照 | Change Data Capture | DBトランザクションログからイベント送信 | Google Cloudでは構成が複雑になる懸念 |
コピー参照 | Event Sourcing | イベントを保存して状態を再構築 | CRUD設計の既存サービスでは大規模改修が必要 |
その他のデータ参照手法との比較の詳細
そのサービスのDBを直接参照する
概要: 参照先サービスにデータ取得用のAPIを追加して、参照元からそのAPIにリクエストを送りデータを取得する
選択しなかった理由
- 参照先サービスの障害が直接伝播する
- サービス間の独立性が失われる
DBの書き込みとMessage Brokerへのダブルライト
概要: DB書き込み後にMessage Brokerへ直接イベント送信し、Database UpdaterでDB更新を行う
選択しなかった理由
- DB更新とイベント送信の原子性が保たれず、不整合が起きやすい
- 例1: DB更新は成功したが、イベント送信に失敗した
- 例2: イベントは送られたが、DBは更新されていない
同一トランザクション内でイベント通知
概要: 原子性を保つために、同一トランザクション内で DB書き込み後にMessage Brokerへ直接イベント送信し、Database UpdaterでDB更新を行う
選択しなかった理由
- Message Brokerへの送信を含めることで、ブローカー送信やACK待ち・再試行をトランザクション内で実行することとなり、トランザクションが長時間占有される
Change Data Capture(CDC)
概要: Databaseのトランザクションログから変更を抽出し、Message Brokerへイベントを送信し、Database UpdaterでDB更新を行う
選択しなかった理由
- アプリケーションコードの変更が不要であり検討の余地があったが、Google Cloudで実現するためには複雑なアーキテクチャになりそうだったので断念した
- 参考
Event Sourcing
概要: ドメインの状態変化をイベントとして保存し、イベントの再生で現在状態を構築するアーキテクチャ。サービスで作成されるイベントをそのまま送信し値を参照
選択しなかった理由
- Command側で作成したイベントをそのまま送信することでスマートに実現できるが、既存サービスは State Sourcingで設計されており、Event Sourcing への移行は大規模改修が必要となり断念
- 参考
構築
下図のインフラ構成にてデータベース連携を実現しました
データ参照先(イベント送信側)
既存サービスに Outbox Table、Message Relay(Cloud Run + Cloud Scheduler)、Message Broker(Pub/Sub)を追加構築しました。
Message RelayであるCloud RunがOutbox Tableを定期的にポーリングし、未送信イベントをPub/Subへ送信します。
Backend Application + Database
Create, Update, Delete(CUD)処理時に、同一トランザクションでOutbox Tableにイベントを記録します。
Create, Update時は全データを、Delete時は空オブジェクトをDBにpayloadとして保存します。
export async function createTableA(id: string, name: string) {
await prisma.$transaction(async (tx) => {
const createdRecord = await tx.tableA.create({ data: { id, name, status: 'ACTIVE' } });
// 同一トランザクションでOutboxにイベントを書き込む
await tx.outboxEvent.create({
data: {
id: ulid(),
aggregateType: 'TABLE_A',
aggregateId: createdRecord.id,
eventType: 'CREATED',
payload: JSON.stringify(createdRecord),
},
});
});
}
Outbox Tableの設計
model OutboxEvent {
id String @id
aggregateType OutboxAggregateType @map("aggregate_type")
aggregateId String @map("aggregate_id")
eventType OutboxEventType @map("event_type")
payload String @map("payload")
status OutboxEventStatus? @map("status")
createdAt DateTime @default(now()) @map("created_at")
@@map("outbox_events")
}
enum OutboxEventType {
CREATED
UPDATED
DELETED
}
// Message RelayによってPollingされ、Message Brokerにpushが成功したイベントはSENT, 失敗したらFAILEDとする
enum OutboxEventStatus {
SENT // 送信済み
FAILED // 送信失敗
}
// 参照対象のテーブルの種類
// DDDのアプリケーションなので、集約タイプ(AggregateType)とした
enum OutboxAggregateType {
TABLE_A
TABLE_B
}
Message Relay(Cloud Run + Cloud Scheduler)
Outbox Table に保存されたイベントは、Cloud Runからの定期ポーリングにより、未送信または送信失敗のイベントが Pub/Sub へ送信するPolling Publisher Patternを採用しました。
その他のイベント送信手法の候補として、Transaction Log Tailing Pattern によるDBトランザクションログ出力を起点としたイベントを送信も挙げられましたが、送信サイズや処理負荷を制御しづらい点、Google Cloud 上で実現するには構成が複雑になる懸念から、Polling Publisher Patternを採用しました。
Polling Publisher Patternの実装コード
// 大量のイベントをDatabaseUpdaterに送信しないために、一度のpollingで取得するイベント数を制限する
const MAX_POLLING_NUMBER = 1000;
async function getEvents(): Promise<OutboxEvent[]> {
const events = await prisma.outboxEvent.findMany({
where: {
OR: [{ status: OutboxEventStatus.FAILED }, { status: null }],
},
orderBy: {
createdAt: 'asc',
},
take: MAX_POLLING_NUMBER,
});
return events;
}
async function publishEvent(events: OutboxEvent[]) {
const topicName = process.env.TOPIC_NAME;
for (const event of events) {
let isSuccess = false;
try {
const msg = {
id: event.id,
aggregateType: event.aggregateType,
aggregateId: event.aggregateId,
eventType: event.eventType,
payload: event.payload,
};
await pubSubClient
// 順番通り送るためにメッセージのオプションを設定
.topic(topicName, { messageOrdering: true })
.publishMessage({
data: Buffer.from(JSON.stringify(msg)),
orderingKey: e.createdAt.toISOString()
});
isSuccess = true;
}finally{
await OutboxStatusChange(event, isSuccess ? OutboxEventStatus.SENT : OutboxEventStatus.FAILED);
}
}
}
async function main() {
const events = await getEvents();
await publishEvent(events);
}
Message Broker(Pub/Sub)
Pub/Subは順序指定キーを利用し、依存関係のあるイベントも正しい順序で伝播できる設定とします
データ参照元(イベント受信側)
Pub/Sub からイベントを受信し、DBに反映します。
Database
データ連携されたテーブル(参照テーブル)はBackend Applicationからは参照専用とし、書き込みはDatabase Updaterからのみ許可します。
それぞれの参照テーブルには必ず参照先テーブルのレコードのPrimary Key(PK)を含める必要があり、今回はそれぞれのPKと同様の値としました。
また、参照先のテーブル設計をそのまま反映する必要はなく、不要なプロパティを切り落としたり、2つのテーブルを1つに結合して保存することも可能です(その分マッピングは大変となりますが...)。
Outboxイベントの冪等性確認用テーブル
Pub/Subからは1つのメッセージの配信が複数回行われてしまう可能性があります(参考ドキュメント)。
そのため、Database Updaterで冪等性を確認するために、受け取ったイベントを保存するテーブルを作成します。
model OutboxSubscribeEvent {
id String @id
outboxEventId String @map("outbox_event_id")
aggregateType OutboxAggregateType
aggregateId String
eventType OutboxEventType
payload String
result OutboxSubscribeEventResult // outboxのイベントの反映が成功したかどうか
message String? @map("message") // エラー時のメッセージ
createdAt DateTime @map("created_at") // Outboxイベントが作成された時間
appliedAt DateTime @updatedAt @map("updated_at") // Outboxイベントが反映された時間
// 検索の高速化を図る
@@index([outboxEventId, result])
}
enum OutboxSubscribeEventResult {
SUCCESS
FAILED
}
Database Updater
Nest.jsで実装されているBackend Applicationにイベント受信用のエンドポイント(rmu: read model updater)を追加することで実現しました(Backend ApplicationとDatabase Updaterは同様のコンテナイメージがデプロイされているが、使用箇所が異なる)。
rmuは下図に示すような構成とし、2つに分かれたcontrollerの内、最初のcontroller(rmu controller)で冪等性確認を行い、次のcontrollerで参照先テーブルと実際に保存するテーブルのマッピングを行い、Service層でデータ加工し、DAO層でDBに保存しています。
また、アプリケーションのエラーハンドリングは、Pub/Subの再送条件を考慮しながらレスポンスを設計する必要があります。
rmuの具体的な実装
@Controller('rmu')
export class RmuController {
@Post()
async subscribeOutboxEvent(@Body() pubSubMessage: PubSubMessageDto): Promise<void> {
// 1. Pub/Subメッセージからイベントデータをデコード
const event: OutboxEvent = JSON.parse(
Buffer.from(pubSubMessage.message.data, 'base64').toString(),
);
// 2. 冪等性を担保するため、処理済みのイベントはスキップ
if (await this.eventService.isAlreadySucceededEvent(event.id)) return;
try {
// 3. イベントの種類(aggregateType)に応じて呼び出すcontrollerを変える
if (event.aggregateType === OutboxAggregateType.APPLICANT_BASE_INFORMATION) {
await this.applicantBaseInformationController.handleEvent(event);
}
// 4. 成功した結果をDBに保存します。
await this.eventService.storeEventResult({ event, result: 'SUCCESS' });
} catch (error) {
// 5. 失敗した結果は保存し、エラーをスローしてPub/Subに再送を要求
await this.eventService.storeEventResult({ event, result: 'FAILED', message: error.message });
throw new HttpException('イベント処理に失敗しました', 400);
}
}
}
export class ApplicantBaseInformationController {
public async handleEvent(event: OutboxEvent): Promise<void> {
// payloadをJSONパース
const parsedPayload = JSON.parse(event.payload);
// 参照先テーブルと、このサービスのテーブルをマッピングする
const mappedPayload: ApplicantBaseInformationPayload = {
id: event.aggregateId,
fullName: parsedPayload.fullName,
lastName: parsedPayload.lastName,
birthday: parsedPayload.birthday,
};
// 保存するテーブルに紐づくService層に渡す
await this.applicantInformationService.applyApplicantBaseInformationEvent({
eventType: event.eventType,
payload: mappedPayload,
});
}
}
export class ApplicantInformationService {
async applyApplicantBaseInformationEvent({
payload,
eventType,
}: {
payload: ApplicantBaseInformationPayload;
eventType: OutboxEventType;
}): Promise<void> {
// ビジネスロジックで列を追加
const validBirthdayPartyType = this.calculateBirthdayPartyType(payload.birthday)
// eventTypeに応じた処理の分岐
switch (eventType) {
case OUTBOX_EVENT_TYPE.CREATED:
await this.ApplicantInformationDAO.create({...payload, validBirthdayPartyType});
break;
case OUTBOX_EVENT_TYPE.UPDATED:
await this.ApplicantInformationDAO.update({...payload, validBirthdayPartyType});
break;
case OUTBOX_EVENT_TYPE.DELETED:
await this.ApplicantInformationDAO.delete(payload.id);
break;
default:
throw new UnknownEventTypeApplicationError(
`未対応のイベントタイプです: ${eventType}`,
);
}
}
}
DAOの実装は割愛
ディレクトリ構成
.
└── rmu
├── application
│ └── { table-name }
│ ├── { table-name }.service.ts
│ ├── { table-name }.service.error.ts
│ ├── { table-name }.service.spec.ts
│ └── { table-name }.service.dao.interface.ts
├── controller
│ └── http
│ ├── { aggregation-type }
│ │ ├── { aggregation-type }.controller.ts
│ │ ├── { aggregation-type }.controller.error.ts
│ │ └── { aggregation-type }.controller.spec.ts
│ ├── rmu.controller.ts
│ └── rmu.controller.spec.ts
├── dao
│ └── { table-name }
│ ├── { table-name }.dao.ts
│ ├── { table-name }.dao.spec.ts
│ └── { table-name }.dao.error.ts
├── dto
└── types
まとめ
本記事では、サービス間のデータ連携の実現方法として Transactional Outbox Pattern を採用し、Google Cloud 上での構成例とコードを紹介しました。
次回の記事で運用フェーズの対応(ベースデータ移行, パフォーマンスチューニング等)について共有します。